当前位置: 首页 > news >正文

建设网站企业网上银行张家口认证助手app

建设网站企业网上银行,张家口认证助手app,威海seo,wordpress 特色文章目录 案例信息介绍后端异步处理请求和后端同步处理请求同步方式异步方式 环境文件目录配置.envrequirements.txt 完整代码ext.pyapp.pykafka_create_user.py 运行方式本地安装 kafka运行 app.py使用 postman 测试建立 http 长连接#xff0c;等待后端处理结果发送 RAW DAT… 文章目录 案例信息介绍后端异步处理请求和后端同步处理请求同步方式异步方式 环境文件目录配置.envrequirements.txt 完整代码ext.pyapp.pykafka_create_user.py 运行方式本地安装 kafka运行 app.py使用 postman 测试建立 http 长连接等待后端处理结果发送 RAW DATA 在看这个文章之前建议先学习 kafka的工作原理 这个系列视频讲得很好虽然基于 Java 但是理解原理并不用区分语言。只需要看懂工作原理即可。 案例信息介绍 假设我的网站需要高并发地处理 user 注册这个简单的功能。前端会发送 {user_id: xxxx, psw:xxx} 的信息到后端完成创建 前端用 postman 来模拟后端用 flask 框架来简单演示 下面我用一张大致的图来表示代码的架构 前端的原始数据进入后端之后后端要用 kafka 的架构在有序地处理 user 的请求在这个任务中所有 user 的请求都是 register因此我们就创建一个 kafka 的 topic 专门用来处理 user 的这类请求同时由于 kafka 是通过队列的方式 异步地处理 user 的请求所以当 kafka 处理完 user 的请求后我们需要找到这个处理结果并返回给对应的 user 如果大家对于 异步处理 user 请求和同步处理没有概念那么下面一章我先给大家讲一下同步处理请求和异步处理的区别 后端异步处理请求和后端同步处理请求 同步方式 file: app.py.pyTime : 2024/3/30Author : Peinuan qinimport threading import json from flask import Flask, jsonify, request from flask_cors import CORS from ext import FLASK_HOST, FLASK_PORTapp Flask(__name__) CORS(app)app.route(/login, methods[POST]) def create_user_post():data request.jsonregister user code ....return jsonify({status: 200, msg: success})if __name__ __main__:app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue)上述方式可以看到我的 create_user_post 负责接受前端的数据并且即刻处理处理之后将结果返回前端 jsonify({status: 200, msg: success})这个过程是一行接着一行发生的如果中途出现了很耗时的操作那么程序会一直等着。在 Flask 应用中如果 register user code .... 处理需要20秒这确实会阻塞处理该请求的线程直到该过程完成。由于 Flask 开发服务器默认是单线程的这意味着在这20秒内服务器将无法处理来自其他用户的任何其他请求。为了允许 Flask 同时处理多个请求你可以启用多线程模式。这可以通过在 app.run() 中设置 threadedTrue 来实现 app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue, threadedTrue)。这样Flask 将能够为每个请求启动一个新的线程从而允许同时处理多个请求。但这仍然并不是一种很好的方法因为整个服务器来看不具备扩展性。 假设我们服务器为每个 user 的请求开一个线程那么服务器资源是有限的当服务器宕机也并不能很快的恢复这就导致扩展性很差。 异步方式 import threading import json from flask import Flask, jsonify, request from flask_cors import CORS from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer from kafka_create_user import kafka_consumer_taskapp Flask(__name__) CORS(app)app.route(/login, methods[POST]) def create_user_post():data request.json# 发送数据到Kafkaproducer.produce(LOGIN_TOPIC, keystr(data[user_id]).encode(utf-8), valuejson.dumps(data).encode(utf-8))producer.flush()logger.info(send message to consumer)return jsonify({msg: 你好请求正在处理})我们先忽略其他的代码只看这一部分。这里相当于我们接受 user 的请求之后通过 kafka 把处理请求的需要转移到外部的服务器集群上去了。而 kafka 的特性在于非常高的可扩展性。增加 kafka 的节点就可以线性地将任务处理的数量提高。如果你看我上面给的那张图kafka 可以通过无限制增加 consumer 的数量来提高数据的处理能力。而后端的服务器需要做的就是把这些数据不断地派发出去这个步骤相比于直接在后端将所有的请求处理来说可以忽略不计。 环境文件目录 . ├── app.py ├── ext.py ├── kafka_create_user.py └── requirements.txt配置 .env 首先构建一个配置文件 .env 来存放基础的配置信息 FLASK_HOST0.0.0.0 FLASK_PORT9300 # LOGIN 这个 topic 是用来处理用户注册这个业务的 LOGIN_TOPICLOGIN# RESPONSE_TOPIC 则是用来构建 response 来返回前端成功或者失败 RESPONSE_TOPICRESPONSE_TOPICrequirements.txt kafka-python2.0.0 colorlog6.7.0 configparser5.3.0 flask2.3.2 flask_basicauth0.2.0 Flask-JWT-Extended4.6.0 Flask-Limiter3.5.1 Flask-PyMongo2.3.0 requests2.31.0 gunicorn21.0.0 pymongo4.6.0 pdfminer.six20231228 flask_cors4.0.0 python-dotenv orjson3.10.0 langchain langchain-community langchain_openai chromadbpython3.10 完整代码 ext.py file: ext.py.pyTime : 2024/3/30Author : Peinuan qin import json import logging import os import queue import threading import colorlog from confluent_kafka import Producer, Consumer, KafkaError from dotenv import load_dotenv from confluent_kafka.admin import AdminClient, NewTopic# 加载 .env 中的变量 load_dotenv()FLASK_HOST os.environ[FLASK_HOST] FLASK_PORT os.environ[FLASK_PORT] LOGIN_TOPIC os.environ[LOGIN_TOPIC] RESPONSE_TOPIC os.environ[RESPONSE_TOPIC]TOPICS [LOGIN_TOPIC, RESPONSE_TOPIC]def create_topic():# Kafka服务器配置admin_client AdminClient({bootstrap.servers: localhost:9092})# 创建新主题的配置topic_list [NewTopic(topic, num_partitions3, replication_factor1) for topic in TOPICS]# 注意: replication_factor 和 num_partitions 可能需要根据你的Kafka集群配置进行调整# 创建主题fs admin_client.create_topics(topic_list)# 处理结果for topic, f in fs.items():try:f.result() # The result itself is Nonelogger.info(fTopic {topic} created)except Exception as e:logger.error(fFailed to create topic {topic}: {e})# Handler for logging handler colorlog.StreamHandler() formatter colorlog.ColoredFormatter(%(log_color)s%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s,datefmt%Y-%m-%d %H:%M:%S,log_colors{DEBUG: cyan,INFO: green,WARNING: yellow,ERROR: red,CRITICAL: red,bg_white,} ) handler.setFormatter(formatter)# Logger logger colorlog.getLogger(__name__) logger.addHandler(handler) logger.setLevel(logging.INFO) 尝试创建 topiccreate_topic()# 初始化Kafka生产者 producer_config {bootstrap.servers: localhost:9092 } producer Producer(**producer_config) 定义专门用来回复 response 的 consumer class ResponseConsumer:专门用来将各种处理好的结果返回给 user 作为 response 也就是图中针对 RESPONSE TOPIC 的 consumerdef __init__(self):self.users_streams {}self.config {bootstrap.servers: localhost:9092,group.id: user-response, # 设置 groupid如果不知道为什么要设置 groupid 可以去先看 kafka 的讲解视频auto.offset.reset: earliest} # 告诉 Kafka 消费者在找不到初始偏移量offset或者偏移量无效时比如指定的偏移量已经被删除应该从哪里开始消费消息。它可以设置为 earliest 或 latest。设置为 earliest 意味着消费者将从主题的开始处开始读取数据即尽可能不漏掉任何消息设置为 latest 意味着消费者将从新产生的消息开始读取即只消费自启动之后产生的消息。self.consumer Consumer(**self.config)logger.info(Create Response Consumer)self.consumer.subscribe([RESPONSE_TOPIC])logger.info(Subscribe Response Topic)# 因为可能有多个线程一起操作 consumer所以通过 lock 来保证线程安全self.lock threading.Lock()def get_or_make(self, user_id):获取某个 user_id 的 response queue 如果当前 user_id 的 response queue 不存在就创建一个每个 user_id 的 response queue 中都是返回给前端 user 的信息也就是图中的 RESPONSE MSG:param user_id::return:with self.lock:# 如果当前 user_id 还没有 queue就构建一个q self.users_streams.get(user_id, queue.Queue())self.users_streams[user_id] qreturn qdef pop(self, user_id):with self.lock:self.users_streams.pop(user_id, None)def put(self, user_id, msg_dict):当 user_id 的请求处理完产生的 RESPONSE MSG 放到 user_id 的队列里面:param user_id: :param msg_dict: :return: q self.get_or_make(user_id)if q:with self.lock:self.users_streams[user_id].put(msg_dict)logger.info(fput {msg_dict} into {user_id}s queue)return Trueelse:return Falsedef listen_for_response(self):不断拉取 RESPONSE TOPIC 的 producer 生成的结果:return:try:while True:msg self.consumer.poll(timeout1.0) # 1秒超时if msg is None:continueif msg.error():if msg.error().code() KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())break如果拉取到了就放到对应的 user_id 的 queue 里面if msg:logger.info(freceived data: {msg})msg_data json.loads(msg.value().decode(utf-8))user_id msg.key().decode(utf-8)logger.info(fmsg_data: {msg_data})logger.info(fuser_id: {user_id})put_flag self.put(user_id, msg_data)if not put_flag:logger.error(fCreate RESPONSE MSG for {user_id} failed)else:logger.info(fcreate RESPONSE MSG response for {user_id})except Exception as e:self.consumer.close() app.py file: app.py.pyTime : 2024/3/30Author : Peinuan qinimport threading import json from flask import Flask, jsonify, request from flask_cors import CORS from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer from kafka_create_user import kafka_consumer_taskapp Flask(__name__) CORS(app)response_consumer ResponseConsumer()app.route(/login, methods[POST]) def create_user_post():data request.json# 发送数据到Kafkaproducer.produce(LOGIN_TOPIC, keystr(data[user_id]).encode(utf-8), valuejson.dumps(data).encode(utf-8))producer.flush()logger.info(send message to login consumer)return jsonify({msg: 你好请求正在处理})app.route(/stream) def stream():user_id request.args.get(user_id) # 假设用户ID通过查询参数传入logger.info(fuid: {user_id})logger.info(fuser_streams: {response_consumer.users_streams})def event_stream(user_id):# 这里需要一种机制来持续发送数据给特定用户的流q response_consumer.get_or_make(user_id)logger.info(f{user_id} s queue is: {q})while True:if not q.empty():message q.get()logger.info(fmessage: {message})yield fdata: {json.dumps(message)}\n\nreturn app.response_class(event_stream(user_id), content_typetext/event-stream)def run_multi_thread():consumer_thread threading.Thread(targetkafka_consumer_task)response_thread threading.Thread(targetresponse_consumer.listen_for_response, daemonTrue)logger.info(Start APP ...)consumer_thread.start()logger.info(Create User Consumer start ...)response_thread.start()logger.info(Response Consumer start ...)app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue, use_reloaderFalse)if __name__ __main__:run_multi_thread()kafka_create_user.py file: kafka_create_user.pyTime : 2024/3/30Author : Peinuan qin import json import os from queue import Queue import threading# 初始化全局消息队列 from confluent_kafka import Consumer, KafkaError from kafka import KafkaConsumer, KafkaProducer from dotenv import load_dotenv from ext import logger, LOGIN_TOPIC, RESPONSE_TOPIC, producerdef kafka_consumer_task():这里定义了 LOGIN TOPIC 的 consumer 的行为也就是对 user_id 传过来的 RAW DATA 如何处理:return: # Kafka配置config {bootstrap.servers: localhost:9092,group.id: user-login-group,auto.offset.reset: earliest}consumer Consumer(**config)consumer.subscribe([LOGIN_TOPIC])# 读取数据try:while True:msg consumer.poll(timeout1.0) # 1秒超时if msg is None:continueif msg.error():if msg.error().code() KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())breakif msg:data json.loads(msg.value().decode(utf-8))key msg.key().decode(utf-8)print(key:, key)为了观察我们将 user 传过来的数据保存到本地with open(f{key}.json, w) as f:json.dump(data, f, ensure_asciiFalse, indent4)logger.info(fsuccessfully saved the {key}.json)完成任务后通过 RESPONSE TOPIC 的 producer 生成 response并发送给 RESPONSE TOPIC 等待对应的 consumer 来取并且返回给前端producer.produce(RESPONSE_TOPIC, keymsg.key(), valuejson.dumps({msg: fsuccessfully create user {key}}).encode(utf-8))producer.flush()logger.info(send processed data to response consumer)except KeyboardInterrupt:passfinally:# 清理操作consumer.close()producer.flush()producer.close()强调一下 如果你也是基于 Flask 框架虽然这里的 debugTrue 可以保证每次更改代码后对代码进行重载方便你进行调试。但是关于内存中的一些变量会消失所以保证我上面的代码能够顺利运行我设置了 use_reloaderFalse 否则 response_consumer.users_streams 总是为空因为重载变量会造成混淆引发未知的程序错误。app.py 中的 stream 是以 SSE 的方式让服务器可以主动通知 user本质是 user 向服务器建立长连接然后 kafka 完成任务后通过这个端口将信息发送给 user 运行方式 本地安装 kafka 不知道如何安装请 参考 运行 app.py 直接用 pycharm 运行就可以 使用 postman 测试 建立 http 长连接等待后端处理结果 新建窗口建立 http 连接针对 stream 端口并且是 GET 方法注意选中 http 协议哦通过左上角的符号不要选择其他协议同时在 Params 下面的 key 和 value 输入你 user_id 的信息要和下面的 /login 的一致然后点击 send长连接就会成功建立了 发送 RAW DATA 打开另一个新的窗口输入你本地运行的地址和端口并且选择 post 方法选择 body 和 raw 选择 json 的格式并在文本框中键入 json 数据发送就会收到 阶段性的服务器回复 这代表后端已经通过 kafka 来异步处理数据 这个时候很快你应该可以看到在长连接的那个 postman 窗口里面出现 {msg: successfully create user peinuan}并且每次你在 /login send 一次这里就会成功获得一次结果前端获得成功的信息
http://www.pierceye.com/news/18335/

相关文章:

  • 金融理财管理网站源码 dedecms长春找工作哪个网站好
  • 合肥网站建设技术支持网页版哔哩哔哩
  • 工厂网站建设公司游戏后端开发需要学什么
  • php 网站 手机版ui设计那个培训班好
  • 清河网站建设设计费用房地产网
  • asp.net网站开发四酷全书建筑网站、
  • 莒县城阳网站建设好看的wordpress文章模板下载
  • 企业网站的建立的目的淮阴网站建设
  • 海口网站建设费用中国核工业华兴建设有限公司
  • 建站优化内容网页界面设计布局
  • 手机怎么网站建设网站对服务器要求
  • 网站建设 荆州自贡建设网站
  • mysql的网站开发北京海淀建设工程律师哪个好
  • 好网站在哪里深圳网站建设公司联系方式
  • 建网站空间专业设计服务网站
  • 网站开发与管理大作业行业论坛网站
  • wordpress添加悬浮按钮搜索关键词优化
  • 中国建设银行网站查询密码是什么意思网站pc转移动端代码
  • 网站收录什么意思按城市亭湖建设局网站
  • 南京做网站南京乐识最优一半都有哪些做影视外包的网站
  • 开源免费的网站程序装饰设计网站建设
  • 南宁本地有几家网站开发资讯文章减肥健康wordpress
  • 文化投资的微网站怎么做wordpress 3.8.3 下载
  • 菏泽建设局网站青海住房与城乡建设厅网站
  • 网站建设出错1004株洲网站优化找哪家
  • 百度只收录网站首页石家庄新钥匙网站建设
  • 鲸影视官方网站下载手机网站制作平台有哪些
  • cetos做网站视频网站怎么做
  • 购物商城网站开发公司黄骅贴吧新闻
  • 浏阳做网站公司百度网站建设多钱