littlepythonsheep/md_ktv.py
2024-04-03 09:56:48 +08:00

295 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 咪哒KTV v1.00
#
# 微信小程序-咪哒KTV
# 只有签到得咪豆, 每天跑一两次就行
# 咪豆可以兑换会员
#
# 抓取请求https://mk-gateway-pro.singworld.cn/mk-outside/api/sign/toSign的请求头
# uid是咪哒号 token是登录后的token
# 多账号换行隔开
# export md_ktv_cookie="uid@token@wxpusher_uid"
#
# cron "30 7,18 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('咪哒KTV签到')
import requests, json
import os
import urllib3
import random
from wxpusher import WxPusher
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
md_ktv_cookie = os.getenv("md_ktv_cookie").split('\n')
# md_ktv_cookie = ['1093049864@PZhaGyo6ao8oVw03_Re-LkKH98_kLZzngBzNSZC2bB0.7X7SpCW4Tz30Wf_tbXpeJc8zJokr_a34fYYNoxzPP3U@UID_iEX04ORBuLG5ch5B3tbqBUx3G8SP']
# AI评分主题id
ai_themeId = ''
# 投票活动主题id
vote_themeId = ''
# 点赞歌曲id
paramId = "45829_LOW_20231210183212_206925"
# 投票歌曲id
vote_optionId = 20486
# 获取活动主题列表
def get_theme_list():
global ai_themeId, vote_themeId # 声明全局变量
data = {
"page": 1,
"rows": 5
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/theme/getThemeListJson"
response = requests.post(url, headers=headers, json=data, verify=False)
theme_list = json.loads(response.text)['data']['list']
# 只取前两个活动
for i in range(2):
theme = theme_list[i]
print('' + str(i + 1) + '个主题名称:' + theme['name'] + ',主题id:' + str(theme['id']) + ',主题类型:' + str(
theme['themeType']))
if theme['themeType'] == 1:
vote_themeId = theme['id']
elif theme['themeType'] == 2:
ai_themeId = theme['id']
# 签到
def to_sign():
data = {
"uid": uid
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/sign/toSign"
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
print(response.text)
if json.loads(response.text)['msg'] == 'invalid token':
# print('token失效,跳过该帐号,账号为:' + uid)
# 给推送uid推送消息
WxPusher.send_message('token失效,请重新获取',
uids=[wxpusher_uid],
token='AT_uj0Ezms54MZVD3xO5R5i1Wh3vR7nMety')
return False
if json.loads(response.text)['success']:
print('签到成功')
return True
# 播放录音15s +2
def play_record():
data = {
"taskId": 4
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/task/doneTask"
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
if json.loads(response.text)['success']:
print('完成播放录音15s任务')
# 点赞 +1
def to_like():
data = {
"paramId": paramId,
"favourUid": uid,
"favourType": 3
}
del_url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/favor/delete"
# 先取消然后再点赞
response = requests.post(del_url, headers=headers, json=data, verify=False)
like_url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/favor/create"
response = requests.post(like_url, headers=headers, json=data, verify=False)
if json.loads(response.text)['success']:
print('点赞成功')
# 投票 +1
def to_vote():
# 获取投票列表
data = {
"page": 1,
"rows": 10,
"themeId": vote_themeId,
"uid": uid,
"sortType": 3
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/option/searchVoteRecordResult"
response = requests.post(url, headers=headers, json=data, verify=False)
# 获取投票列表
voteList = json.loads(response.text)['data']['list']
# 获取列表中投票数最少的投票id
min_vote = min(voteList, key=lambda x: x['voteCount'])
optionId = min_vote['optionId']
# 判断是否有投票id如果有就用投票id投票没有就用默认的投票id
if vote_optionId != '':
optionId = vote_optionId
data = {
"optionId": optionId
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/vote/vote"
# 循环投票直到返回json中的success为false
while True:
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
if not json.loads(response.text)['success']:
print('投票成功')
break
# AI评分 +1
def ai_score():
# 获取评分列表
data = {
"page": 1,
"rows": 10,
"themeId": ai_themeId,
"uid": uid
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/event/notScoreList"
response = requests.post(url, headers=headers, json=data, verify=False)
# 获取评分列表
scoreList = json.loads(response.text)['data']['list']
# 随机选择一个进行评分
scoreUid = scoreList[random.randint(1, 9)]['uid']
# 获取对应的optionId
data = {
"uid": scoreUid,
"themeId": ai_themeId
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/event/getDetailList"
response = requests.post(url, headers=headers, json=data, verify=False)
# 获取对应的optionId
optionId = json.loads(response.text)['data'][0]['optionId']
# 进行评分 5~10分
score = random.randint(5, 8)
data = {
"optionId": optionId,
"score": score
}
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/event/setScore"
response = requests.post(url, headers=headers, json=data, verify=False)
if json.loads(response.text)['success']:
print('AI评分成功')
# 领取所有咪豆
def get_all_mido():
data = {} # 如果需要发送数据,可以在这里填写你的数据字典
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/score/receiveWaitScore"
# 添加 verify=False 来忽略SSL证书验证
response = requests.post(url, headers=headers, json=data, verify=False)
# print(response.status_code)
print(json.loads(response.text)['msg'])
# 成功后推送,如果有推送uid则推送没有则直接跳过
if wxpusher_uid != '' and json.loads(response.text)['msg'] == '领取成功':
# 获取咪豆数量
url = "https://mk-gateway-pro.singworld.cn/mk-outside/api/score/getCurScore?uid=1093049864"
response = requests.get(url, headers=headers, json=data, verify=False)
score = json.loads(response.text)['data']
# 获取当前可兑换的商品列表
url = "https://wtmall-outside-pro.singworld.net/wtmall/shop/index?platformId=2"
data = {
"page": 1,
"rows": 10,
"shopId": 0,
"productTypeCode": "2003",
"uid": uid,
"platformId": 2
}
headers2 = {
"Host": "wtmall-outside-pro.singworld.net",
"token": token,
"xweb_xhr": "1",
"uid": uid,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100",
"authorization": "Bearer " + token,
"user_token": token,
"content-type": "application/json",
"accept": "*/*",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://servicewechat.com/wx689a4b8fe3fccc63/196/page-frame.html",
"accept-language": "zh-CN,zh;q=0.9"
}
response = requests.post(url, headers=headers2, json=data, verify=False)
details = json.loads(response.text)['data']['list']
# 获取商品名称,价格
content = ''
# 将所有价格存入列表,用来判断是否有可兑换的商品
price_list = []
for j in range(len(details)):
detail = details[j]
name = detail['goodsName']
price = detail['currentScore']
price_list.append(price)
# 拼接推送内容
content += '' + str(j + 1) + '件商品名称:' + name + ',价格:' + str(price) + '咪豆' + '\n'
# 判断是否有可兑换的商品
if min(price_list) <= score:
# 获取最兑换的商品名称
min_price = min(price_list)
index = price_list.index(min_price)
name = details[index]['goodsName']
content += '当前咪豆数量:' + str(
score) + ',有可兑换的商品' + '\n' + '最低价格商品名称:' + name + ',价格:' + str(
min_price) + '咪豆' + '\n'
else:
content += '当前咪豆数量:' + str(score) + ',没有可兑换的商品'
content += '注意兑换商品需要至少消费过1次否则无法兑换'
WxPusher.send_message('咪哒K歌签到成功,当前咪豆数量:' + str(score) + '\n' + content + '\n',
uids=[wxpusher_uid],
token='AT_uj0Ezms54MZVD3xO5R5i1Wh3vR7nMety')
print(content)
if __name__ == '__main__':
# 遍历所有帐号
for i in range(len(md_ktv_cookie)):
cookie = md_ktv_cookie[i]
uid = cookie.split('@')[0]
token = cookie.split('@')[1]
# 判断是否有推送uid
if len(cookie.split('@')) == 3:
wxpusher_uid = cookie.split('@')[2]
else:
wxpusher_uid = ''
headers = {
"Host": "mk-gateway-pro.singworld.cn",
"token": token,
"xweb_xhr": "1",
"uid": uid,
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100",
"authorization": "Bearer " + token,
"user_token": token,
"content-type": "application/json",
"accept": "*/*",
"sec-fetch-site": "cross-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://servicewechat.com/wx689a4b8fe3fccc63/196/page-frame.html",
"accept-language": "zh-CN,zh;q=0.9"
}
print(f'开始第{i + 1}个帐号签到')
# 获取活动主题列表
get_theme_list()
# 1.签到
if not to_sign():
continue
# 2.播放录音15s +2
play_record()
# 3.点赞 +1
to_like()
# 4.投票 +1
to_vote()
# 5.AI评分 +1
ai_score()
# 6.领取咪豆
get_all_mido()