Flink开发怎样进行实时处理应用程序

本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

专注于为中小企业提供成都网站设计、网站制作、外贸营销网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业准格尔免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上1000家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

使用Flink + java实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

使用上一节中的springboot-flink-train项目

开发步骤

第一步:创建流处理上下文环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

第二步:读取数据,使用socket流方式读取数据

DataStreamSource text = env.socketTextStream("192.168.152.45", 9999);

第三步:transform

        text.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();

这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。

第四步:执行

env.execute("StreamingWCJavaApp");

整体代码如下:

/**
 * 使用Java API来开发Flink的实时处理应用程序
 * wc统计的数据源自socket
 */
public class StreamingWCJava02App {

    public static void main(String[] args) throws Exception {

        // 获取参数
        int port;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        } catch (Exception e) {
            System.out.println("端口未设置, 使用默认端口9999");
            port = 9999;
        }


        // step1: 获取流处理上下文环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // step2: 读取数据
        DataStreamSource text = env.socketTextStream("192.168.152.45", port);
        // step3: transform
        text.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
        env.execute("StreamingWCJavaApp");
    }

}

运行

首先在192.168.152.45上运行命令

nc -l 9999

然后在运行main方法。在192.168.152.45的nc上输入

abc,def,abc,ddd

在idea控制台输出如下:

4> (abc,2)
1> (def,1)
4> (ddd,1)

这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:

        text.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

这样控制台的打印结果如下:

(abc,2)
(ddd,1)
(def,1)

这样一个简单的demo就成功了!

重构代码

上面的代码中localhost与port需要用参数传递进来。

代码如下:

        // 获取参数
        int port;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        } catch (Exception e) {
            System.out.println("端口未设置, 使用默认端口9999");
            port = 9999;
        }

使用Flink提供的ParameterTool来接收参数。

我们在运行时就可以指定参数列表了,其中的key必须以“-”或者“--”开头。

在运行时,配置参数:

Flink开发怎样进行实时处理应用程序

这样运行就可以从外界传递参数了

使用Flink + Scala实现需求

接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:

/**
  * 使用Scala开发Flink的实时处理应用程序
  */
object StreamingWCScalaApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 引入隐式转换
    import org.apache.flink.api.scala._

    val text = env.socketTextStream("192.168.152.45", 9999)
    text.flatMap(_.split(","))
        .map((_,1))
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1)
        .print()
        .setParallelism(1)

    env.execute("StreamingWCScalaApp");
  }
}

这种方式比java实现更加简洁。

上述内容就是Flink开发怎样进行实时处理应用程序,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。


分享标题:Flink开发怎样进行实时处理应用程序
转载注明:http://myzitong.com/article/jhjgoo.html