LittlePythonSheep/md_ktv.py
2024-01-07 21:19:34 +08:00

255 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 咪哒KTV v1.00
#
# 微信小程序-咪哒KTV
# 只有签到得咪豆, 每天跑一两次就行
# 咪豆可以兑换会员
#
# 抓取请求https://mk-gateway-pro.singworld.cn/mk-outside/api/sign/toSign的请求头
# uid是咪哒号 token是登录后的token
# 多账号换行隔开
# export md_ktv_cookie="uid@token@wxpusher_uid"
#
# cron "30 7,18 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('咪哒KTV签到')
import requests, json
import os
import urllib3
import random
from wxpusher import WxPusher
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
md_ktv_cookie = [
'1093049864@AY0-xf4s59zE_Cf6FtqrmLNNXrukcSdWyKFz3AdMEAY.KkAKKvYSMgofx6Kbm8aZ_huwqedgOLU_0kcmYkdzNqo@UID_iEX04ORBuLG5ch5B3tbqBUx3G8SP']
# md_ktv_cookie = os.getenv("md_ktv_cookie").split('\n')
# AI评分主题id
ai_themeId = 129
# 投票活动主题id
vote_themeId = 130
# 签到
def to_sign():
data = {
"uid": uid
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/sign/toSign"
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
if json.loads(response.text)['success']:
print('签到成功')
# 播放录音15s +2
def play_record():
data = {
"taskId": 4
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/task/doneTask"
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
if json.loads(response.text)['success']:
print('完成播放录音15s任务')
# 点赞 +1
def to_like():
data = {
"paramId": "45829_LOW_20231210183212_206925",
"favourUid": uid,
"favourType": 3
}
del_url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/favor/delete"
# 先取消然后再点赞
response = requests.post(del_url, headers=headers, json=data, verify=False)
like_url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/favor/create"
response = requests.post(like_url, headers=headers, json=data, verify=False)
if json.loads(response.text)['success']:
print('点赞成功')
# 投票 +1
def to_vote():
# 获取投票列表
data = {
"page": 1,
"rows": 10,
"themeId": vote_themeId,
"uid": uid,
"sortType": 3
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/option/searchVoteRecordResult"
response = requests.post(url, headers=headers, json=data, verify=False)
# 获取投票列表
voteList = json.loads(response.text)['data']['list']
# 获取列表中投票数最少的投票id
min_vote = min(voteList, key=lambda x: x['voteCount'])
optionId = min_vote['optionId']
data = {
"optionId": optionId
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/vote/vote"
# 循环投票直到返回json中的success为false
while True:
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
if not json.loads(response.text)['success']:
print('投票成功')
break
# AI评分 +1
def ai_score():
# 获取评分列表
data = {
"page": 1,
"rows": 10,
"themeId": ai_themeId,
"uid": uid
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/event/notScoreList"
response = requests.post(url, headers=headers, json=data, verify=False)
# 获取评分列表
scoreList = json.loads(response.text)['data']['list']
# 随机选择一个进行评分
scoreUid = scoreList[random.randint(1, 9)]['uid']
# 获取对应的optionId
data = {
"uid": scoreUid,
"themeId": ai_themeId
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/event/getDetailList"
response = requests.post(url, headers=headers, json=data, verify=False)
# 获取对应的optionId
optionId = json.loads(response.text)['data'][0]['optionId']
# 进行评分 5~10分
score = random.randint(5, 8)
data = {
"optionId": optionId,
"score": score
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/event/setScore"
response = requests.post(url, headers=headers, json=data, verify=False)
if json.loads(response.text)['success']:
print('AI评分成功')
# 领取所有咪豆
def get_all_mido():
data = {} # 如果需要发送数据,可以在这里填写你的数据字典
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/score/receiveWaitScore"
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
# print(response.status_code)
print(json.loads(response.text)['msg'])
# 成功后推送,如果有推送uid则推送没有则直接跳过
if wxpusher_uid != '' and json.loads(response.text)['msg'] == '领取成功':
# 获取咪豆数量
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/score/getCurScore?uid=1093049864"
response = requests.get(url, headers=headers, json=data, verify=False)
score = json.loads(response.text)['data']
# 获取当前可兑换的商品列表
url = "https://wtmall-outside-pro.singworld.net/wtmall/shop/index?platformId=2"
data = {
"page": 1,
"rows": 10,
"shopId": 0,
"productTypeCode": "2003",
"uid": uid,
"platformId": 2
}
headers2 = {
"Host": "wtmall-outside-pro.singworld.net",
"token": token,
"xweb_xhr": "1",
"uid": uid,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100",
"authorization": "Bearer " + token,
"user_token": token,
"content-type": "application/json",
"accept": "*/*",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://servicewechat.com/wx689a4b8fe3fccc63/196/page-frame.html",
"accept-language": "zh-CN,zh;q=0.9"
}
response = requests.post(url, headers=headers2, json=data, verify=False)
details = json.loads(response.text)['data']['list']
# 获取商品名称,价格
content = ''
# 将所有价格存入列表,用来判断是否有可兑换的商品
price_list = []
for j in range(len(details)):
detail = details[j]
name = detail['goodsName']
price = detail['currentScore']
price_list.append(price)
# 拼接推送内容
content += '' + str(j + 1) + '件商品名称:' + name + ',价格:' + str(price) + '咪豆' + '\n'
# 判断是否有可兑换的商品
if min(price_list) <= score:
# 获取最兑换的商品名称
min_price = min(price_list)
index = price_list.index(min_price)
name = details[index]['goodsName']
content += '当前咪豆数量:' + str(
score) + ',有可兑换的商品' + '\n' + '最低价格商品名称:' + name + ',价格:' + str(
min_price) + '咪豆' + '\n'
else:
content += '当前咪豆数量:' + str(score) + ',没有可兑换的商品'
WxPusher.send_message('咪哒K歌签到成功,当前咪豆数量:' + str(score) + '\n' + content + '\n',
uids=[wxpusher_uid],
token='AT_uj0Ezms54MZVD3xO5R5i1Wh3vR7nMety')
print(content)
if __name__ == '__main__':
# 遍历所有帐号
for i in range(len(md_ktv_cookie)):
cookie = md_ktv_cookie[i]
uid = cookie.split('@')[0]
token = cookie.split('@')[1]
# 判断是否有推送uid
if len(cookie.split('@')) == 3:
wxpusher_uid = cookie.split('@')[2]
else:
wxpusher_uid = ''
headers = {
"Host": "mk-gateway-pro.singworld.cn",
"token": token,
"xweb_xhr": "1",
"uid": uid,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100",
"authorization": "Bearer " + token,
"user_token": token,
"content-type": "application/json",
"accept": "*/*",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://servicewechat.com/wx689a4b8fe3fccc63/196/page-frame.html",
"accept-language": "zh-CN,zh;q=0.9"
}
print(f'开始第{i + 1}个帐号签到')
# 1.签到
to_sign()
# 2.播放录音15s +2
play_record()
# 3.点赞 +1
to_like()
# 4.投票 +1
# to_vote()
# 5.AI评分 +1
ai_score()
# 6.领取咪豆
get_all_mido()