SpringCloudStream总结-创新互联

概念

1、group:

10多年的长沙网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。全网营销推广的优势是能够根据用户设备显示端的尺寸不同,自动调整长沙建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。创新互联从事“长沙网站设计”,“长沙网站推广”以来,每个客户项目都认真落实执行。

组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费

组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group

2、destination binder:

与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder

3、destination binding:

Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建

4、partition

一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上

注:严格来说partition不属于概念,而是一种Stream提高伸缩性、吞吐量的一种方式


注解

1、@Input,使用示例:

public interface MySink {
    @Input("my-input")
    SubscribableChannel input();
}

作用:

  • 用于接收消息
  • 为每个binding生成channel实例
  • 指定input channel的名称
  • 在spring容器中生成一个名为my-input,类型为SubscribableChannel的bean
  • 在spring容器中生成一个类,实现MySink接口。

2、@Output,使用示例:

public interface MySource {
    @Output("my-output")
    MessageChannel output();
}

作用:

  • @Input类似,只不过是用来生产消息

3、@StreamListener,使用示例:

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void receive(String messageBody) {
    log.info("Received: {}", messageBody);
}

作用:

  • 用于消费消息
  • condition的作用:用于过滤消息,只有符合条件表达式的消息才会被处理
  • condition起作用的两个条件:
    • 注解的方法没有返回值
    • 方法是一个独立方法,不支持Reactive API

4、@SendTo,使用示例:

// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
   return "handle...";
}

作用:

  • 用于发送消息

4、@InboundChannelAdapter,使用示例:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
        poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource producer() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}

作用:

  • 让添加该注解的方法生产消息
  • fixedDelay:多少毫秒发送1次
  • maxMessagesPerPoll:每次发送多少条消息

5、@ServiceActivator,使用示例:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
    return payload.toUpperCase();
}

作用:

  • 标注该注解的方法能够处理消息或消息的有效内容,通过监听input消息,用方法体的代码处理后,输出到output中

6、@Transformer,使用示例:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
  return message.toUpperCase();
}

作用:

  • @ServiceActivator类似,标注该注解的方法能够转换消息,消息头,或消息有效内容

PollableMessageSource

PollableMessageSource允许消费者可以控制消费速率。举个例子简单演示一下,首先定义一个接口:

public interface PolledProcessor {
    @Input("pollable-input")
    PollableMessageSource input();
}

使用示例:

@Autowired
private PolledProcessor polledProcessor;

@Scheduled(fixedDelay = 5_000)
public void poll() {
    polledProcessor.input().poll(message -> {
        byte[] bytes = (byte[]) message.getPayload();
        String payload = new String(bytes);
        System.out.println(payload);
    });
}

参考:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


文章名称:SpringCloudStream总结-创新互联
URL分享:http://myzitong.com/article/dspegi.html