三、flink--DataStreamAPI原理以及用法

一、DataStream基本概述

1.1 datastream是什么?

datastream是flink提供给用户使用的用于进行流计算和批处理的api,是对底层流式计算模型的api封装,便于用户编程。

成都创新互联公司长期为超过千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为吴兴企业提供专业的做网站、网站建设,吴兴网站改版等技术服务。拥有十年丰富建站经验和众多成功案例,为您定制开发。

1.2 datastream运行模型

一个完整的datastream运行模型一般由三部分组成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取(也就是从数据源读取,可以批数据源,也可以是流式数据数据源),Transformation主要负责对属于的转换操作(也就是正常的业务处逻辑),Sink负责最终数据的输出(计算结果的导出)。

1.3 datastream程序架构

一般来说,使用datastream api编写flink程序,包括以下流程:
1、获得一个执行环境;(Execution Environment)
2、加载/创建初始数据;(Source)
3、指定转换这些数据;(Transformation)
4、指定放置计算结果的位置;(Sink)
5、触发程序执行(这是流式计算必须的操作,如果是批处理则不需要)

二、DataStream api的使用

2.1 maven依赖配置



    4.0.0

    SparkDemo
    SparkDemoTest
    1.0-SNAPSHOT

    
        UTF-8

        2.11.8
        2.7.3
        2.11
        1.6.1

    

    

        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
            MySQL
            mysql-connector-java
            8.0.12
        
        
            junit
            junit
            4.12
        

        
            org.apache.logging.log4j
            log4j-core
            2.9.0
        

        
        
        
            org.apache.flink
            flink-java
            1.6.1
        

        
        
            org.apache.flink
            flink-streaming-java_2.11
            1.6.1
            
        

        
        
            org.apache.flink
            flink-streaming-scala_2.11
            1.6.1
        

        
        
            org.apache.flink
            flink-scala_2.11
            1.6.1
        

        
        
            org.apache.flink
            flink-clients_2.11
            1.6.1
        

        
        
            org.apache.flink
            flink-table_2.11
            1.6.1
            provided
        

        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
            com.alibaba
            fastjson
            1.2.22
        

        
            org.apache.flink
            flink-connector-kafka-0.10_${scala.binary.version}
            ${flink.version}
        

    

    
    
        

            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            

            
                maven-compiler-plugin
                3.6.0
                
                    1.8
                    1.8
                
            

            
                org.apache.maven.plugins
                maven-surefire-plugin
                2.19
                
                    true
                
            

        
    

2.2 获取执行环境(Execution Environment)

有三种类型的执行环境:

1、StreamExecutionEnvironment.getExecutionEnvironment()
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

2、StreamExecutionEnvironment.createLocalEnvironment()
返回本地执行环境,需要在调用时指定默认的并行度。

3、StreamExecutionEnvironment.createRemoteEnvironment()
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

2.3 常用数据源(source)

2.3.1 基于file的数据源

1、env.readTextFile(path)
一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。

package flinktest;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExampleDemo {
    public static void main(String[] args) throws Exception {
        //1、创建环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、读取文件作为数据源
        DataStreamSource fileSource = env.readTextFile("/tmp/test.txt");
        //3、打印数据
        fileSource.print();
        //4、启动任务执行
        env.execute("test file source");
    }
}

2、env.readFile(fileInputFormat,path)
按照指定的fileinputformat格式来读取文件。这里的fileinputformat可以自定义类

package flinktest;

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExampleDemo {
    public static void main(String[] args) throws Exception {
        //1、创建环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、读取文件作为数据源
        DataStreamSource fileSource = env.readFile(new TextInputFormat(new Path("/tmp/test.txt")), "/tmp/test.txt");
        //3、打印数据
        fileSource.print();
        //4、启动任务执行
        env.execute("test file source");
    }
}

2.3.2 基于socket数据源

socketTextStream(host,port)

package flinktest;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExampleDemo {
    public static void main(String[] args) throws Exception {
        //1、创建环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、读取socket作为数据源
        DataStreamSource sourceSocket = env.socketTextStream("127.0.0.1", 1000);
        //3、打印数据
        sourceSocket.print();
        //4、启动任务执行
        env.execute("test socket source");
    }
}

2.3.3 基于集合collection的数据源

1、fromCollection(Collection)
从集合中创建一个数据流,集合中所有元素的类型是一致的。

List list = new ArrayList<>();
DataStreamSource sourceCollection = env.fromCollection(list);       

2、fromCollection(Iterator)
从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回。

3、fromElements(Object)
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的

4、generateSequence(from, to)
从给定的间隔中并行地产生一个数字序列。读取一定范围的sequnce对象

2.3.4 自定义数据源

env.addSource(SourceFuntion)
自定义一个数据源实现类,然后 addSource 到到env中。比如场景的从kafka读取数据,从mysql读取数据

2.4 常用输出(sink)

Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。Flink有许多封装在DataStream操作里的内置输出格式。
1、 writeAsText
将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。

2、WriteAsCsv
将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。

3、print/printToErr
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。

4、 writeUsingOutputFormat
自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。

5、writeToSocket
根据SerializationSchema 将元素写入到socket中。

6、stream.addSink(SinkFunction)
使用自定义的sink类

2.5 常用算子(transformation operator)

2.5.1 map

DataStream → DataStream:输入一个参数经过处理产生一个新的参数

DataStream dataStream = //...
dataStream.map(new MapFunction() {
    @Override
    //这里将每个参数 * 2,然后返回
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

2.5.2 flatMap

DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。

dataStream.flatMap(new FlatMapFunction() {
    @Override
    public void flatMap(String value, Collector out)
        throws Exception {
        //切割字符串,将处理之后的数据放到 collector 中。
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

2.5.3 filter

DataStream → DataStream:计算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:

dataStream.filter(new FilterFunction() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

2.5.4 keyBy

DataStream → KeyedStream:要求输入是tuple,或者是一个复合对象,里面有多个属性(例如student类,里面有name、age等2个以上的属性),反正就是必须有作为key和value的数据。根据key进行分区,相同key的在同一个分区,在内部使用hash实现。

//有不同方式指定key
dataStream.keyBy("someKey") // 指定key的字段名称,常用于复合对象中
dataStream.keyBy(0) // 指定key的位置,常用于tuple中

2.5.5 reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果,也就是每一次聚合的结果都会返回,直到最后一次聚合结束,所以不是只返回最后一个聚合结果。

keyedStream.reduce(new ReduceFunction() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

2.5.6 fold

KeyedStream → DataStream
一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

DataStream result =
  keyedStream.fold("start", new FoldFunction() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

运行结果为:
假设数据源为 (1,2,3,4,5)
结果为:start-1,start-1-2...... 

2.5.7 aggregations

KeyedStream →DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

注意:在2.3.10之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的操作,但是到2.3.10后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。

2.5.8 connect、coMap、coFlatMap

1、connect:
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

DataStream someStream = //...
DataStream otherStream = //...

ConnectedStreams connectedStreams = someStream.connect(otherStream);

2、coMap、coFlatMap
ConnectedStreams → DataStream:专门用于connect之后的stream操作的map和flatmap算子。

connectedStreams.map(new CoMapFunction() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction() {

   @Override
   public void flatMap1(Integer value, Collector out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

2.5.9 split和select

split:
DataStream → SplitStream:将一个数据流拆分成两个或者多个数据流.并且会给每个数据流起一个别名

select:SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

SplitStream split = someDataStream.split(new OutputSelector() {
    @Override
    public Iterable select(Integer value) {
        List output = new ArrayList();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

split.select("even").print();
split.select("odd").print();

2.5.10 union

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。这和connect不一样,connect并没有合并多个stream

dataStream.union(otherStream1, otherStream2, ...);

当前题目:三、flink--DataStreamAPI原理以及用法
本文来源:http://myzitong.com/article/jdgcdi.html