网站首页 > 技术文章 正文
Kafka 从0.10版本开始支持流处理,我们可以使用 Kafka Streams 来开发实时应用程序。本章介绍 Spring Boot 集成 Kafka Streams 进行流式计算。
Spring Boot 集成 Kafka 的基本配置和用法在“Spring Boot 集成 Kafka”有介绍,这里不再详述。
依赖
使用 Kafka Streams 流处理,在集成 Spring Kafka 的基础下,还需要引入:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
配置
- 在 application.yml 配置
spring:
kafka:
streams:
application-id: test-kafka-stream # 默认取springboot应用名
bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
# auto-startup: true
properties:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde # 序列化key
value:
serde: org.springframework.kafka.support.serializer.JsonSerde # 序列化value
timestamp:
extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
spring:
json:
trusted:
packages: com.engrz.lab.* # 允许json反序列化的包
流处理相关配置:spring.kafka.streams.*
更多配置参考:Spring Boot Integration Properties
- 在 Java 代码中配置(与 application.yml 配置二选一)
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
值使用 JsonSerde 序列化,需要配置信任包,否则 Spring 会报出:If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
使用
- 创建流
使用 @EnableKafkaStreams 注解装配
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Bean
public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Object> stream = streamsBuilder.stream("streamTopic");
stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
return stream;
}
}
可以指定多个topic,把接收的内容存到myTopic中
流计算
本章只讲 Spring Boot 集成,关于 Kafka Streams 流计算会放在 Kafka 专题介绍。以下给出一个应用场景示例:
- 定义一个订单model类
/**
* @author Engr-Z
* @since 2021/1/29
*/
@Data
public class OrderModel implements Serializable {
/**
* 用户id
*/
private Integer userId;
/**
* 订单号
*/
private String orderNo;
/**
* 订单时间
*/
private LocalDateTime orderTime;
/**
* 订单金额
*/
private BigDecimal orderAmt;
/**
* 订单状态
*/
private String orderStatus;
}
- 找出交易小于1元的订单,发送到 orderTopic
@Bean
public KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) {
KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic");
stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
stream.filter((k, v) -> {
BigDecimal orderAmt = v.getOrderAmt();
return orderAmt.compareTo(new BigDecimal(1)) < 0;
}).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
}
通过实时计算,我们可以解决很多业务问题。如:实时数仓,实时风控等。
除非注明,否则均为"攻城狮·正"原创文章,转载请注明出处。
本文链接:https://engr-z.com/169.html
猜你喜欢
- 2024-12-12 Kafka监控与指标解析-UnderReplicatedPartitions
- 2024-12-12 聊聊 Kafka: Consumer 源码解析之 ConsumerNetworkClient
- 2024-12-12 为什么Kafka依赖ZooKeeper?
- 2024-12-12 一款Kafka可视化Web界面管理工具:CMAK
- 2024-12-12 MongoDB 数据同步kafka
- 2024-12-12 kafka快速入门到精通
- 2024-12-12 SpringBoot集成Kafka+Kafka优化问题
- 2024-12-12 kafka consumer 配置详解
- 2024-12-12 kafka生产者配置详解
- 2024-12-12 Kafka两种集群详解和搭建教程
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- c语言min函数头文件 (77)
- asynccallback (87)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 无效的列索引 (74)