spark结构化流有状态的聚合操作-创新互联
spark的结构化流的聚合操作主要有两种,一种是不限时间维度的聚合操作,也就是全局的聚合操作,另一种是带时间窗口的聚合操作,本文我们主要是来谈谈这两种操作的技术实现及应用场景
乳源网站建设公司创新互联建站,乳源网站设计制作,有大型网站制作公司丰富经验。已为乳源千余家提供企业网站建设服务。企业网站搭建\外贸网站制作要多少钱,请找那个售后服务好的乳源做网站的公司定做!技术原理 不根据时间维度进行聚合我们以统计输入单词的个数作为例子,wordstream.groupby('word').count()
这个代码会统计至今为止出现的输入的每个单词的个数信息,这种聚合方式会把当前的聚合结果作为分布式式状态维护到spark中(spark检查点目录中),它支持两种输出模式:更新模式和完整模式,但不支持追加模式
1.使用更新模式输出,每次聚合操作新增和更新的行都会输出到输出池中,这样就可以使用kafka数据池作为输出池,因为kafka数据池支持更新模式,但是不能使用只支持追加模式的输出池
2.使用完整模式输出,每次聚合操作后当前的完整的单词对应的个数的记录数据都会输出到输出池中,每次微批触发后都会输出完整的记录,至于输出池可以选择使用支持完整输出模式的kafka输出池
3.不支持追加模式,由于这种聚合方式会改变之前输出结果,所以这种类型的聚合不支持追加模式的输出方式.
假设我们想要统计每台容器每10分钟内的错误日志量,触发间隔为5分钟一次,那我们可以通过如下代码实现:
events.groupby('ip',window('eventtime','10 minutes','5 minutes')).count()
顺便提一下:使用窗口后,触发时间间隔配置.trgger(EventTime, '5 minutes')最好也是设置成和window函数中使用的值一样即可
这里关键在于window函数,他可以对数据进行分组统计,这种统计方式的优点是他不会丢失任何迟到的或者乱序到来的数据,而且会把他们正确的分组,因为他会保留历史的所有状态数据到检查点目录中,不过也是由于可能存在迟到了几天的数据到达,导致需要把这条迟到的数据归到几天前的对应窗口去,所以这种方式不支持追加输出模式,只支持更新和完整输出模式,此外这种统计方式的缺点是状态数据会一直累加,状态大小无限增大,包含最新时间的新分组不停的创建,而旧分组数据需要一直保留,以防止可能到来的迟到事件,这就引申出另一个问题:怎么清除旧的分组来限制状态大小的无限增长呢?
答案是指定水印
events.withWatermark('eventtime','60 minutes').groupby('ip',window('eventtime','10 minutes','5 minutes')).count()
顺便提一下:使用窗口后,触发时间间隔配置.trgger(EventTime, '5 minutes')最好也是设置成和window函数中使用的值一样即可
清除旧分组的关键在于怎么定义这个分组不再接收延迟到达的数据,假设我们认为数据达到的时间不会迟到超过1小时,那我们就可以如上所示编写程序,注意这里水位线的更新间隔是5分钟,也就是新的分组的水位线等于当前窗口开始创建之前已经收到的记录的大事件时间减去60分钟,这个值就作为新分组的水位线,我们可以知道根据水位线的定义,水位线是递增的,如果到来的记录的事件时间高于水位线就把它归到对应的某个旧时间窗口中,当前也可以归到当前或者未来的某个时间窗口分组中,如果低于水位线那么这条记录就被直接丢掉不处理(不过这里也不一定:水印提供的保证没有说一定会丢掉这条迟到太久的记录)。有了这个水印之后,spark就可以清理掉旧的分组数据了,因为低于水位线的那些旧分组已经不会被新来的数据更新了,可以安全的清理掉.
基于水位线的时间窗口聚合严格来说只能支持两种输出模式:1.追加模式:由于水印的存在,spark可以判断出哪个时间窗口内的统计数据不会在发生变化,然后他就可以把这个数据不会再变化的旧分组的数据输出到输出池中了,比如基于文件的输出池,不过使用这种追加模式的缺点是数据输出要推迟到水位线超过对应分组的窗口时间,才能输出这个分组的聚合数据
2.更新模式:这种输出模式下,新增分组的统计值和旧的分组中有变化的统计值会输出到输出池中.
其实严格来说,基于水位线的时间窗口聚合确实可以使用完整模式输出,但是由于这种输出模式需要保留过去所有的历史状态数据,即使指定了水印,spark也不会清理旧分组的状态数据,相当于水印没有指定一样,所以这个和使用水印的初衷是相违背的,会造成状态数据的无限增长,所以这里才说基于水位线的时间窗口聚合不支持完整输出模式
彩蛋:
提一个问题:
如果强行比如基于水位线的时间窗口聚合使用基于文件的追加模式输出结果会怎么样?
答案是:未定义,聚合类型支持的输出模式和输出池的输出模式不匹配时,有可能发生以下情况,第一种情况是spark直接报错,程序没法执行,第二种情况是输出结果未定义.
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
网站标题:spark结构化流有状态的聚合操作-创新互联
分享路径:http://myzitong.com/article/dssshs.html