Update send.py 增加企业微信应用通道来发送消息

增加企业微信应用通道来发送消息
This commit is contained in:
Shining Hu 2024-11-18 23:15:07 +08:00 committed by GitHub
parent 02d52c1d01
commit 10bd0c90bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -4,6 +4,8 @@ import hashlib
import hmac
import time
from typing import Dict, Any
import asyncio
import json
import urllib
def generate_sign(secret):
@ -28,6 +30,61 @@ async def send_message(url: str, data: Dict[str, Any]) -> Dict[str, Any]:
async with session.post(url, json=data, headers=headers) as response:
return await response.json()
class WeCom:
def __init__(self, corpid, corpsecret, agentid):
self.CORPID = corpid
self.CORPSECRET = corpsecret
self.AGENTID = agentid
async def get_access_token(self):
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
params = {'corpid': self.CORPID, 'corpsecret': self.CORPSECRET}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
data = await response.json()
return data.get("access_token") # 使用 get 方法避免 KeyError
async def send_text(self, message, touser="@all"):
access_token = await self.get_access_token()
send_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
send_values = {
"touser": touser,
"msgtype": "text",
"agentid": self.AGENTID,
"text": {
"content": message
},
"safe": "0"
}
async with aiohttp.ClientSession() as session:
async with session.post(send_url, json=send_values) as response:
response_data = await response.json()
return response_data.get("errmsg", "未知错误") # 使用 get 方法避免 KeyError
async def send_mpnews(self, title, message, media_id, touser="@all"):
access_token = await self.get_access_token()
send_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
send_values = {
"touser": touser,
"msgtype": "mpnews",
"agentid": self.AGENTID,
"mpnews": {
"articles": [
{
"title": title,
"thumb_media_id": media_id,
"author": "Author",
"content_source_url": "",
"content": message.replace('\n', '<br/>'),
"digest": message
}
]
}
}
async with aiohttp.ClientSession() as session:
async with session.post(send_url, json=send_values) as response:
response_data = await response.json()
return response_data.get("errmsg", "未知错误") # 使用 get 方法避免 KeyError
class SendApi(object):
def __init__(self, name):
@ -91,3 +148,29 @@ class SendApi(object):
}
}
return await send_message(url, data)
@staticmethod
async def send_wecom_app(url: str, msg: str) -> Dict[str, Any]:
"""
企业微信应用发送消息
"""
params = url.split(",")
corpid = params[0]
corpsecret = params[1]
touser = params[2]
agentid = params[3]
try:
mediaid = params[4]
except IndexError:
mediaid = None # 用 None 表示没有 media_id
wx = WeCom(corpid, corpsecret, agentid)
# 检查是否存在 mediaid如果存在则发送图文消息反之发送文本消息
if mediaid:
response = await wx.send_mpnews("京东账号自动更新报告", msg, mediaid, touser)
else:
response = await wx.send_text(msg, touser)
return {"status": response}