qinglong_auto_tools/scripts_trigger_ckorder.py
spiritysdx e58978c0f3 init
2022-03-08 17:52:00 +08:00

351 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
# 作者仓库:https://jihulab.com/spiritlhl/qinglong_auto_tools.git
# 觉得不错麻烦点个star谢谢
# 频道https://t.me/qinglong_auto_tools
'''
cron: 1
new Env('单容器 二叉树随机ck顺序');
'''
# 初次运行生成原始顺序模板文件 trigger_cookies.json备份原始ck文件 allck.txt
# 可每次运行重新生成模板,在配置文件中配置 export ec_write_cks="true" 开启该功能
# 默认保持前6位ck顺序不变有需要在配置文件中配置 export ec_head_cks="具体几个" 更改数量
# 可配置随机顺序时给ck备注标上原始顺序如果备注已存在则保留原始备注不更改
# 禁用的ck自动后置非ck变量全部自动前置
# 默认任务定时自行修改
import os
import time
import json
import re
import random
print("不要在主容器运行本脚本每运行一次少一个ck请在分容器运行")
try:
import requests
except Exception as e:
print(e, "\n缺少requests 模块请执行命令安装python3 -m pip install requests")
exit(3)
requests.packages.urllib3.disable_warnings()
ql_auth_path = '/ql/config/auth.json'
try:
head = int(os.environ["ec_head_cks"])
print("已配置保留前{}位ck顺序做车头".format(head))
except:
head = 6
print("#默认只保留前6位ck做车头有需求")
print("#请在配置文件中配置\nexport ec_head_cks=\"具体几个\" \n#更改车头数量\n")
print("===================================")
print("已设置保留前{}位顺序不改变\n".format(head))
def __get_token() -> str or None:
with open(ql_auth_path, 'r', encoding='utf-8') as f:
j_data = json.load(f)
return j_data.get('token')
def __get__headers() -> dict:
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json;charset=UTF-8',
'Authorization': 'Bearer ' + __get_token()
}
return headers
# 查询环境变量
def get_envs(name: str = None) -> list:
params = {
't': int(time.time() * 1000)
}
if name is not None:
params['searchValue'] = name
res = requests.get(ql_url + '/api/envs', headers=__get__headers(), params=params)
j_data = res.json()
if j_data['code'] == 200:
return j_data['data']
return []
def gettimestamp():
return str(int(time.time() * 1000))
def gettoken(self):
self.headers.update({"Authorization": "Bearer " + __get_token()})
def login(self):
gettoken(self)
def getallenv(self, baseurl, typ):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
r = self.get(url)
item = json.loads(r.text)["data"]
return item
def getitem(self, baseurl, key, typ):
url = baseurl + typ + "/envs?searchValue=%s&t=%s" % (key, gettimestamp())
r = self.get(url)
item = json.loads(r.text)["data"]
return item
def getckitem(self, baseurl, key, typ):
url = baseurl + typ + "/envs?searchValue=JD_COOKIE&t=%s" % gettimestamp() # JD_COOKIE为默认的环境变量名该变量里的值默认含pt_pin和pt_key其他类似默认按照下面注释改
r = self.get(url)
for i in json.loads(r.text)["data"]:
if key in i["value"]:
return i
return []
def update(self, baseurl, typ, value, qlid, remarks):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
data = {
"name": "JD_COOKIE",
"value": value,
"_id": qlid,
"remarks": remarks
}
r = self.put(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return True
else:
data = {
"name": "JD_COOKIE",
"value": value,
"id": qlid,
"remarks": remarks
}
r = self.put(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return True
return False
def insert(self, baseurl, typ, name, value, remarks):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
data = []
if remarks == None:
data_json = {
"value": value,
"name": name
}
else:
data_json = {
"value": value,
"name": name,
'remarks': remarks
}
data.append(data_json)
r = self.post(url, json.dumps(data))
if json.loads(r.text)["code"] == 200:
return r.json()['data']
else:
return False
def delete(self, baseurl, typ, value):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
data = value
r = self.delete(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return True
else:
return False
def disable(self, baseurl, typ, ids):
url = baseurl + typ + "/envs/disable?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
r = self.put(url, data=json.dumps(ids))
if json.loads(r.text)["code"] == 200:
return True
else:
return False
if __name__ == '__main__':
s = requests.session()
login(s)
try:
ql_url = 'http://localhost:5700/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
except:
ql_url = 'http://localhost:5600/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
# # 备份ck
# if os.path.exists("./allck.txt") == False:
# temp = []
# for i in cookies:
# temp.append(i["value"] + "\n")
# with open("./allck.txt", "w") as ffp:
# ffp.writelines(temp)
# print("初次运行自动备份ck到allck.txt文件里有需要恢复初始状态自取\n")
# else:
# print("脚本管理根目录下allck.txt文件已存在有需要恢复初始状态自取\n")
# pass
# 备份
tt = "trigger_cookies.json"
allenv = getallenv(s, ql_url, "api")
try:
os.environ['ec_backup_ck']
ec_backup_ck = os.environ['ec_backup_ck']
except:
ec_backup_ck = 'true'
if ec_backup_ck == 'true' and os.path.exists('./' + tt) != True:
with open(tt, "w", encoding="utf-8") as fp:
json.dump(allenv, fp)
print("已备份原有环境变量至{},有需要还原请使用二叉树还原环境变量脚本还原".format(tt))
print("如果不需要备份请设置export ec_backup_ck=\"false\"")
else:
print("上次备份文件{}还存在或已配置无需备份,本次运行不进行备份".format(tt))
####################################################################################
try:
ql_url = 'http://localhost:5700/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
except:
ql_url = 'http://localhost:5600/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
allenv = getallenv(s, ql_url, "api")
jdcount = 0
result_list = []
he_list = []
head_list = []
random_list = []
disable_list = []
for i in allenv:
if "JD_COOKIE" == i["name"]:
jdcount += 1
else:
he_list.append(i)
continue
if jdcount <= head:
head_list.append(i)
continue
if i["status"] != 0:
disable_list.append(i)
continue
head_len = len(head_list) + len(he_list)
# 随机化启用的cookie顺序保留指定位置不变
res = random.sample(range(head, len(cookies)), len(cookies) - head)
print("固定的ck顺序")
print("pin\t\t\t\t上次\t 本次")
for i in range(0, head):
c = re.findall(r"pt_pin=(.*?);", cookies[i]["value"])[0][-16:].ljust(16)
print("{}\t\t{}\t\t{}".format(c, i + 1, i + 1))
print("本次随机ck的顺序")
print("pin\t\t\t\t上次\t 本次")
temp = []
ct = 1
for i in res:
random_list.append(cookies[i - 1])
c = re.findall(r"pt_pin=(.*?);", cookies[i - 1]["value"])[0][-16:].ljust(16)
print("{}\t\t{}\t\t{}".format(c, i, ct + head))
ct += 1
result_list.extend(head_list)
# 与后置黑号脚本匹配
tp1 = []
tp2 = []
try:
ec_blackkey = os.environ["ec_blackkey"]
for i in random_list:
try:
if i['remarks'] == ec_blackkey:
tp2.append(i)
else:
tp1.append(i)
except:
tp1.append(i)
random_list = tp1 + tp2
except:
pass
tp1 = []
tp2 = []
try:
for i in random_list:
try:
if i['remarks'] == '黑号':
tp2.append(i)
else:
tp1.append(i)
except:
tp1.append(i)
random_list = tp1 + tp2
except:
pass
# 去重
tp1 = []
value_tp = []
for i in random_list:
if i['value'] not in value_tp:
tp1.append(i)
value_tp.append(i['value'])
random_list = tp1
he_count = len(he_list)
result_list.extend(random_list)
result_list.extend(disable_list)
# 去重
tp1 = []
value_tp = []
for i in result_list:
if i['value'] not in value_tp:
tp1.append(i)
value_tp.append(i['value'])
result_list = tp1
he_list.extend(result_list)
result_list = he_list
for i in allenv:
try:
c = delete(s, ql_url, "api", [i['_id']])
except:
c = delete(s, ql_url, "api", [i['id']])
time.sleep(2)
print(len(result_list))
for i in result_list:
try:
i['remarks']
c = insert(s, ql_url, "api", i['name'], i['value'], i['remarks'])
if i["status"] != 0:
try:
disable(s, ql_url, 'api', [c[0]['_id']])
except:
disable(s, ql_url, 'api', [c[0]['id']])
except:
c = insert(s, ql_url, "api", i['name'], i['value'], None)
if i["status"] != 0:
try:
disable(s, ql_url, 'api', [c[0]['_id']])
except:
disable(s, ql_url, 'api', [c[0]['id']])
print("已前置非ck变量共{}车头ck共{}随机顺序ck共{}后置禁用ck共{}".format(he_count, len(head_list), len(random_list),
len(disable_list)))
print("============================================")
print("脚本执行完毕")