qinglong_auto_tools/Script/ecs_trigger_task.py
spiritysdx e58978c0f3 init
2022-03-08 17:52:00 +08:00

208 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# 作者仓库:https://jihulab.com/spiritlhl/qinglong_auto_tools.git
# 觉得不错麻烦点个star谢谢
# 频道https://t.me/qinglong_auto_tools
'''
cron: 1
new Env('二叉树轮换车头运行脚本');
'''
client_id1 = ""
client_secret1 = ""
url1 = "http://xxxxxxxx:xxxx/"
scname = "脚本名字"
start = 1 # 开始号
name = "任务名字"
sleep_time1 = 60 # 任务执行时长
sleep_time2 = 30 # 完成后间隔
team_nums = 30 # 一队几人
import requests
import time
import json
requests.packages.urllib3.disable_warnings()
def gettimestamp():
return str(int(time.time() * 1500))
def gettoken(self, url_token):
r = requests.get(url_token).text
res = json.loads(r)["data"]["token"]
self.headers.update({"Authorization": "Bearer " + res})
def login(self, baseurl, client_id_temp, client_secret_temp):
url_token = baseurl + 'open/auth/token?client_id=' + client_id_temp + '&client_secret=' + client_secret_temp
gettoken(self, url_token)
def getitem(self, baseurl, typ, path):
url = baseurl + typ + "/scripts/files?path={}".format(path)
r = self.get(url)
item = json.loads(r.text)["data"]
return item
def getenvs(self, baseurl, typ):
url = baseurl + typ + "/envs"
r = self.get(url)
item = json.loads(r.text)["data"]
return item
def getscript(self, baseurl, typ, filename):
url = baseurl + typ + "/scripts/" + filename + "?t=%s" % gettimestamp()
r = self.get(url)
response = json.loads(r.text)["code"]
if response == 500:
url = baseurl + typ + "/scripts/" + filename + "?path="
r = self.get(url)
script = json.loads(r.text)["data"]
return script
def pushscript(self, baseurl, typ, data, path):
url = baseurl + typ + "/scripts?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
r = self.put(url, data=json.dumps(data))
response = json.loads(r.text)["code"]
if response == 500:
data["path"] = path
r = self.put(url, data=json.dumps(data))
return r.text
def getcrons(self, baseurl, typ):
url = baseurl + typ + "/crons?t=%s" % gettimestamp()
r = self.get(url)
item = json.loads(r.text)["data"]
return item
def addcron(self, baseurl, typ, data):
url = baseurl + typ + "/crons?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
r = self.post(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return r.text
else:
return r.text
def runcron(self, baseurl, typ, data):
url = baseurl + typ + "/crons/run?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
r = self.put(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return r.text
else:
return r.text
def stopcron(self, baseurl, typ, data):
url = baseurl + typ + "/crons/stop?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
r = self.put(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return r.text
else:
return r.text
def deletecron(self, baseurl, typ, data):
url = baseurl + typ + "/crons?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
r = self.delete(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return r.text
else:
return r.text
if __name__ == '__main__':
# 主容器
s = requests.session()
login(s, url1, client_id1, client_secret1)
tasks = getcrons(s, url1, "open")
envs = getenvs(s, url1, "open")
count = 0
for i in envs:
if i["name"] == "JD_COOKIE" and i["status"] == 0:
count += 1
print("共计有效账号 {}\n".format(count))
status = 0
for i in tasks:
if scname in i["command"]:
stopcron(s, url1, "open", [i["_id"]])
time.sleep(1)
deletecron(s, url1, "open", [i["_id"]])
time.sleep(1)
for j in range(start, count):
for k in range(j, j + team_nums + 1):
if j + 11 < count - 1 and k == j + 20:
wzz = " " + str(j) + "-" + str(k)
elif k == j + 20:
wzz = " " + str(j) + "-" + str(k - 30)
data = {
"command": "task {} desi JD_COOKIE ".format(scname) + str(wzz),
"schedule": "1",
"name": name
}
res1 = addcron(s, url1, "open", data)
time.sleep(1)
id = json.loads(res1)["data"]["_id"]
runcron(s, url1, "open", [id])
print("运行账号 {}".format(j))
time.sleep(sleep_time1)
stopcron(s, url1, "open", [id])
time.sleep(1)
deletecron(s, url1, "open", [id])
time.sleep(1)
time.sleep(sleep_time2)
if j == count - 1:
status = 1
break
if status == 0:
for j in range(start, count + 1):
for k in range(j, j + team_nums + 1):
if j + 11 < count - 1 and k == j + 20:
wzz = " " + str(j) + "-" + str(k)
elif k == j + 20:
wzz = " " + str(j) + "-" + str(k - 30)
data = {
"command": "task {} desi JD_COOKIE ".format(scname) + str(wzz),
"schedule": "1",
"name": name
}
res1 = addcron(s, url1, "open", data)
time.sleep(1)
id = json.loads(res1)["data"]["_id"]
runcron(s, url1, "open", [id])
print("运行账号 {}".format(j))
time.sleep(sleep_time1)
stopcron(s, url1, "open", [id])
time.sleep(1)
deletecron(s, url1, "open", [id])
time.sleep(1)
time.sleep(sleep_time2)
if j == count - 1:
status = 1
break