Spring Boot 集成 Kafka Stream

Kafka 从0.10版本开始支持流处理,我们可以使用 Kafka Streams 来开发实时应用程序。本章介绍 Spring Boot 集成 Kafka Streams 进行流式计算。

Spring Boot 集成 Kafka 的基本配置和用法在”Spring Boot 集成 Kafka“有介绍,这里不再详述。

依赖

使用 Kafka Streams 流处理,在集成 Spring Kafka 的基础下,还需要引入:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

配置

  • 在 application.yml 配置
spring:
  kafka:
    streams:
      application-id: test-kafka-stream # 默认取springboot应用名
      bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
#      auto-startup: true
      properties:
        default:
          key:
            serde: org.apache.kafka.common.serialization.Serdes$StringSerde # 序列化key
          value:
            serde: org.springframework.kafka.support.serializer.JsonSerde # 序列化value
          timestamp:
            extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
        spring:
          json:
            trusted:
              packages: com.engrz.lab.* # 允许json反序列化的包

流处理相关配置:spring.kafka.streams.*

更多配置参考:Spring Boot Integration Properties

  • 在 Java 代码中配置(与 application.yml 配置二选一)
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    return new KafkaStreamsConfiguration(props);
}

值使用 JsonSerde 序列化,需要配置信任包,否则 Spring 会报出:If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

使用

  • 创建流

使用 @EnableKafkaStreams 注解装配

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
        @Bean
    public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, Object> stream = streamsBuilder.stream("streamTopic");
        stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
        return stream;
    }
}

可以指定多个topic,把接收的内容存到myTopic中

流计算

本章只讲 Spring Boot 集成,关于 Kafka Streams 流计算会放在 Kafka 专题介绍。以下给出一个应用场景示例:

  • 定义一个订单model类
/**
 * @author Engr-Z
 * @since 2021/1/29
 */
@Data
public class OrderModel implements Serializable {

    /**
     * 用户id
     */
    private Integer userId;

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 订单时间
     */
    private LocalDateTime orderTime;

    /**
     * 订单金额
     */
    private BigDecimal orderAmt;

    /**
     * 订单状态
     */
    private String orderStatus;

}
  • 找出交易小于1元的订单,发送到 orderTopic
@Bean
public KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic");
    stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
    stream.filter((k, v) -> {
        BigDecimal orderAmt = v.getOrderAmt();
        return orderAmt.compareTo(new BigDecimal(1)) < 0;
    }).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
}

通过实时计算,我们可以解决很多业务问题。如:实时数仓,实时风控等。


已发布

分类

来自

标签:

评论

《 “Spring Boot 集成 Kafka Stream” 》 有 2 条评论

  1. 看了你的,集成方式,我感觉,我不会了,

    1. 有什么不会的,能正常集成就行。Spring框架一个功能往往有不同切入点,有的是写配置,有的用继承重写方法,理解不同方式优缺点,能符合需求完成工作才是最重要的。

回复 wangzhengzhen 取消回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注