import aiohttp import argparse import asyncio from api.qinglong import QlApi, QlOpenApi from api.send import SendApi from utils.ck import get_invalid_ck_ids from config import ( qinglong_data, user_datas, ) import cv2 import json from loguru import logger import os from playwright.async_api import Playwright, async_playwright from playwright._impl._errors import TimeoutError import random import re from PIL import Image # 用于图像处理 import traceback from typing import Union from utils.consts import ( jd_login_url, supported_types, supported_colors, supported_sms_func ) from utils.tools import ( get_tmp_dir, get_img_bytes, get_forbidden_users_dict, filter_forbidden_users, save_img, get_ocr, get_word, get_shape_location_by_type, get_shape_location_by_color, rgba2rgb, send_msg, new_solve_slider_captcha, ddddocr_find_files_pic, expand_coordinates, cv2_save_img, ddddocr_find_bytes_pic, solve_slider_captcha, validate_proxy_config, is_valid_verification_code, filter_cks, extract_pt_pin, desensitize_account ) """ 基于playwright做的 """ logger.add( sink="main.log", level="DEBUG" ) try: # 账号是否脱敏的开关 from config import enable_desensitize except ImportError: enable_desensitize = False async def download_image(url, filepath): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: with open(filepath, 'wb') as f: f.write(await response.read()) print(f"Image downloaded to {filepath}") else: print(f"Failed to download image. Status code: {response.status}") async def check_notice(page): try: logger.info("检查登录是否报错") notice = await page.wait_for_function( """ () => { const notice = document.querySelectorAll('.notice')[1]; return notice && notice.textContent.trim() !== '' ? notice.textContent.trim() : false; } """, timeout = 3000 ) raise RuntimeError(notice) except TimeoutError: logger.info("登录未发现报错") return async def auto_move_slide_v2(page, retry_times: int = 2, slider_selector: str = 'img.move-img', move_solve_type: str = ""): for i in range(retry_times): logger.info(f'第{i + 1}次开启滑块验证') # 查找小图 try: # 查找小图 await page.wait_for_selector('.captcha_drop', state='visible', timeout=3000) except Exception as e: logger.info('未找到验证码框, 退出滑块验证') return await auto_move_slide(page, retry_times=5, slider_selector = slider_selector, move_solve_type = move_solve_type) # 判断是否一次过了滑块 captcha_drop_visible = await page.is_visible('.captcha_drop') # 存在就重新滑一次 if captcha_drop_visible: if i == retry_times - 1: return logger.info('一次过滑块失败, 再次尝试滑块验证') await page.wait_for_selector('.captcha_drop', state='visible', timeout=3000) # 点外键 sign_locator = page.locator('#header').locator('.text-header') sign_locator_box = await sign_locator.bounding_box() sign_locator_left_x = sign_locator_box['x'] sign_locator_left_y = sign_locator_box['y'] await page.mouse.click(sign_locator_left_x, sign_locator_left_y) await asyncio.sleep(1) # 提交键 submit_locator = page.locator('.btn.J_ping.active') await submit_locator.click() await asyncio.sleep(1) continue return async def auto_move_slide(page, retry_times: int = 2, slider_selector: str = 'img.move-img', move_solve_type: str = ""): """ 自动识别移动滑块验证码 """ logger.info("开始滑块验证") for i in range(retry_times + 1): try: # 查找小图 await page.wait_for_selector('#slot_img', state='visible', timeout=3000) except Exception as e: # 未找到元素,认为成功,退出循环 logger.info('未找到滑块,退出滑块验证') break # 滑块验证失败了 if i + 1 == retry_times + 1: raise Exception("滑块验证失败了") logger.info(f'第{i + 1}次尝试自动移动滑块中...') # 获取 src 属性 small_src = await page.locator('#slot_img').get_attribute('src') background_src = await page.locator('#main_img').get_attribute('src') # 获取 bytes small_img_bytes = get_img_bytes(small_src) background_img_bytes = get_img_bytes(background_src) # 保存小图 small_img_path = save_img('small_img', small_img_bytes) small_img_width = await page.evaluate('() => { return document.getElementById("slot_img").clientWidth; }') # 获取网页的图片尺寸 small_img_height = await page.evaluate('() => { return document.getElementById("slot_img").clientHeight; }') # 获取网页的图片尺寸 small_image = Image.open(small_img_path) # 打开图像 resized_small_image = small_image.resize((small_img_width, small_img_height)) # 调整图像尺寸 resized_small_image.save(small_img_path) # 保存调整后的图像 # 保存大图 background_img_path = save_img('background_img', background_img_bytes) background_img_width = await page.evaluate('() => { return document.getElementById("main_img").clientWidth; }') # 获取网页的图片尺寸 background_img_height = await page.evaluate('() => { return document.getElementById("main_img").clientHeight; }') # 获取网页的图片尺寸 background_image = Image.open(background_img_path) # 打开图像 resized_background_image = background_image.resize((background_img_width, background_img_height)) # 调整图像尺寸 resized_background_image.save(background_img_path) # 保存调整后的图像 # 获取滑块 slider = page.locator(slider_selector) await asyncio.sleep(1) # 这里是一个标准算法偏差 slide_difference = 10 if move_solve_type == "old": # 用于调试 distance = ddddocr_find_bytes_pic(small_img_bytes, background_img_bytes) await asyncio.sleep(1) await solve_slider_captcha(page, slider, distance, slide_difference) await asyncio.sleep(1) continue # 获取要移动的长度 distance = ddddocr_find_files_pic(small_img_path, background_img_path) await asyncio.sleep(1) # 移动滑块 await new_solve_slider_captcha(page, slider, distance, slide_difference) await asyncio.sleep(1) async def auto_shape(page, retry_times: int = 5): logger.info("开始二次验证") # 图像识别 ocr = get_ocr(beta=True) # 文字识别 det = get_ocr(det=True) # 自己训练的ocr, 提高文字识别度 my_ocr = get_ocr(det=False, ocr=False, import_onnx_path="myocr_v1.onnx", charsets_path="charsets.json") """ 自动识别滑块验证码 """ for i in range(retry_times + 1): try: # 查找小图 await page.wait_for_selector('div.captcha_footer img', state='visible', timeout=3000) except Exception as e: # 未找到元素,认为成功,退出循环 logger.info('未找到二次验证图,退出二次验证识别') break # 二次验证失败了 if i + 1 == retry_times + 1: raise Exception("二次验证失败了") logger.info(f'第{i + 1}次自动识别形状中...') tmp_dir = get_tmp_dir() background_img_path = os.path.join(tmp_dir, f'background_img.png') # 获取大图元素 background_locator = page.locator('#cpc_img') # 获取元素的位置和尺寸 backend_bounding_box = await background_locator.bounding_box() backend_top_left_x = backend_bounding_box['x'] backend_top_left_y = backend_bounding_box['y'] # 截取元素区域 await page.screenshot(path=background_img_path, clip=backend_bounding_box) # 获取 图片的src 属性和button按键 word_img_src = await page.locator('div.captcha_footer img').get_attribute('src') button = page.locator('div.captcha_footer button#submit-btn') # 找到刷新按钮 refresh_button = page.locator('.jcap_refresh') # 获取文字图并保存 word_img_bytes = get_img_bytes(word_img_src) rgba_word_img_path = save_img('rgba_word_img', word_img_bytes) # 图像识别的解法,东哥求放过啊,写不动了 if page.locator("div.sp_msg.tip_text", has_text="请点击上图中的").is_visible(): logger.info("检测为图像, 开始图像识别......") from utils.tools import crop_center_contour, ddddocr_find_files_pic_v2 small_img_path = os.path.join(tmp_dir, f'small_img.png') # 这里是一个标准算法偏差 slide_difference = 10 try: # 将中间的图截取出来,才能更好的识别 result = crop_center_contour(rgba_word_img_path, small_img_path, min_area=100, padding=1) if result is None: raise IndexError("截图异常") # 获取要移动的长度 target_dict = ddddocr_find_files_pic_v2(small_img_path, background_img_path) # 提取坐标 x1, y1, x2, y2 = target_dict["target"] center_x = (x1 + slide_difference + x2) // 2 center_y = (y1 + y2) // 2 await asyncio.sleep(random.uniform(0, 1)) logger.info("已检测到图像,尝试点击中") x, y = backend_top_left_x + center_x, backend_top_left_y + center_y # 点击图片 await page.mouse.click(x, y) except IndexError: logger.info(f'识别图像出错,刷新中......') await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue # 文字图是RGBA的,有蒙板识别不了,需要转成RGB rgb_word_img_path = rgba2rgb('rgb_word_img', rgba_word_img_path) # 获取问题的文字 word = get_word(ocr, rgb_word_img_path) if word.find('色') > 0: target_color = word.split('请选出图中')[1].split('的图形')[0] if target_color in supported_colors: logger.info(f'正在点击中......') # 获取点的中心点 center_x, center_y = get_shape_location_by_color(background_img_path, target_color) if center_x is None and center_y is None: logger.info(f'识别失败,刷新中......') await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue # 得到网页上的中心点 x, y = backend_top_left_x + center_x, backend_top_left_y + center_y # 点击图片 await page.mouse.click(x, y) await asyncio.sleep(random.uniform(1, 4)) # 点击确定 await button.click() await asyncio.sleep(random.uniform(2, 4)) continue else: logger.info(f'不支持{target_color},刷新中......') # 刷新 await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue # 这里是文字验证码了 elif word.find('依次') > 0 or word.find('按照次序点选') > 0: logger.info(f'开始文字识别,点击中......') # 获取文字的顺序列表 try: if word.find('依次') > 0: target_char_list = list(re.findall(r'[\u4e00-\u9fff]+', word)[1]) if word.find('按照次序点选') > 0: target_char_list = list(word.split('请按照次序点选')[1]) except IndexError: logger.info(f'识别文字出错,刷新中......') await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue target_char_len = len(target_char_list) # 识别字数不对 if target_char_len < 4: logger.info(f'识别的字数小于4,刷新中......') await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue # 取前4个的文字 target_char_list = target_char_list[:4] # 定义【文字, 坐标】的列表 target_list = [[x, []] for x in target_char_list] # 获取大图的二进制 background_locator = page.locator('#cpc_img') background_locator_src = await background_locator.get_attribute('src') background_locator_bytes = get_img_bytes(background_locator_src) bboxes = det.detection(background_locator_bytes) count = 0 im = cv2.imread(background_img_path) for bbox in bboxes: # 左上角 x1, y1, x2, y2 = bbox # 做了一下扩大 expanded_x1, expanded_y1, expanded_x2, expanded_y2 = expand_coordinates(x1, y1, x2, y2, 10) im2 = im[expanded_y1:expanded_y2, expanded_x1:expanded_x2] img_path = cv2_save_img('word', im2) image_bytes = open(img_path, "rb").read() result = my_ocr.classification(image_bytes) if result in target_char_list: for index, target in enumerate(target_list): if result == target[0] and target[0] is not None: x = x1 + (x2 - x1) / 2 y = y1 + (y2 - y1) / 2 target_list[index][1] = [x, y] count += 1 if count != target_char_len: logger.info(f'文字识别失败,刷新中......') await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue await asyncio.sleep(random.uniform(0, 1)) try: for char in target_list: center_x = char[1][0] center_y = char[1][1] # 得到网页上的中心点 x, y = backend_top_left_x + center_x, backend_top_left_y + center_y # 点击图片 await page.mouse.click(x, y) await asyncio.sleep(random.uniform(1, 4)) except IndexError: logger.info(f'识别文字出错,刷新中......') await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue # 点击确定 await button.click() await asyncio.sleep(random.uniform(2, 4)) else: shape_type = word.split('请选出图中的')[1] if shape_type in supported_types: logger.info(f'已找到图形,点击中......') if shape_type == "圆环": shape_type = shape_type.replace('圆环', '圆形') # 获取点的中心点 center_x, center_y = get_shape_location_by_type(background_img_path, shape_type) if center_x is None and center_y is None: logger.info(f'识别失败,刷新中......') await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue # 得到网页上的中心点 x, y = backend_top_left_x + center_x, backend_top_left_y + center_y # 点击图片 await page.mouse.click(x, y) await asyncio.sleep(random.uniform(1, 4)) # 点击确定 await button.click() await asyncio.sleep(random.uniform(2, 4)) continue else: logger.info(f'不支持{shape_type},刷新中......') # 刷新 await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) continue async def sms_recognition(page, user, mode): try: from config import sms_func except ImportError: sms_func = "no" sms_func = user_datas[user].get("sms_func", sms_func) if sms_func not in supported_sms_func: raise Exception(f"sms_func只支持{supported_sms_func}") if mode == "cron" and sms_func == "manual_input": sms_func = "no" if sms_func == "no": raise Exception("sms_func为no关闭, 跳过短信验证码识别环节") logger.info('点击【获取验证码】中') await page.click('button.getMsg-btn') await asyncio.sleep(1) # 自动识别滑块 await auto_move_slide(page, retry_times=5) await auto_shape(page, retry_times=30) # 识别是否成功发送验证码 await page.wait_for_selector('button.getMsg-btn:has-text("重新发送")', timeout=3000) logger.info("发送短信验证码成功") # 手动输入 # 用户在60S内,手动在终端输入验证码 if sms_func == "manual_input": logger.info("启用手动输入验证码模式") from inputimeout import inputimeout, TimeoutOccurred try: verification_code = inputimeout(prompt="请输入验证码:", timeout=60) except TimeoutOccurred: return # 通过调用web_hook的方式来实现全自动输入验证码 elif sms_func == "webhook": logger.info("启用webhook获取验证码模式") from utils.tools import send_request try: from config import sms_webhook except ImportError: sms_webhook = "" sms_webhook = user_datas[user].get("sms_webhook", sms_webhook) if sms_webhook is None: raise Exception(f"sms_webhook未配置") headers = { 'Content-Type': 'application/json', } data = {"phone_number": user} response = await send_request(url=sms_webhook, method="post", headers=headers, data=data) verification_code = response['data']['code'] await asyncio.sleep(1) if not is_valid_verification_code(verification_code): logger.error(f"验证码需为6位数字, 输入的验证码为{verification_code}, 异常") raise Exception(f"验证码异常") logger.info('填写验证码中...') verification_code_input = page.locator('input.acc-input.msgCode') for v in verification_code: await verification_code_input.type(v, no_wait_after=True) await asyncio.sleep(random.random() / 10) logger.info('点击提交中...') await page.click('a.btn') async def voice_verification(page, user, mode): from utils.consts import supported_voice_func try: from config import voice_func except ImportError: voice_func = "no" voice_func = user_datas[user].get("voice_func", voice_func) if voice_func not in supported_voice_func: raise Exception(f"voice_func只支持{supported_voice_func}") if mode == "cron" and voice_func == "manual_input": voice_func = "no" if voice_func == "no": raise Exception("voice_func为no关闭, 跳过手机语音识别") logger.info('点击获取验证码中') await page.click('button.getMsg-btn:has-text("点击获取验证码")') await asyncio.sleep(1) # 自动识别滑块 await auto_move_slide(page, retry_times=5, slider_selector='#slider') await auto_shape(page, retry_times=30) # 识别是否成功发送验证码 await page.wait_for_selector('button.getMsg-btn:has-text("重新发送")', timeout=3000) logger.info("发送手机语音识别验证码成功") # 手动输入 # 用户在60S内,手动在终端输入验证码 if voice_func == "manual_input": from inputimeout import inputimeout, TimeoutOccurred try: verification_code = inputimeout(prompt="请输入验证码:", timeout=60) except TimeoutOccurred: return await asyncio.sleep(1) if not is_valid_verification_code(verification_code): logger.error(f"验证码需为6位数字, 输入的验证码为{verification_code}, 异常") raise Exception(f"验证码异常") logger.info('填写验证码中...') verification_code_input = page.locator('input.acc-input.msgCode') for v in verification_code: await verification_code_input.type(v, no_wait_after=True) await asyncio.sleep(random.random() / 10) logger.info('点击提交中...') await page.click('a.btn') async def check_dialog(page): logger.info("开始弹窗检测") try: # 等待 dialog 出现 await page.wait_for_selector(".dialog", timeout=4000) except Exception as e: logger.info('未找到弹框, 退出弹框检测') return # 获取 dialog-des 的文本内容 dialog_text = await page.locator(".dialog-des").text_content() if dialog_text == "您的账号存在风险,为了账号安全需实名认证,是否继续?": raise Exception("检测到实名认证弹窗,请前往移动端做实名认证") else: raise Exception("检测到不支持的弹窗, 更新异常") async def get_jd_pt_key(playwright: Playwright, user, mode) -> Union[str, None]: """ 获取jd的pt_key """ try: from config import headless except ImportError: headless = False args = '--no-sandbox', '--disable-setuid-sandbox', '--disable-software-rasterizer', '--disable-gpu' try: # 引入代理 from config import proxy # 检查代理的配置 is_proxy_valid, msg = validate_proxy_config(proxy) if not is_proxy_valid: logger.error(msg) proxy = None if msg == "未配置代理": logger.info(msg) proxy = None except ImportError: logger.info("未配置代理") proxy = None browser = await playwright.chromium.launch(headless=headless, args=args, proxy=proxy) try: # 引入UA from config import user_agent except ImportError: from utils.consts import user_agent context = await browser.new_context(user_agent=user_agent) try: page = await context.new_page() await page.set_viewport_size({"width": 360, "height": 640}) await page.goto(jd_login_url) if user_datas[user].get("user_type") == "qq": await page.get_by_role("checkbox").check() await asyncio.sleep(1) # 点击QQ登录 await page.locator("a.quick-qq").click() await asyncio.sleep(1) # 等待 iframe 加载完成 await page.wait_for_selector("#ptlogin_iframe") # 切换到 iframe iframe = page.frame(name="ptlogin_iframe") # 通过 id 选择 "密码登录" 链接并点击 await iframe.locator("#switcher_plogin").click() await asyncio.sleep(1) # 填写账号 username_input = iframe.locator("#u") # 替换为实际的账号 for u in user: await username_input.type(u, no_wait_after=True) await asyncio.sleep(random.random() / 10) await asyncio.sleep(1) # 填写密码 password_input = iframe.locator("#p") # 替换为实际的密码 password = user_datas[user]["password"] for p in password: await password_input.type(p, no_wait_after=True) await asyncio.sleep(random.random() / 10) await asyncio.sleep(1) # 点击登录按钮 await iframe.locator("#login_button").click() await asyncio.sleep(1) # 这里检测安全验证 new_vcode_area = iframe.locator("div#newVcodeArea") style = await new_vcode_area.get_attribute("style") if style and "display: block" in style: if await new_vcode_area.get_by_text("安全验证").text_content() == "安全验证": logger.error(f"QQ号{user}需要安全验证, 登录失败,请使用其它账号类型") raise Exception(f"QQ号{user}需要安全验证, 登录失败,请使用其它账号类型") else: await page.get_by_text("账号密码登录").click() username_input = page.locator("#username") for u in user: await username_input.type(u, no_wait_after=True) await asyncio.sleep(random.random() / 10) password_input = page.locator("#pwd") password = user_datas[user]["password"] for p in password: await password_input.type(p, no_wait_after=True) await asyncio.sleep(random.random() / 10) await asyncio.sleep(random.random()) await page.locator('.policy_tip-checkbox').click() await asyncio.sleep(random.random()) await page.locator('.btn.J_ping.active').click() if user_datas[user].get("auto_switch", True): # 自动识别移动滑块验证码 await asyncio.sleep(1) await auto_move_slide(page, retry_times=30) # 自动验证形状验证码 await asyncio.sleep(1) await auto_shape(page, retry_times=30) # 进行短信验证识别 await asyncio.sleep(1) if await page.locator('text="手机短信验证"').count() != 0: logger.info("开始短信验证码识别环节") await sms_recognition(page, user, mode) # 进行手机语音验证识别 if await page.locator('div#header .text-header:has-text("手机语音验证")').count() > 0: logger.info("检测到手机语音验证页面,开始识别") await voice_verification(page, user, mode) # 弹窗检测 await check_dialog(page) # 检查警告,如账号存在风险或账密不正确等 await check_notice(page) else: logger.info("自动过验证码开关已关, 请手动操作") # 等待验证码通过 logger.info("等待获取cookie...") await page.wait_for_selector('#msShortcutMenu', state='visible', timeout=120000) cookies = await context.cookies() for cookie in cookies: if cookie['name'] == 'pt_key': pt_key = cookie["value"] return pt_key return None except Exception as e: traceback.print_exc() return None finally: await context.close() await browser.close() async def get_ql_api(ql_data): """ 封装了QL的登录 """ logger.info("开始获取QL登录态......") # 优化client_id和client_secret client_id = ql_data.get('client_id') client_secret = ql_data.get('client_secret') if client_id and client_secret: logger.info("使用client_id和client_secret登录......") qlapi = QlOpenApi(ql_data["url"]) response = await qlapi.login(client_id=client_id, client_secret=client_secret) if response['code'] == 200: logger.info("client_id和client_secret正常可用......") return qlapi else: logger.info("client_id和client_secret异常......") qlapi = QlApi(ql_data["url"]) # 其次用token token = ql_data.get('token') if token: logger.info("已设置TOKEN,开始检测TOKEN状态......") qlapi.login_by_token(token) # 如果token失效,就用账号密码登录 response = await qlapi.get_envs() if response['code'] == 401: logger.info("Token已失效, 正使用账号密码获取QL登录态......") response = await qlapi.login_by_username(ql_data.get("username"), ql_data.get("password")) if response['code'] != 200: logger.error(f"账号密码登录失败. response: {response}") raise Exception(f"账号密码登录失败. response: {response}") else: logger.info("Token正常可用......") else: # 最后用账号密码 logger.info("正使用账号密码获取QL登录态......") response = await qlapi.login_by_username(ql_data.get("username"), ql_data.get("password")) if response['code'] != 200: logger.error(f"账号密码登录失败. response: {response}") raise Exception(f"账号密码登录失败.response: {response}") return qlapi async def main(mode: str = None): """ :param mode 运行模式, 当mode = cron时,sms_func为 manual_input时,将自动传成no """ try: qlapi = await get_ql_api(qinglong_data) send_api = SendApi("ql") # 拿到禁用的用户列表 response = await qlapi.get_envs() if response['code'] == 200: logger.info("获取环境变量成功") else: logger.error(f"获取环境变量失败, response: {response}") raise Exception(f"获取环境变量失败, response: {response}") env_data = response['data'] # 获取值为JD_COOKIE的环境变量 jd_ck_env_datas = filter_cks(env_data, name='JD_COOKIE') # 从value中过滤出pt_pin, 注意只支持单行单pt_pin jd_ck_env_datas = [ {**x, 'pt_pin': extract_pt_pin(x['value'])} for x in jd_ck_env_datas if extract_pt_pin(x['value'])] try: logger.info("检测CK任务开始") # 先获取启用中的env_data up_jd_ck_list = filter_cks(jd_ck_env_datas, status=0, name='JD_COOKIE') # 这一步会去检测这些JD_COOKIE invalid_cks_id_list = await get_invalid_ck_ids(up_jd_ck_list) if invalid_cks_id_list: # 禁用QL的失效环境变量 ck_ids_datas = bytes(json.dumps(invalid_cks_id_list), 'utf-8') await qlapi.envs_disable(data=ck_ids_datas) # 更新jd_ck_env_datas jd_ck_env_datas = [{**x, 'status': 1} if x.get('id') in invalid_cks_id_list or x.get('_id') in invalid_cks_id_list else x for x in jd_ck_env_datas] logger.info("检测CK任务完成") except Exception as e: traceback.print_exc() logger.error(f"检测CK任务失败, 跳过检测, 报错原因为{e}") # 获取需强制更新pt_pin force_update_pt_pins = [user_datas[key]["pt_pin"] for key in user_datas if user_datas[key].get("force_update") is True] # 获取禁用和需要强制更新的users forbidden_users = [x for x in jd_ck_env_datas if (x['status'] == 1 or x['pt_pin'] in force_update_pt_pins)] if not forbidden_users: logger.info("所有COOKIE环境变量正常,无需更新") return # 获取需要的字段 filter_users_list = filter_forbidden_users(forbidden_users, ['_id', 'id', 'value', 'remarks', 'name']) # 生成字典 user_dict = get_forbidden_users_dict(filter_users_list, user_datas) if not user_dict: logger.info("失效的CK信息未配置在user_datas内,无需更新") return # 登录JD获取pt_key async with async_playwright() as playwright: for user in user_dict: logger.info(f"开始更新{desensitize_account(user, enable_desensitize)}") pt_key = await get_jd_pt_key(playwright, user, mode) if pt_key is None: logger.error(f"获取pt_key失败") await send_msg(send_api, send_type=1, msg=f"{desensitize_account(user, enable_desensitize)} 更新失败") continue req_data = user_dict[user] req_data["value"] = f"pt_key={pt_key};pt_pin={user_datas[user]['pt_pin']};" logger.info(f"更新内容为{req_data}") data = json.dumps(req_data) response = await qlapi.set_envs(data=data) if response['code'] == 200: logger.info(f"{desensitize_account(user, enable_desensitize)}更新成功") else: logger.error(f"{desensitize_account(user, enable_desensitize)}更新失败, response: {response}") await send_msg(send_api, send_type=1, msg=f"{desensitize_account(user, enable_desensitize)} 更新失败") continue req_id = f"[{req_data['id']}]" if 'id' in req_data.keys() else f'[\"{req_data["_id"]}\"]' data = bytes(req_id, 'utf-8') response = await qlapi.envs_enable(data=data) if response['code'] == 200: logger.info(f"{desensitize_account(user, enable_desensitize)}启用成功") await send_msg(send_api, send_type=0, msg=f"{desensitize_account(user, enable_desensitize)} 更新成功") else: logger.error(f"{desensitize_account(user, enable_desensitize)}启用失败, response: {response}") except Exception as e: traceback.print_exc() def parse_args(): """ 解析参数 """ parser = argparse.ArgumentParser() parser.add_argument('-m', '--mode', choices=['cron'], help="运行的main的模式(例如: 'cron')") return parser.parse_args() if __name__ == '__main__': # 使用解析参数的函数 args = parse_args() asyncio.run(main(mode=args.mode))