Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。 Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。本章介绍 Spring Boot 集成 Kafka 收发消息。
依赖
Spring 有专门的项目支持 Kafka ,引入依赖包时需要注意版本兼容问题,以下是 Spring for Apache Kafka 版本兼容列表:

对照以上列表,选择自己 Spring Kafka 版本,在 pom.xml 中引入:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
指定 Scala 版本解决 Jackson/Scala 兼容问题:
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>{version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>{version}</version>
<scope>test</scope>
</dependency>
配置
- Spring Kafka 通用配置
spring:
kafka:
bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # kafka连接接地址
# client-id: # 发送请求时传给服务器的id
通用配置:spring.kafka.*
admin、producer、consumer、streams配置会覆盖通用配置 spring.kafka.* 中相同的属性
- 生产者
spring:
kafka:
producer:
bootstrap-servers: localhost:9092 # 会覆盖 spring.kafka.bootstrap-servers 配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 序列化key的类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 反序列化value的类
生产者相关配置:spring.kafka.producer.*
- 消费者
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092 # 会覆盖 spring.kafka.bootstrap-servers 配置
group-id: kafka-test # 消费者所属消息组
key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类
消费者相关配置:spring.kafka.consumer.*
默认 value-serializer 使用 org.apache.kafka.common.serialization.StringSerializer ,只支持文本消息。自定义 org.springframework.kafka.support.serializer.JsonSerializer 可以让消息支持其他类型。生产者与消费者需同时设置,注意不是同一个类。(StringSerializer/StringDeserializer,JsonSerializer/JsonDeserializer)。
更多配置参考:Spring Boot Integration Properties
使用
在 Spring Boot 启动入口加入 @EnableKafka
注解启用
@SpringBootApplication
@EnableKafka
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
- 发送消息
Spring Kafka 提供 KafkaTemplate 类发送消息,在 Spring Boot 中配置好 Kafka 就可以在任何地方使用它
@Autowired
KafkaTemplate kafkaTemplate;
public void send() {
// 发送消息
kafkaTemplate.send(topic, msg);
}
指定 topic ,msg 为消息内容,也可以是对象。
- 接收消息
接收消息使用 @KafkaListener
注解
@KafkaListener(topics = "topic", groupId = "testGroup", topicPartitions = {})
public void processMessage(String content) {
log.info("收到消息 -> " + content);
}
参数说明:
topics
与发送消息topic相同,可以指定多个
groupId
消费组唯一id
topicPartitions
topic分区,可指定多个
关于 KafkaTemplate 更多方法参考:KafkaTemplate
Topic 分区
了解 Kafka 应该都知道, Kafka 每个主题可以有多个分区,每个分区只能有一个消费者,保证消息消费是有序的。在 Spring Kafka 中可以创建 Topic 并指定分区数。
@Bean("myTopic")
public NewTopic myTopic() {
return TopicBuilder.name("myTopic")
.partitions(10)
.compact()
.build();
}
myTopic主题中创建10个分区
- 自定义分区发送
我们可以定义规则,把消息发送到指定的分区,需实现 Partitioner 接口
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partitionNum = 0;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (StringUtils.isEmpty(key) || numPartitions <= 1) {
return partitionNum;
} else {
partitionNum = Integer.parseInt(key.toString().substring(0, 1));
partitionNum = partitionNum >= numPartitions ? 0 : partitionNum;
}
log.info("partitionNum -> {}", partitionNum);
return partitionNum;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
根据发送的key,取第一位数,做为指定的分区编号
在 application.yaml 中增加配置
spring:
kafka:
producer:
properties:
partitioner:
class: com.engrz.lab.springboot.kafka.CustomPartitioner # 指定分区规则
以下是我测试时全部配置
spring:
application:
name: spring-boot-lab-kafka
kafka:
bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # kafka连接接地址
# client-id: # 发送请求时传给服务器的id
producer:
# bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 序列化key的类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 序列化value的类
properties:
partitioner:
class: com.engrz.lab.springboot.kafka.CustomPartitioner # 指定分区规则
spring:
json:
value:
default:
type:
trusted:
packages: com.engrz.lab.* # 允许json反序列化的包
add:
type:
headers: false
consumer:
# bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
group-id: kafka-test # 消费者所属消息组
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类
- 也可以用java代码配置生产者
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// kafka连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 分区规则
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
关于 kafka 其他属性参考 Kafka Producer Configs
- 发送消息
// kafkaTemplate.send(topic, key, msg);
kafkaTemplate.send("myTopic", "1", "hellow world");
kafkaTemplate.send("myTopic", "2", "hellow world");
kafkaTemplate.send("myTopic", "3", "hellow world");
因为在自定义分区配置,我把 key 的第一个字符做为指定分区num。指定key就可以把消息发到指定分区。可以根据自己的策略去修改规则。
Kafka 消息支持更高性能的流处理。关于 Kafka Streams 将在新的章节讲解。
《“Spring Boot 集成 Kafka”》 有 2 条评论
[…] Spring Boot 集成 Kafka 的基本配置和用法在Spring Boot 集成 Kafka有介绍,这里不再详述。 […]
[…] 除非注明,否则均为”攻城狮·正“原创文章,转载请注明出处。 本文链接:https://engr-z.com/150.html […]