mirror of
https://jihulab.com/spiritlhl/qinglong_auto_tools.git
synced 2026-06-05 21:07:21 +08:00
351 lines
11 KiB
Python
351 lines
11 KiB
Python
# -*- coding:utf-8 -*-
|
||
# 作者仓库:https://jihulab.com/spiritlhl/qinglong_auto_tools.git
|
||
# 觉得不错麻烦点个star谢谢
|
||
# 频道:https://t.me/qinglong_auto_tools
|
||
|
||
'''
|
||
cron: 1
|
||
new Env('单容器 二叉树随机ck顺序');
|
||
'''
|
||
|
||
# 初次运行生成原始顺序模板文件 trigger_cookies.json,备份原始ck文件 allck.txt
|
||
# 可每次运行重新生成模板,在配置文件中配置 export ec_write_cks="true" 开启该功能
|
||
# 默认保持前6位ck顺序不变,有需要在配置文件中配置 export ec_head_cks="具体几个" 更改数量
|
||
# 可配置随机顺序时给ck备注标上原始顺序,如果备注已存在,则保留原始备注不更改
|
||
# 禁用的ck自动后置,非ck变量全部自动前置
|
||
# 默认任务定时自行修改
|
||
|
||
|
||
import os
|
||
import time
|
||
import json
|
||
import re
|
||
import random
|
||
|
||
print("不要在主容器运行!!!本脚本每运行一次少一个ck,请在分容器运行!!!")
|
||
|
||
|
||
try:
|
||
import requests
|
||
except Exception as e:
|
||
print(e, "\n缺少requests 模块,请执行命令安装:python3 -m pip install requests")
|
||
exit(3)
|
||
|
||
requests.packages.urllib3.disable_warnings()
|
||
|
||
ql_auth_path = '/ql/config/auth.json'
|
||
|
||
try:
|
||
head = int(os.environ["ec_head_cks"])
|
||
print("已配置保留前{}位ck顺序做车头".format(head))
|
||
except:
|
||
head = 6
|
||
print("#默认只保留前6位ck做车头,有需求")
|
||
print("#请在配置文件中配置\nexport ec_head_cks=\"具体几个\" \n#更改车头数量\n")
|
||
|
||
print("===================================")
|
||
|
||
print("已设置保留前{}位顺序不改变\n".format(head))
|
||
|
||
|
||
def __get_token() -> str or None:
|
||
with open(ql_auth_path, 'r', encoding='utf-8') as f:
|
||
j_data = json.load(f)
|
||
return j_data.get('token')
|
||
|
||
|
||
def __get__headers() -> dict:
|
||
headers = {
|
||
'Accept': 'application/json',
|
||
'Content-Type': 'application/json;charset=UTF-8',
|
||
'Authorization': 'Bearer ' + __get_token()
|
||
}
|
||
return headers
|
||
|
||
|
||
# 查询环境变量
|
||
def get_envs(name: str = None) -> list:
|
||
params = {
|
||
't': int(time.time() * 1000)
|
||
}
|
||
if name is not None:
|
||
params['searchValue'] = name
|
||
res = requests.get(ql_url + '/api/envs', headers=__get__headers(), params=params)
|
||
j_data = res.json()
|
||
if j_data['code'] == 200:
|
||
return j_data['data']
|
||
return []
|
||
|
||
|
||
def gettimestamp():
|
||
return str(int(time.time() * 1000))
|
||
|
||
|
||
def gettoken(self):
|
||
self.headers.update({"Authorization": "Bearer " + __get_token()})
|
||
|
||
|
||
def login(self):
|
||
gettoken(self)
|
||
|
||
|
||
def getallenv(self, baseurl, typ):
|
||
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
|
||
r = self.get(url)
|
||
item = json.loads(r.text)["data"]
|
||
return item
|
||
|
||
|
||
def getitem(self, baseurl, key, typ):
|
||
url = baseurl + typ + "/envs?searchValue=%s&t=%s" % (key, gettimestamp())
|
||
r = self.get(url)
|
||
item = json.loads(r.text)["data"]
|
||
return item
|
||
|
||
|
||
def getckitem(self, baseurl, key, typ):
|
||
url = baseurl + typ + "/envs?searchValue=JD_COOKIE&t=%s" % gettimestamp() # JD_COOKIE为默认的环境变量名,该变量里的值默认含pt_pin和pt_key,其他类似默认按照下面注释改
|
||
r = self.get(url)
|
||
for i in json.loads(r.text)["data"]:
|
||
if key in i["value"]:
|
||
return i
|
||
return []
|
||
|
||
|
||
def update(self, baseurl, typ, value, qlid, remarks):
|
||
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
|
||
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
|
||
data = {
|
||
"name": "JD_COOKIE",
|
||
"value": value,
|
||
"_id": qlid,
|
||
"remarks": remarks
|
||
}
|
||
r = self.put(url, data=json.dumps(data))
|
||
if json.loads(r.text)["code"] == 200:
|
||
return True
|
||
else:
|
||
data = {
|
||
"name": "JD_COOKIE",
|
||
"value": value,
|
||
"id": qlid,
|
||
"remarks": remarks
|
||
}
|
||
r = self.put(url, data=json.dumps(data))
|
||
if json.loads(r.text)["code"] == 200:
|
||
return True
|
||
return False
|
||
|
||
|
||
def insert(self, baseurl, typ, name, value, remarks):
|
||
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
|
||
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
|
||
data = []
|
||
if remarks == None:
|
||
data_json = {
|
||
"value": value,
|
||
"name": name
|
||
}
|
||
else:
|
||
data_json = {
|
||
"value": value,
|
||
"name": name,
|
||
'remarks': remarks
|
||
}
|
||
data.append(data_json)
|
||
r = self.post(url, json.dumps(data))
|
||
if json.loads(r.text)["code"] == 200:
|
||
return r.json()['data']
|
||
else:
|
||
return False
|
||
|
||
|
||
def delete(self, baseurl, typ, value):
|
||
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
|
||
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
|
||
data = value
|
||
r = self.delete(url, data=json.dumps(data))
|
||
if json.loads(r.text)["code"] == 200:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
def disable(self, baseurl, typ, ids):
|
||
url = baseurl + typ + "/envs/disable?t=%s" % gettimestamp()
|
||
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
|
||
r = self.put(url, data=json.dumps(ids))
|
||
if json.loads(r.text)["code"] == 200:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
if __name__ == '__main__':
|
||
s = requests.session()
|
||
login(s)
|
||
try:
|
||
ql_url = 'http://localhost:5700/'
|
||
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
|
||
except:
|
||
ql_url = 'http://localhost:5600/'
|
||
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
|
||
|
||
# # 备份ck
|
||
# if os.path.exists("./allck.txt") == False:
|
||
# temp = []
|
||
# for i in cookies:
|
||
# temp.append(i["value"] + "\n")
|
||
# with open("./allck.txt", "w") as ffp:
|
||
# ffp.writelines(temp)
|
||
# print("初次运行,自动备份ck到allck.txt文件里,有需要恢复初始状态自取\n")
|
||
# else:
|
||
# print("脚本管理根目录下allck.txt文件已存在,有需要恢复初始状态自取\n")
|
||
# pass
|
||
|
||
# 备份
|
||
tt = "trigger_cookies.json"
|
||
allenv = getallenv(s, ql_url, "api")
|
||
try:
|
||
os.environ['ec_backup_ck']
|
||
ec_backup_ck = os.environ['ec_backup_ck']
|
||
except:
|
||
ec_backup_ck = 'true'
|
||
if ec_backup_ck == 'true' and os.path.exists('./' + tt) != True:
|
||
with open(tt, "w", encoding="utf-8") as fp:
|
||
json.dump(allenv, fp)
|
||
print("已备份原有环境变量至{},有需要还原请使用二叉树还原环境变量脚本还原".format(tt))
|
||
print("如果不需要备份,请设置export ec_backup_ck=\"false\"")
|
||
else:
|
||
print("上次备份文件{}还存在或已配置无需备份,本次运行不进行备份".format(tt))
|
||
|
||
####################################################################################
|
||
|
||
try:
|
||
ql_url = 'http://localhost:5700/'
|
||
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
|
||
except:
|
||
ql_url = 'http://localhost:5600/'
|
||
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
|
||
|
||
allenv = getallenv(s, ql_url, "api")
|
||
jdcount = 0
|
||
result_list = []
|
||
he_list = []
|
||
head_list = []
|
||
random_list = []
|
||
disable_list = []
|
||
for i in allenv:
|
||
if "JD_COOKIE" == i["name"]:
|
||
jdcount += 1
|
||
else:
|
||
he_list.append(i)
|
||
continue
|
||
if jdcount <= head:
|
||
head_list.append(i)
|
||
continue
|
||
if i["status"] != 0:
|
||
disable_list.append(i)
|
||
continue
|
||
head_len = len(head_list) + len(he_list)
|
||
# 随机化启用的cookie顺序,保留指定位置不变
|
||
res = random.sample(range(head, len(cookies)), len(cookies) - head)
|
||
print("固定的ck顺序")
|
||
print("pin\t\t\t\t上次\t 本次")
|
||
for i in range(0, head):
|
||
c = re.findall(r"pt_pin=(.*?);", cookies[i]["value"])[0][-16:].ljust(16)
|
||
print("{}\t\t{}\t\t{}".format(c, i + 1, i + 1))
|
||
print("本次随机ck的顺序")
|
||
print("pin\t\t\t\t上次\t 本次")
|
||
temp = []
|
||
ct = 1
|
||
for i in res:
|
||
random_list.append(cookies[i - 1])
|
||
c = re.findall(r"pt_pin=(.*?);", cookies[i - 1]["value"])[0][-16:].ljust(16)
|
||
print("{}\t\t{}\t\t{}".format(c, i, ct + head))
|
||
ct += 1
|
||
|
||
result_list.extend(head_list)
|
||
|
||
# 与后置黑号脚本匹配
|
||
tp1 = []
|
||
tp2 = []
|
||
try:
|
||
ec_blackkey = os.environ["ec_blackkey"]
|
||
for i in random_list:
|
||
try:
|
||
if i['remarks'] == ec_blackkey:
|
||
tp2.append(i)
|
||
else:
|
||
tp1.append(i)
|
||
except:
|
||
tp1.append(i)
|
||
random_list = tp1 + tp2
|
||
except:
|
||
pass
|
||
tp1 = []
|
||
tp2 = []
|
||
try:
|
||
for i in random_list:
|
||
try:
|
||
if i['remarks'] == '黑号':
|
||
tp2.append(i)
|
||
else:
|
||
tp1.append(i)
|
||
except:
|
||
tp1.append(i)
|
||
random_list = tp1 + tp2
|
||
except:
|
||
pass
|
||
|
||
# 去重
|
||
tp1 = []
|
||
value_tp = []
|
||
for i in random_list:
|
||
if i['value'] not in value_tp:
|
||
tp1.append(i)
|
||
value_tp.append(i['value'])
|
||
random_list = tp1
|
||
he_count = len(he_list)
|
||
result_list.extend(random_list)
|
||
result_list.extend(disable_list)
|
||
|
||
# 去重
|
||
tp1 = []
|
||
value_tp = []
|
||
for i in result_list:
|
||
if i['value'] not in value_tp:
|
||
tp1.append(i)
|
||
value_tp.append(i['value'])
|
||
result_list = tp1
|
||
|
||
he_list.extend(result_list)
|
||
result_list = he_list
|
||
|
||
for i in allenv:
|
||
try:
|
||
c = delete(s, ql_url, "api", [i['_id']])
|
||
except:
|
||
c = delete(s, ql_url, "api", [i['id']])
|
||
time.sleep(2)
|
||
print(len(result_list))
|
||
for i in result_list:
|
||
try:
|
||
i['remarks']
|
||
c = insert(s, ql_url, "api", i['name'], i['value'], i['remarks'])
|
||
if i["status"] != 0:
|
||
try:
|
||
disable(s, ql_url, 'api', [c[0]['_id']])
|
||
except:
|
||
disable(s, ql_url, 'api', [c[0]['id']])
|
||
except:
|
||
c = insert(s, ql_url, "api", i['name'], i['value'], None)
|
||
if i["status"] != 0:
|
||
try:
|
||
disable(s, ql_url, 'api', [c[0]['_id']])
|
||
except:
|
||
disable(s, ql_url, 'api', [c[0]['id']])
|
||
print("已前置非ck变量共{}个,车头ck共{}个,随机顺序ck共{}个,后置禁用ck共{}个".format(he_count, len(head_list), len(random_list),
|
||
len(disable_list)))
|
||
print("============================================")
|
||
print("脚本执行完毕") |