当前位置: 首页 > news >正文

有知道做网站的吗自己做网站前期困难吗

有知道做网站的吗,自己做网站前期困难吗,大佛寺广州网站,网站开发项目章程示例前言 kafka生产者负责将数据发布到kafka集群的主题#xff1b;kafka生产者消息发送方式有两种#xff1a; 同步发送异步回调发送 流程 流程说明#xff1a; Kafka Producer整体可看作是一个异步处理操作#xff1b;消息发送过程中涉及两个线程#xff1a;main线程和se…前言 kafka生产者负责将数据发布到kafka集群的主题kafka生产者消息发送方式有两种 同步发送异步回调发送 流程 流程说明 Kafka Producer整体可看作是一个异步处理操作消息发送过程中涉及两个线程main线程和sender线程main线程负责将消息发送至一个双端队列sender线程负责从双端队列取消息并发送至kafka broker 消息可靠性 producer的acks参数表示生产者生产消息时写入到副本的严格程度。决定了生产者的性能与可靠性。 0生产者发送过来的数据不等待broker确认直接发送下一条数据性能最高但可能存在丢数据 1生产者发送过来的数据等待Leader副本确认后发送下一条数据性能中等 -1all生产者发送过来的数据等待所有副本将数据同步后发送下一条数据性能最慢安全性最高 消息有序性 消息保序策略按key分区可以实现局部有序但这又可能会导致数据倾斜可根据实际情况选择。 示例 // 指定消息key,即倒数第二个参数,当有相同的两条消息先后存储同一个key,消费者可按顺序消费到RdKafka::ErrorCode errorCode m_producer-produce(m_topic, // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload, // 消息本身len, // 消息长度key, // 消息keyNULL);Main线程与Sender线程 Main线程 流程 创建消息 // librdkafka源码 rdkafka_msg.c/* Create message */ rkm rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len,key, keylen, msg_opaque, err, errnox, NULL, 0,rd_clock()); if (unlikely(!rkm)) {/* errno is already set by msg_new() */rd_kafka_set_last_error(err, errnox);return -1; }选择分区 /* Partition the message */ err rd_kafka_msg_partitioner(rkt, rkm, 1); if (likely(!err)) {rd_kafka_set_last_error(0, 0);return 0; }调用拦截器 /* Interceptor: unroll failing messages by triggering on_ack.. */ rkm-rkm_err err; rd_kafka_interceptors_on_acknowledgement(rkt-rkt_rk,rkm-rkm_rkmessage);Sender线程 参数说明 batch.size缓冲区一批数据最大值默认16k。适当增加该值可以提高吞吐量但是如果该值设置太大会导致数据传输延迟增加。linger.ms如果数据迟迟未达到batch.sizesender等待linger.time之后就会发送数据。单位ms默认值是0ms表示没有延迟。生产环境建议该值大小为5-100ms之间。acks见“消息可靠性”章节max.in.flight.requests.per.connection允许最多没有返回ack的次数默认为5开启幂等性要保证该值是 1-5的数字。retries当消息发送出现错误的时候系统会重发消息。retries表示重试次数。默认是int最大值2147483647。 如果设置了重试还想保证消息的有序性需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION1否则在重试此失败消息的时候其他的消息可能发送成功了。retry.backoff.ms两次重试之间的时间间隔默认是100ms。enable.idempotence是否开启幂等性默认true开启幂等性。 流程 达到batch.size大小或满足linger.ms时间发送消息消息发送至的kafka服务器后如果kafka没有应答默认每个broker节点队列最多缓存 5 个请求与“max.in.flight.requests.per.connection”参数有关如配置了“retries”、“ retry.backoff.ms”参数消息发送失败由kafka内部自动重试无需手动在回调函数中重试 同步和异步流程 同步流程 流程说明 通过produce方法将消息推送至双端队列通过flush方法等待发送结果如outq_len()大于0说明存在未发送成功的消息 代码示例 int KafkaProducer::PushMessage(const std::string str, const std::string key) {int32_t len (int32_t)str.length();void *payload const_castvoid *(static_castconst void *(str.data()));// produce 方法生产和发送单条消息到 Broker// 如果不加时间戳内部会自动加上当前的时间戳RdKafka::ErrorCode errorCode m_producer-produce(m_topic, // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload, // 消息本身len, // 消息长度key, // 消息keyNULL);if (RdKafka::ERR_NO_ERROR ! errorCode) {// kafka 队列满等待 100 msif (RdKafka::ERR__QUEUE_FULL errorCode) {m_producer-poll(100);}return -1;}// 同步等待200msm_producer-flush(200);if(m_producer-outq_len() 0) // 用于调试{printf(Existed not send message.size:%d\n, m_producer-outq_len());return -1;}return 0; }异步流程 流程说明 设置生产者投递报告回调设置生产者自定义分区策略回调消息发送 代码示例 设置生产者投递回调 // 生产者投递报告回调 class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb { public:void dr_cb(RdKafka::Message message){ if (message.err()) // 出错回调{// TODO} else // 正常回调{ // TODO}} };// 设置生产者投递报告回调 m_dr_cb new ProducerDeliveryReportCb; // 创建投递报告回调 errCode m_config-set(dr_cb, m_dr_cb, errorStr); // 异步方式发送数据 if (RdKafka::Conf::CONF_OK ! errCode) {printf(Conf set(dr_cb) failed, errorStr:%s, errorStr.c_str());break; }设置生产者自定义分区策略回调 // 生产者自定义分区策略回调partitioner_cb class HashPartitionerCb : public RdKafka::PartitionerCb { public:// brief 返回 topic 中使用 key 的分区msg_opaque 置 NULL// return 返回分区(0, partition_cnt)int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque) {// 用于自定义分区策略这里用 hash。例轮询方式p_id % partition_cntint32_t partition_id generate_hash(key-c_str(), key-size()) % partition_cnt;return partition_id;}private:// 自定义哈希函数 static inline unsigned int generate_hash(const char *str, size_t len) {unsigned int hash 5381;for (size_t i 0; i len; i)hash ((hash 5) hash) str[i];return hash;} };// 设置生产者自定义分区策略回调 m_partitioner_cb new HashPartitionerCb; // 创建自定义分区投递回调 errCode m_topicConfig-set(partitioner_cb, m_partitioner_cb, errorStr); if (RdKafka::Conf::CONF_OK ! errCode) {printf(Conf set(partitioner_cb) failed, errorStr:%s, errorStr.c_str());break; }消息发送 注意此处produce执行成功不代表消息发送成功需根据dr_cb消息回调结果判断消息是否发送成功。 int KafkaProducer::PushMessage(const std::string str, const std::string key) {int32_t len (int32_t)str.length();void *payload const_castvoid *(static_castconst void *(str.data()));// produce 方法生产和发送单条消息到 Broker// 如果不加时间戳内部会自动加上当前的时间戳RdKafka::ErrorCode errorCode m_producer-produce(m_topic, // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload, // 消息本身len, // 消息长度key, // 消息keyNULL);// 轮询处理m_producer-poll(0);if (RdKafka::ERR_NO_ERROR ! errorCode) {// kafka 队列满等待 100 msif (RdKafka::ERR__QUEUE_FULL errorCode) {m_producer-poll(100);}return -1;}return 0; }
http://www.pierceye.com/news/643369/

相关文章:

  • 重庆专业网站设计服务做染料的网站
  • 长春模板建站公司浙江住房和建设厅网站
  • 网站建设公司 佛山南京移动网站建设
  • 网站建设目录规范微信h5网站开发
  • 做ppt卖给网站枣庄做网站优化
  • 新乡营销型网站建设做软件的中介网站
  • 延边州建设局网站软件公司主要做哪些
  • 建设网站带后台管理程序制作软件
  • 榆林市住房和城市建设局网站梁志天设计公司项目
  • 建设网站怎么搞做非法网站判刑多少年
  • 做查询网站 发布数据wordpress nextapp
  • 福鼎建设局网站首页上海社区网站建设
  • 企业网站免费推广方案wordpress文章类模板
  • 从化区住房和建设局网站网站开发所需要的的环境
  • 深圳微商城网站制作联系电话国家信息网
  • 网站没有收录怎么办巴中城乡和住房建设厅网站
  • 做个网站要钱吗wordpress动漫网站模板
  • 高性能网站建设进阶指南下载wdcp 快速迁移网站
  • 建设教育协会网站房产资讯的网站怎么做
  • 网站网页怎么做如何查看网站做没做竞价
  • 济南建网站的网站l临沂建设工程信息网站
  • 网站建设美词原创php网站开发实验总结
  • 遵义建设厅网站如何申请个人网站域名
  • 济南建设网官方网站合肥市建设行政主管部门网站
  • 书怎么做pdf下载网站信息流优化师需要具备哪些能力
  • 专业制作公司网站公司公积金网站建设方案
  • 专门做产品定制的网站自豪得用wordpress删
  • 佳木斯做网站网站空间试用
  • 南京建站平台wordpress 主题 自适应
  • 广东建设职业注册中心网站wordpress 500一片空白