侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 94 篇文章
  • 累计创建 42 个标签
  • 累计收到 1 条评论

SpringCloud Stream消息驱动微服务

qingtian
2022-05-04 / 0 评论 / 0 点赞 / 176 阅读 / 22,817 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-04,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

SpringCloud Stream消息驱动微服务

SpringBoot集成Kafka构建消息驱动微服务

Kafka的工作流程图

image-20220427000049714

配置文件

server:
  port: 8001
  servlet:
    context-path: /imooc-study-ecommerce-dev
spring:
  # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
  kafka:
    bootstrap-servers: 192.168.0.103:9092
       # 可以通过配置的方式确定kafka相关配置 打开注解即可
#    consumer:
#      # 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
#      group-id: imooc-study-ecommerce
#      auto-offset-reset: latest
#      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#    producer:
#      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.apache.kafka.common.serialization.StringSerializer

创建Kafka相关的配置

/**
 * <h1>通过代码自定义 Kafka 配置</h1>
 * */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * <h2>Kafka Producer 工厂类配置</h2>
     * */
    @Bean
    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configs);
    }

    /**
     * <h2>Kafka Producer 客户端</h2>
     * */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * <h2>Kafka Consumer 工厂类配置</h2>
     * */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * <h2>Kafka Consumer 监听器工厂类配置</h2>
     * */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        // 并发数就是一个消费者实例起几个线程
        factory.setConcurrency(3);
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

Kafka消息生产者

/**
 * <h1>kafka 生产者</h1>
 * */
@Slf4j
@Component
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * <h2>发送 kafka 消息</h2>
     * */
    public void sendMessage(String key, String value, String topic) {

        if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
            throw new IllegalArgumentException("value or topic is null or empty");
        }

        ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?
                kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);

        // 异步回调的方式获取通知
        future.addCallback(
                success -> {
                    assert null != success && null != success.getRecordMetadata();
                    // 发送到 kafka 的 topic
                    String _topic = success.getRecordMetadata().topic();
                    // 消息发送到的分区
                    int partition = success.getRecordMetadata().partition();
                    // 消息在分区内的 offset
                    long offset = success.getRecordMetadata().offset();

                    log.info("send kafka message success: [{}], [{}], [{}]",
                            _topic, partition, offset);
                }, failure -> {
                    log.error("send kafka message failure: [{}], [{}], [{}]",
                            key, value, topic);
                }
        );

        // 同步等待的方式获取通知
        try {
//            SendResult<String, String> sendResult = future.get();
            SendResult<String, String> sendResult = future.get(5, TimeUnit.SECONDS);

            // 发送到 kafka 的 topic
            String _topic = sendResult.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = sendResult.getRecordMetadata().partition();
            // 消息在分区内的 offset
            long offset = sendResult.getRecordMetadata().offset();

            log.info("send kafka message success: [{}], [{}], [{}]",
                    _topic, partition, offset);
        } catch (Exception ex) {
            log.error("send kafka message failure: [{}], [{}], [{}]",
                    key, value, topic);
        }
    }
}

Kafka消息消费者

/**
 * <h1>Kafka 消费者</h1>
 * */
@Slf4j
@Component
public class KafkaConsumer {

    private final ObjectMapper mapper;

    public KafkaConsumer(ObjectMapper mapper) {
        this.mapper = mapper;
    }

    /**
     * <h2>监听 Kafka 消息并消费</h2>
     * */
    @KafkaListener(topics = {"qingtian-springboot"}, groupId = "qingtian-springboot-kafka")
    public void listener01(ConsumerRecord<String, String> record) throws Exception {

        String key = record.key();
        String value = record.value();

        QinyiMessage kafkaMessage = mapper.readValue(value, QinyiMessage.class);
        log.info("in listener01 consume kafka message: [{}], [{}]",
                key, mapper.writeValueAsString(kafkaMessage));
    }

    /**
     * <h2>监听 Kafka 消息并消费</h2>
     * */
    @KafkaListener(topics = {"qingtian-springboot"}, groupId = "qingtian-springboot-kafka-1")
    public void listener02(ConsumerRecord<?, ?> record) throws Exception {

        Optional<?> _kafkaMessage = Optional.ofNullable(record.value());
        if (_kafkaMessage.isPresent()) {
            Object message = _kafkaMessage.get();
            QinyiMessage kafkaMessage = mapper.readValue(message.toString(),
                    QinyiMessage.class);
            log.info("in listener02 consume kafka message: [{}]",
                    mapper.writeValueAsString(kafkaMessage));
        }
    }
}

如何使用

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {

    private final ObjectMapper mapper;
    private final KafkaProducer kafkaProducer;

    public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) {
        this.mapper = mapper;
        this.kafkaProducer = kafkaProducer;
    }

    /**
     * <h2>发送 kafka 消息</h2>
     * */
    @GetMapping("/send-message")
    public void sendMessage(@RequestParam(required = false) String key,
                            @RequestParam String topic) throws Exception {

        QinyiMessage message = new QinyiMessage(
                1,
                "Imooc-Study-Ecommerce"
        );
        kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
    }
}

使用方式

/**
 * <h1>SpringBoot 集成 kafka 发送消息</h1>
 * */
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {

    private final ObjectMapper mapper;
    private final KafkaProducer kafkaProducer;

    public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) {
        this.mapper = mapper;
        this.kafkaProducer = kafkaProducer;
    }

    /**
     * <h2>发送 kafka 消息</h2>
     * */
    @GetMapping("/send-message")
    public void sendMessage(@RequestParam(required = false) String key,
                            @RequestParam String topic) throws Exception {

        QinyiMessage message = new QinyiMessage(
                1,
                "Imooc-Study-Ecommerce"
        );
        kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
    }
}

SpringBoot集成RocketMQ构建消息驱动微服务

消息生产者

/**
 * <h1>通过 RocketMQ 发送消息</h1>
 * Spring Messaging 模块
 * */
@Slf4j
@Component
public class RocketMQProducer {

    /** 类似 Kafka 中的 topic, 默认的读写队列都是4个 */
    private static final String TOPIC = "imooc-study-rocketmq";

    /** RocketMQ 客户端 */
    private final RocketMQTemplate rocketMQTemplate;

    public RocketMQProducer(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }

    /**
     * <h2>使用同步的方式发送消息, 不指定 key 和 tag</h2>
     * */
    public void sendMessageWithValue(String value) {

        // 随机选择一个 Topic 的 Message Queue 发送消息
        SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, value);
        log.info("sendMessageWithValue result: [{}]", JSON.toJSONString(sendResult));

        //实现分区
        SendResult sendResultOrderly = rocketMQTemplate.syncSendOrderly(
                TOPIC, value, "Qinyi"
        );
        log.info("sendMessageWithValue orderly result: [{}]",
                JSON.toJSONString(sendResultOrderly));
    }

    /**
     * <h2>使用异步的方式发送消息, 指定 key</h2>
     * */
    public void sendMessageWithKey(String key, String value) {

        Message<String> message = MessageBuilder.withPayload(value)
                .setHeader(RocketMQHeaders.KEYS, key).build();

        // 异步发送消息, 并设定回调
        rocketMQTemplate.asyncSend(TOPIC, message, new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("sendMessageWithKey success result: [{}]",
                        JSON.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable e) {
                log.error("sendMessageWithKey failure: [{}]", e.getMessage(), e);
            }
        });
    }

    /**
     * <h2>使用同步的方式发送消息, 带有 tag, 且发送的是 Java Pojo</h2>
     * */
    public void sendMessageWithTag(String tag, String value) {

        QinyiMessage qinyiMessage = JSON.parseObject(value, QinyiMessage.class);
        SendResult sendResult = rocketMQTemplate.syncSend(
                String.format("%s:%s", TOPIC, tag),
                qinyiMessage
        );
        log.info("sendMessageWithTag result: [{}]", JSON.toJSONString(sendResult));
    }

    /**
     * <h2>使用同步的方式发送消息, 带有 key 和 tag</h2>
     * */
    public void sendMessageWithAll(String key, String tag, String value) {

        Message<String> message = MessageBuilder.withPayload(value)
                .setHeader(RocketMQHeaders.KEYS, key).build();
        SendResult sendResult = rocketMQTemplate.syncSend(
                String.format("%s:%s", TOPIC, tag),
                message
        );
        log.info("sendMessageWithAll result: [{}]", JSON.toJSONString(sendResult));
    }
}

消息消费者

/**
 * <h1>第一个 RocketMQ 消费者</h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-string"
)
public class RocketMQConsumerString implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {

        QinyiMessage rocketMessage = JSON.parseObject(message, QinyiMessage.class);
        log.info("consume message in RocketMQConsumerString: [{}]",
                JSON.toJSONString(rocketMessage));
    }
}

---------------------------------------------------------------------------------
/**
 * <h1>第二个 RocketMQ 消费者, 指定了消费带有 tag 的消息</h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-tag-string",
        selectorExpression = "qinyi"        // 根据 tag 过滤
)
public class RocketMQConsumerTagString implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {

        QinyiMessage rocketMessage = JSON.parseObject(message, QinyiMessage.class);
        log.info("consume message in RocketMQConsumerTagString: [{}]",
                JSON.toJSONString(rocketMessage));
    }
}

----------------------------------------------------------------------------------
/**
 * <h1>第三个 RocketMQ 消费者, </h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-message-ext"
)
public class RocketMQConsumerMessageExt implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {

        String value = new String(message.getBody());
        log.info("consume message in RocketMQConsumerMessageExt: [{}], [{}]",
                message.getKeys(), value);
        log.info("MessageExt: [{}]", JSON.toJSONString(message));   // 会慢一些
    }
}

----------------------------------------------------------------------------------
/**
 * <h1>第四个, RocketMQ 消费者, 指定消费带有 tag 的消息, 且消费的是 Java Pojo</h1>
 * */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "imooc-study-rocketmq",
        consumerGroup = "qinyi-springboot-rocketmq-tag-object",
        selectorExpression = "qinyi"    // 根据 tag 做过滤
)
public class RocketMQConsumerObject implements RocketMQListener<QinyiMessage> {

    @Override
    public void onMessage(QinyiMessage message) {

        log.info("consume message in RocketMQConsumerObject: [{}]",
                JSON.toJSONString(message));
        // so something
    }
}

使用方式

/**
 * <h1>SpringBoot 集成 RocketMQ</h1>
 * */
@Slf4j
@RestController
@RequestMapping("/rocket-mq")
public class RocketMQController {

    private static final QinyiMessage RocketMQMessage = new QinyiMessage(
            1,
            "Qinyi-Study-RocketMQ-In-SpringBoot"
    );

    private final RocketMQProducer rocketMQProducer;

    public RocketMQController(RocketMQProducer rocketMQProducer) {
        this.rocketMQProducer = rocketMQProducer;
    }

    @GetMapping("/message-with-value")
    public void sendMessageWithValue() {
        rocketMQProducer.sendMessageWithValue(JSON.toJSONString(RocketMQMessage));
    }

    @GetMapping("/message-with-key")
    public void sendMessageWithKey() {
        rocketMQProducer.sendMessageWithKey("Qinyi", JSON.toJSONString(RocketMQMessage));
    }

    @GetMapping("/message-with-tag")
    public void sendMessageWithTag() {
        rocketMQProducer.sendMessageWithTag("qinyi",
                JSON.toJSONString(RocketMQMessage));
    }

    @GetMapping("/message-with-all")
    public void sendMessageWithAll() {
        rocketMQProducer.sendMessageWithAll("Qinyi", "qinyi",
                JSON.toJSONString(RocketMQMessage));
    }
}

SpringCloud Strean消息驱动组件

为什么会出现SpringCloud Stream

  • 没有SpringCloud Stream

    image-20220502165333262

SpringCloud Stream应用模型

  • SpringCloud Stream 中的核心概念

    • 负责于中间件交互的抽象绑定器:Binder
    • 发送消息与接收消息的应用通信信道:Input、Output
  • 经典的 SpringCloud Stream发布-订阅模型

    • Topic可以认为就是Kafka中的Topic的概念
    • Producer通过Input信道发布消息到Topic上
    • Consumer通过Output信道消费Topic上的消息

基于SpringCloud Stream消息驱动的应用

Maven配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>ecommerce-springcloud</artifactId>
        <groupId>com.imooc.ecommerce</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>


    <artifactId>e-commerce-stream-client</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <!-- 模块名及描述信息 -->
    <name>e-commerce-stream-client</name>
    <description>Stream Client</description>

    <dependencies>
        <!-- 创建工程需要的两个依赖 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- web 工程 -->
        <dependency>
            <groupId>com.imooc.ecommerce</groupId>
            <artifactId>e-commerce-mvc-config</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!-- zipkin = spring-cloud-starter-sleuth + spring-cloud-sleuth-zipkin-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-zipkin</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>
        <!-- SpringCloud Stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <!-- SpringCloud Stream + Kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <!-- SpringCloud Stream + RocketMQ -->
        <!--        <dependency>-->
        <!--            <groupId>com.alibaba.cloud</groupId>-->
        <!--            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>-->
        <!--        </dependency>-->
    </dependencies>

    <!--
        SpringBoot的Maven插件, 能够以Maven的方式为应用提供SpringBoot的支持,可以将
        SpringBoot应用打包为可执行的jar或war文件, 然后以通常的方式运行SpringBoot应用
     -->
    <build>
        <finalName>${artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

bootStrap.yml

server:
  port: 8006
  servlet:
    context-path: /ecommerce-stream-client

spring:
  application:
    name: e-commerce-stream-client
  cloud:
    nacos:
      # 服务注册发现
      discovery:
        enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
        server-addr: 192.168.0.103:8848
        namespace: f2dfb4c7-a6cf-45d5-92e2-3912daf808d0
        metadata:
          management:
            context-path: ${server.servlet.context-path}/actuator
    # 消息驱动的配置
    stream:
      # SpringCloud Stream + Kafka
      kafka:
        binder:
          brokers: 192.168.0.103:9092
          auto-create-topics: true  # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
      # SpringCloud Stream + RocketMQ
      #      rocketmq:
      #        binder:
      #          name-server: 192.168.0.103:9876
      # 开启 stream 分区支持
      instanceCount: 1  # 消费者的总数
      instanceIndex: 0  # 当前消费者的索引
      bindings:
        # 默认发送方
        output:      # 这里用 Stream 给我们提供的默认 output 信道
          destination: ecommerce-stream-client-default    # 消息发往的目的地, Kafka 中就是 Topic
          content-type: text/plain    # 消息发送的格式, 接收端不用指定格式, 但是发送端要
          # 消息分区
#          producer:
            # partitionKeyExpression: payload.author  # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
#            partitionCount: 1   # 分区大小
#            # 使用自定义的分区策略, 注释掉 partitionKeyExpression
#            partitionKeyExtractorName: qinyiPartitionKeyExtractorStrategy
#            partitionSelectorName: qinyiPartitionSelectorStrategy
        # 默认接收方
        input:      # 这里用 Stream 给我们提供的默认 input 信道
          destination: ecommerce-stream-client-default
#          group: e-commerce-guank-default
#          # 消费者开启分区支持
#          consumer:
#            partitioned: true

        # Qinyi 发送方
        guankOutput:
          destination: ecommerce-stream-client-guank
          content-type: text/plain
        # Qinyi 接收方
        guankInput:
          destination: ecommerce-stream-client-guank
#          group: e-commerce-guank-guank

  # spring-kafka 的配置
  kafka:
    bootstrap-servers: 192.168.0.103:9092
    producer:
      retries: 3
    consumer:
      auto-offset-reset: latest
  sleuth:
    sampler:
      # ProbabilityBasedSampler 抽样策略
      probability: 1.0  # 采样比例, 1.0 表示 100%, 默认是 0.1
      # RateLimitingSampler 抽样策略
      rate: 100  # 每秒间隔接受的 trace 量
  zipkin:
    sender:
      type: kafka # 默认是 http
    base-url: http://192.168.0.103:9411/

# 暴露端点
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

使用 SpringCloud Stream 默认的信道发送和消费

默认的发送信道

/**
 * @author qingtian
 * @version 1.0
 * @description: 使用默认的通信信道发送消息
 * @date 2022/5/3 9:09
 */
@Slf4j
@EnableBinding(Source.class)
public class DefaultSendService {

    /**
     * 注入信道源
     */
    @Autowired
    private Source source;

    public void sendMessage(GuankMessage message) {

        String _message = JSON.toJSONString(message);
        log.info("in DefaultSendService send message : [{}]",_message);

        //Spring Messaging ,统一消息的编程模型,是 Stream 组件的重要组成部分之一
        source.output().send(MessageBuilder.withPayload(_message).build());
    }
}

默认的消费信道

/**
 * @author qingtian
 * @version 1.0
 * @description: 使用默认的信道实现消息的接收
 * @date 2022/5/3 9:24
 */
@Slf4j
@EnableBinding(Sink.class)
public class DefaultReceiveService {

    /**
     *
     * @param payload
     */
    @StreamListener(Sink.INPUT)
    public void receiveMessage(Object payload) {
        log.info("in defaultReceiveService consume message start");
        GuankMessage guankMessage = JSON.parseObject(payload.toString(), GuankMessage.class);
        log.info("in defaultReceiveService consume message success : [{}]",JSON.toJSONString(guankMessage));
    }
}

使用自定义的信道

/**
 * @author qingtian
 * @version 1.0
 * @description: 自定义输出信道
 * @date 2022/5/4 9:07
 */
public interface GuankSource {

    String OUTPUT = "guankOutput";

    /**
     * 输出信道的名称是 guanKOutput ,需要使用 stream 绑定器绑定
     * @return
     */
    @Output(GuankSource.OUTPUT)
    MessageChannel guankOutput();
}

------------------------------------------------------------------------------
@Slf4j
@EnableBinding(GuankSource.class)
public class GuankSendService {
    @Autowired
    private GuankSource guankSource;

    /**
     * 使用自定义的信道发送消息
     * @param message
     */
    public void sendMessage(GuankMessage message) {
        String _message = JSON.toJSONString(message);
        log.info("in guankSendService sned message : [{}]",_message);
        guankSource.guankOutput().send(MessageBuilder.withPayload(_message).build());
    }
}

------------------------------------------------------------------------------
public interface GuankSink {

    String INPUT = "guankInput";

    /**
     * 输入信道的名称是 guankInput , 需要使用 Stream 绑定器在 yml 文件中进行绑定
     * @return
     */
    @Input(GuankSink.INPUT)
    SubscribableChannel guankInput();
}

--------------------------------------------------------------------------------
/**
 * @author qingtian
 * @version 1.0
 * @description: 使用自定义的输入信道 接收消息
 * @date 2022/5/4 9:17
 */
@Slf4j
@EnableBinding(GuankSink.class)
public class GuankReceiveService {

    /**
     * 使用自定义的输入信道接收消息
     * @param payload
     */
    @StreamListener(GuankSink.INPUT)
    public void receiveMessage(@Payload Object payload) {
        log.info("in guankReceiveService consume message start");
        GuankMessage guankMessage = JSON.parseObject(payload.toString(), GuankMessage.class);
        log.info("in guankReceiveService consume message success : [{}]",JSON.toJSONString(guankMessage));
    }
}

-------------------------------------------------------------------------------
    
//如何使用
@Slf4j
@RestController
@RequestMapping("/message")
public class MessageController {


    @Autowired
    private DefaultSendService defaultSendService;

    @Autowired
    private GuankSendService guankSendService;


    /**
     * 默认的信道
     */
    @GetMapping("/default")
    public void defaultSend() {
        defaultSendService.sendMessage(GuankMessage.defaultMessage());
    }

    /**
     * 自定义信道
     */
    @GetMapping("/guank")
    public void guankSend() {
        guankSendService.sendMessage(GuankMessage.defaultMessage());
    }
}

SpringCloud Stream消息分组和消费分区的配置和说明

  • SpringCloud Stream消费者组模型
  • 应用的不同实例放在一个消费者组中,每一条消息只会被一个实例消费
  • 消费者组的思想是通过多实例扩展服务吞吐量,且不会造成消息的重复消费

也就是说,在同一个消费者组中的消费者在对于信道中的一条消息时有且只有一个实例进行消费,通过这样来保证队列中的消息在分布式部署的情况下不会被重复消费。

image-20220504212718628

SpringCloud Stream 消费分区

  • 消费分区的作用就是为了确保具有相同特征标识的数据由同一个消费者实例进行处理

    image-20220504212851419

server:
  port: 8006
  servlet:
    context-path: /ecommerce-stream-client

spring:
  application:
    name: e-commerce-stream-client
  cloud:
    nacos:
      # 服务注册发现
      discovery:
        enabled: true # 如果不想使用 Nacos 进行服务注册和发现, 设置为 false 即可
        server-addr: 192.168.0.103:8848
        namespace: f2dfb4c7-a6cf-45d5-92e2-3912daf808d0
        metadata:
          management:
            context-path: ${server.servlet.context-path}/actuator
    # 消息驱动的配置
    stream:
      # SpringCloud Stream + Kafka
      kafka:
        binder:
          brokers: 192.168.0.103:9092
          auto-create-topics: true  # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
      # SpringCloud Stream + RocketMQ
      #      rocketmq:
      #        binder:
      #          name-server: 192.168.0.103:9876
      # 开启 stream 分区支持
      instanceCount: 1  # 消费者的总数
      instanceIndex: 0  # 当前消费者的索引
      bindings:
        # 默认发送方
        output:      # 这里用 Stream 给我们提供的默认 output 信道
          destination: ecommerce-stream-client-default    # 消息发往的目的地, Kafka 中就是 Topic
          content-type: text/plain    # 消息发送的格式, 接收端不用指定格式, 但是发送端要
          # 消息分区
          producer:
#            partitionKeyExpression: payload.author  # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
            partitionCount: 1   # 分区大小
            # 使用自定义的分区策略, 注释掉 partitionKeyExpression
            partitionKeyExtractorName: guankPartitionKeyExtractorStrategy
            partitionSelectorName: guankPartitionSelectorStrategy
        # 默认接收方
        input:      # 这里用 Stream 给我们提供的默认 input 信道
          destination: ecommerce-stream-client-default
          group: e-commerce-guank-default
          # 消费者开启分区支持
          consumer:
            partitioned: true

        # Qinyi 发送方
        guankOutput:
          destination: ecommerce-stream-client-guank
          content-type: text/plain
        # Qinyi 接收方
        guankInput:
          destination: ecommerce-stream-client-guank
          group: e-commerce-guank-guank

  # spring-kafka 的配置
  kafka:
    bootstrap-servers: 192.168.0.103:9092
    producer:
      retries: 3
    consumer:
      auto-offset-reset: latest
  sleuth:
    sampler:
      # ProbabilityBasedSampler 抽样策略
      probability: 1.0  # 采样比例, 1.0 表示 100%, 默认是 0.1
      # RateLimitingSampler 抽样策略
      rate: 100  # 每秒间隔接受的 trace 量
  zipkin:
    sender:
      type: kafka # 默认是 http
    base-url: http://192.168.0.103:9411/

# 暴露端点
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

编写对应的分区策略

/**
 * @author qingtian
 * @version 1.0
 * @description: 自定义从 message 中提取 PartitionKey 的策略
 * @date 2022/5/4 22:01
 */
@Slf4j
@Component
public class GuankPartitionKeyExtractorStrategy implements PartitionKeyExtractorStrategy {
    @Override
    public Object extractKey(Message<?> message) {
        GuankMessage guankMessage = JSON.parseObject(message.getPayload().toString(), GuankMessage.class);

        //自定义提取 key
        String key = guankMessage.getProjectName();
        log.info("SpringCloud Stream guank partition key : [{}]",key);
        return key;
    }
}
-------------------------------------------------------------------------------
/**
 * @author qingtian
 * @version 1.0
 * @description: 决定 payload 发送到哪个分区的策略
 * @date 2022/5/4 22:04
 */
@Slf4j
@Component
public class GuankPartitionSelectorStrategy implements PartitionSelectorStrategy {

    /**
     * 选择分区的策略
     * @param key
     * @param partitionCount
     * @return
     */
    @Override
    public int selectPartition(Object key, int partitionCount) {
        int partition = key.toString().hashCode() % partitionCount;
        log.info("SpringCloud Stream guank Selector info : [{}], [{}], [{}]",
                key.toString(), partitionCount, partition);
        return partition;
    }
}

0

评论区