wordpress建网站主页,深圳网站建设公司怎么做,福州招聘网站有哪几个,永泰建设工程网站目录 介绍关键特性应用场景核心概念部署方式kafka streams的处理模式 具体使用1、准备工作2、添加依赖3、代码实现3、测试 介绍
Kafka Streams是构建在Apache Kafka之上的客户端库#xff0c;用于构建高效、实时的流处理应用。它允许你以高吞吐量和低延迟的方式处理记录流用于构建高效、实时的流处理应用。它允许你以高吞吐量和低延迟的方式处理记录流并且可以容易地扩展和复制数据处理流程。这种流处理方式适用于从简单的数据转换到复杂的事件驱动的应用程序。
关键特性
**易用性**Kafka Streams提供了简洁的API允许开发者轻松构建复杂的流处理应用。这些API包括高级的DSLDomain Specific Language和低级的处理器API两者可以相互配合使用。**无需单独的处理集群**与其他流处理技术不同Kafka Streams应用是作为常规的Java应用运行的不需要维护一个专门的处理集群。你可以在你自己的应用中直接包含流处理逻辑这使得部署和维护变得更容易。**强大的状态处理能力**Kafka Streams支持状态化处理并允许容错、持久化的本地状态存储。这是通过管理和复制RocksDB实例来实现的为应用程序的状态提供了持久化和容错支持。**时间窗口处理**Kafka Streams支持多种类型的时间窗口操作如滑动窗口、跳跃窗口和会话窗口使得在处理时间敏感的数据流时非常有效。**流式表格双模型**Kafka Streams引入了一个流式表格双模型允许用户将流处理结果看作是一张动态更新的表。这个模型提供了一种理解流数据和转换流数据的直观方式。**可扩展和容错**由于Kafka Streams建立在Apache Kafka之上它继承了Kafka的可扩展性和高可用性。应用可通过增加实例来水平扩展故障转移由Kafka负责处理。
应用场景
Kafka Streams适用于多种实时数据处理场景包括
实时分析和监控对即时生成的数据进行聚合、过滤和分析。事件驱动的应用基于特定事件自动触发流程和操作。数据转换和清洗实时处理数据流并将结果输出到Kafka主题或其他存储系统中。个性化推荐根据用户行为实时更新推荐内容。Kafka Streams的设计目标是提供一种简单、强大且易于部署的流处理方式。通过利用Kafka本身的优点Kafka Streams可以帮助开发者更方便地构建和部署实时数据处理应用。
核心概念 DFP以数据为中心的流式出来的方式 Source Processor源头读的Processor Stream Processors进行流式处理的中间的Processors Sink Processors流中最后的一个Processors用于pull到本地或者另外一个新的Topic Topology多个Processors就构成了一个Topology的环形图 sub-Topologies获取数据分子的Topology Streams TaskStreams的最小单位真正处理数据的Streams ThreadStreams 处理数据的线程一般每个Streams Task会创建一个新的线程提高并行
部署方式
在一个服务里面起一个Instance实例这个实例里面创建两个线程一个线程处理两个Task对象这种方式先对并发最小 在一个服务里面启动两个Instance实例在每一个线程里面可以处理一个Task这样在处理上可以有效的提高并发避免一个实例出现问题有限其他的 起多个服务集群化部署去跑多个实例可以有效利用多核CPU的性能 kafka streams的处理模式
**Depth-First Processing深度优先处理模式**在处理拓扑中的节点时首先处理完一个节点的所有分支然后再处理下一个节点这种处理模式可以确保数据再处理过程中的一致性和正确性。避免数据混乱Breadth-First Processing广度优先处理与深度优先先反广度优先处理模式会优先处理一个节点的所以相邻节点然后再处理下一个节点。Time Windowing时间窗口处理按照时间窗口进行分组然后对每个窗口内的事件进行处理这种模式适用于需要对一段时间内的事件进行聚合处理或计算
具体使用
1、准备工作
默认已经安装kafka了啊如果还没通过我这篇文章去安装kafka安装
在使用的时候首先我们需要创建两个个topic
#进入kafka容器
docker exec -it kafka-server1 /bin/bash#创建主题topic-1
/opt/kafka/bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
#创建主题topic-2
/opt/kafka/bin/kafka-topics.sh --create --topic out-topic --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1
2、添加依赖
我用的kafka-streams是3.1.2的
gradle implementation(org.apache.kafka:kafka-streams)implementation(org.springframework.kafka:spring-kafka)mavne
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactId
/dependency3、代码实现
/*** 通过streams实现数据流处理把字符串装为大写*/
Slf4j
public class KafkaStreamsYellingApp {
// appidprivate final static String APPLICATION_ID yelling_app_id;private final static String INPUT_TOPIC input-topic;private final static String OUTPUT_TOPIC out-topic;private final static String BOOTSTRAP_SERVERS localhost:9092;public static void main(String[] args) throws InterruptedException {// 配置kafka stream属性连接Properties properties new Properties();properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);StreamsConfig streamsConfig new StreamsConfig(properties);
// 配置键值对的序列化/反序列化Serdes对象SerdeString stringSerde Serdes.String();
// 构建流处理拓扑用于输出StreamsBuilder builder new StreamsBuilder();// 数据源处理器从指定的topic中取出数据KStreamString, String inputStream builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//KStreamString, String upperStream inputStream.peek((key, value) - {log.info([收集]key:{},value:{}, key, value);}).filter((key, value) - value.length() 5).mapValues(time - time.toUpperCase()).peek((key, value) - log.info([过滤结束]key:{},value:{}, key, value));
// 日志打印upperStream处理器的数据upperStream.print(Printed.toSysOut());
// 把upperStream处理器的数据输出到指定的topic中upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));KafkaStreams kafkaStreams new KafkaStreams(builder.build(), streamsConfig);
// jvm关闭时把流也关闭CountDownLatch downLatch new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread(() - {kafkaStreams.close();downLatch.countDown();log.info(关闭流处理);}));kafkaStreams.start();log.info(启动执行);}
}上面代码的重点具体步骤
创建Source Processor源去topic中读取消息 KStreamString, String inputStream builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));创建Stream Processors中间处理的流 KStreamString, String upperStream inputStream.peek((key, value) - {log.info([收集]key:{},value:{}, key, value);}).filter((key, value) - value.length() 5).mapValues(time - time.toUpperCase()).peek((key, value) - log.info([过滤结束]key:{},value:{}, key, value));创建Sink Processor流中最后的一个Processors用于pull到本地或者另外一个新的Topic upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
具体的含义后面会详细编写一篇这里先介绍简单使用
3、测试 进入生产者topic看发的最后三条 进入消费topic 日志输出