Flink-使用合流操作进行实时对账需求的实现-创新互联
学Flink第八章多流转换的时候,进行合流操作.connect()使用到了第九章状态编程的知识,感觉总体不是很清晰,因此学完状态编程后现在进行重温并细化一些细节
创新互联公司专注于中大型企业的网站设计、成都网站设计和网站改版、网站营销服务,追求商业策划与数据分析、创意艺术与技术开发的融合,累计客户超过千家,服务满意度达97%。帮助广大客户顺利对接上互联网浪潮,准确优选出符合自己需要的互联网运用,我们将一直专注品牌网站制作和互联网程序开发,在前进的路上,与客户一起成长!- 业务背景
- 步骤一:
用户进行支付的时候,后台是需要调用第三方服务平台进行服务,即用户支付请求,页面将会跳转到第三方支付平台支付
- 步骤二:
用户进行支付之后,第三方支付平台给到用户前端支出反馈,并且给我们平台发送用户已经付款的消息
- 步骤三:
第三方支付平台需要将钱再转入到我们平台账户
- 出现的问题以及需求
- 问题
如果进行到图中④,如果发生数据丢失,那么用户已经支付的消息无法传达给到后台,而后不能关闭订单
- 需求
因此需要进行实时对账操作,即用户提交的支付请求(客户端),以及第三方支付平台给到的请求(三方端),两者可以当成两条流
- 结果
如果进行两条流的操作后不匹配,那么将进行预警
- 一些细节考虑
两个流都给他标上时间戳(使用watermark标志)
使用状态编程保存状态以及设置定时器,来进行两条流的连接以及等待
如果对方流中有我流的数据,那么直接输出成功;如果没有则更新我流状态,注册定时器等待另一个流
然后用ontimer()触发定时器:判断条件如果两条流中还有状态没被清空,说明没匹配上
- 上代码
- 代码
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//来自app的支付日志
SingleOutputStreamOperator>appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L),
Tuple3.of("order-3", "app", 3500L)
).assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
public long extractTimestamp(Tuple3element, long recordTimestamp) {return element.f2;
}
})
);
//来自第三方平台的支付日志
SingleOutputStreamOperator>thirdpartStream = env.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.
>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
public long extractTimestamp(Tuple4element, long recordTimestamp) {return element.f3;
}
})
);
//检测同一支付单在两条流中是否匹配,等待一段时间后,不匹配就报警
// //这种也可以
// appStream.keyBy(data->data.f0)
// .connect(thirdpartStream.keyBy(data ->data.f0));
//
appStream.connect(thirdpartStream)
.keyBy(data->data.f0,data->data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
//自定义实现CoFunction
public static class OrderMatchResult extends CoProcessFunction,
Tuple4,String>{//定义状态变量,用来保存已经到达的事件
private ValueState>appEventState;
private ValueState>thirdPartyEventState;
//运行上下文环境中获取状态
@Override
public void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(
new ValueStateDescriptor>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
);
thirdPartyEventState = getRuntimeContext().getState(
new ValueStateDescriptor>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
);
}
@Override
public void processElement1(Tuple3value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//来的是app event,看另一条流中事件是否来过
if(thirdPartyEventState.value()!=null){out.collect("对账成功:"+value+" "+thirdPartyEventState.value());
//清空状态
thirdPartyEventState.clear();
}else{//如果没来就等待,并且更新状态
appEventState.update(value);
//注册一个5秒后的定时器,开始等待另一条的事件
ctx.timerService().registerEventTimeTimer(value.f2+5000L);
}
}
@Override
public void processElement2(Tuple4value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//来的是app event,看另一条流中事件是否来过
if(appEventState.value()!=null){out.collect("对账成功:"+appEventState.value()+" "+value);
//清空状态
appEventState.clear();
}else{//如果没来就等待,并且更新状态
thirdPartyEventState.update(value);
//注册一个5秒后的定时器,开始等待另一条的事件
ctx.timerService().registerEventTimeTimer(value.f3);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collectorout) throws Exception {//定时器触发,判断状态,如果某个状态不为空,说明另一条中事件没来
//并且不会存在两个都不为空,因为其中一个不为空后会被清除
//没有没清空表示失败
if(appEventState.value()!=null){out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到");
}
if(thirdPartyEventState.value()!=null){out.collect("对账失败:"+thirdPartyEventState.value()+" "+"APP信息信息未到");
}
//清空所有数据
appEventState.clear();
thirdPartyEventState.clear();
}
}
}
- 结果
对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账成功:(order-3,app,3500) (order-3,third-party,success,4000)
对账失败:(order-2,app,2000) 第三方支付平台信息未到
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
名称栏目:Flink-使用合流操作进行实时对账需求的实现-创新互联
文章起源:http://myzitong.com/article/ihjpe.html