当前位置: 首页 > news >正文

做运营必看的网站设计装修网站大全

做运营必看的网站,设计装修网站大全,百度网站的安全建设方案,深圳易百讯网站建设公司说明 在大量数据处理任务下的缓存与分发 这个算是来自顾同学的助攻1#xff0c;我有点java绝缘体的体质#xff0c;碰到和java相关的安装部署总会碰到点奇怪的问题#xff0c;不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素#xff0…说明 在大量数据处理任务下的缓存与分发 这个算是来自顾同学的助攻1我有点java绝缘体的体质碰到和java相关的安装部署总会碰到点奇怪的问题不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素可以简单认为kafka的速度是10万/秒级的。 本次文章的目的是 1 搭建一个平时工作中常用的队列服务2 方便自己或者其他同事再次搭建 内容 1 搭建过程 共要搭建两个服务zookeeper和kafka。 1.1 创建zookeeper 这个是基础服务必须要最先启动 docker run -d --name zookeeper -e \ ZOOKEEPER_CLIENT_PORT2181 -e \ ZOOKEEPER_TICK_TIME2000 -p 2181:2181 \ registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100通常来说这个服务启动后就不用管了但是偶尔如果需要debug的时候 docker exec -it zookeeper bash bin/zkCli.sh -server 127.0.0.1:2181 ls /brokers/ids1.2 创建持久化路径 这个会实际保存kafka的消息 mkdir -p /data/kafka-logs1.3 创建kafka 一种场景是只监听外网IP(WAN_IP)另一种场景是同时监听内外网(LAN_IP)。 只监听外网的比较简单 WAN_IP111 LAN_IP222 docker run -it --rm --name kafka \-p 24666:24666 \--link zookeeper:zk \-e HOST_IPlocalhost \-e KAFKA_BROKER_ID1 \-e KAFKA_ZOOKEEPER_CONNECTzk:2181 \-e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://${WAN_IP}:24666 \-e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:24666 \-e KAFKA_LOG_DIRS/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100 同时监听内外网的比较麻烦(且要求端口不同) WAN_IP111 LAN_IP222 docker run -d --name kafka \-p 24666:24666 \-p 9092:9092 \--link zookeeper:zk \-e HOST_IPlocalhost \-e KAFKA_BROKER_ID1 \-e KAFKA_ZOOKEEPER_CONNECTzk:2181 \-e KAFKA_ADVERTISED_LISTENERSINTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:24666 \-e KAFKA_LISTENERSINTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:24666 \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAPINTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAMEINTERNAL \-e KAFKA_LISTENER_NAMEEXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAMEINTERNAL \-e KAFKA_LOG_DIRS/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100配置解释 KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092 用于所有网络接口监听。 EXTERNAL://0.0.0.0:24666 用于所有网络接口监听。 KAFKA_ADVERTISED_LISTENERS: INTERNAL://IP:9092 用于内网客户端。 EXTERNAL://IP:24666 用于外网客户端。 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT 和 EXTERNAL:PLAINTEXT 映射了每个监听器名称和协议类型。 注释 • docker run -d --name kafka启动一个名为 kafka 的容器并在后台运行。 • -p 9092:9092将主机的 9092 端口映射到容器的 9092 端口这是 Kafka 的默认端口。 • --link zookeeper:zk将名为 zookeeper 的容器链接到当前容器并在当前容器中以 zk 作为别名进行访问。 • -e HOST_IPlocalhost设置环境变量 HOST_IP 为 localhost。 • -e KAFKA_BROKER_ID1设置 Kafka 的 broker ID 为 1。【如果有多个应该在这里区分】 • -e KAFKA_ZOOKEEPER_CONNECTzk:2181指定 Zookeeper 的连接地址。 • -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://xxx:9092设置 Kafka 的广告监听器地址。【这个是实际上Consumer一定会用的。】 • -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092设置 Kafka 的监听地址。 • -e KAFKA_LOG_DIRS/data/kafka-logs指定 Kafka 日志存储目录。 • -v /data/kafka-logs:/data/kafka-logs将主机的 /data/kafka-logs 目录挂载到容器的 /data/kafka-logs 目录以持久化存储 Kafka 日志。2 测试 2.1 生产者测试 from pydantic import BaseModel, field_validator import json import pandas as pd class KafkaJsonMsgList(BaseModel):json_list : listpropertydef msg_list(self):return pd.Series(self.json_list).apply(json.loads).to_list()from func_timeout import func_set_timeout,FunctionTimedOutimport json from confluent_kafka import Producer # func_set_timeout(60)def send_messages(bootstrap_servers None, topic None, messages None):发送消息到 Kafka 主题:param bootstrap_servers: Kafka 服务器地址:param topic: Kafka 主题:param messages: 要发送的消息列表# 创建 Producer 实例producer Producer(**{bootstrap.servers: bootstrap_servers,acks: 1 })for msg in messages:try:producer.produce(topic, msg)except BufferError:# 如果队列已满等待队列空出空间producer.poll(1)# 定期调用poll以确保消息传递producer.poll(0)# 确保所有消息都被发送producer.flush()msg_list [json.dumps({id:i ,value:aaa,aa:this is test}) for i in range(3)] topic my_test6 # 外网 ## bootstrap_servers WAN_IP:24666 # 内网 bootstrap_servers LAN_IP:9092send_messages(bootstrap_serversbootstrap_servers,topictopic,messages msg_list)2.2 消费者测试 from confluent_kafka import Consumer# 如果是非json的直接拿到就可以了 # func_set_timeout(60)def consume_messages(config None, topic None, max_messages 3):# Create Consumer instanceconsumer Consumer(config)# Subscribe to topicconsumer.subscribe([topic])consumed_count 0res_list []try:while consumed_count max_messages:msg consumer.poll(1.0)if msg is None:print(Empty Q)break else:res_list.append(msg.value().decode(utf-8))consumed_count 1if consumed_count max_messages:breakexcept KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()return res_list # 外网 config { # User-specific properties that you must set bootstrap.servers: WAN_IP:24666, group.id:group1, auto.offset.reset: earliest, enable.auto.commit: True } # 内网 config { # User-specific properties that you must set bootstrap.servers: LAN_IP:9092, group.id:group1, auto.offset.reset: earliest, enable.auto.commit: True } topic my_test6 import time tick1 time.time() max_messages 100 # 这里设置要消费的消息数量 json_list consume_messages(config, topic, max_messages) tick2 time.time() kj KafkaJsonMsgList(json_list json_list) msg_list kj.msg_list tick3 time.time()2.3 性能测试 发送端1.48秒发送10万条消息稍微弱了点不过考虑这个是一台仅仅4核8G且繁忙的机器那就还好(我默认的方式是需要json序列化的)。 tick1 time.time() msg_list_10w [json.dumps({id:i ,value:aaa,aa:this is test}) for i in range(100000)] topic my_test6 send_messages(bootstrap_serversbootstrap_servers,topictopic,messages msg_list_10w) tick2 time.time() print(takes %.2f to send 100000 % (tick2-tick1)) takes 1.48 to send 100000 接收端 python topic my_test6 import time tick1 time.time() max_messages 100000 # 这里设置要消费的消息数量 json_list consume_messages(config, topic, max_messages) tick2 time.time() kj KafkaJsonMsgList(json_list json_list) msg_list kj.msg_list tick3 time.time() print(tick2-tick1, get_time) print(tick3-tick2, parse-time)1.3391587734222412 get_time 0.24841904640197754 parse-time 总体上还是满意的可以了。
http://www.pierceye.com/news/472960/

相关文章:

  • 牡丹江市建设行业协会网站广西住房城乡建设厅网站首页
  • 重庆网站关键词排名优化免费网页代理的推荐
  • 定制型网站怎么做重庆软件开发公司有哪些
  • 自适应型网站建设网站建设搭建是什么意思
  • 网站建设能够不同地方网址大全12345
  • 做网批那个网站好校园网站界面建设
  • 免费网站建设php济南网站建设公司官网
  • 徐汇网站推广网络营销的四个特点
  • 简易做网站wordpress插件tag
  • 红酒 公司 网站建设青岛安装建设股份公司网站
  • 小米路由hd 做网站营销型网站策划 建设的考试题
  • 运河网站制作自主建站平台
  • 万网 网站建设合同最好的网站开发语言
  • 网站备案密码收不到典当 网站
  • 东莞网站建设推广服务网站建设开票单位
  • 贵港公司做网站东莞凤岗企业网站建设推广
  • 网站制作过程中碰到的问题微信怎么做链接推广产品
  • 做网站留后门是怎么回事视频网站开发需求分析
  • 关于做网站的了解点电子商务应用平台包括哪些
  • 垂直门户网站都有什么网站首页index.html
  • wordpress网站加载效果线上推销的方法
  • 网站都有什么语言杭州网络营销公司
  • 济南高新网站制作正规seo排名外包
  • 网站方案讲解技巧ppt的免费网站
  • 个人网站名称有哪些WordPress dux修改
  • 普法网站建设方案app制作开发公司怎么收费
  • 网站平台建设哪家公司好网站建设建站在线建站
  • 龙岗区住房和建设局在线网站网站如何做团购
  • 河南省建设监理协会网站证书查询wordpress 修改链接
  • 做网站业务员怎么样深圳福田最新新闻事件