littlepythonsheep/蜜雪冰城.py
2024-08-16 11:11:15 +08:00

145 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import concurrent.futures
import hashlib
import logging
import threading
import time
from datetime import datetime, timedelta
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ----固定变量区----
MARKETING_ID = '1816854086004391938'
SALT = "c274bac6493544b89d9c4f9d8d542b84"
# 任务数和线程数
TASKS_NUM = 50 # 运行 50 次
THREADS_NUM = 5 # 最大线程数 5
# ----自定义变量区----
TOKEN = ('eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJ3eF8xNDU0Mzg5NzIyNTY1ODQ5MDg5IiwiaWF0IjoxNzIyMDcxNjcxfQ'
'.uQ9849O6WRN_Fu8B0U2KjrmL9qkknZTBJi4NNZfcZg6gJQfYU6N2l2mVnN3h0bsIiyQzffN7OpFOjWdl2vn7mQ')
# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s')
# 全局停止事件
stop_event = threading.Event()
def gen_sign(json_data: dict, salt: str) -> str:
sorted_keys = sorted(json_data.keys())
data_str = '&'.join(f'{key}={json_data[key]}' for key in sorted_keys)
data_str += salt
md5_hash = hashlib.md5(data_str.encode('utf-8')).hexdigest()
return md5_hash
def fetch_secretword():
headers1 = {
'Referer': 'https://mxsa-h5.mxbc.net/',
'Host': 'mxsa.mxbc.net',
'Origin': 'https://mxsa-h5.mxbc.net',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.50(0x18003233) NetType/4G Language/zh_CN miniProgram/wx7696c66d2245d107',
'Content-type': 'application/json;charset=UTF-8',
}
params1 = {
'marketingId': MARKETING_ID,
's': '2',
'stamp': str(int(time.time() * 1000)),
}
params1['sign'] = gen_sign(params1, SALT)
response = requests.get('https://mxsa.mxbc.net/api/v1/h5/marketing/secretword/info', params=params1,
headers=headers1, verify=False)
secretword = response.json()["data"]["hintWord"].replace("本场口令:", "")
logging.info(f"本场口令:{secretword}")
return secretword
HEADERS = {
'Host': 'mxsa.mxbc.net',
'Accept': 'application/json, text/plain, */*',
'Access-Token': TOKEN,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090b17)XWEB/11177',
'Origin': 'https://mxsa-h5.mxbc.net',
'Sec-Fetch-Site': 'same-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://mxsa-h5.mxbc.net/',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
def exchange(secretword):
if stop_event.is_set():
return
try:
url = 'https://mxsa.mxbc.net/api/v1/h5/marketing/secretword/confirm'
json_data = {
'marketingId': MARKETING_ID,
'round': start_time_str,
'secretword': secretword,
's': 2,
'stamp': str(int(time.time() * 1000)),
}
json_data['sign'] = gen_sign(json_data, SALT)
res = requests.post(url, headers=HEADERS, json=json_data, verify=False)
response_data = res.json()
logging.info(f'任务开始: {res.text}')
if response_data.get("code") == 5310:
logging.warning("检测到 code 5310停止所有线程")
stop_event.set()
except requests.RequestException as e:
logging.error(f'任务失败: {e}')
def threading_run(tasks, threads, secretword):
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
futures = [executor.submit(exchange, secretword) for _ in range(tasks)]
for future in concurrent.futures.as_completed(futures):
if stop_event.is_set():
break
try:
future.result()
except Exception as e:
logging.error(f'任务产生了异常: {e}')
def start_task_at_time(start_time: datetime, secretword: str):
# 计算提前 0.5 秒的时间
start_time = start_time - timedelta(seconds=0.5)
current_time = datetime.now()
sleep_time = (start_time - current_time).total_seconds()
while sleep_time > 0:
if sleep_time > 1:
countdown_time = int(sleep_time)
logging.info(f"倒计时 {countdown_time}")
time.sleep(1)
else:
logging.info(f"等待 {sleep_time} 秒后开始任务")
time.sleep(sleep_time)
break
current_time = datetime.now()
sleep_time = (start_time - current_time).total_seconds()
threading_run(TASKS_NUM, THREADS_NUM, secretword)
if __name__ == '__main__':
secretword = fetch_secretword()
# 设置任务开始的具体时间,格式为 "HH:MM"
start_time_str = "16:00"
today = datetime.today().strftime('%Y-%m-%d')
start_time = datetime.strptime(f"{today} {start_time_str}", '%Y-%m-%d %H:%M')
start_task_at_time(start_time, secretword)