Spring Boot 集成 Kafka

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。 Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。本章介绍 Spring Boot 集成 Kafka 收发消息。

依赖

Spring 有专门的项目支持 Kafka ,引入依赖包时需要注意版本兼容问题,以下是 Spring for Apache Kafka 版本兼容列表:

对照以上列表,选择自己 Spring Kafka 版本,在 pom.xml 中引入:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

指定 Scala 版本解决 Jackson/Scala 兼容问题:

<dependency>
   <groupId>org.scala-lang</groupId>
   <artifactId>scala-library</artifactId>
   <version>{version}</version>
   <scope>test</scope>
</dependency>

<dependency>
   <groupId>org.scala-lang</groupId>
   <artifactId>scala-reflect</artifactId>
   <version>{version}</version>
   <scope>test</scope>
</dependency>

配置

  • Spring Kafka 通用配置
spring:
  kafka:
    bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # kafka连接接地址
#    client-id: # 发送请求时传给服务器的id

通用配置:spring.kafka.*
admin、producer、consumer、streams配置会覆盖通用配置 spring.kafka.* 中相同的属性

  • 生产者
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092 # 会覆盖 spring.kafka.bootstrap-servers 配置
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 序列化key的类
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 反序列化value的类

生产者相关配置:spring.kafka.producer.*

  • 消费者
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092 # 会覆盖 spring.kafka.bootstrap-servers 配置
      group-id: kafka-test # 消费者所属消息组
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类

消费者相关配置:spring.kafka.consumer.*

默认 value-serializer 使用 org.apache.kafka.common.serialization.StringSerializer ,只支持文本消息。自定义 org.springframework.kafka.support.serializer.JsonSerializer 可以让消息支持其他类型。生产者与消费者需同时设置,注意不是同一个类。(StringSerializer/StringDeserializer,JsonSerializer/JsonDeserializer)。

更多配置参考:Spring Boot Integration Properties

使用

在 Spring Boot 启动入口加入 @EnableKafka 注解启用

@SpringBootApplication
@EnableKafka
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}
  • 发送消息

Spring Kafka 提供 KafkaTemplate 类发送消息,在 Spring Boot 中配置好 Kafka 就可以在任何地方使用它

@Autowired
KafkaTemplate kafkaTemplate;

public void send() {
    // 发送消息
    kafkaTemplate.send(topic, msg);
}

指定 topic ,msg 为消息内容,也可以是对象。

  • 接收消息

接收消息使用 @KafkaListener 注解

@KafkaListener(topics = "topic", groupId = "testGroup", topicPartitions = {})
public void processMessage(String content) {
    log.info("收到消息 -> " + content);
}

参数说明:
topics
与发送消息topic相同,可以指定多个
groupId
消费组唯一id
topicPartitions
topic分区,可指定多个

关于 KafkaTemplate 更多方法参考:KafkaTemplate

Topic 分区

了解 Kafka 应该都知道, Kafka 每个主题可以有多个分区,每个分区只能有一个消费者,保证消息消费是有序的。在 Spring Kafka 中可以创建 Topic 并指定分区数。

@Bean("myTopic")
public NewTopic myTopic() {
    return TopicBuilder.name("myTopic")
            .partitions(10)
            .compact()
            .build();
}

myTopic主题中创建10个分区

  • 自定义分区发送

我们可以定义规则,把消息发送到指定的分区,需实现 Partitioner 接口

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        int partitionNum = 0;
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (StringUtils.isEmpty(key) || numPartitions <= 1) {
            return partitionNum;
        } else {
            partitionNum = Integer.parseInt(key.toString().substring(0, 1));
            partitionNum = partitionNum >= numPartitions ? 0 : partitionNum;
        }
        log.info("partitionNum -> {}", partitionNum);
        return partitionNum;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

根据发送的key,取第一位数,做为指定的分区编号

在 application.yaml 中增加配置

spring:
  kafka:
    producer:
      properties:
        partitioner:
          class: com.engrz.lab.springboot.kafka.CustomPartitioner # 指定分区规则

以下是我测试时全部配置

spring:
  application:
    name: spring-boot-lab-kafka
  kafka:
    bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # kafka连接接地址
#    client-id: # 发送请求时传给服务器的id
    producer:
#      bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 序列化key的类
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 序列化value的类
      properties:
        partitioner:
          class: com.engrz.lab.springboot.kafka.CustomPartitioner # 指定分区规则
        spring:
          json:
            value:
              default:
                type:
            trusted:
              packages: com.engrz.lab.* # 允许json反序列化的包
            add:
              type:
                headers: false
    consumer:
#      bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
      group-id: kafka-test # 消费者所属消息组
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类
  • 也可以用java代码配置生产者
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}


@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // kafka连接地址
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    // key序列化
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // value序列化
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    // 分区规则
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
}

关于 kafka 其他属性参考 Kafka Producer Configs

  • 发送消息
// kafkaTemplate.send(topic, key, msg);
kafkaTemplate.send("myTopic", "1", "hellow world");
kafkaTemplate.send("myTopic", "2", "hellow world");
kafkaTemplate.send("myTopic", "3", "hellow world");

因为在自定义分区配置,我把 key 的第一个字符做为指定分区num。指定key就可以把消息发到指定分区。可以根据自己的策略去修改规则。

Kafka 消息支持更高性能的流处理。关于 Kafka Streams 将在新的章节讲解。


除非注明,否则均为"攻城狮·正"原创文章,请注明出处。

本文链接:https://engr-z.com/150.html

1条评论

发表评论

您的电子邮箱地址不会被公开。