V2RayAggregator/utils/get_subs.py
2023-07-13 15:06:02 +03:30

653 lines
35 KiB
Python

from sub_convert import sub_convert
from subs_function import subs_function
import json
import re
import os
import yaml
sub_list_json = './sub/sub_list.json'
sub_merge_path = './sub/'
sub_list_path = './sub/list/'
ipv4 = r"([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})"
ipv6 = r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
ill = ['|', '?', '[', ']', '@', '!', '%', ':']
valid_ss_cipher_methods = ["aes-128-gcm", "aes-192-gcm", "aes-256-gcm", "aes-128-cfb", "aes-192-cfb", "aes-256-cfb", "aes-128-ctr", "aes-192-ctr", "aes-256-ctr", "rc4-md5", "chacha20-ietf", "xchacha20", "chacha20-ietf-poly1305", "xchacha20-ietf-poly1305"]
valid_ss_plugins = ["obfs","v2ray-plugin"]
class subs:
def get_subs(content_urls: []):
if content_urls == []:
return
for t in os.walk(sub_list_path):
for f in t[2]:
f = t[0]+f
os.remove(f)
content_list = []
for (index, url_container) in enumerate(content_urls):
ids = content_urls[index]['id']
remarks = content_urls[index]['remarks']
if type(url_container['url']) == list:
for each_url in url_container["url"]:
print("gather server from " + each_url)
content = subs_function.convert_sub(
each_url, 'mixed', "http://0.0.0.0:25500")
print("added content: %s" %
str(content.split('\n').__len__()))
if content == 'Err: No nodes found' or content == 'Err: failed to parse sub':
print("host convertor failed. trying manually...")
content = sub_convert.main(each_url, 'url', 'url')
if content != 'Url 解析错误' and content != '订阅内容解析错误':
if subs_function.is_line_valid(content, False) != '':
content_list.append(content)
else:
print(f'this url failed{each_url}')
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
else:
print(
f'Writing error of {remarks} to {ids:0>2d}.txt\n')
if content == 'Err: No nodes found':
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write(content)
file.close()
if content == 'Err: failed to parse sub':
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write('Err: failed to parse sub')
file.close()
elif content != None and content != '':
if subs_function.is_line_valid(content, False) != '':
content_list.append(content)
else:
print(f'this url failed {each_url}')
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write(content)
file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
else:
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write('Url Subscription could not be parsed')
file.close()
print(
f'Writing error of {remarks} to {ids:0>2d}.txt\n')
print('already gathered ' +
str(''.join(content_list).split('\n').__len__()))
print('\n')
print('Merging nodes...\n')
content_list = list(
filter(lambda x: x != '', ''.join(content_list).split("\n")))
content_raw = "\n".join(content_list)
print(f"it's fine till here with {content_list.__len__()} lines")
content_yaml = sub_convert.main(content_raw, 'content', 'YAML', {
'dup_rm_enabled': True, 'format_name_enabled': True})
yaml_proxies = content_yaml.split('\n')[1:]
temp = list(filter(lambda x: re.search(ipv6, x) ==
None or re.search(ipv4, x) != None, yaml_proxies))
temp = list(filter(lambda x: re.search(
"path: /(.*?)\?(.*?)=(.*?)}", x) == None, temp))
temp2 = temp
temp = []
for pr in temp2:
try:
yaml.safe_load(pr)
temp.append(pr)
except Exception as e:
print(e)
print(f"found {yaml_proxies.__len__() - temp.__len__()} bad lines :)")
###temp###
# print(temp)
##########
content_yaml = "\n".join(temp)
if content_yaml[-1:] == '\n':
content_yaml[-1:] = ''
content_yaml = 'proxies:\n' + content_yaml
# todo removed dup
content_raw = sub_convert.yaml_decode(content_yaml)
# print('decoded content')
# print(content_raw)
# note removed here
# content_raw = list(
# filter(lambda x: x != '', content_raw.split("\n")))
# content_raw = "\n".join(content_raw)
content_base64 = sub_convert.base64_encode(content_raw)
content = content_raw
##############################
def content_write(file, output_type):
file = open(file, 'w+', encoding='utf-8')
file.write(output_type)
file.close
write_list = [f'{sub_merge_path}/sub_merge.txt',
f'{sub_merge_path}/sub_merge_base64.txt', f'{sub_merge_path}/sub_merge_yaml.yml']
content_type = (content, content_base64, content_yaml)
for index in range(len(write_list)):
content_write(write_list[index], content_type[index])
print('Done!\n')
def get_subs_v2(content_urls: []):
if content_urls == []:
return
for t in os.walk(sub_list_path):
for f in t[2]:
f = t[0]+f
os.remove(f)
content_list = []
corresponding_list = []
corresponding_id = 0
bad_lines = 0
for (index, url_container) in enumerate(content_urls):
ids = content_urls[index]['id']
remarks = content_urls[index]['remarks']
if type(url_container['url']) == list:
for each_url in url_container["url"]:
print("gather server from " + each_url)
# todo change to 0.0.0.0
# getting one source in to format
content = subs_function.convert_sub(
each_url, 'mixed', "http://0.0.0.0:25500", False)
content_clash = subs_function.convert_sub(
each_url, 'clash', "http://0.0.0.0:25500", False)
if content == 'Err: No nodes found' or content == 'Err: failed to parse sub' or content_clash == 'Err: No nodes found' or content_clash == 'Err: failed to parse sub':
print("host convertor failed. just continue & ignore...")
if content == 'Err: No nodes found' or content_clash == 'Err: No nodes found':
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write('Err: No nodes found')
file.close()
if content == 'Err: failed to parse sub' or content_clash == 'Err: failed to parse sub':
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write('Err: failed to parse sub')
file.close()
elif content != None and content != '':
single_url_gather_quantity = list(
filter(lambda x: x != '', content.split('\n'))).__len__()
print(
f"added content of current url : {single_url_gather_quantity}")
# the mixed result should be a valid ss Url
if subs_function.is_line_valid(content, False) != '':
content_list.append(content)
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write(content)
file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
# Convert both format to list
mixed_content = list(
filter(lambda x: x != '', content.split("\n")))
clash_content = list(
filter(lambda x: x != '', content_clash.split('\n')[1:]))
# check of the size of lists are equal
if mixed_content.__len__() == clash_content.__len__() and clash_content.__len__() > 0:
# create a new list for clash lines check result + mixed
safe_clash = []
safe_mixed = []
# check for bad line in clash content (yaml check)
for (index, cl) in enumerate(clash_content):
try:
if re.search(ipv6, str(cl)) == None or re.search(ipv4, str(cl)) != None:
if re.search("path: /(.*?)\?(.*?)=(.*?)}", str(cl)) == None:
# todo first trying without it
# # fix name issues and replacing the illegal character with empty string
# try:
# if 'name' in cl:
# match_re = re.search(
# "name: (.*?),", cl)[1]
# if match_re != None:
# match_re = match_re.replace(":", "").replace(
# "|", "").replace('\'', '').replace('"', '')
# for char in ill:
# match_re = match_re.replace(
# char, "")
# cl = re.sub(
# "name: (.*?),", f"name: {match_re},", cl)
# except:
# pass
cl_res = yaml.safe_load(cl)
if cl_res != None:
# safe_clash.append(cl)
# it's not text it's yaml object
safe_clash.append(cl_res)
safe_mixed.append(
mixed_content[index])
except Exception as e:
bad_lines += 1
# if fails remove the same index from both lists
# clash_content.pop(index)
# mixed_content.pop(index)
if safe_clash.__len__() == safe_mixed.__len__() and safe_clash.__len__() > 0:
print("Check Points Passed 👍\n")
for (i, each_mixed_proxy) in enumerate(safe_mixed):
if subs_function.is_line_valid(each_mixed_proxy, False):
corresponding_list.append(
{"id": corresponding_id, "c_clash": safe_clash[i], "c_mixed": each_mixed_proxy})
corresponding_id += 1
else:
print(
f'unmatched length in sources {each_url}')
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write("unmatched length in sources")
file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
# # make clash ready for yaml loading
# clash_content = "proxies:\n" + \
# "\n".join(clash_content)
# yaml_loaded = False
# try:
# clash_content = yaml.safe_load(
# clash_content)["proxies"]
# yaml_loaded = True
# except Exception as e:
# print(e)
# if clash_content.__len__() == mixed_content.__len__() and yaml_loaded == True and mixed_content.__len__() > 0:
# for (index, each_clash_proxy) in enumerate(clash_content):
# if subs_function.is_line_valid(mixed_content[index], False) != '':
# try:
# # make sure the c_clash_proxy is a valid yaml format
# # also it could be redundent code but it's for safety
# yaml.safe_load(
# str(each_clash_proxy))
# corresponding_list.append(
# {"id": corresponding_id, "c_clash": each_clash_proxy, "c_mixed": mixed_content[index]})
# corresponding_id += 1
# except Exception as e:
# bad_lines += 1
# print(e)
# else:
# print(f'this url failed {each_url}')
# file = open(f'{sub_list_path}{ids:0>2d}.txt',
# 'a+', encoding='utf-8')
# file.write(content)
# file.close()
# print(
# f'Writing content of {remarks} to {ids:0>2d}.txt\n')
else:
print(
f'unmatch length in both sources first stage {each_url}')
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write(
"unmatch length in both sources first stage")
file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
else:
print(f'started with a invalid url {each_url}')
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write("started with a invalid url")
file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
else:
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write('Url Subscription could not be parsed')
file.close()
print(
f'Writing error of {remarks} to {ids:0>2d}.txt\n')
gather_quantity = list(
filter(lambda x: x != '', ''.join(content_list).split('\n'))).__len__()
print(f"already gathered {gather_quantity}")
print('\n')
print('----------------------------------------------')
print('\n')
print('Merging nodes...\n')
content_list = list(
filter(lambda x: x != '', ''.join(content_list).split("\n")))
content_raw = "\n".join(content_list)
print(f"{content_list.__len__()} lines - {bad_lines} bad lines => total is {content_list.__len__() - bad_lines}")
################ okay everything is fine till here ################
'''
we should have a list of corresponding proxies and they are ready to be fixed in 2 steps:
1- making their name better (using their server) => via thier clash corresponding
2- remove duplicate proxies from the list => via thier clash corresponding
after that we have clean list that contains both type that we need, with modification and no conversion :)
'''
################ Fix names ################
corresponding_list = subs_function.fix_proxies_name(
corresponding_proxies=corresponding_list)
################ Fix Duplication ################
corresponding_list = subs_function.fix_proxies_duplication(
corresponding_proxies=corresponding_list)
print(f"\nfinal sub length => {corresponding_list.__len__()}")
clash = list(map(lambda x: f" - {x['c_clash']}", corresponding_list))
mixed = list(map(lambda x: x["c_mixed"], corresponding_list))
content_raw = "\n".join(mixed)
content_yaml = 'proxies:\n' + "\n".join(clash)
content_base64 = sub_convert.base64_encode(content_raw)
content = content_raw
##############################
def content_write(file, output_type):
file = open(file, 'w+', encoding='utf-8')
file.write(output_type)
file.close
write_list = [f'{sub_merge_path}/sub_merge.txt',
f'{sub_merge_path}/sub_merge_base64.txt', f'{sub_merge_path}/sub_merge_yaml.yml']
content_type = (content, content_base64, content_yaml)
for index in range(len(write_list)):
content_write(write_list[index], content_type[index])
print('Done!\n')
# eject mixed proxies and use only clash
def get_subs_v3(content_urls: [], output_path="sub_merge_yaml", should_cleanup=True, specific_files_cleanup=["05.txt"]):
if content_urls == []:
return
if should_cleanup:
for t in os.walk(sub_list_path):
for f in t[2]:
if specific_files_cleanup.__contains__(f) == False:
f = t[0]+f
os.remove(f)
else:
for t in os.walk(sub_list_path):
for f in t[2]:
if specific_files_cleanup.__contains__(f):
f = t[0]+f
os.remove(f)
content_list = []
corresponding_list = []
corresponding_id = 0
bad_lines = 0
for (index, url_container) in enumerate(content_urls):
ids = content_urls[index]['id']
remarks = content_urls[index]['remarks']
if type(url_container['url']) == list:
for each_url in url_container["url"]:
print("gather server from " + each_url)
# todo change to 0.0.0.0
# getting one source in to format
content_clash = subs_function.convert_sub(
each_url, 'clash', "http://0.0.0.0:25500", False, extra_options="&udp=false")
if content_clash == 'Err: No nodes found' or content_clash == 'Err: failed to parse sub':
if content_clash == 'Err: No nodes found':
print(
"host convertor was unable to find any nodes. just continue & ignore...\n")
# file = open(f'{sub_list_path}{ids:0>2d}.txt',
# 'a+', encoding='utf-8')
# file.write('Err: No nodes found')
# file.close()
if content_clash == 'Err: failed to parse sub':
print(
"host convertor failed. just continue & ignore...\n")
# file = open(f'{sub_list_path}{ids:0>2d}.txt',
# 'a+', encoding='utf-8')
# file.write('Err: failed to parse sub')
# file.close()
elif content_clash != None and content_clash != '':
single_url_gather_quantity = list(
filter(lambda x: x != '', content_clash.split('\n'))).__len__()
print(
f"added content of current url : {single_url_gather_quantity - 1}")
# Convert both format to list
clash_content = list(
filter(lambda x: x != '', content_clash.split('\n')[1:]))
# check of the size of lists are equal
if clash_content.__len__() > 0:
# create a new list for clash lines check result
safe_clash = []
# check for bad line in clash content (yaml check)
for (index, cl) in enumerate(clash_content):
try:
if re.search(ipv6, str(cl)) == None or re.search(ipv4, str(cl)) != None:
if re.search("path: /(.*?)\?(.*?)=(.*?)}", str(cl)) == None:
# todo first trying without it
# # fix name issues and replacing the illegal character with empty string
# try:
# if 'name' in cl:
# match_re = re.search(
# "name: (.*?),", cl)[1]
# if match_re != None:
# match_re = match_re.replace(":", "").replace(
# "|", "").replace('\'', '').replace('"', '')
# for char in ill:
# match_re = match_re.replace(
# char, "")
# cl = re.sub(
# "name: (.*?),", f"name: {match_re},", cl)
# except:
# pass
cl_res = yaml.safe_load(cl)
if cl_res != None:
# safe_clash.append(cl)
# it's not text it's yaml object
try:
cl_temp = yaml.safe_load(
str(cl_res[0]))
if cl_temp != None:
bad_uuid_format = False
if 'uuid' in cl_temp:
if cl_temp['uuid'].__len__() != 36:
bad_uuid_format = True
bad_lines += 1
if bad_uuid_format == False:
if cl_temp['type'] == "ss" or cl_temp['type'] == "ssr":
if cl_temp["cipher"] in valid_ss_cipher_methods:
if cl_temp['type'] == "ss":
if 'plugin' in cl_temp:
if cl_temp['plugin'] in valid_ss_plugins:
if cl_temp['plugin'] == 'obfs':
if 'plugin-opts' in cl_temp:
if cl_temp['plugin-opts']['mode'] == 'http' or cl_temp['plugin-opts']['mode'] == 'tls':
safe_clash.append(cl_res)
else:
bad_lines += 1
else:
safe_clash.append(cl_res)
elif cl_temp['plugin'] == 'v2ray-plugin':
if 'plugin-opts' in cl_temp:
if cl_temp['plugin-opts']['mode'] == 'websocket':
safe_clash.append(cl_res)
else:
bad_lines += 1
else:
safe_clash.append(cl_res)
else:
safe_clash.append(cl_res)
else:
bad_lines += 1
else:
safe_clash.append(cl_res)
else:
safe_clash.append(cl_res)
else:
bad_lines += 1
elif cl_temp['type'] == "vmess":
if cl_temp["network"] == "h2" or cl_temp["network"] == "grpc":
if "tls" in cl_temp and cl_temp['tls'] == False:
bad_lines += 1
else:
safe_clash.append(cl_res)
else:
safe_clash.append(cl_res)
else:
safe_clash.append(cl_res)
except Exception as e1:
bad_lines += 1
except Exception as e:
bad_lines += 1
# if fails remove the same index from both lists
# clash_content.pop(index)
# mixed_content.pop(index)
if safe_clash.__len__() > 0:
content_list.append(
"\n".join(clash_content) + "\n")
file = open(f'{sub_list_path}{ids:0>2d}.txt',
'a+', encoding='utf-8')
file.write("\n".join(clash_content) + "\n")
file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
print("Check Points Passed 👍\n")
for (i, each_clash_proxy) in enumerate(safe_clash):
# c_clash type is a list with one item
corresponding_list.append(
{"id": corresponding_id, "c_clash": each_clash_proxy})
corresponding_id += 1
else:
print(
f'there is no clash lines {each_url}')
# file = open(f'{sub_list_path}{ids:0>2d}.txt',
# 'a+', encoding='utf-8')
# file.write("there is no clash lines")
# file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
else:
print(
f'there is no clash lines first stage {each_url}')
# file = open(f'{sub_list_path}{ids:0>2d}.txt',
# 'a+', encoding='utf-8')
# file.write(
# "there is no clash lines first stage")
# file.close()
print(
f'Writing content of {remarks} to {ids:0>2d}.txt\n')
else:
# file = open(f'{sub_list_path}{ids:0>2d}.txt',
# 'a+', encoding='utf-8')
# file.write('Url Subscription could not be parsed')
# file.close()
print(
f'Writing error of {remarks} to {ids:0>2d}.txt\n')
gather_quantity = list(
filter(lambda x: x != '', ''.join(content_list).split('\n'))).__len__()
print(f"already gathered {gather_quantity}")
print('\n')
print('----------------------------------------------')
print('\n')
print('Merging nodes...\n')
content_list = list(
filter(lambda x: x != '', ''.join(content_list).split("\n")))
content_raw = "\n".join(content_list)
print(f"{content_list.__len__()} lines - {bad_lines} bad lines => total is {content_list.__len__() - bad_lines}")
################ everything is fine till here ################
'''
we should have a list of corresponding proxies and they are ready to be fixed in 2 steps:
1- making their name better (using their server) => via thier clash corresponding
2- remove duplicate proxies from the list => via thier clash corresponding
after that we have clean list that contains both type that we need, with modification and no conversion :)
'''
################ Fix names ################
corresponding_list = subs_function.fix_proxies_name(
corresponding_proxies=corresponding_list)
################ Fix Duplication ################
corresponding_list = subs_function.fix_proxies_duplication(
corresponding_proxies=corresponding_list)
print(f"\nfinal sub length => {corresponding_list.__len__()}")
clash = list(map(lambda x: f" - {x['c_clash']}", corresponding_list))
content_yaml = 'proxies:\n' + "\n".join(clash)
# mixed = list(map(lambda x: x["c_mixed"], corresponding_list))
# content_raw = "\n".join(mixed)
# content_base64 = sub_convert.base64_encode(content_raw)
# content = content_raw
##############################
def content_write(file, output_type):
file = open(file, 'w+', encoding='utf-8')
file.write(output_type)
file.close
content_write(f'{sub_merge_path}/{output_path}.yml', content_yaml)
print('Done!\n')
if __name__ == "__main__":
subs.get_subs([])