有知道做网站的吗,自己做网站前期困难吗,大佛寺广州网站,网站开发项目章程示例前言
kafka生产者负责将数据发布到kafka集群的主题#xff1b;kafka生产者消息发送方式有两种#xff1a; 同步发送异步回调发送
流程 流程说明#xff1a;
Kafka Producer整体可看作是一个异步处理操作#xff1b;消息发送过程中涉及两个线程#xff1a;main线程和se…前言
kafka生产者负责将数据发布到kafka集群的主题kafka生产者消息发送方式有两种 同步发送异步回调发送
流程 流程说明
Kafka Producer整体可看作是一个异步处理操作消息发送过程中涉及两个线程main线程和sender线程main线程负责将消息发送至一个双端队列sender线程负责从双端队列取消息并发送至kafka broker
消息可靠性
producer的acks参数表示生产者生产消息时写入到副本的严格程度。决定了生产者的性能与可靠性。
0生产者发送过来的数据不等待broker确认直接发送下一条数据性能最高但可能存在丢数据 1生产者发送过来的数据等待Leader副本确认后发送下一条数据性能中等 -1all生产者发送过来的数据等待所有副本将数据同步后发送下一条数据性能最慢安全性最高 消息有序性
消息保序策略按key分区可以实现局部有序但这又可能会导致数据倾斜可根据实际情况选择。
示例
// 指定消息key,即倒数第二个参数,当有相同的两条消息先后存储同一个key,消费者可按顺序消费到RdKafka::ErrorCode errorCode m_producer-produce(m_topic, // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload, // 消息本身len, // 消息长度key, // 消息keyNULL);Main线程与Sender线程
Main线程
流程
创建消息
// librdkafka源码 rdkafka_msg.c/* Create message */
rkm rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len,key, keylen, msg_opaque, err, errnox, NULL, 0,rd_clock());
if (unlikely(!rkm)) {/* errno is already set by msg_new() */rd_kafka_set_last_error(err, errnox);return -1;
}选择分区
/* Partition the message */
err rd_kafka_msg_partitioner(rkt, rkm, 1);
if (likely(!err)) {rd_kafka_set_last_error(0, 0);return 0;
}调用拦截器
/* Interceptor: unroll failing messages by triggering on_ack.. */
rkm-rkm_err err;
rd_kafka_interceptors_on_acknowledgement(rkt-rkt_rk,rkm-rkm_rkmessage);Sender线程
参数说明
batch.size缓冲区一批数据最大值默认16k。适当增加该值可以提高吞吐量但是如果该值设置太大会导致数据传输延迟增加。linger.ms如果数据迟迟未达到batch.sizesender等待linger.time之后就会发送数据。单位ms默认值是0ms表示没有延迟。生产环境建议该值大小为5-100ms之间。acks见“消息可靠性”章节max.in.flight.requests.per.connection允许最多没有返回ack的次数默认为5开启幂等性要保证该值是 1-5的数字。retries当消息发送出现错误的时候系统会重发消息。retries表示重试次数。默认是int最大值2147483647。 如果设置了重试还想保证消息的有序性需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION1否则在重试此失败消息的时候其他的消息可能发送成功了。retry.backoff.ms两次重试之间的时间间隔默认是100ms。enable.idempotence是否开启幂等性默认true开启幂等性。
流程
达到batch.size大小或满足linger.ms时间发送消息消息发送至的kafka服务器后如果kafka没有应答默认每个broker节点队列最多缓存 5 个请求与“max.in.flight.requests.per.connection”参数有关如配置了“retries”、“ retry.backoff.ms”参数消息发送失败由kafka内部自动重试无需手动在回调函数中重试
同步和异步流程
同步流程
流程说明
通过produce方法将消息推送至双端队列通过flush方法等待发送结果如outq_len()大于0说明存在未发送成功的消息
代码示例
int KafkaProducer::PushMessage(const std::string str, const std::string key)
{int32_t len (int32_t)str.length();void *payload const_castvoid *(static_castconst void *(str.data()));// produce 方法生产和发送单条消息到 Broker// 如果不加时间戳内部会自动加上当前的时间戳RdKafka::ErrorCode errorCode m_producer-produce(m_topic, // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload, // 消息本身len, // 消息长度key, // 消息keyNULL);if (RdKafka::ERR_NO_ERROR ! errorCode) {// kafka 队列满等待 100 msif (RdKafka::ERR__QUEUE_FULL errorCode) {m_producer-poll(100);}return -1;}// 同步等待200msm_producer-flush(200);if(m_producer-outq_len() 0) // 用于调试{printf(Existed not send message.size:%d\n, m_producer-outq_len());return -1;}return 0;
}异步流程
流程说明
设置生产者投递报告回调设置生产者自定义分区策略回调消息发送
代码示例
设置生产者投递回调
// 生产者投递报告回调
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:void dr_cb(RdKafka::Message message){ if (message.err()) // 出错回调{// TODO} else // 正常回调{ // TODO}}
};// 设置生产者投递报告回调
m_dr_cb new ProducerDeliveryReportCb; // 创建投递报告回调
errCode m_config-set(dr_cb, m_dr_cb, errorStr); // 异步方式发送数据
if (RdKafka::Conf::CONF_OK ! errCode)
{printf(Conf set(dr_cb) failed, errorStr:%s, errorStr.c_str());break;
}设置生产者自定义分区策略回调
// 生产者自定义分区策略回调partitioner_cb
class HashPartitionerCb : public RdKafka::PartitionerCb
{
public:// brief 返回 topic 中使用 key 的分区msg_opaque 置 NULL// return 返回分区(0, partition_cnt)int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque) {// 用于自定义分区策略这里用 hash。例轮询方式p_id % partition_cntint32_t partition_id generate_hash(key-c_str(), key-size()) % partition_cnt;return partition_id;}private:// 自定义哈希函数 static inline unsigned int generate_hash(const char *str, size_t len) {unsigned int hash 5381;for (size_t i 0; i len; i)hash ((hash 5) hash) str[i];return hash;}
};// 设置生产者自定义分区策略回调
m_partitioner_cb new HashPartitionerCb; // 创建自定义分区投递回调
errCode m_topicConfig-set(partitioner_cb, m_partitioner_cb, errorStr);
if (RdKafka::Conf::CONF_OK ! errCode)
{printf(Conf set(partitioner_cb) failed, errorStr:%s, errorStr.c_str());break;
}消息发送
注意此处produce执行成功不代表消息发送成功需根据dr_cb消息回调结果判断消息是否发送成功。
int KafkaProducer::PushMessage(const std::string str, const std::string key)
{int32_t len (int32_t)str.length();void *payload const_castvoid *(static_castconst void *(str.data()));// produce 方法生产和发送单条消息到 Broker// 如果不加时间戳内部会自动加上当前的时间戳RdKafka::ErrorCode errorCode m_producer-produce(m_topic, // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload, // 消息本身len, // 消息长度key, // 消息keyNULL);// 轮询处理m_producer-poll(0);if (RdKafka::ERR_NO_ERROR ! errorCode) {// kafka 队列满等待 100 msif (RdKafka::ERR__QUEUE_FULL errorCode) {m_producer-poll(100);}return -1;}return 0;
}