diff --git a/daily/sfsy.py b/daily/sfsy.py new file mode 100644 index 0000000..cf187a5 --- /dev/null +++ b/daily/sfsy.py @@ -0,0 +1,1198 @@ +""" +顺丰速运日常积分任务 +Author: 爱学习的呆子 +Version: 1.3.0 +Date: 2026-04-04 +new Env("顺丰速运日常任务") +cron 0 12 * * * +""" +# 扫码获取ck地址:https://sm.9999.blue/ +import hashlib +import json +import os +import random +import time +from datetime import datetime +from typing import Dict, List, Optional, Any +from dataclasses import dataclass +from urllib.parse import unquote, urlparse, parse_qs +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock +import requests +from requests.packages.urllib3.exceptions import InsecureRequestWarning + +# 禁用SSL警告 +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +# ==================== 代理相关配置常量 ==================== +PROXY_TIMEOUT = 15 # 代理超时时间(秒) +MAX_PROXY_RETRIES = 5 # 最大代理重试次数 +REQUEST_RETRY_COUNT = 3 # 请求重试次数 + +# ==================== 并发配置常量 ==================== +CONCURRENT_NUM = int(os.getenv('SFBF', '1')) # 并发数量,默认为1(串行),最大20 +if CONCURRENT_NUM > 20: + CONCURRENT_NUM = 20 + print(f'⚠️ 并发数量超过最大值20,已自动调整为20') +elif CONCURRENT_NUM < 1: + CONCURRENT_NUM = 1 + print(f'⚠️ 并发数量小于1,已自动调整为1(串行模式)') + +# 全局线程锁 +print_lock = Lock() # 用于保护打印输出 + + +# ==================== 配置类 ==================== +@dataclass +class Config: + """全局配置""" + APP_NAME: str = "顺丰速运" + VERSION: str = "1.2.0" + ENV_NAME: str = "sfsyUrl" + PROXY_API_URL: str = os.getenv('SF_PROXY_API_URL', '') + + # 代理相关配置常量 + PROXY_TIMEOUT = 15 # 代理时间(秒) + MAX_PROXY_RETRIES = 5 # 最大代理重试次数 + REQUEST_RETRY_COUNT = 3 # 请求重试次数 + + # API签名配置 + TOKEN: str = 'wwesldfs29aniversaryvdld29' + SYS_CODE: str = 'MCS-MIMP-CORE' + + # 任务跳过列表 + SKIP_TASKS: List[str] = None + + def __post_init__(self): + if self.SKIP_TASKS is None: + # 尝试直接提交所有任务,看看能否领取奖励 + # 原本跳过的任务:'用行业模板寄件下单'、'去新增一个收件偏好' + self.SKIP_TASKS = ['用行业模板寄件下单','用积分兑任意礼品','参与积分活动','每月累计寄件','完成每月任务','去使用AI寄件'] + + +# ==================== 日志系统 ==================== +class Logger: + """ + 日志管理器 - 实现图片中的日志风格 + """ + + # 日志图标 + ICONS = { + 'task_found': '🎯', # 发现任务 + 'task_skip': '⏭️', # 跳过任务 + 'task_complete': '✅', # 任务完成 + 'reward_get': '🎁', # 奖励领取 + 'info': '📝', # 普通信息 + 'success': '✨', # 成功 + 'error': '❌', # 错误 + 'warning': '⚠️', # 警告 + 'user': '👤', # 用户信息 + 'money': '💰', # 积分/金币 + 'gift': '🎁', # 礼物 + 'target': '🎯', # 目标 + } + + def __init__(self): + self.messages: List[str] = [] + self.current_account_msg: List[str] = [] + self.lock = Lock() # 每个Logger实例独立的锁 + + def _format_msg(self, icon: str, content: str) -> str: + """格式化消息""" + return f"{icon} {content}" + + def _safe_print(self, msg: str): + """线程安全的打印""" + with print_lock: + print(msg) + + def task_found(self, task_name: str, status: int = 2): + """发现任务""" + msg = self._format_msg(self.ICONS['task_found'], f"发现任务: {task_name} (状态: {status})") + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def task_skip(self, task_name: str): + """跳过任务""" + msg = self._format_msg(self.ICONS['task_skip'], f"[{task_name}] 已跳过") + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def task_complete(self, task_name: str): + """任务完成""" + msg = self._format_msg(self.ICONS['task_complete'], f"[{task_name}] 提交成功") + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def reward_get(self, task_name: str): + """奖励领取成功""" + msg = self._format_msg(self.ICONS['reward_get'], f"[{task_name}] 奖励领取成功") + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def info(self, content: str): + """普通信息""" + msg = self._format_msg(self.ICONS['info'], content) + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def success(self, content: str): + """成功信息""" + msg = self._format_msg(self.ICONS['success'], content) + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def error(self, content: str): + """错误信息""" + msg = self._format_msg(self.ICONS['error'], content) + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def warning(self, content: str): + """警告信息""" + msg = self._format_msg(self.ICONS['warning'], content) + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def user_info(self, account_index: int, mobile: str): + """用户信息""" + msg = self._format_msg(self.ICONS['user'], f"账号{account_index}: 【{mobile}】登录成功") + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def points_info(self, points: int, prefix: str = "当前积分"): + """积分信息""" + msg = self._format_msg(self.ICONS['money'], f"{prefix}: 【{points}】") + self._safe_print(msg) + with self.lock: + self.current_account_msg.append(msg) + self.messages.append(msg) + + def reset_account_msg(self): + """重置当前账号消息""" + self.current_account_msg = [] + + def get_all_messages(self) -> str: + """获取所有消息""" + return '\n'.join(self.messages) + + def get_account_messages(self) -> str: + """获取当前账号消息""" + return '\n'.join(self.current_account_msg) + + +# ==================== 代理管理器 ==================== +class ProxyManager: + """代理管理器""" + + def __init__(self, api_url: str): + self.api_url = api_url + self.logger = Logger() + + def get_proxy(self) -> Optional[Dict[str, str]]: + """获取代理 + 返回格式:{'http': 'http://ip:port', 'https': 'http://ip:port'} + """ + try: + if not self.api_url: + print('⚠️ 未配置代理API地址,将不使用代理') + return None + + response = requests.get(self.api_url, timeout=10) + if response.status_code == 200: + proxy_text = response.text.strip() + if ':' in proxy_text: + # 构建代理URL + if proxy_text.startswith('http://') or proxy_text.startswith('https://'): + proxy = proxy_text + else: + proxy = f'http://{proxy_text}' + + # 隐藏认证信息用于显示(如果有的话) + display_proxy = proxy + if '@' in proxy: + # 格式: http://user:pass@ip:port + parts = proxy.split('@') + if len(parts) == 2: + display_proxy = f"http://***:***@{parts[1]}" + + print(f"✅ 成功获取代理: {display_proxy}") + return {'http': proxy, 'https': proxy} + + print(f'❌ 获取代理失败: {response.text}') + return None + except Exception as e: + print(f'❌ 获取代理异常: {str(e)}') + return None + + +# ==================== HTTP客户端 ==================== +class SFHttpClient: + """顺丰HTTP客户端""" + + def __init__(self, config: Config, proxy_manager: ProxyManager): + self.config = config + self.proxy_manager = proxy_manager + self.session = requests.Session() + self.session.verify = False + + # 设置代理 + proxy = self.proxy_manager.get_proxy() + if proxy: + self.session.proxies = proxy + + # 默认请求头 + self.headers = { + 'Host': 'mcs-mimp-web.sf-express.com', + 'upgrade-insecure-requests': '1', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 NetType/WIFI MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63090551) XWEB/6945 Flue', + 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', + 'sec-fetch-site': 'none', + 'sec-fetch-mode': 'navigate', + 'sec-fetch-user': '?1', + 'sec-fetch-dest': 'document', + 'accept-language': 'zh-CN,zh', + 'platform': 'MINI_PROGRAM', + } + + def _generate_sign(self) -> Dict[str, str]: + """生成API签名""" + timestamp = str(int(round(time.time() * 1000))) + data = f'token={self.config.TOKEN}×tamp={timestamp}&sysCode={self.config.SYS_CODE}' + signature = hashlib.md5(data.encode()).hexdigest() + + return { + 'sysCode': self.config.SYS_CODE, + 'timestamp': timestamp, + 'signature': signature + } + + def request( + self, + url: str, + method: str = 'POST', + data: Optional[Dict] = None, + max_retries: int = REQUEST_RETRY_COUNT + ) -> Optional[Dict[str, Any]]: + """发送HTTP请求,带双层重试机制 + + Args: + url: 请求URL + method: 请求方法 GET/POST + data: 请求数据 + max_retries: 最大重试次数 + + Returns: + 响应JSON数据或None + """ + # 更新签名 + sign_data = self._generate_sign() + self.headers.update(sign_data) + + retry_count = 0 + proxy_retry_count = 0 + + while proxy_retry_count < MAX_PROXY_RETRIES: + try: + # 如果请求重试次数达到2次,尝试切换代理 + if retry_count >= 2: + print('请求已失败2次,尝试切换代理IP') + new_proxy = self.proxy_manager.get_proxy() + if new_proxy: + self.session.proxies = new_proxy + else: + print('⚠️ 切换代理失败,无可用代理') + retry_count = 0 # 重置请求重试计数 + + try: + if method.upper() == 'GET': + response = self.session.get(url, headers=self.headers, timeout=PROXY_TIMEOUT) + elif method.upper() == 'POST': + response = self.session.post(url, headers=self.headers, json=data or {}, timeout=PROXY_TIMEOUT) + else: + raise ValueError(f'不支持的请求方法: {method}') + + # 检查响应状态码 + response.raise_for_status() + + try: + res = response.json() + if res is None: + print(f'响应内容为空,正在重试 ({retry_count + 1}/{max_retries})') + retry_count += 1 + time.sleep(2) + continue + return res + except (json.JSONDecodeError, ValueError) as e: + print(f'JSON解析失败: {str(e)}, 响应内容: {response.text[:200]}') + retry_count += 1 + if retry_count < max_retries: + print(f'正在进行第{retry_count + 1}次重试...') + time.sleep(2) + continue + return None + + except requests.exceptions.RequestException as e: + retry_count += 1 + print(f'请求失败,正在重试 ({retry_count}/{max_retries}): {str(e)}') + # 如果是代理错误或SSL错误,增加代理重试计数 + if 'ProxyError' in str(e) or 'SSLError' in str(e): + proxy_retry_count += 1 + print(f'代理连接失败,尝试切换代理 ({proxy_retry_count}/{MAX_PROXY_RETRIES})') + if proxy_retry_count < MAX_PROXY_RETRIES: + new_proxy = self.proxy_manager.get_proxy() + if new_proxy: + self.session.proxies = new_proxy + time.sleep(2) + continue + + except Exception as e: + print(f'请求发生异常: {str(e)}') + proxy_retry_count += 1 + if proxy_retry_count < MAX_PROXY_RETRIES: + print(f'尝试切换代理 ({proxy_retry_count}/{MAX_PROXY_RETRIES})') + time.sleep(2) + continue + else: + print('达到最大代理重试次数,返回None') + return None + + print('请求最终失败,返回None') + return None + + def login(self, url: str, timeout: int = PROXY_TIMEOUT) -> tuple[bool, str, str]: + """ + 登录(兼容URL和CK格式) + + Args: + url: 登录URL 或 CK字符串(sessionId=xxx;_login_mobile_=xxx;_login_user_id_=xxx) + timeout: 超时时间(秒) + + Returns: + tuple: (是否成功, user_id, 手机号) + """ + try: + decoded_input = unquote(url) + if decoded_input.startswith('sessionId=') or '_login_mobile_=' in decoded_input: + cookie_dict = {} + for item in decoded_input.split(';'): + item = item.strip() + if '=' in item: + k, v = item.split('=', 1) + cookie_dict[k] = v + for k, v in cookie_dict.items(): + self.session.cookies.set(k, v, domain='mcs-mimp-web.sf-express.com') + user_id = cookie_dict.get('_login_user_id_', '') + phone = cookie_dict.get('_login_mobile_', '') + if phone: + return True, user_id, phone + else: + return False, '', '' + else: + decoded_url = unquote(url) + self.session.get(decoded_url, headers=self.headers, timeout=timeout) + cookies = self.session.cookies.get_dict() + user_id = cookies.get('_login_user_id_', '') + phone = cookies.get('_login_mobile_', '') + if phone: + return True, user_id, phone + else: + return False, '', '' + except Exception as e: + print(f'登录异常: {str(e)}') + return False, '', '' + + +# ==================== 任务执行器 ==================== +class TaskExecutor: + """任务执行器""" + + def __init__( + self, + http_client: SFHttpClient, + logger: Logger, + config: Config, + user_id: str + ): + self.http = http_client + self.logger = logger + self.config = config + self.user_id = user_id + self.total_points = 0 + + # 任务相关属性 + self.taskId = "" + self.taskCode = "" + self.strategyId = "" + self.title = "" + + @staticmethod + def generate_device_id(characters: str = 'abcdef0123456789') -> str: + """生成设备ID""" + result = '' + for char in 'xxxxxxxx-xxxx-xxxx': + if char == 'x': + result += random.choice(characters) + else: + result += char + return result + + def _extract_task_id_from_url(self, url: str) -> str: + """从URL中提取taskId""" + try: + from urllib.parse import parse_qs, urlparse, unquote + import json + + # 处理_ug_view_param参数 + parsed = urlparse(url) + params = parse_qs(parsed.query) + + if '_ug_view_param' in params: + ug_params = json.loads(unquote(params['_ug_view_param'][0])) + if 'taskId' in ug_params: + return str(ug_params['taskId']) # 确保返回字符串类型 + + # 如果URL是JSON格式的,尝试解析 + if url.startswith('com.sf-express://'): + json_str = url.split('_ug_view_param=')[1] + ug_params = json.loads(unquote(json_str)) + if 'taskId' in ug_params: + return str(ug_params['taskId']) # 确保返回字符串类型 + + except Exception as e: + self.logger.warning(f'从URL提取taskId失败: {e}') + + return '' + + def _set_task_attrs(self, task: Dict) -> None: + """设置任务属性""" + self.taskId = str(task.get('taskId', '')) # 确保是字符串类型 + self.taskCode = str(task.get('taskCode', '')) # 确保是字符串类型 + self.strategyId = int(task.get('strategyId', 0)) # 确保是整数类型 + self.title = str(task.get('title', '未知任务')) + self.point = int(task.get('point', 0)) # 确保是整数类型 + + # 如果taskCode为空,尝试从buttonRedirect中提取 + if not self.taskCode and 'buttonRedirect' in task: + extracted_task_id = self._extract_task_id_from_url(task['buttonRedirect']) + if extracted_task_id: + self.taskCode = extracted_task_id + self.logger.info(f'从buttonRedirect中提取到taskId: {self.taskCode}') + + def app_sign_in(self) -> tuple[bool, str]: + """APP每日签到(使用getUnFetchPointAndDiscount接口触发签到+领取) + + Returns: + tuple[bool, str]: (是否成功, 错误信息) + """ + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskSignPlusService~getUnFetchPointAndDiscount' + data = {} + + # 保存原有的platform头 + original_platform = self.http.headers.get('platform', 'MINI_PROGRAM') + + # 临时切换为APP平台 + self.http.headers['platform'] = 'SFAPP' + + try: + response = self.http.request(url, data=data) + if response and response.get('success'): + obj = response.get('obj', []) + + # 响应是一个数组,包含待领取的奖励 + if obj and isinstance(obj, list) and len(obj) > 0: + total_points = 0 + reward_names = [] + for item in obj: + packet_name = item.get('packetName', '未知奖励') + detail_value = item.get('detailValue', '0') + reward_names.append(packet_name) + try: + total_points += int(detail_value) + except: + pass + + self.logger.success(f'[APP签到] 签到成功,获得【{", ".join(reward_names)}】') + else: + self.logger.info(f'[APP签到] 今日已签到或无可领取奖励') + + return True, '' + else: + error_msg = response.get('errorMessage', '未知错误') if response else '请求失败' + + # 如果返回"没有待领取礼包",等待1秒后再次调用接口 + if '没有待领取礼包' in error_msg: + self.logger.info(f'[APP签到] 检测到需要二次领取,等待1秒后重试...') + time.sleep(1) + + # 再次调用getUnFetchPointAndDiscount接口 + response2 = self.http.request(url, data=data) + if response2 and response2.get('success'): + obj2 = response2.get('obj', []) + + if obj2 and isinstance(obj2, list) and len(obj2) > 0: + total_points = 0 + reward_names = [] + for item in obj2: + packet_name = item.get('packetName', '未知奖励') + detail_value = item.get('detailValue', '0') + reward_names.append(packet_name) + try: + total_points += int(detail_value) + except: + pass + + self.logger.success(f'[APP签到] 二次领取成功,获得【{", ".join(reward_names)}】') + else: + self.logger.info(f'[APP签到] 二次领取完成,但无可领取奖励') + + return True, '' + else: + error_msg2 = response2.get('errorMessage', '未知错误') if response2 else '请求失败' + self.logger.error(f'[APP签到] 二次领取失败: {error_msg2}') + return False, error_msg2 + else: + self.logger.error(f'[APP签到] 失败: {error_msg}') + return False, error_msg + finally: + # 恢复原有的platform头 + self.http.headers['platform'] = original_platform + + def sign_in(self) -> tuple[bool, str]: + """小程序每日签到 + + Returns: + tuple[bool, str]: (是否成功, 错误信息) + """ + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskSignPlusService~automaticSignFetchPackage' + data = {"comeFrom": "vioin", "channelFrom": "WEIXIN"} + + response = self.http.request(url, data=data) + if response and response.get('success'): + count_day = response.get('obj', {}).get('countDay', 0) + packet_list = response.get('obj', {}).get('integralTaskSignPackageVOList', []) + + if packet_list: + packet_name = packet_list[0].get('packetName', '未知奖励') + self.logger.success(f'签到成功,获得【{packet_name}】,本周累计签到【{count_day + 1}】天') + else: + self.logger.info(f'今日已签到,本周累计签到【{count_day + 1}】天') + return True, '' + else: + error_msg = response.get('errorMessage', '未知错误') if response else '请求失败' + self.logger.error(f'签到失败: {error_msg}') + return False, error_msg + + def new_sign_in(self) -> tuple[bool, str]: + """新签到(integralSignV2Service) + + Returns: + tuple[bool, str]: (是否成功, 错误信息) + """ + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralSignV2Service~sign' + data = {} + + original_platform = self.http.headers.get('platform', 'MINI_PROGRAM') + self.http.headers['platform'] = 'SFAPP' + + try: + response = self.http.request(url, data=data) + if response and response.get('success'): + obj = response.get('obj', {}) + signed = obj.get('signed', False) + day_count = obj.get('dayCount', 0) + total_count = obj.get('totalCount', 0) + award = obj.get('award', {}) + award_type = obj.get('awardType', '') + award_num = obj.get('awardNum', 0) + + if signed and award: + gift_bag_name = award.get('giftBagName', '未知奖励') + self.logger.success(f'[新签到] 签到成功,连续第{day_count}天,获得【{gift_bag_name}】') + elif signed: + self.logger.info(f'[新签到] 今日已签到,连续第{day_count}天') + else: + self.logger.info(f'[新签到] 签到完成') + + return True, '' + else: + error_msg = response.get('errorMessage', '未知错误') if response else '请求失败' + self.logger.error(f'[新签到] 失败: {error_msg}') + return False, error_msg + finally: + self.http.headers['platform'] = original_platform + + def get_task_list(self) -> List[Dict]: + """获取任务列表""" + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskStrategyService~queryPointTaskAndSignFromES' + + all_tasks = [] + task_codes_seen = set() + + for channel_type in ['1', '2', '3', '4','01','02','03','04']: + data = { + 'channelType': channel_type, + 'deviceId': self.generate_device_id(), + } + + response = self.http.request(url, data=data) + + if response and response.get('success') and response.get('obj'): + # 只在第一次请求时获取总积分 + if channel_type == '1': + self.total_points = response['obj'].get('totalPoint', 0) + + tasks = response['obj'].get('taskTitleLevels', []) + + # 去重添加任务 + for task in tasks: + task_code = task.get('taskCode') + task_title = task.get('title', '未知任务') + + # 尝试提取taskId + if 'buttonRedirect' in task: + extracted_id = self._extract_task_id_from_url(task['buttonRedirect']) + if extracted_id and not task_code: + task_code = extracted_id + task['taskCode'] = extracted_id + + # 如果taskCode为空,但能从buttonRedirect中提取到taskId,则使用提取的taskId + if not task_code and 'buttonRedirect' in task: + extracted_id = self._extract_task_id_from_url(task['buttonRedirect']) + if extracted_id: + task['taskCode'] = extracted_id + task_code = extracted_id + + # 如果taskCode仍然为空,则跳过 + if not task_code: + continue + + # 检查是否已存在相同taskCode的任务 + if task_code not in task_codes_seen: + task_codes_seen.add(task_code) + all_tasks.append(task) + else: + error_msg = response.get('errorMessage', '未知错误') if response else '请求失败' + self.logger.warning(f'获取 channelType={channel_type} 的任务失败: {error_msg}') + + return all_tasks + + def execute_task(self) -> bool: + """执行单个任务""" + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonRoutePost/memberEs/taskRecord/finishTask' + data = {'taskCode': self.taskCode} + + response = self.http.request(url, data=data) + if response and response.get('success'): + return True + return False + + def _update_points(self): + """更新积分显示""" + tasks = self.get_task_list() + if tasks: + self.logger.points_info(self.total_points, "当前积分") + + def receive_task_reward(self) -> bool: + """领取任务奖励""" + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberNonactivity~integralTaskStrategyService~fetchIntegral' + data = { + "strategyId": self.strategyId, + "taskId": self.taskId, + "taskCode": self.taskCode, + "deviceId": self.generate_device_id() + } + + response = self.http.request(url, data=data) + if response: + if response.get('success'): + self.logger.success(f'成功领取任务奖励: {self.title}') + return True + return False + + def get_welfare_list(self) -> List[Dict]: + """获取生活特权列表""" + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberGoods~mallGoodsLifeService~list' + data = { + "memGrade": 3, + "categoryCode": "SHTQ", + "showCode": "SHTQWNTJ" + } + + response = self.http.request(url, data=data) + if response and response.get('success'): + obj_list = response.get('obj', []) + # 收集所有可领取的特权 + welfare_list = [] + for module in obj_list: + goods_list = module.get('goodsList', []) + for goods in goods_list: + # exchangeStatus=1 表示可以领取 + if goods.get('exchangeStatus') == 1: + welfare_list.append({ + 'goodsId': goods.get('goodsId'), + 'goodsNo': goods.get('goodsNo'), + 'goodsName': goods.get('goodsName'), + 'showName': goods.get('showName', ''), + 'id': goods.get('id') + }) + return welfare_list + return [] + + def receive_welfare(self, goods_no: str, goods_name: str, task_code: str) -> bool: + """领取生活特权""" + url = 'https://mcs-mimp-web.sf-express.com/mcs-mimp/commonPost/~memberGoods~pointMallService~createOrder' + data = { + "from": "Point_Mall", + "orderSource": "POINT_MALL_EXCHANGE", + "goodsNo": goods_no, + "quantity": 1, + "taskCode": task_code + } + + response = self.http.request(url, data=data) + if response and response.get('success'): + order_no = response.get('obj', {}).get('orderNo', '') + self.logger.success(f'成功领取生活特权: {goods_name} (订单号: {order_no})') + return True + else: + error_msg = response.get('errorMessage', '未知错误') if response else '请求失败' + self.logger.error(f'领取生活特权失败: {goods_name} - {error_msg}') + return False + + def handle_welfare_task(self, task_title: str) -> bool: + """处理领取生活特权任务""" + self.logger.info('正在获取生活特权列表...') + + welfare_list = self.get_welfare_list() + if not welfare_list: + self.logger.warning('没有可领取的生活特权') + return False + + self.logger.info(f'找到 {len(welfare_list)} 个可领取的生活特权') + + # 尝试领取第一个可用的特权 + for welfare in welfare_list: + goods_no = welfare.get('goodsNo') + goods_name = welfare.get('goodsName') + show_name = welfare.get('showName') + + if not goods_no: + continue + + display_name = f"{show_name} - {goods_name}" if show_name else goods_name + + # 使用任务的 taskCode + if self.receive_welfare(goods_no, display_name, self.taskCode): + return True + + # 如果领取失败,等待一下再尝试下一个 + time.sleep(1) + + return False + + def run_all_tasks(self) -> tuple[int, int]: + """执行所有任务 + + Returns: + tuple: (执行前积分, 执行后积分) + """ + print('-'*50) + + # 只在这里显示一次任务列表更新信息 + self.logger.info('正在获取任务列表...') + tasks = self.get_task_list() + if not tasks: + self.logger.error('获取任务列表失败') + return (0, 0) + + points_before = self.total_points + self.logger.points_info(points_before, "执行前积分") + + for task in tasks: + task_title = task.get('title', '未知任务') + task_status = task.get('status') + + # 状态3表示已完成 + if task_status == 3: + self.logger.success(f'{task_title} - 已完成') + continue + + # 跳过特定任务 + if task_title in self.config.SKIP_TASKS: + self.logger.task_skip(task_title) + continue + + # 提取任务属性 + self._set_task_attrs(task) + + # 检查是否成功提取 taskCode + if not self.taskCode: + # 如果taskCode为空,尝试从buttonRedirect中提取 + if 'buttonRedirect' in task: + self.logger.info(f'尝试从buttonRedirect中提取taskCode: {task_title}') + extracted_task_id = self._extract_task_id_from_url(task['buttonRedirect']) + if extracted_task_id: + self.taskCode = extracted_task_id + self.logger.info(f'成功从buttonRedirect中提取到taskCode: {self.taskCode}') + else: + self.logger.warning(f'{task_title} - 无法从buttonRedirect提取taskCode,跳过') + continue + else: + self.logger.warning(f'{task_title} - 无法提取taskCode,跳过') + continue + + # 发现任务 + self.logger.task_found(task_title, task_status) + + # 特殊任务处理 - 需要在状态判断之前处理 + if '领任意生活特权福利' in task_title: + # 先处理生活特权领取 + if self.handle_welfare_task(task_title): + time.sleep(2) + # 然后执行任务提交 + if self.execute_task(): + self.logger.task_complete(task_title) + time.sleep(2) + # 领取奖励 + if self.receive_task_reward(): + self.logger.reward_get(task_title) + self._update_points() + else: + self.logger.warning(f'任务执行失败: {task_title}') + else: + self.logger.warning(f'{task_title} - 无法完成,跳过') + time.sleep(3) + continue + + # 状态1表示需要先执行任务 + if task_status == 1: + # 特殊处理连签7天任务 + if '连签7天' in task_title and 'process' in task: + current, total = map(int, task['process'].split('/')) + if current < total: + self.logger.info(f'【{task_title}】进度: {task["process"]},还需{total - current}天') + continue + + if self.execute_task(): + self.logger.task_complete(task_title) + time.sleep(2) + # 执行成功后,将状态更新为2(可领取奖励) + task_status = 2 + else: + self.logger.warning(f'任务执行失败: {task_title}') + continue + + # 状态2表示可领取奖励 + if task_status == 2: + # 先尝试直接领取奖励 + if self.receive_task_reward(): + self.logger.reward_get(task_title) + # 更新积分 + self._update_points() + continue + + # 如果直接领取失败,尝试先执行任务再领取 + if self.execute_task(): + self.logger.task_complete(task_title) + time.sleep(2) + # 再次尝试领取奖励 + if self.receive_task_reward(): + self.logger.reward_get(task_title) + self._update_points() + else: + self.logger.warning(f'任务执行失败: {task_title}') + continue + + time.sleep(3) + + # 获取最新积分 + tasks = self.get_task_list() + points_after = self.total_points if tasks else points_before + if tasks: + self.logger.points_info(points_after, "执行后积分") + + return (points_before, points_after) + + +# ==================== 账号管理器 ==================== +class AccountManager: + """账号管理器""" + + def __init__(self, account_url: str, account_index: int, config: Config): + self.account_url = account_url + self.account_index = account_index + 1 + self.config = config + self.logger = Logger() + self.proxy_manager = ProxyManager(config.PROXY_API_URL) + + # 登录重试机制(参考顺丰代理.py的实现) + self.login_success = False + self.user_id = None + self.phone = None + self.http_client = None + + retry_count = 0 + while retry_count < MAX_PROXY_RETRIES and not self.login_success: + try: + # 每次重试都重新获取代理和创建HTTP客户端 + self.http_client = SFHttpClient(config, self.proxy_manager) + + # 尝试登录(带超时) + success, self.user_id, self.phone = self.http_client.login(account_url) + + if success: + masked_phone = self.phone[:3] + "*" * 4 + self.phone[7:] + self.logger.user_info(self.account_index, masked_phone) + self.login_success = True + break + else: + if retry_count < MAX_PROXY_RETRIES - 1: + print(f'账号{self.account_index} 登录失败,尝试重新获取代理 ({retry_count + 1}/{MAX_PROXY_RETRIES})') + time.sleep(2) + except Exception as e: + print(f'账号{self.account_index} 登录异常: {str(e)[:100]}') + + retry_count += 1 + + # 如果所有代理重试都失败,记录错误 + if not self.login_success: + self.logger.error(f'账号{self.account_index} 登录失败,已重试{MAX_PROXY_RETRIES}次,所有代理均不可用') + + def run(self) -> Dict[str, Any]: + """运行账号任务 + + Returns: + Dict: 包含账号统计信息的字典 + """ + if not self.login_success: + return { + 'success': False, + 'phone': '', + 'points_before': 0, + 'points_after': 0, + 'points_earned': 0 + } + + # 随机延迟 + wait_time = random.randint(1000, 3000) / 1000.0 + time.sleep(wait_time) + + # 初始化任务执行器 + executor = TaskExecutor(self.http_client, self.logger, self.config, self.user_id) + + # 先执行APP签到 + app_sign_success, app_error_msg = executor.app_sign_in() + time.sleep(1) + + # 执行新签到 + new_sign_success, new_sign_error = executor.new_sign_in() + time.sleep(1) + + # 再执行小程序签到 + sign_success, error_msg = executor.sign_in() + + # 如果签到失败且错误信息包含“活动太火爆”,尝试重新登录 + if not sign_success and '活动太火爆' in error_msg: + max_retries = 3 + for retry in range(max_retries): + self.logger.warning(f'签到失败(代理IP问题),{2}秒后重新获取代理并重试(第{retry + 1}次)...') + time.sleep(2) + + try: + # 重新创建HTTP客户端(会自动获取新代理) + self.http_client = SFHttpClient(self.config, self.proxy_manager) + + # 重新登录 + success, self.user_id, self.phone = self.http_client.login(self.account_url) + + if success: + # 更新执行器的HTTP客户端 + executor.http = self.http_client + executor.user_id = self.user_id + + # 重试签到 + sign_success, error_msg = executor.sign_in() + + if sign_success: + self.logger.success('重新登录后签到成功') + break + elif '活动太火爆' not in error_msg: + # 如果不是代理问题,则不再重试 + break + else: + if retry == max_retries - 1: + self.logger.error(f'重新登录失败,已重试{max_retries}次') + except Exception as e: + if retry == max_retries - 1: + self.logger.error(f'重新登录异常: {str(e)[:100]},已重试{max_retries}次') + + # 执行其他任务 + points_before, points_after = executor.run_all_tasks() + points_earned = points_after - points_before + + # 返回统计信息 + return { + 'success': True, + 'phone': self.phone, + 'points_before': points_before, + 'points_after': points_after, + 'points_earned': points_earned + } + + +# ==================== 单账号执行函数 ==================== +def run_single_account(account_info: str, index: int, config: Config) -> Dict[str, Any]: + """ + 执行单个账号的任务(线程安全) + + Args: + account_info: 账号信息 + index: 账号索引 + config: 配置对象 + + Returns: + Dict: 包含账号统计信息的字典 + """ + try: + with print_lock: + print(f"🚀 开始执行账号{index + 1}") + + account = AccountManager(account_info, index, config) + result = account.run() + + if result['success']: + with print_lock: + print(f"✅ 账号{index + 1}执行完成") + else: + with print_lock: + print(f"❌ 账号{index + 1}执行失败") + + result['index'] = index + return result + except Exception as e: + error_msg = f"账号{index + 1}执行异常: {str(e)}" + with print_lock: + print(f"❌ {error_msg}") + return { + 'index': index, + 'success': False, + 'phone': '', + 'points_before': 0, + 'points_after': 0, + 'points_earned': 0, + 'error': error_msg + } + + +# ==================== 主程序 ==================== +def main(): + """主函数""" + config = Config() + + env_value = os.getenv(config.ENV_NAME) + if not env_value: + print(f"❌ 未找到环境变量 {config.ENV_NAME},请检查配置") + return + + account_urls = [url.strip() for url in env_value.split('&') if url.strip()] + if not account_urls: + print(f"❌ 环境变量 {config.ENV_NAME} 为空或格式错误") + return + + # 随机打乱账号顺序 + random.shuffle(account_urls) + print(f"🔀 已随机打乱账号执行顺序") + + print("=" * 50) + print(f"🎉 {config.APP_NAME} v{config.VERSION}") + print(f"👨‍💻 作者: 爱学习的呆子") + print(f"📱 共获取到 {len(account_urls)} 个账号") + print(f"⚙️ 并发数量: {CONCURRENT_NUM}") + print(f"⏰ 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("=" * 50) + + # 收集所有账号的统计信息 + all_results = [] + + if CONCURRENT_NUM <= 1: + # 串行执行模式 + print("🔄 使用串行模式执行...") + for index, account_url in enumerate(account_urls): + account = AccountManager(account_url, index, config) + result = account.run() + result['index'] = index + all_results.append(result) + + if index < len(account_urls) - 1: + print("=" * 50) + print(f"⏳ 等待 2 秒后执行下一个账号...") + time.sleep(2) + else: + # 并发执行模式 + print(f"🔄 使用并发模式执行,并发数: {CONCURRENT_NUM}") + + # 使用线程池执行 + with ThreadPoolExecutor(max_workers=CONCURRENT_NUM) as executor: + # 提交所有任务 + future_to_index = { + executor.submit(run_single_account, account_url, index, config): index + for index, account_url in enumerate(account_urls) + } + + # 等待任务完成 + for future in as_completed(future_to_index): + result = future.result() + all_results.append(result) + + # 按索引排序结果 + all_results.sort(key=lambda x: x['index']) + + # 统计成功和失败数量 + success_count = sum(1 for r in all_results if r['success']) + fail_count = len(all_results) - success_count + total_earned = sum(r['points_earned'] for r in all_results if r['success']) + + # 显示汇总统计表格 + print(f"\n" + "=" * 80) + print(f"📊 积分统计汇总") + print("=" * 80) + print(f"{'序号':<6} {'手机号':<15} {'今日获得积分':<15} {'总积分':<15} {'状态':<10}") + print("-" * 80) + + for result in all_results: + index = result['index'] + 1 + phone = result['phone'][:3] + "****" + result['phone'][7:] if result['phone'] else "未登录" + earned = result['points_earned'] + total = result['points_after'] + status = "✅成功" if result['success'] else "❌失败" + + print(f"{index:<6} {phone:<15} {earned:<15} {total:<15} {status:<10}") + + print("-" * 80) + print(f"{'汇总':<6} {'账号总数: ' + str(len(all_results)):<15} {'今日总获得: ' + str(total_earned):<15} {'':<15} {'成功: ' + str(success_count):<10}") + print("=" * 80) + + print("\n🎊 所有账号任务执行完成!") + + +if __name__ == '__main__': + main() \ No newline at end of file