KafkaProducer拦截器-创新互联
Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器。本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理。
成都创新互联服务项目包括温岭网站建设、温岭网站制作、温岭网页制作以及温岭网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,温岭网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到温岭省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!使用Kafka Producer端的拦截器非常简单,主要是实现ProducerInterceptor接口,此接口包含4个方法:
- ProducerRecord
onSend(ProducerRecord record):Producer在将消息序列化和分配分区之前会调用拦截器的这个方法来对消息进行相应的操作。一般来说最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样也会影响Broker端日志压缩(Log Compaction)的功能。
- ProducerRecord
- void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被应答(Acknowledgement)之前或者消息发送失败时调用,优先于用户设定的Callback之前执行。这个方法运行在Producer的IO线程中,所以这个方法里实现的代码逻辑越简单越好,否则会影响消息的发送速率。
- void close():关闭当前的拦截器,此方法主要用于执行一些资源的清理工作。
- configure(Map
configs):用来初始化此类的方法,这个是ProducerInterceptor接口的父接口Configurable中的方法。
- configure(Map
一般情况下只需要关注并实现onSend或者onAcknowledgement方法即可。下面我们来举个案例,通过onSend方法来过滤消息体为空的消息以及通过onAcknowledgement方法来计算发送消息的成功率。
public class ProducerInterceptorDemo implements ProducerInterceptor {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord onSend(ProducerRecord record) {
if(record.value().length()<=0)
return null;
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
sendSuccess++;
} else {
sendFailure ++;
}
}
@Override
public void close() {
double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率="+String.format("%f", succe***atio * 100)+"%");
}
@Override
public void configure(Map configs) {}
}
自定义的ProducerInterceptorDemo类实现之后就可以在Kafka Producer的主程序中指定,示例代码如下:
public class ProducerMain {
public static final String brokerList = "localhost:9092";
public static final String topic = "hidden-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");
Producer producer = new KafkaProducer(properties);
for(int i=0;i<100;i++) {
ProducerRecord producerRecord = new ProducerRecord(topic, "msg-" + i);
producer.send(producerRecord).get();
}
producer.close();
}
}
Kafka Producer不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链,这个拦截链会按照其中的拦截器的加入顺序一一执行。比如上面的程序多添加一个拦截器,示例如下:
properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1
这样Kafka Producer会先执行拦截器ProducerInterceptorDemo,之后再执行ProducerInterceptorDemoPlus。
有关interceptor.classes参数,在kafka 1.0.0版本中的定义如下:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
interceptor.calssses | A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. | list | null | low |
本文的重点是你有没有收获与成长,其余的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有一定的参考和帮助
需要更详细思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取
创新互联www.cdcxhl.cn,专业提供香港、美国云服务器,动态BGP最优骨干路由自动选择,持续稳定高效的网络助力业务部署。公司持有工信部办法的idc、isp许可证, 机房独有T级流量清洗系统配攻击溯源,准确进行流量调度,确保服务器高可用性。佳节活动现已开启,新人活动云服务器买多久送多久。
文章题目:KafkaProducer拦截器-创新互联
网站链接:http://myzitong.com/article/ceegoe.html