Spring Boot 集成 Kafka Stream

Kafka 从0.10版本开始支持流处理,我们可以使用 Kafka Streams 来开发实时应用程序。本章介绍 Spring Boot 集成 Kafka Streams 进行流式计算。

Spring Boot 集成 Kafka 的基本配置和用法在”Spring Boot 集成 Kafka“有介绍,这里不再详述。

依赖

使用 Kafka Streams 流处理,在集成 Spring Kafka 的基础下,还需要引入:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

配置

  • 在 application.yml 配置
spring:
  kafka:
    streams:
      application-id: test-kafka-stream # 默认取springboot应用名
      bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
#      auto-startup: true
      properties:
        default:
          key:
            serde: org.apache.kafka.common.serialization.Serdes$StringSerde # 序列化key
          value:
            serde: org.springframework.kafka.support.serializer.JsonSerde # 序列化value
          timestamp:
            extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
        spring:
          json:
            trusted:
              packages: com.engrz.lab.* # 允许json反序列化的包

流处理相关配置:spring.kafka.streams.*

更多配置参考:Spring Boot Integration Properties

  • 在 Java 代码中配置(与 application.yml 配置二选一)
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    return new KafkaStreamsConfiguration(props);
}

值使用 JsonSerde 序列化,需要配置信任包,否则 Spring 会报出:If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

使用

  • 创建流

使用 @EnableKafkaStreams 注解装配

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
        @Bean
    public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, Object> stream = streamsBuilder.stream("streamTopic");
        stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
        return stream;
    }
}

可以指定多个topic,把接收的内容存到myTopic中

流计算

本章只讲 Spring Boot 集成,关于 Kafka Streams 流计算会放在 Kafka 专题介绍。以下给出一个应用场景示例:

  • 定义一个订单model类
/**
 * @author Engr-Z
 * @since 2021/1/29
 */
@Data
public class OrderModel implements Serializable {

    /**
     * 用户id
     */
    private Integer userId;

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 订单时间
     */
    private LocalDateTime orderTime;

    /**
     * 订单金额
     */
    private BigDecimal orderAmt;

    /**
     * 订单状态
     */
    private String orderStatus;

}
  • 找出交易小于1元的订单,发送到 orderTopic
@Bean
public KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic");
    stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
    stream.filter((k, v) -> {
        BigDecimal orderAmt = v.getOrderAmt();
        return orderAmt.compareTo(new BigDecimal(1)) < 0;
    }).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
}

通过实时计算,我们可以解决很多业务问题。如:实时数仓,实时风控等。


除非注明,否则均为"攻城狮·正"原创文章,转载请注明出处。

本文链接:https://engr-z.com/169.html

发表评论

邮箱地址不会被公开。