出口外贸网站建设,会展类网站模板,帝国网站管理系统前台,益阳seo网站建设企业微信机器人是一种可以在企业微信工作群中执行特定任务的自动化工具。它具备丰富的功能#xff0c;可以帮助企业提高团队协作效率#xff0c;简化工作流程#xff0c;并为员工提供更好的工作体验。
获取企业 ID 信息
企业信息页面链接地址#xff1a;https://work.wei…企业微信机器人是一种可以在企业微信工作群中执行特定任务的自动化工具。它具备丰富的功能可以帮助企业提高团队协作效率简化工作流程并为员工提供更好的工作体验。
获取企业 ID 信息
企业信息页面链接地址https://work.weixin.qq.com/wework_admin/frame#profile 创建企业微信机器人
后台应用管理页面链接地址https://work.weixin.qq.com/wework_admin/frame#apps
点击创建应用 填写配置机器人应用信息 机器人功能配置 配置密钥 Secret
获取机器人应用 agentId 和 Secret 信息 发送 Secret 到企业微信查看详细 Secret 创建 config.ini 配置文件配置企业微信机器人应用的配置信息
corpId【企业 ID】
corpSecret【应用密钥】获取 access_token
API 开发文档https://developer.work.weixin.qq.com/resource/devtool
获取 access_token 是调用企业微信API接口的第一步相当于创建了一个登录凭证其它的业务 API 接口都需要依赖于 access_token 来鉴权调用者身份。
因此开发者在使用业务接口前要明确 access_token 的颁发来源使用 正确的access_token。
请求方式 GETHTTPS
请求地址 https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpidIDcorpsecretSECRET权限说明
每个应用有独立的 secret获取到的 access_token 只能本应用使用所以每个应用的 access_token 应该分开来获取。 注意事项
开发者需要缓存access_token用于后续接口的调用注意不能频繁调用gettoken接口否则会受到频率拦截。当access_token失效或过期时需要重新获取。
access_token的有效期通过返回的expires_in来传达正常情况下为7200秒2小时有效期内重复获取返回相同结果过期后获取会返回新的access_token。 由于企业微信每个应用的access_token是彼此独立的所以进行缓存时需要区分应用来进行存储。 access_token至少保留512字节的存储空间。 企业微信可能会出于运营需要提前使access_token失效开发者应实现access_token失效时重新获取的逻辑。
返回结果 实现代码展示
import os
import requests # 从环境变量中读取 corpid 和 corpsecret
corpid os.environ.get(corpid)
corpsecret os.environ.get(corpsecret) # 发送 GET 请求获取 access_token
url fhttps://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid{corpid}corpsecret{corpsecret}
response requests.get(url) # 检查响应状态
if response.status_code 200: data response.json() if data.get(errcode) 0: # 将 access_token 写入文件 with open(access_token.txt, w) as file: file.write(data[access_token]) else: # 输出错误信息 print(data)
else: # 输出请求失败信息 print(fRequest failed with status code {response.status_code})接收消息服务器配置
为了能够让自建应用和企业微信进行双向通信企业可以在应用的管理后台开启接收消息模式。开启接收消息模式的企业需要提供可用的接收消息服务器 URL建议使用 https。
开启接收消息模式后用户在应用里发送的消息会推送给企业后台。还可配置地理位置上报等事件消息当事件触发时企业微信会把相应的数据推送到企业的后台。企业后台接收到消息后可在回复该消息请求的响应包里带上新消息企业微信会将该被动回复消息推送给用户。
配置消息服务器配置 企业微信在推送消息给企业时会对消息内容做 AES 加密以 XML 格式 POST 到企业应用的 URL 上。企业在被动响应时也需要对数据加密以 XML 格式返回给企业微信。
配置说明展示
属性说明URL企业后台接收企业微信推送请求的访问协议和地址 支持 http 或 https 协议为了提高安全性建议使用httpsToken企业任意填写用于生成签名EncodingAESKey用于消息体的加密
加解密方案说明
文档地址https://developer.work.weixin.qq.com/document/path/90968
术语说明
msg_signature 消息签名用于验证请求是否来自企业微信防止攻击者伪造。
EncodingAESKey用于消息体的加密长度固定为43个字符从a-z, A-Z, 0-9共62个字符中选取是AESKey的Base64编码。解码后即为32字节长的AESKey
AESKeyBase64_Decode(EncodingAESKey “”)AESKeyAES 算法的密钥长度为 32 字节。AES 采用 CBC 模式数据采用 PKCS#7 填充至 32 字节的倍数IV 初始向量大小为 16 字节取 AESKey 前 16 字节详见http://tools.ietf.org/html/rfc2315
msg为消息体明文格式为XML
msg_encrypt明文消息msg加密处理后的Base64编码。
初始化加解密类
WXBizMsgCrypt wxcpt(sToken,sEncodingAESKey,sReceiveId);要求传参数sToken,sEncodingAESKey,sReceiveId。
sToken,sEncodingAESKey即设置接收消息的参数章节所述配置的Token、EncodingAESKey。
验证 URL 函数
① 签名校验 ② 解密数据包得到明文消息内容。
int VerifyURL(const string sMsgSignature, const string sTimeStamp, const string sNonce, const string sEchoStr, string sReplyEchoStr);解密函数
① 签名校验 ② 解密数据包得到明文消息结构体。
int DecryptMsg(const string sMsgSignature, const string sTimeStamp, const string sNonce, const string sPostData, string sMsg);加密函数
① 加密明文消息结构体 ② 生成签名 ③ 构造被动响应包
int EncryptMsg(const string sReplyMsg, const string sTimeStamp, const string sNonce, string sEncryptMsg);项目结构目录 配置文件 config.ini
[wechat]
AgentId 企业微信应用ID
Secret 企业微信应用Secret
CorpID 企业微信ID
Token 企业微信应用Token
EncodingAESKey 企业微信应用EncodingAESKey[dev]
debug falserequirements.txt
blinker1.6.3
certifi2023.7.22
charset-normalizer3.3.1
click8.1.7
colorama0.4.6
configobj5.0.8
Flask3.0.0
idna3.4
itsdangerous2.1.2
Jinja23.1.2
lxml4.9.3
MarkupSafe2.1.3
Naked0.1.32
pycryptodomex3.19.0
PyYAML6.0.1
requests2.31.0
shellescape3.8.1
six1.16.0
urllib32.0.7
Werkzeug3.0.0加解密算法算法库
鉴于加解密算法相对复杂企业微信提供了算法库。
./callback/ierror.py
WXBizMsgCrypt_OK 0
WXBizMsgCrypt_ValidateSignature_Error -40001
WXBizMsgCrypt_ParseXml_Error -40002
WXBizMsgCrypt_ComputeSignature_Error -40003
WXBizMsgCrypt_IllegalAesKey -40004
WXBizMsgCrypt_ValidateCorpid_Error -40005
WXBizMsgCrypt_EncryptAES_Error -40006
WXBizMsgCrypt_DecryptAES_Error -40007
WXBizMsgCrypt_IllegalBuffer -40008
WXBizMsgCrypt_EncodeBase64_Error -40009
WXBizMsgCrypt_DecodeBase64_Error -40010
WXBizMsgCrypt_GenReturnXml_Error -40011./callback/WXBizMsgCrypt3.py
import logging
import base64
import random
import hashlib
import time
import struct
from Cryptodome.Cipher import AES
import xml.etree.cElementTree as ET
import socketfrom callback import ierror
关于Crypto.Cipher模块ImportError: No module named Crypto解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后按照README中的“Installation”小节的提示进行pycrypto安装。
class FormatException(Exception):passdef throw_exception(message, exception_classFormatException):my define raise exception functionraise exception_class(message)class SHA1:计算企业微信的消息签名接口def getSHA1(self, token, timestamp, nonce, encrypt):用SHA1算法生成安全签名param token: 票据param timestamp: 时间戳param encrypt: 密文param nonce: 随机字符串return: 安全签名try:sortlist [token, timestamp, nonce, encrypt]sortlist.sort()sha hashlib.sha1()sha.update(.join(sortlist).encode())return ierror.WXBizMsgCrypt_OK, sha.hexdigest()except Exception as e:logger logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_ComputeSignature_Error, Noneclass XMLParse:提供提取消息格式中的密文及生成回复消息格式的接口# xml消息模板AES_TEXT_RESPONSE_TEMPLATE xml
Encrypt![CDATA[%(msg_encrypt)s]]/Encrypt
MsgSignature![CDATA[%(msg_signaturet)s]]/MsgSignature
TimeStamp%(timestamp)s/TimeStamp
Nonce![CDATA[%(nonce)s]]/Nonce
/xmldef extract(self, xmltext):提取出xml数据包中的加密消息param xmltext: 待提取的xml字符串return: 提取出的加密消息字符串try:xml_tree ET.fromstring(xmltext)encrypt xml_tree.find(Encrypt)return ierror.WXBizMsgCrypt_OK, encrypt.textexcept Exception as e:logger logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_ParseXml_Error, Nonedef generate(self, encrypt, signature, timestamp, nonce):生成xml消息param encrypt: 加密后的消息密文param signature: 安全签名param timestamp: 时间戳param nonce: 随机字符串return: 生成的xml字符串resp_dict {msg_encrypt: encrypt,msg_signaturet: signature,timestamp: timestamp,nonce: nonce,}resp_xml self.AES_TEXT_RESPONSE_TEMPLATE % resp_dictreturn resp_xmlclass PKCS7Encoder():提供基于PKCS7算法的加解密接口block_size 32def encode(self, text): 对需要加密的明文进行填充补位param text: 需要进行填充补位操作的明文return: 补齐明文字符串text_length len(text)# 计算需要填充的位数amount_to_pad self.block_size - (text_length % self.block_size)if amount_to_pad 0:amount_to_pad self.block_size# 获得补位所用的字符pad chr(amount_to_pad)return text (pad * amount_to_pad).encode()def decode(self, decrypted):删除解密后明文的补位字符param decrypted: 解密后的明文return: 删除补位字符后的明文pad ord(decrypted[-1])if pad 1 or pad 32:pad 0return decrypted[:-pad]class Prpcrypt(object):提供接收和推送给企业微信消息的加解密接口def __init__(self, key):# self.key base64.b64decode(key)self.key key# 设置加解密模式为AES的CBC模式self.mode AES.MODE_CBCdef encrypt(self, text, receiveid):对明文进行加密param text: 需要加密的明文return: 加密得到的字符串# 16位随机字符串添加到明文开头text text.encode()text self.get_random_str() struct.pack(I, socket.htonl(len(text))) text receiveid.encode()# 使用自定义的填充方式对明文进行补位填充pkcs7 PKCS7Encoder()text pkcs7.encode(text)# 加密cryptor AES.new(self.key, self.mode, self.key[:16])try:ciphertext cryptor.encrypt(text)# 使用BASE64对加密后的字符串进行编码return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)except Exception as e:logger logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_EncryptAES_Error, Nonedef decrypt(self, text, receiveid):对解密后的明文进行补位删除param text: 密文return: 删除填充补位后的明文try:cryptor AES.new(self.key, self.mode, self.key[:16])# 使用BASE64对密文进行解码然后AES-CBC解密plain_text cryptor.decrypt(base64.b64decode(text))except Exception as e:logger logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_DecryptAES_Error, Nonetry:pad plain_text[-1]# 去掉补位字符串# pkcs7 PKCS7Encoder()# plain_text pkcs7.encode(plain_text)# 去除16位随机字符串content plain_text[16:-pad]xml_len socket.ntohl(struct.unpack(I, content[: 4])[0])xml_content content[4: xml_len 4]from_receiveid content[xml_len 4:]except Exception as e:logger logging.getLogger()logger.error(e)return ierror.WXBizMsgCrypt_IllegalBuffer, Noneif from_receiveid.decode(utf8) ! receiveid:return ierror.WXBizMsgCrypt_ValidateCorpid_Error, Nonereturn 0, xml_contentdef get_random_str(self): 随机生成16位字符串return: 16位字符串return str(random.randint(1000000000000000, 9999999999999999)).encode()class WXBizMsgCrypt(object):# 构造函数def __init__(self, sToken, sEncodingAESKey, sReceiveId):try:self.key base64.b64decode(sEncodingAESKey )assert len(self.key) 32except:throw_exception([error]: EncodingAESKey unvalid !, FormatException)# return ierror.py.WXBizMsgCrypt_IllegalAesKey,Noneself.m_sToken sTokenself.m_sReceiveId sReceiveId# 验证URL# param sMsgSignature: 签名串对应URL参数的msg_signature# param sTimeStamp: 时间戳对应URL参数的timestamp# param sNonce: 随机串对应URL参数的nonce# param sEchoStr: 随机串对应URL参数的echostr# param sReplyEchoStr: 解密之后的echostr当return返回0时有效# return成功0失败返回对应的错误码def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):sha1 SHA1()ret, signature sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)if ret ! 0:return ret, Noneif not signature sMsgSignature:return ierror.WXBizMsgCrypt_ValidateSignature_Error, Nonepc Prpcrypt(self.key)ret, sReplyEchoStr pc.decrypt(sEchoStr, self.m_sReceiveId)return ret, sReplyEchoStrdef EncryptMsg(self, sReplyMsg, sNonce, timestampNone):# 将企业回复用户的消息加密打包# param sReplyMsg: 企业号待回复用户的消息xml格式的字符串# param sTimeStamp: 时间戳可以自己生成也可以用URL参数的timestamp,如为None则自动用当前时间# param sNonce: 随机串可以自己生成也可以用URL参数的nonce# sEncryptMsg: 加密后的可以直接回复用户的密文包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,# return成功0sEncryptMsg,失败返回对应的错误码Nonepc Prpcrypt(self.key)ret, encrypt pc.encrypt(sReplyMsg, self.m_sReceiveId)encrypt encrypt.decode(utf8)if ret ! 0:return ret, Noneif timestamp is None:timestamp str(int(time.time()))# 生成安全签名sha1 SHA1()ret, signature sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)if ret ! 0:return ret, NonexmlParse XMLParse()return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):# 检验消息的真实性并且获取解密后的明文# param sMsgSignature: 签名串对应URL参数的msg_signature# param sTimeStamp: 时间戳对应URL参数的timestamp# param sNonce: 随机串对应URL参数的nonce# param sPostData: 密文对应POST请求的数据# xml_content: 解密后的原文当return返回0时有效# return: 成功0失败返回对应的错误码# 验证安全签名xmlParse XMLParse()ret, encrypt xmlParse.extract(sPostData)if ret ! 0:return ret, Nonesha1 SHA1()ret, signature sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)if ret ! 0:return ret, Noneif not signature sMsgSignature:return ierror.WXBizMsgCrypt_ValidateSignature_Error, Nonepc Prpcrypt(self.key)ret, xml_content pc.decrypt(encrypt, self.m_sReceiveId)return ret, xml_content原理介绍可略
from WXBizMsgCrypt import WXBizMsgCrypt
import xml.etree.cElementTree as ET
import sysif __name__ __main__:# 假设企业在企业微信后台上设置的参数如下sToken hJqcu3uJ9Tn2gXPmxx2w9kkCkCE2EPYosEncodingAESKey 6qkdMrq68nTKduznJYO1A37W2oEgpkMUvkttRToqhUtsCorpID ww1436e0e65a779aee------------使用示例一验证回调URL---------------*企业开启回调模式时企业号会向验证url发送一个get请求 假设点击验证时企业收到类似请求* GET /cgi-bin/wxpush?msg_signature5c45ff5e21c57e6ad56bac8758b79b1d9ac89fd3timestamp1409659589nonce263014780echostrP9nAzCzyDtyTWESHep1vC5X9xho%2FqYX3Zpb4yKa9SKld1DsH3Iyt3tP3zNdtp%2B4RPcs8TgAE7OaBO%2BFZXvnaqQ%3D%3D * HTTP/1.1 Host: qy.weixin.qq.com接收到该请求时企业应 1.解析出Get请求的参数包括消息体签名(msg_signature)时间戳(timestamp)随机数字串(nonce)以及企业微信推送过来的随机加密字符串(echostr),这一步注意作URL解码。2.验证消息体签名的正确性 3. 解密出echostr原文将原文当作Get请求的response返回给企业微信第23步可以用企业微信提供的库函数VerifyURL来实现。wxcpt WXBizMsgCrypt(sToken, sEncodingAESKey, sCorpID)# sVerifyMsgSigHttpUtils.ParseUrl(msg_signature)# ret wxcpt.VerifyAESKey()# print retsVerifyMsgSig 012bc692d0a58dd4b10f8dfe5c4ac00ae211ebeb# sVerifyTimeStampHttpUtils.ParseUrl(timestamp)sVerifyTimeStamp 1476416373# sVerifyNonceHttpUitls.ParseUrl(nonce)sVerifyNonce 47744683# sVerifyEchoStrHttpUtils.ParseUrl(echostr)sVerifyEchoStr fsi1xnbH4yQh0PJxcOdhhK6TDXkjMyhEPA7xB2TGz6bg7xyAbEkRxN/3cNXW9qdqjnoVzEtpbhnFyq6SVHyAret, sEchoStr wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)if (ret ! 0):printERR: VerifyURL ret: str(ret)sys.exit(1)# 验证URL成功将sEchoStr返回给企业号# HttpUtils.SetResponse(sEchoStr)------------使用示例二对用户回复的消息解密---------------用户回复消息或者点击事件响应时企业会收到回调消息此消息是经过企业微信加密之后的密文以post形式发送给企业密文格式请参考官方文档假设企业收到企业微信的回调消息如下POST /cgi-bin/wxpush? msg_signature477715d11cdb4164915debcba66cb864d751f3e6timestamp1409659813nonce1372623149 HTTP/1.1Host: qy.weixin.qq.comContent-Length: 613xml ToUserName![CDATA[wx5823bf96d3bd56c7]]/ToUserNameEncrypt![CDATA[RypEvHKD8QQKFhvQ6QleEB4J58tiPdvortK1I9qca6aM/wvqnLSV5zEPeusUiX5L5X/0lWfrf0QADHHhGd3QczcdCUpj911L3vg3W/sYYvuJTs3TUUkSUXxaccAS0qhxchrRYt66wiSpGLYL42aM6A8dTT6k4aSknmPj48kzJs8qLjvd4Xgpue06DOdnLxAUHzM6kDZHMZfJYuRLtwGc2hgf5gsijff0ekUNXZiqATP7PF5mZxZ3Izoun1s4zG4LUMnvw2rKqCKIw3IQH03vBCA9nMELNqbSf6tiWSrXJB3LAVGUcallcrw8V2t9EL4EhzJWrQUax5wLVMNS0rUPA3k22Ncx4XXZS9o0MBH27Bo6BpNelZpS/uh9KsNlY6bHCmJU9p8g7m3fVKn28H3KDYA5Pl/T8Z1ptDAVe0lXdQ2YoyyH2uyPIGHBZZIs2pDBS8R07qNE7Q]]/EncryptAgentID![CDATA[218]]/AgentID/xml企业收到post请求之后应该 1.解析出url上的参数包括消息体签名(msg_signature)时间戳(timestamp)以及随机数字串(nonce)2.验证消息体签名的正确性。 3.将post请求的数据进行xml解析并将Encrypt标签的内容进行解密解密出来的明文即是用户回复消息的明文明文格式请参考官方文档第23步可以用企业微信提供的库函数DecryptMsg来实现。# sReqMsgSig HttpUtils.ParseUrl(msg_signature)sReqMsgSig 0c3914025cb4b4d68103f6bfc8db550f79dcf48esReqTimeStamp 1476422779sReqNonce 1597212914sReqData xmlToUserName![CDATA[ww1436e0e65a779aee]]/ToUserName\nEncrypt![CDATA[Kl7kjoSf6DMD1zh7rtrHjFaDapSCkaOnwu3bqLc5tAybhhMl9pFeK8NslNPVdMwmBQTNoW4mY7AIjeLvEl3NyeTkAgGzBhzTtRLNshw2AEewkkYcDFq72Kt00fT0WnN87hGrW8SqGcNcT3mu87Ha3dz1pSDi6GaUA6A0sqfde0VJPQbZ9U3JWcoD4Z5jaU0y9GSh010wsHF8KZD24YhmZH4ch4Ka7ilEbjbfvhKkNL65HHL0J6EYJIZUC2pFrdkJ7MhmEbU2qARR4iQHE7wy24qy0cRX3Mfp6iELcDNfSsPGjUQVDGxQDCWjayJOpcwocugux082f49HKYg84EpHSGXAyh/oxwaWbvL6aSDPOYuPDGOCI8jmnKiypE]]/Encrypt\nAgentID![CDATA[1000002]]/AgentID\n/xmlret, sMsg wxcpt.DecryptMsg(sReqData, sReqMsgSig, sReqTimeStamp, sReqNonce)printret, sMsgif (ret ! 0):printERR: DecryptMsg ret: str(ret)sys.exit(1)# 解密成功sMsg即为xml格式的明文# TODO: 对明文的处理# For example:xml_tree ET.fromstring(sMsg)content xml_tree.find(Content).textprintcontent# ...# ...------------使用示例三企业回复用户消息的加密---------------企业被动回复用户的消息也需要进行加密并且拼接成密文格式的xml串。假设企业需要回复用户的明文如下xmlToUserName![CDATA[mycreate]]/ToUserNameFromUserName![CDATA[wx5823bf96d3bd56c7]]/FromUserNameCreateTime1348831860/CreateTimeMsgType![CDATA[text]]/MsgTypeContent![CDATA[this is a test]]/ContentMsgId1234567890123456/MsgIdAgentID128/AgentID/xml为了将此段明文回复给用户企业应 1.自己生成时间时间戳(timestamp),随机数字串(nonce)以便生成消息体签名也可以直接用从企业微信的post url上解析出的对应值。2.将明文加密得到密文。 3.用密文步骤1生成的timestamp,nonce和企业在企业微信设定的token生成消息体签名。 4.将密文消息体签名时间戳随机数字串拼接成xml格式的字符串发送给企业号。以上234步可以用企业微信提供的库函数EncryptMsg来实现。sRespData xmlToUserNameww1436e0e65a779aee/ToUserNameFromUserNameChenJiaShun/FromUserNameCreateTime1476422779/CreateTimeMsgTypetext/MsgTypeContent你好/ContentMsgId1456453720/MsgIdAgentID1000002/AgentID/xmlret, sEncryptMsg wxcpt.EncryptMsg(sRespData, sReqNonce, sReqTimeStamp)if (ret ! 0):printERR: EncryptMsg ret: str(ret)sys.exit(1)# ret 0 加密成功企业需要将sEncryptMsg返回给企业号# TODO:# HttpUitls.SetResponse(sEncryptMsg)验证 URL 有效性
文档地址https://developer.work.weixin.qq.com/document/10514
当点击“保存”提交以上信息时企业微信会发送一条验证消息到填写的 URL发送方法为 GET。企业的接收消息服务器接收到验证请求后需要作出正确的响应才能通过 URL 验证。
企业在获取请求时需要做Urldecode处理否则会验证不成功
你可以访问接口调试工具进行调试依次选择 建立连接 测试回调模式。假设接收消息地址设置为http://api.3dept.com/企业微信将向该地址发送如下验证请求
请求方式GET
请求地址http://api.3dept.com/?msg_signatureASDFQWEXZCVAQFASDFASDFSStimestamp13500001234nonce123412323echostrENCRYPT_STR企业后台收到请求后需要做如下操作
对收到的请求做Urldecode处理
通过参数msg_signature对请求进行校验确认调用者的合法性。
解密echostr参数得到消息内容(即msg字段)
在1秒内原样返回明文消息内容(不能加引号不能带bom头不能带换行符)Python 实现验证 URL 有效性源码
from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import timefrom configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecoveryconfig ConfigObj(./config.ini, encodingutf-8)
debug_mode config[dev].as_bool(debug)app Flask(__name__)app.route(/, methods[GET])
def receive_callback():# 获取参数msg_signature request.args.get(msg_signature)timestamp request.args.get(timestamp)nonce request.args.get(nonce)echostr request.args.get(echostr)print(msg_signature, msg_signature)print(timestamp, timestamp)print(nonce, nonce)print(echostr, echostr) print(config[wechat][Token], config[wechat][EncodingAESKey], config[wechat][CorpID])# 创建 WXBizMsgCrypt 对象wxcpt WXBizMsgCrypt(config[wechat][Token], config[wechat][EncodingAESKey], config[wechat][CorpID])ret, sEchoStr wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)print(sEchoStr, sEchoStr)if ret ! 0:print(str(ret))return ERR: VerifyURL ret: str(ret)return sEchoStrif __name__ __main__:app.run(host0.0.0.0, debugdebug_mode, port8888)使用接收消息
开启接收消息模式后企业微信会将消息发送给企业填写的URL企业后台需要做正确的响应。
接收消息协议的说明
企业微信服务器在五秒内收不到响应会断掉连接并且重新发起请求总共重试三次。如果企业在调试中发现成员无法收到被动回复的消息可以检查是否消息处理超时。
当接收成功后http头部返回200表示接收ok其他错误码企业微信后台会一律当做失败并发起重试。
关于重试的消息排重有msgid的消息推荐使用msgid排重。事件类型消息推荐使用FromUserName CreateTime排重。
假如企业无法保证在五秒内处理并回复或者不想回复任何内容可以直接返回200即以空串为返回包。企业后续可以使用主动发消息接口进行异步回复。接收消息请求的说明 假设企业的接收消息的URL设置为http://api.3dept.com。
请求方式POST
请求地址 http://api.3dept.com/?msg_signatureASDFQWEXZCVAQFASDFASDFSStimestamp13500001234nonce123412323接收数据格式
xml ToUserName![CDATA[toUser]]/ToUserNameAgentID![CDATA[toAgentID]]/AgentIDEncrypt![CDATA[msg_encrypt]]/Encrypt
/xml企业收到消息后需要作如下处理
对msg_signature进行校验
解密Encrypt得到明文的消息结构体消息结构体后面章节会详说
如果需要被动回复消息构造被动响应包
正确响应本次请求被动响应包的数据格式
xmlEncrypt![CDATA[msg_encrypt]]/EncryptMsgSignature![CDATA[msg_signature]]/MsgSignatureTimeStamptimestamp/TimeStampNonce![CDATA[nonce]]/Nonce
/xmlPython 实现接收回复消息源码
from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import timefrom configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecoveryconfig ConfigObj(./config.ini, encodingutf-8)
debug_mode config[dev].as_bool(debug)app Flask(__name__)app.route(/, methods[POST])
def callback_message():# 获取参数msg_signature request.args.get(msg_signature)timestamp request.args.get(timestamp)nonce request.args.get(nonce)# 获取 POST 的原始数据sReqData request.data# 创建 WXBizMsgCrypt 对象wxcpt WXBizMsgCrypt(config[wechat][Token], config[wechat][EncodingAESKey], config[wechat][CorpID])ret, sMsg wxcpt.DecryptMsg(sReqData, msg_signature, timestamp, nonce)if ret ! 0:return ERR: DecryptMsg ret: str(ret)# 解析 XMLxml_tree etree.fromstring(sMsg)print(xml_tree)from_user xml_tree.find(FromUserName).textto_user xml_tree.find(ToUserName).textmsg_type xml_tree.find(MsgType).textprint(from_user)print(to_user)print(msg_type)# 消息被动回复if msg_type text:content xml_tree.find(Content).textprint(content)reply_xml PassiveRecovery(to_user, from_user, msg_type, msg_contentcontent).text()else:media_id xml_tree.find(MediaId).textprint(media_id)reply_xml PassiveRecovery(to_user, from_user, msg_type, media_idmedia_id).image()# 加密回复的 XMLret, sEncryptMsg wxcpt.EncryptMsg(reply_xml, nonce, timestamp)if ret ! 0:return ERR: EncryptMsg ret: str(ret)return sEncryptMsgif __name__ __main__:app.run(debugdebug_mode, port8888)
企业微信服务器 IP 段
企业微信在回调企业指定的URL时是通过特定的IP发送出去的。如果企业需要做防火墙配置那么可以通过这个接口获取到所有相关的IP段。
请求方式GETHTTPS
请求地址 https://qyapi.weixin.qq.com/cgi-bin/getcallbackip?access_tokenACCESS_TOKEN返回结果
{errcode: 0,errmsg: ok,ip_list: [101.226.103.*, 101.226.62.*]
}源码部署服务器
接收消息 ./receive/PassiveRecovery.py
import timeclass PassiveRecovery(object):# 传入数据 to_user, from_user, msg_type, msg_content, media_iddef __init__(self, to_user, from_user, msg_type, msg_contentNone, media_idNone):self.to_user to_userself.from_user from_userself.msg_type msg_typeself.msg_content msg_contentself.media_id media_iddef text(self):return fxmlToUserName![CDATA[{self.to_user}]]/ToUserNameFromUserName![CDATA[{self.from_user}]]/FromUserNameCreateTime{int(time.time())}/CreateTimeMsgType![CDATA[text]]/MsgTypeContent![CDATA[{self.msg_content}]]/Content/xmldef image(self):return fxmlToUserName![CDATA[{self.to_user}]]/ToUserNameFromUserName![CDATA[{self.from_user}]]/FromUserNameCreateTime{int(time.time())}/CreateTimeMsgType![CDATA[image]]/MsgTypeImageMediaId![CDATA[{self.media_id}]]/MediaId/Image/xmldef video(self):return fxmlToUserName![CDATA[{self.to_user}]]/ToUserNameFromUserName![CDATA[{self.from_user}]]/FromUserNameCreateTime{int(time.time())}/CreateTimeMsgType![CDATA[video]]/MsgTypeVideoMediaId![CDATA[{self.media_id}]]/MediaIdTitle![CDATA[title]]/TitleDescription![CDATA[description]]/Description/Video/xmlmain.py
from flask import Flask, request
from callback.WXBizMsgCrypt3 import WXBizMsgCrypt
from lxml import etree
import timefrom configobj import ConfigObj
from receive.PassiveRecovery import PassiveRecoveryconfig ConfigObj(./config.ini, encodingutf-8)
debug_mode config[dev].as_bool(debug)app Flask(__name__)app.route(/, methods[GET])
def receive_callback():# 获取参数msg_signature request.args.get(msg_signature)timestamp request.args.get(timestamp)nonce request.args.get(nonce)echostr request.args.get(echostr)# 创建 WXBizMsgCrypt 对象wxcpt WXBizMsgCrypt(config[wechat][Token], config[wechat][EncodingAESKey], config[wechat][CorpID])ret, sEchoStr wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)if ret ! 0:return ERR: VerifyURL ret: str(ret)return sEchoStrapp.route(/, methods[POST])
def callback_message():# 获取参数msg_signature request.args.get(msg_signature)timestamp request.args.get(timestamp)nonce request.args.get(nonce)# 获取 POST 的原始数据sReqData request.data# 创建 WXBizMsgCrypt 对象wxcpt WXBizMsgCrypt(config[wechat][Token], config[wechat][EncodingAESKey], config[wechat][CorpID])ret, sMsg wxcpt.DecryptMsg(sReqData, msg_signature, timestamp, nonce)if ret ! 0:return ERR: DecryptMsg ret: str(ret)# 解析 XMLxml_tree etree.fromstring(sMsg)print(xml_tree)from_user xml_tree.find(FromUserName).textto_user xml_tree.find(ToUserName).textmsg_type xml_tree.find(MsgType).textprint(from_user)print(to_user)print(msg_type)# 消息被动回复if msg_type text:content xml_tree.find(Content).textprint(content)reply_xml PassiveRecovery(to_user, from_user, msg_type, msg_contentcontent).text()else:media_id xml_tree.find(MediaId).textprint(media_id)reply_xml PassiveRecovery(to_user, from_user, msg_type, media_idmedia_id).image()# 加密回复的 XMLret, sEncryptMsg wxcpt.EncryptMsg(reply_xml, nonce, timestamp)if ret ! 0:return ERR: EncryptMsg ret: str(ret)return sEncryptMsgif __name__ __main__:app.run(debugdebug_mode, port8888)参考项目https://github.com/Waite0603/enterprise_wechat_bot
执行 main.py
python main.py使用 Gunicorn 运行 Flask
gevent 是一个基于 libevent 的 Python 协程库用于实现高性能的并发服务器。与多线程或多进程相比使用 gevent 可以更有效地处理大量的并发连接因为它通过协程实现非阻塞 I/O从而避免了线程切换的开销。
安装必要的库:
首先确保你已经安装了 Flask 和 gevent。你可以使用 pip 来安装它们
pip install Flask gevent使用 Gunicorn gevent worker
对于生产环境更常见的做法是使用 Gunicorn 作为反向代理服务器并配置它使用 gevent worker。这样你可以利用 Gunicorn 的许多高级功能如进程管理、日志记录等同时仍然享受 gevent 带来的性能优势。
安装 Gunicorn
pip install gunicorn使用命令启动你的 Flask 应用
gunicorn -w 4 -k gevent app:main这里-w 4 指定了 4 个 worker 进程-k gevent 指定了使用 gevent worker。‘app:main’ 是你的 Flask 应用的位置其中 app 是 Python 模块名后面的 main 是 Flask 应用实例的变量名。
配置 nginx 服务器
域名解析http://wxbot.willwaking.com - 服务器 IP 注意防止和静态路由冲突需要配置 nginx 请求转发。 location /wxbot/ { proxy_pass http://localhost:8888/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme;
}测试结果 配置成功