广告网站建设报价,重庆网站推广哪家好,企业网络搭建技能大赛,国内html5网站建设导语
Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案#xff0c;支持多租户、低延时、读写分离、跨地域复制#xff08;GEO replication#xff09;、快速扩容、灵活容错等特性。同时为了达到高性能#xff0c;低延时、高可用#xff0c;Pulsar 在客户端也…导语
Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案支持多租户、低延时、读写分离、跨地域复制GEO replication、快速扩容、灵活容错等特性。同时为了达到高性能低延时、高可用Pulsar 在客户端也做了很多的优化本文主要讲述 PulsarClient 基本原理和实现。
PulsarClient 简介
Pulsar 客户端 API 设计优雅简洁使用 PulsarClient 作为客户端的总入口方便用户记忆和构建出具体的客户端例如 Producer: 生产者用来发送消息到指定 Topic。 Consumer: 消费者通过订阅关联到指定 Topic 并接收消息。 Reader: 手动管理 Cursors 的消费者。(内部使用 Consumer 实现)。
PulsarClient 还统一管理客户端系统资源为具体的客户端提供了部分通用化处理包括连接管理、线程管理、内存管理等。接下来让我们了解一下 PulsarClient 是如何实现的。
PulsarClient 有哪些功能
作为客户端的统一入口下面代码片段不难看出 PulsarClient 主要功能是构建、销毁 PulsarClient 实例以及构建各种具体 Client 和事务实例。
public interface PulsarClient extends Closeable {ProducerBuilderbyte[] newProducer();T ProducerBuilderT newProducer(SchemaT schema);ConsumerBuilderbyte[] newConsumer();T ConsumerBuilderT newConsumer(SchemaT schema);ReaderBuilderbyte[] newReader();T ReaderBuilderT newReader(SchemaT schema);void updateServiceUrl(String serviceUrl) throws PulsarClientException;CompletableFutureListString getPartitionsForTopic(String topic);CompletableFutureVoid closeAsync();void shutdown() throws PulsarClientException;boolean isClosed();TransactionBuilder newTransaction() throws PulsarClientException;
}
实现原理
初始化过程
PulsarClient 可以使用以下代码来实例化。
PulsarClient client PulsarClient.builder().serviceUrl(pulsar://broker:6650).build();PulsarClient 以及具体客户端都使用 Builder 模式构建每种客户端都有对应的 ConfigurationData 来管理配置PulsarClient 核心配置如下:
public class ClientConfigurationData implements Serializable, Cloneable {private String serviceUrl;// 用来在运行时外部改变urlprivate transient ServiceUrlProvider serviceUrlProvider;private long operationTimeoutMs 30000;private long statsIntervalSeconds 60;private int numIoThreads 1;private int numListenerThreads 1;private int connectionsPerBroker 1;private boolean useTcpNoDelay true;private int concurrentLookupRequest 5000;private int maxLookupRequest 50000;private int maxLookupRedirects 20;private int maxNumberOfRejectedRequestPerConnection 50;private int keepAliveIntervalSeconds 30;private int connectionTimeoutMs 10000;private int requestTimeoutMs 60000;private long initialBackoffIntervalNanos TimeUnit.MILLISECONDS.toNanos(100);private long maxBackoffIntervalNanos TimeUnit.SECONDS.toNanos(60);private boolean enableBusyWait false;private String listenerName;// 全局内存限制(producer使用)private long memoryLimitBytes 0;private String proxyServiceUrl;private ProxyProtocol proxyProtocol;long tickDuration 1;// transactionprivate boolean enableTransaction false;
}PulsarClient 的初始化过程比较简单逐个初始化内部模块以下代码片段展示了 Client 内部主要的模块。
public class PulsarClientImpl implements PulsarClient {// 配置protected final ClientConfigurationData conf;// 本地元数据管理器主要负责topic分区个数、topic对应的owner节点以及schema信息private LookupService lookup;// 共享连接池 双层map结构private final ConnectionPool cnxPool;// 时间轮private final Timer timer;// 执行外部逻辑线程组(主要消费使用)private final ExecutorProvider externalExecutorProvider;// 执行内部逻辑线程组(主要消费使用)private final ExecutorProvider internalExecutorService;private final AtomicReferenceState state new AtomicReference();//producer集合private final SetProducerBase? producers;//consumer集合private final SetConsumerBase? consumers;//producer自增Idprivate final AtomicLong producerIdGenerator new AtomicLong();//consumer自增Idprivate final AtomicLong consumerIdGenerator new AtomicLong();// 请求自增Idprivate final AtomicLong requestIdGenerator new AtomicLong();// netty 线程组protected final EventLoopGroup eventLoopGroup;// 生产本地buffer内存限制器private final MemoryLimitController memoryLimitController;...
}
PulsarClient 初始化时主要创建了 Netty 客户端连接池、时间轮等对象只是准备好资源并没有和服务端建立连接进行任何交互。只有在创建具体的客户端时才会和服务端有交互。
Producer 创建
Pulsar 是以 Topic 粒度对外提供服务多分区 Topic 等同于多个不同数字后缀的 Topic 集合。下文提到的 Topic-Partition 包含了单分区 Topic 和多分区 Topic 中的一个 Partition。Pulsar 客户端的实现 Topic-Partition 之间是相互独立的SDK 内部会为每个 Topic-Partition 单独创建一个具体的客户端。我们在这里只介绍 Producer 的初始化流程(Consumer 类似)。
可以用以下代码构建 Producer。
Producerbyte[] producer client.newProducer().topic(my-topic).create();当 My-topic 为 Non-partitioned Topic会实例化一个 ProducerImpl 对象并返回当 My-topic 分区数量大于0时则会创建 PartitionedProducerImpl 对象。PartitionedProducerImpl 对象内包含了 List。可以理解为 PulsarClient 创建 Producer 时最终会创建和分区数量一致的 ProducerImpl 对象每个 ProducerImpl 都独立工作互不影响(Consumer 类似)。
在创建 Producer 时客户端与服务端命令字交互如下 PulsarClient 通过用户指定的 ServiceUrl 挑选一个 url 来连接服务端并做认证相关操作。 使用 LookupService 发送 PARTITIONED_METADATA 命令字查询给定 Topic 的分区数。 根据 Metadata 返回结果中的分区数循环创建 ProducerImpl 对象。 3.1 ProducerImpl 对象初始化时会使用 LookupService 发送 LOOKUP 请求查询对应的分区的 Owner 节点 Lookup 过程可参考https://km.woa.com/articles/show/555638。 3.2 根据 LOOKUP 响应连接到 Owner 节点并发送 PRODUCER 请求向服务端创建 Producer。 到这里 Producer 就已经创建完毕可以正式使用来发送消息了。
ps: 如果创建好 Producer 后分区数量有变化了比如服务端扩容了客户端可以感知到并增加 ProducerImpl 对象数量吗。答案是可以的默认会定时1分钟发起一次检测有分区变化会做相应处理。
连接管理
与大部分组件一样客户端和服务端使用长连接通信。Pulsar 协议设计上不是传统的应答模式可以同时支持多个客户端使用同一个连接并行发送接收请求(服务端会串行处理单个 Topic-partition 上的请求来保证消息顺序性)。
得益于连接共享客户端消耗的连接数是很少的PulsarClient 会为每台 Broker 创建一个连接池,默认连接数为1 用户可以使用 ConnectionsPerBroker 配置来设置每台 Broker 最大连接数。ProducerImpl、ConsumerImpl 在初始化时会随机从连接池中获取一个连接用来和服务端通信。
下图中 maxConnectionsPerHosts2, 连接池中为每个 Broker 创建2个连接6个客户端会在对应 T opic owner 节点里随机挑选一个连接绑定。 连接健康管理
Pulsar keepAlive 检测是双向的连接创建成功后客户端和服务端都会定时30s(KeepAliveIntervalSeconds 配置可修改)发送 Ping 请求到对端接收到 Ping 请求后会回应 Pong 来标识存活。在以下几种情况下客户端、服务端都会主动断开连接: 超时时间内没有完成握手动作。 发送 Ping 或者 Pong 命令时Netty 回调发送失败。 连接 isAutoRead 打开并且超时时间内没有收到任何请求(包含 Ping、Pong)。
连接断开后会通知绑定在该连接上的所有客户端这些客户端会重新从连接池中获取健康的连接。Pulsar 中空闲连接不会自动回收。
线程模型
PulsarClient 使用 Netty 作为网络通信框架, 是标准的 Netty 客户端。协议处理和事件驱动都是依托于 Netty。核心处理类直接继承于 Netty Handler。 所以线程模型也主要围绕于 Netty 的 EventLoopGroup。上文提到客户端资源管理都收敛于 PulsarClient也就是使用同一个 PulsarClient 创建出来的具体客户端都共享该 PulsarClient 中的线程等资源比如使用 ClientA 对象分别创建一个或多个 Producer、Consuemer、Reader 客户端这些客户端都共享 Client 中的线程资源。
PulsarClient 线程、线程组如下 图中实线表示客户端会从线程池中挑选一个线程绑定运行。 Pulsar-client-io: io 线程( Netty 内部线程)负责网络连接和读写。NumIoThreads 参数配置默认值为1。客户端不直接绑定 IO 线程而是由其内部的连接来绑定 IO 线程所以 IO 线程数配置最好小于或者等于总连接数否则有些线程不会使用到。 Pulsar-client-internal: 主要用于 Consumer 内部处理比如接收到消息后放置到接收队列等。也是通过 NumIoThreads 参数配置默认值为1。 Pulsar-external-listener: 主要用于 Consumer 外部处理比如用户消费逻辑回调。NumListenerThreads 参数配置默认值为1。 Pulsar-timer: 时间轮内部线程负责所有定时操作比如连接重连发送超时检测等。一个 PulsarClient 对应一个线程。
简单描述一下生产消费时线程是如何交互 生产: 用户线程创建消息并放置到本地缓存IO 线程负责把消息发送到服务端。 消费: IO 线程接收到服务端的消息推送使用 Pulsar-client-internal 线程把消息放在本地缓存队列然后使用 Pulsar-external-listener 线程执行用户消息处理逻辑。
总结和思考
本文介绍了 Pulsar 整体客户端架构讲解了 PulsarClient、Producer 初始化过程以及客户端的连接管理和线程模型。并没有涉及到详细的生产消费过程。大家不难发现 Pulsar 客户端和其他组件客户端相比较大的区别就是会给每个 Topic-partition 创建 Producer/consumer。如果客户端关联的 Topic-partition 数量很大Producer/consumer 数量会急剧膨胀从而导致客户端需要消耗更多的资源。也正是因为 Producer/consumer 数量可能较大连接和线程等资源不可能做到独立只能是 Producer/consumer 共享。而资源共享就不可避免出现客户端之间会相互影响比如限流是控制在连接维度但是由于连接是共享的某些 Topic 的限流就会影响到该连接上的全部客户端。建议用户客户端关联的 Topic-partition 数量较大时可以适当调大连接池和线程池大小来缓解影响或者使用不同的 PulsarClient 来做客户端隔离。