kafka消息丢失怎么办
本篇内容主要讲解“kafka消息丢失怎么办”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“kafka消息丢失怎么办”吧!
创新互联建站坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都做网站、网站建设、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的个旧网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
在处理生产环境问题的过程中发现12号那天某kafka集群有少量的数据丢失,概率大致在千万分之三。 数据写入kafka之后,就完全消失了,消费者完全没有消费到这个数据。 通过找到那天的数据,查看有问题的数据在写入kafka的时候上下文应用日志发现有少量以下报错:
[2019-10-12 11:03:43,xxx] This is not the correct coordinator.
理论上正常情况下kafka是不太可能丢数据的,如果出现这种情况,必然是开发人员或者硬件引发了什么问题,因为写入日志是有的,看了下应用配置
acks=1
马上意识到,问题突破口应该在这里。
acks=0 生产者能够通过网络吧消息发送出去,那么就认为消息已成功写入Kafka,一定会丢失一些数据 acks=1 master在疏导消息并把它写到分区数据问津是会返回确认或者错误响应,还是可能会丢数据 acks=all master在返回确认或错误响应之前,会等待所有同步副本都收到消息
可能以前是为了保证性能够快,选择了折中的应用配置 acks=1
。
马上想到去看下kafka的日志,猜测这个时间段必然出现出现了 master 不可用的情况才会导致数据丢失。
在kafka集群的57号节点的机器上看到这样一段日志:
[2019-10-12 11:03:39,427] WARN Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaadbbxx00 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:40,908] INFO Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaabbxx00, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:41,253] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2019-10-12 11:03:41,962] INFO Opening socket connection to server xx.xx.xx.59/xx.xx.xx.59:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:41,962] INFO Socket connection established to xx.xx.xx.59/10.xx.xx.xx:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,293] WARN Unable to reconnect to ZooKeeper service, session 0x396cf664cdbb0000 has expired (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,293] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2019-10-12 11:03:42,294] INFO Initiating client connection, connectString=xx.xx.xx.55:2181,xx.xx.xx.56:2181,xx.xx.xx.57:2181,xx.xx.xx.58:2181,xx.xx.xx.59:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@342xxx2d (org.apache.zookeeper.ZooKeeper) [2019-10-12 11:03:42,313] INFO Unable to reconnect to ZooKeeper service, session 0x396cxxxxxb0000 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,323] INFO EventThread shut down for session: 0x396cxxxx000 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,342] INFO Opening socket connection to server xx.xx.xx.58/xx.xx.xx.58:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:42,343] INFO Socket connection established to xx.xx.xx.58/xx.xx.xx.58:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:43,516] INFO Session establishment complete on server xx.xx.xx.58/xx.xx.xx.58:2181, sessionid = 0x3ax4xxxxx01, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2019-10-12 11:03:43,517] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
看来和猜测的情况是一样的,这个kafka的节点机器出现了连接不到zk,所以这台机器处于‘丢失’状态。又去看了下kafka的controller日志:
[2019-10-12 11:03:42,671] INFO [Controller id=56] Newly added brokers: , deleted brokers: 57, all live brokers: ...,55,56,58,59,xx.... (kafka.controller.KafkaController) [2019-10-12 11:03:42,678] INFO [Controller-56-to-broker-57-send-thread]: Shutting down (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Stopped (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Shutdown completed (kafka.controller.RequestSendThread) [2019-10-12 11:03:42,828] INFO [Controller id=56] Broker failure callback for 57 (kafka.controller.KafkaController) [2019-10-12 11:03:42,836] INFO [Controller id=56] Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController)
再次看了下这个节点系统的网络层面监控:
真相浮出水面,和猜测吻合,这个问题和之前遇到的一个问题有点类似,不一样的情况是这次只丢失了一个节点,上次的问题是所有的网络节点都短暂的丢失,详情可以看我之前写的博客: https://my.oschina.net/110NotFound/blog/3105190
这次的原因分析:
57号节点丢失之后,立马进行了选举,这个时候数据在生产的时候到达了master,恰好选举的时候变更了master,而 acks=1
,数据已经进入master,但是还没有来得及同步到slave,这样就导致了数据的丢失。
总结:
如果有对数据有强一致性的要求,一定要选择 acks=all
否则指不定哪天硬件的轻微系统抖动就会导致 kafka
集群重新选举,丢失数据。
到此,相信大家对“kafka消息丢失怎么办”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
新闻标题:kafka消息丢失怎么办
分享路径:http://myzitong.com/article/joheeg.html