Kafka怎么在SpringBoot中使用

这篇文章给大家介绍Kafka怎么在Spring Boot中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名注册虚拟主机、营销软件、网站建设、张掖网站维护、网站推广。

系统环境

使用远程服务器上搭建的kafka服务

  1. Ubuntu 16.04 LTS

  2. kafka_2.12-0.11.0.0.tgz

  3. zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:



  4.0.0

  com.laravelshao.springboot
  spring-boot-integration-kafka
  0.0.1-SNAPSHOT
  jar

  spring-boot-integration-kafka
  Demo project for Spring Boot

  
    org.springframework.boot
    spring-boot-starter-parent
    2.0.0.RELEASE
     
  

  
    UTF-8
    UTF-8
    1.8
  

  
    
      org.springframework.boot
      spring-boot-starter
    
    
    
      org.springframework.kafka
      spring-kafka
    
    
      org.springframework.boot
      spring-boot-starter-json
    
    
      org.springframework.boot
      spring-boot-starter-test
      test
    
  

  
    
      
        org.springframework.boot
        spring-boot-maven-plugin
      
    
  

2.添加配置信息,这里使用yml文件

spring:
 kafka:
  bootstrap-servers:X.X.X.X:9092
  producer:
   value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  consumer:
   group-id: test
   auto-offset-reset: earliest
   value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

3.创建消息对象

public class Message {
  private Integer id;
  private String msg;

  public Message() {
  }

  public Message(Integer id, String msg) {
    this.id = id;
    this.msg = msg;
  }

  public Integer getId() {
    return id;
  }

  public void setId(Integer id) {
    this.id = id;
  }

  public String getMsg() {
    return msg;
  }

  public void setMsg(String msg) {
    this.msg = msg;
  }

  @Override
  public String toString() {
    return "Message{" +
        "id=" + id +
        ", msg='" + msg + '\'' +
        '}';
  }
}

4.创建生产者

package com.laravelshao.springboot.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * Created by shaoqinghua on 2018/3/23.
 */
@Component
public class Producer {
  private static Logger log = LoggerFactory.getLogger(Producer.class);

  @Autowired
  private KafkaTemplate kafkaTemplate;

  public void send(String topic, Message message) {
    kafkaTemplate.send(topic, message);
    log.info("Producer->topic:{}, message:{}", topic, message);
  }

}

5.创建消费者,使用@ KafkaListener注解监听主题

package com.laravelshao.springboot.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * Created by shaoqinghua on 2018/3/23.
 */
@Component
public class Consumer {
  private static Logger log = LoggerFactory.getLogger(Consumer.class);

  @KafkaListener(topics = "test_topic")
  public void receive(ConsumerRecord consumerRecord) {
    log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());
  }

}

6.发送消费测试

package com.laravelshao.springboot;

import com.laravelshao.springboot.kafka.Message;
import com.laravelshao.springboot.kafka.Producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class IntegrationKafkaApplication {

  public static void main(String[] args) throws InterruptedException {
    ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);
    Producer producer = context.getBean(Producer.class);

    for (int i = 1; i < 10; i++) {
      producer.send("test_topic", new Message(i, "test topic message " + i));
      Thread.sleep(2000);
    }
  }

}

可以依次看到发送消息,消费消息

Kafka怎么在Spring Boot中使用

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
 at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

spring:
 kafka:
  consumer:
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

关于Kafka怎么在Spring Boot中使用就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


文章名称:Kafka怎么在SpringBoot中使用
网站链接:http://myzitong.com/article/pigsop.html