mirror of
https://github.com/mahdibland/V2RayAggregator.git
synced 2026-06-03 21:01:49 +08:00
841 lines
46 KiB
Python
841 lines
46 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import re
|
||
import yaml
|
||
import json
|
||
import base64
|
||
import requests
|
||
import socket
|
||
import urllib.parse
|
||
from requests.adapters import HTTPAdapter
|
||
|
||
import geoip2.database
|
||
|
||
|
||
class sub_convert():
|
||
|
||
# {'input_type': ['url', 'content'],'output_type': ['url', 'YAML', 'Base64']}
|
||
def main(raw_input, input_type='url', output_type='url', custom_set={'dup_rm_enabled': False, 'format_name_enabled': False}):
|
||
"""Convert subscribe content to YAML or Base64 or url.
|
||
首先获取到订阅内容,然后对其进行格式化处理。如果内容不是 “订阅内容解析错误”,在进行去重、改名操作后(可选)输出目标格式,否则输出 “订阅内容解析错误”。
|
||
"""
|
||
if input_type == 'url': # 获取 URL 订阅链接内容
|
||
sub_content = ''
|
||
if isinstance(raw_input, list):
|
||
a_content = []
|
||
for url in raw_input:
|
||
s = requests.Session()
|
||
s.mount('http://', HTTPAdapter(max_retries=5))
|
||
s.mount('https://', HTTPAdapter(max_retries=5))
|
||
try:
|
||
print('Downloading from:' + url)
|
||
resp = s.get(url, timeout=5)
|
||
s_content = sub_convert.yaml_decode(
|
||
sub_convert.format(resp.content.decode('utf-8')))
|
||
a_content.append(s_content)
|
||
except Exception as err:
|
||
print(err)
|
||
return 'Url 解析错误'
|
||
sub_content = sub_convert.format(''.join(a_content))
|
||
else:
|
||
s = requests.Session()
|
||
s.mount('http://', HTTPAdapter(max_retries=5))
|
||
s.mount('https://', HTTPAdapter(max_retries=5))
|
||
try:
|
||
print('Downloading from:' + raw_input)
|
||
resp = s.get(raw_input, timeout=5)
|
||
sub_content = sub_convert.format(
|
||
resp.content.decode('utf-8'))
|
||
except Exception as err:
|
||
print(err)
|
||
return 'Url 解析错误'
|
||
elif input_type == 'content': # 解析订阅内容
|
||
sub_content = sub_convert.format(raw_input)
|
||
|
||
if sub_content != '订阅内容解析错误':
|
||
dup_rm_enabled = custom_set['dup_rm_enabled']
|
||
format_name_enabled = custom_set['format_name_enabled']
|
||
final_content = sub_convert.makeup(
|
||
sub_content, dup_rm_enabled, format_name_enabled)
|
||
if output_type == 'YAML':
|
||
return final_content
|
||
elif output_type == 'Base64':
|
||
return sub_convert.base64_encode(sub_convert.yaml_decode(final_content))
|
||
elif output_type == 'url':
|
||
return sub_convert.yaml_decode(final_content)
|
||
elif output_type == 'content':
|
||
return sub_convert.yaml_decode(final_content)
|
||
else:
|
||
print('Please define right output type.')
|
||
return '订阅内容解析错误'
|
||
else:
|
||
return '订阅内容解析错误'
|
||
|
||
# 对链接文本(Base64, url, YAML)进行格式化处理, 输出节点的配置字典(Clash 配置), output 为真是输出 YAML 文本
|
||
# 对链接文本(Base64, url, YAML)进行格式化处理, 输出节点的配置字典(Clash 配置), output 为真是输出 YAML 文本
|
||
def format(sub_content, output=False):
|
||
if '</b>' not in sub_content:
|
||
if 'proxies:' not in sub_content: # 对 URL 内容进行格式化处理
|
||
url_list = []
|
||
try:
|
||
if '://' not in sub_content:
|
||
sub_content = sub_convert.base64_decode(sub_content)
|
||
|
||
raw_url_list = re.split(r'\r?\n+', sub_content)
|
||
|
||
for url in raw_url_list:
|
||
while len(re.split('ss://|ssr://|vmess://|trojan://|vless://', url)) > 2:
|
||
try:
|
||
url_to_split = url[8:]
|
||
if 'ss://' in url_to_split and 'vmess://' not in url_to_split and 'vless://' not in url_to_split:
|
||
# https://www.runoob.com/python/att-string-replace.html
|
||
url_splited = url_to_split.replace(
|
||
'ss://', '\nss://', 1)
|
||
elif 'ssr://' in url_to_split:
|
||
url_splited = url_to_split.replace(
|
||
'ssr://', '\nssr://', 1)
|
||
elif 'vmess://' in url_to_split:
|
||
url_splited = url_to_split.replace(
|
||
'vmess://', '\nvmess://', 1)
|
||
elif 'trojan://' in url_to_split:
|
||
url_splited = url_to_split.replace(
|
||
'trojan://', '\ntrojan://', 1)
|
||
elif 'vless://' in url_to_split:
|
||
url_splited = url_to_split.replace(
|
||
'vless://', '\nvless://', 1)
|
||
url_split = url_splited.split('\n')
|
||
|
||
front_url = url[:8] + url_split[0]
|
||
url_list.append(front_url)
|
||
url = url_split[1]
|
||
except Exception as e:
|
||
print(
|
||
f"failed to fix one line in formatting line: {url}")
|
||
|
||
url_list.append(url)
|
||
|
||
url_content = '\n'.join(url_list)
|
||
return sub_convert.yaml_encode(url_content, output=False)
|
||
except:
|
||
print('Sub_content 格式错误')
|
||
return '订阅内容解析错误'
|
||
|
||
elif 'proxies:' in sub_content: # 对 Clash 内容进行格式化处理
|
||
try:
|
||
# fix clash servers from https://github.com/kxswa/k
|
||
if '!<str> ' in sub_content:
|
||
sub_content = sub_content.replace(
|
||
'!<str> ', '').replace('!<str>', '')
|
||
|
||
try_load = yaml.safe_load(sub_content)
|
||
if output:
|
||
raise ValueError
|
||
else:
|
||
content_yaml_dic = try_load
|
||
return content_yaml_dic # 返回字典, output 值为 True 时返回修饰过的 YAML 文本
|
||
except Exception:
|
||
try:
|
||
sub_content = sub_content.replace(
|
||
'\'', '').replace('"', '')
|
||
url_list = []
|
||
il_chars = ['|', '?', '[', ']', '@', '!', '%', ':']
|
||
lines = re.split(r'\n+', sub_content)
|
||
line_fix_list = []
|
||
for line in lines:
|
||
value_list = re.split(r': |, ', line)
|
||
if len(value_list) > 6:
|
||
value_list_fix = []
|
||
for value in value_list:
|
||
for char in il_chars:
|
||
value_il = False
|
||
if char in value:
|
||
value_il = True
|
||
break
|
||
if value_il == True and ('{' not in value and '}' not in value):
|
||
value = '"' + value + '"'
|
||
value_list_fix.append(value)
|
||
elif value_il == True and '}' in value:
|
||
if '}}}' in value:
|
||
host_part = value.replace(
|
||
'}}}', '')
|
||
host_value = '"'+host_part+'"}}}'
|
||
value_list_fix.append(host_value)
|
||
elif '}}' not in value:
|
||
host_part = value.replace('}', '')
|
||
host_value = '"'+host_part+'"}'
|
||
value_list_fix.append(host_value)
|
||
else:
|
||
value_list_fix.append(value)
|
||
line_fix = line
|
||
for index in range(len(value_list_fix)):
|
||
line_fix = line_fix.replace(
|
||
value_list[index], value_list_fix[index])
|
||
line_fix_list.append(line_fix)
|
||
elif len(value_list) == 2:
|
||
value_list_fix = []
|
||
for value in value_list:
|
||
for char in il_chars:
|
||
value_il = False
|
||
if char in value:
|
||
value_il = True
|
||
break
|
||
if value_il == True:
|
||
value = '"' + value + '"'
|
||
value_list_fix.append(value)
|
||
line_fix = line
|
||
for index in range(len(value_list_fix)):
|
||
line_fix = line_fix.replace(
|
||
value_list[index], value_list_fix[index])
|
||
line_fix_list.append(line_fix)
|
||
elif len(value_list) == 1:
|
||
if ':' in line:
|
||
line_fix_list.append(line)
|
||
else:
|
||
line_fix_list.append(line)
|
||
|
||
sub_content = '\n'.join(line_fix_list).replace(
|
||
'False', 'false').replace('True', 'true')
|
||
if output:
|
||
return sub_content
|
||
else:
|
||
content_yaml_dic = yaml.safe_load(sub_content)
|
||
return content_yaml_dic # 返回字典, output 值为 True 时返回修饰过的 YAML 文本
|
||
except:
|
||
print('Sub_content 格式错误')
|
||
return '订阅内容解析错误'
|
||
else:
|
||
print('订阅内容解析错误')
|
||
return '订阅内容解析错误'
|
||
else:
|
||
print('订阅内容解析错误')
|
||
return '订阅内容解析错误'
|
||
|
||
# 输入节点配置字典, 对节点进行区域的筛选和重命名,输出 YAML 文本
|
||
def makeup(input, dup_rm_enabled=False, format_name_enabled=False):
|
||
# 区域判断(Clash YAML): https://blog.csdn.net/CSDN_duomaomao/article/details/89712826 (ip-api)
|
||
if isinstance(input, dict):
|
||
sub_content = input
|
||
else:
|
||
sub_content = sub_convert.format(input)
|
||
proxies_list = sub_content['proxies']
|
||
|
||
if dup_rm_enabled: # 去重
|
||
print("\nBefore was " + str(proxies_list.__len__()) + "\n")
|
||
begin = 0
|
||
raw_length = len(proxies_list)
|
||
length = len(proxies_list)
|
||
while begin < length:
|
||
if (begin + 1) == 1:
|
||
print(f'\n-----Restart-----\nStarting Quantity {length}')
|
||
elif (begin + 1) % 100 == 0:
|
||
print(
|
||
f'Current Benchmark {begin + 1}-----Current Quantity {length}')
|
||
elif (begin + 1) == length and (begin + 1) % 100 != 0:
|
||
repetition = raw_length - length
|
||
print(
|
||
f'Current Benchmark {begin + 1}-----Current Quantity {length}\nNumber of Repetition {repetition}\n-----Deduplication Completed-----\n')
|
||
proxy_compared = proxies_list[begin]
|
||
|
||
begin_2 = begin + 1
|
||
while begin_2 <= (length - 1):
|
||
check = False
|
||
if proxy_compared['server'] == proxies_list[begin_2]['server'] and proxy_compared['port'] == proxies_list[begin_2]['port']:
|
||
check = True
|
||
if 'net' in proxies_list[begin_2] and 'net' in proxy_compared:
|
||
if proxy_compared['net'] != proxies_list[begin_2]['net']:
|
||
check = False
|
||
|
||
if 'tls' in proxies_list[begin_2] and 'tls' in proxy_compared:
|
||
if proxy_compared['tls'] != proxies_list[begin_2]['tls']:
|
||
check = False
|
||
|
||
if 'id' in proxies_list[begin_2] and 'id' in proxy_compared:
|
||
if proxy_compared['id'] != proxies_list[begin_2]['id']:
|
||
check = False
|
||
|
||
if 'password' in proxies_list[begin_2] and 'password' in proxy_compared:
|
||
if proxy_compared['password'] != proxies_list[begin_2]['password']:
|
||
check = False
|
||
|
||
if 'cipher' in proxies_list[begin_2] and 'cipher' in proxy_compared:
|
||
if proxy_compared['cipher'] != proxies_list[begin_2]['cipher']:
|
||
check = False
|
||
|
||
if 'type' in proxies_list[begin_2] and 'type' in proxy_compared:
|
||
if proxy_compared['type'] != proxies_list[begin_2]['type']:
|
||
check = False
|
||
|
||
if check:
|
||
proxies_list.pop(begin_2)
|
||
length -= 1
|
||
|
||
begin_2 += 1
|
||
begin += 1
|
||
|
||
print("\nNow is " + str(proxies_list.__len__()) + "\n")
|
||
|
||
url_list = []
|
||
|
||
for proxy in proxies_list: # 改名
|
||
if format_name_enabled:
|
||
emoji = {
|
||
'AD': '🇦🇩', 'AE': '🇦🇪', 'AF': '🇦🇫', 'AG': '🇦🇬',
|
||
'AI': '🇦🇮', 'AL': '🇦🇱', 'AM': '🇦🇲', 'AO': '🇦🇴',
|
||
'AQ': '🇦🇶', 'AR': '🇦🇷', 'AS': '🇦🇸', 'AT': '🇦🇹',
|
||
'AU': '🇦🇺', 'AW': '🇦🇼', 'AX': '🇦🇽', 'AZ': '🇦🇿',
|
||
'BA': '🇧🇦', 'BB': '🇧🇧', 'BD': '🇧🇩', 'BE': '🇧🇪',
|
||
'BF': '🇧🇫', 'BG': '🇧🇬', 'BH': '🇧🇭', 'BI': '🇧🇮',
|
||
'BJ': '🇧🇯', 'BL': '🇧🇱', 'BM': '🇧🇲', 'BN': '🇧🇳',
|
||
'BO': '🇧🇴', 'BQ': '🇧🇶', 'BR': '🇧🇷', 'BS': '🇧🇸',
|
||
'BT': '🇧🇹', 'BV': '🇧🇻', 'BW': '🇧🇼', 'BY': '🇧🇾',
|
||
'BZ': '🇧🇿', 'CA': '🇨🇦', 'CC': '🇨🇨', 'CD': '🇨🇩',
|
||
'CF': '🇨🇫', 'CG': '🇨🇬', 'CH': '🇨🇭', 'CI': '🇨🇮',
|
||
'CK': '🇨🇰', 'CL': '🇨🇱', 'CM': '🇨🇲', 'CN': '🇨🇳',
|
||
'CO': '🇨🇴', 'CR': '🇨🇷', 'CU': '🇨🇺', 'CV': '🇨🇻',
|
||
'CW': '🇨🇼', 'CX': '🇨🇽', 'CY': '🇨🇾', 'CZ': '🇨🇿',
|
||
'DE': '🇩🇪', 'DJ': '🇩🇯', 'DK': '🇩🇰', 'DM': '🇩🇲',
|
||
'DO': '🇩🇴', 'DZ': '🇩🇿', 'EC': '🇪🇨', 'EE': '🇪🇪',
|
||
'EG': '🇪🇬', 'EH': '🇪🇭', 'ER': '🇪🇷', 'ES': '🇪🇸',
|
||
'ET': '🇪🇹', 'EU': '🇪🇺', 'FI': '🇫🇮', 'FJ': '🇫🇯',
|
||
'FK': '🇫🇰', 'FM': '🇫🇲', 'FO': '🇫🇴', 'FR': '🇫🇷',
|
||
'GA': '🇬🇦', 'GB': '🇬🇧', 'GD': '🇬🇩', 'GE': '🇬🇪',
|
||
'GF': '🇬🇫', 'GG': '🇬🇬', 'GH': '🇬🇭', 'GI': '🇬🇮',
|
||
'GL': '🇬🇱', 'GM': '🇬🇲', 'GN': '🇬🇳', 'GP': '🇬🇵',
|
||
'GQ': '🇬🇶', 'GR': '🇬🇷', 'GS': '🇬🇸', 'GT': '🇬🇹',
|
||
'GU': '🇬🇺', 'GW': '🇬🇼', 'GY': '🇬🇾', 'HK': '🇭🇰',
|
||
'HM': '🇭🇲', 'HN': '🇭🇳', 'HR': '🇭🇷', 'HT': '🇭🇹',
|
||
'HU': '🇭🇺', 'ID': '🇮🇩', 'IE': '🇮🇪', 'IL': '🇮🇱',
|
||
'IM': '🇮🇲', 'IN': '🇮🇳', 'IO': '🇮🇴', 'IQ': '🇮🇶',
|
||
'IR': '🇮🇷', 'IS': '🇮🇸', 'IT': '🇮🇹', 'JE': '🇯🇪',
|
||
'JM': '🇯🇲', 'JO': '🇯🇴', 'JP': '🇯🇵', 'KE': '🇰🇪',
|
||
'KG': '🇰🇬', 'KH': '🇰🇭', 'KI': '🇰🇮', 'KM': '🇰🇲',
|
||
'KN': '🇰🇳', 'KP': '🇰🇵', 'KR': '🇰🇷', 'KW': '🇰🇼',
|
||
'KY': '🇰🇾', 'KZ': '🇰🇿', 'LA': '🇱🇦', 'LB': '🇱🇧',
|
||
'LC': '🇱🇨', 'LI': '🇱🇮', 'LK': '🇱🇰', 'LR': '🇱🇷',
|
||
'LS': '🇱🇸', 'LT': '🇱🇹', 'LU': '🇱🇺', 'LV': '🇱🇻',
|
||
'LY': '🇱🇾', 'MA': '🇲🇦', 'MC': '🇲🇨', 'MD': '🇲🇩',
|
||
'ME': '🇲🇪', 'MF': '🇲🇫', 'MG': '🇲🇬', 'MH': '🇲🇭',
|
||
'MK': '🇲🇰', 'ML': '🇲🇱', 'MM': '🇲🇲', 'MN': '🇲🇳',
|
||
'MO': '🇲🇴', 'MP': '🇲🇵', 'MQ': '🇲🇶', 'MR': '🇲🇷',
|
||
'MS': '🇲🇸', 'MT': '🇲🇹', 'MU': '🇲🇺', 'MV': '🇲🇻',
|
||
'MW': '🇲🇼', 'MX': '🇲🇽', 'MY': '🇲🇾', 'MZ': '🇲🇿',
|
||
'NA': '🇳🇦', 'NC': '🇳🇨', 'NE': '🇳🇪', 'NF': '🇳🇫',
|
||
'NG': '🇳🇬', 'NI': '🇳🇮', 'NL': '🇳🇱', 'NO': '🇳🇴',
|
||
'NP': '🇳🇵', 'NR': '🇳🇷', 'NU': '🇳🇺', 'NZ': '🇳🇿',
|
||
'OM': '🇴🇲', 'PA': '🇵🇦', 'PE': '🇵🇪', 'PF': '🇵🇫',
|
||
'PG': '🇵🇬', 'PH': '🇵🇭', 'PK': '🇵🇰', 'PL': '🇵🇱',
|
||
'PM': '🇵🇲', 'PN': '🇵🇳', 'PR': '🇵🇷', 'PS': '🇵🇸',
|
||
'PT': '🇵🇹', 'PW': '🇵🇼', 'PY': '🇵🇾', 'QA': '🇶🇦',
|
||
'RE': '🇷🇪', 'RO': '🇷🇴', 'RS': '🇷🇸', 'RU': '🇷🇺',
|
||
'RW': '🇷🇼', 'SA': '🇸🇦', 'SB': '🇸🇧', 'SC': '🇸🇨',
|
||
'SD': '🇸🇩', 'SE': '🇸🇪', 'SG': '🇸🇬', 'SH': '🇸🇭',
|
||
'SI': '🇸🇮', 'SJ': '🇸🇯', 'SK': '🇸🇰', 'SL': '🇸🇱',
|
||
'SM': '🇸🇲', 'SN': '🇸🇳', 'SO': '🇸🇴', 'SR': '🇸🇷',
|
||
'SS': '🇸🇸', 'ST': '🇸🇹', 'SV': '🇸🇻', 'SX': '🇸🇽',
|
||
'SY': '🇸🇾', 'SZ': '🇸🇿', 'TC': '🇹🇨', 'TD': '🇹🇩',
|
||
'TF': '🇹🇫', 'TG': '🇹🇬', 'TH': '🇹🇭', 'TJ': '🇹🇯',
|
||
'TK': '🇹🇰', 'TL': '🇹🇱', 'TM': '🇹🇲', 'TN': '🇹🇳',
|
||
'TO': '🇹🇴', 'TR': '🇹🇷', 'TT': '🇹🇹', 'TV': '🇹🇻',
|
||
'TW': '🇹🇼', 'TZ': '🇹🇿', 'UA': '🇺🇦', 'UG': '🇺🇬',
|
||
'UM': '🇺🇲', 'US': '🇺🇸', 'UY': '🇺🇾', 'UZ': '🇺🇿',
|
||
'VA': '🇻🇦', 'VC': '🇻🇨', 'VE': '🇻🇪', 'VG': '🇻🇬',
|
||
'VI': '🇻🇮', 'VN': '🇻🇳', 'VU': '🇻🇺', 'WF': '🇼🇫',
|
||
'WS': '🇼🇸', 'XK': '🇽🇰', 'YE': '🇾🇪', 'YT': '🇾🇹',
|
||
'ZA': '🇿🇦', 'ZM': '🇿🇲', 'ZW': '🇿🇼',
|
||
'RELAY': '🏁',
|
||
'NOWHERE': '🇦🇶',
|
||
}
|
||
|
||
server = proxy['server']
|
||
if server.replace('.', '').isdigit():
|
||
ip = server
|
||
else:
|
||
try:
|
||
# https://cloud.tencent.com/developer/article/1569841
|
||
ip = socket.gethostbyname(server)
|
||
except Exception:
|
||
ip = server
|
||
|
||
with geoip2.database.Reader('./utils/Country.mmdb') as ip_reader:
|
||
try:
|
||
response = ip_reader.country(ip)
|
||
country_code = response.country.iso_code
|
||
except Exception:
|
||
ip = '0.0.0.0'
|
||
country_code = 'NOWHERE'
|
||
|
||
if country_code == 'CLOUDFLARE':
|
||
country_code = 'RELAY'
|
||
elif country_code == 'PRIVATE':
|
||
country_code = 'RELAY'
|
||
|
||
if country_code in emoji:
|
||
name_emoji = emoji[country_code]
|
||
else:
|
||
name_emoji = emoji['NOWHERE']
|
||
|
||
proxy_index = proxies_list.index(proxy)
|
||
if len(proxies_list) >= 999:
|
||
proxy['name'] = f'{name_emoji}{country_code}-{ip}-{proxy_index:0>4d}'
|
||
elif len(proxies_list) <= 999 and len(proxies_list) > 99:
|
||
proxy['name'] = f'{name_emoji}{country_code}-{ip}-{proxy_index:0>3d}'
|
||
elif len(proxies_list) <= 99:
|
||
proxy['name'] = f'{name_emoji}{country_code}-{ip}-{proxy_index:0>2d}'
|
||
|
||
if proxy['server'] != '127.0.0.1':
|
||
proxy_str = str(proxy)
|
||
url_list.append(proxy_str)
|
||
elif format_name_enabled == False:
|
||
if proxy['server'] != '127.0.0.1': # 防止加入无用节点
|
||
proxy_str = str(proxy)
|
||
url_list.append(proxy_str)
|
||
|
||
yaml_content_dic = {'proxies': url_list}
|
||
# yaml.dump 显示中文方法 https://blog.csdn.net/weixin_41548578/article/details/90651464 yaml.dump 各种参数 https://blog.csdn.net/swinfans/article/details/88770119
|
||
yaml_content_raw = yaml.dump(yaml_content_dic, default_flow_style=False,
|
||
sort_keys=False, allow_unicode=True, width=750, indent=2)
|
||
yaml_content = sub_convert.format(yaml_content_raw, output=True)
|
||
|
||
return yaml_content # 输出 YAML 格式文本
|
||
|
||
# 将 URL 内容转换为 YAML 文本, output 为 False 时输出节点配置字典
|
||
# 将 URL 内容转换为 YAML 文本, output 为 False 时输出节点配置字典
|
||
# to yaml
|
||
def yaml_encode(url_content, output=True):
|
||
try:
|
||
url_list = []
|
||
|
||
lines = re.split(r'\n+', url_content)
|
||
|
||
for line in lines:
|
||
try:
|
||
yaml_url = {}
|
||
if 'vmess://' in line:
|
||
try:
|
||
vmess_json_config = json.loads(
|
||
sub_convert.base64_decode(line.replace('vmess://', '')))
|
||
vmess_default_config = {
|
||
'v': 'Vmess Node', 'ps': 'Vmess Node', 'add': '0.0.0.0', 'port': 0, 'id': '',
|
||
'aid': 0, 'scy': 'auto', 'net': '', 'type': '', 'host': '', 'path': '/', 'tls': ''
|
||
}
|
||
vmess_default_config.update(vmess_json_config)
|
||
vmess_config = vmess_default_config
|
||
|
||
yaml_url = {}
|
||
#yaml_config_str = ['name', 'server', 'port', 'type', 'uuid', 'alterId', 'cipher', 'tls', 'skip-cert-verify', 'network', 'ws-path', 'ws-headers']
|
||
#vmess_config_str = ['ps', 'add', 'port', 'id', 'aid', 'scy', 'tls', 'net', 'host', 'path']
|
||
# 生成 yaml 节点字典
|
||
if vmess_config['id'] == '' or vmess_config['id'] is None:
|
||
print('节点格式错误')
|
||
else:
|
||
yaml_url.setdefault(
|
||
'name', urllib.parse.unquote(str(vmess_config['ps'])))
|
||
yaml_url.setdefault(
|
||
'server', vmess_config['add'])
|
||
yaml_url.setdefault(
|
||
'port', int(vmess_config['port']))
|
||
yaml_url.setdefault('type', 'vmess')
|
||
yaml_url.setdefault('uuid', vmess_config['id'])
|
||
yaml_url.setdefault(
|
||
'alterId', int(vmess_config['aid']))
|
||
yaml_url.setdefault(
|
||
'cipher', vmess_config['scy'])
|
||
yaml_url.setdefault('skip-cert-verify', True)
|
||
if vmess_config['net'] == '' or vmess_config['net'] is False or vmess_config['net'] is None:
|
||
yaml_url.setdefault('network', 'tcp')
|
||
else:
|
||
yaml_url.setdefault(
|
||
'network', vmess_config['net'])
|
||
if vmess_config['tls'] == 'tls' or vmess_config['net'] == 'h2' or vmess_config['net'] == 'grpc':
|
||
yaml_url.setdefault('tls', True)
|
||
# elif vmess_config['tls'] == '':
|
||
# yaml_url.setdefault('tls', False)
|
||
# else:
|
||
# yaml_url.setdefault('tls', 'tls')
|
||
|
||
yaml_url.setdefault('ws-opts', {})
|
||
if vmess_config['path'] == '' or vmess_config['path'] is False or vmess_config['path'] is None:
|
||
# yaml_url['ws-opts'].setdefault('path', '/')
|
||
pass
|
||
else:
|
||
yaml_url['ws-opts'].setdefault(
|
||
'path', vmess_config['path'])
|
||
if vmess_config['host'] == '':
|
||
pass
|
||
# yaml_url['ws-opts'].setdefault(
|
||
# 'headers', {'Host': vmess_config['add']})
|
||
else:
|
||
yaml_url['ws-opts'].setdefault(
|
||
'headers', {'Host': vmess_config['host']})
|
||
|
||
url_list.append(yaml_url)
|
||
except Exception as err:
|
||
print(f'yaml_encode 解析 vmess 节点发生错误: {err}')
|
||
pass
|
||
|
||
if 'ss://' in line and 'vless://' not in line and 'vmess://' not in line:
|
||
if '#' not in line:
|
||
line = line + '#SS%20Node'
|
||
try:
|
||
ss_content = line.replace('ss://', '')
|
||
# https://www.runoob.com/python/att-string-split.html
|
||
part_list = ss_content.split('#', 1)
|
||
yaml_url.setdefault(
|
||
'name', urllib.parse.unquote(part_list[1]))
|
||
if '@' in part_list[0]:
|
||
mix_part = part_list[0].split('@', 1)
|
||
method_part = sub_convert.base64_decode(
|
||
mix_part[0])
|
||
server_part = f'{method_part}@{mix_part[1]}'
|
||
else:
|
||
server_part = sub_convert.base64_decode(
|
||
part_list[0])
|
||
|
||
# 使用多个分隔符 https://blog.csdn.net/shidamowang/article/details/80254476 https://zhuanlan.zhihu.com/p/92287240
|
||
server_part_list = server_part.split(':', 1)
|
||
method_part = server_part_list[0]
|
||
server_part_list = server_part_list[1].rsplit(
|
||
'@', 1)
|
||
password_part = server_part_list[0]
|
||
server_part_list = server_part_list[1].split(
|
||
':', 1)
|
||
|
||
yaml_url.setdefault('server', server_part_list[0])
|
||
yaml_url.setdefault('port', server_part_list[1])
|
||
yaml_url.setdefault('type', 'ss')
|
||
yaml_url.setdefault('cipher', method_part)
|
||
yaml_url.setdefault('password', password_part)
|
||
|
||
url_list.append(yaml_url)
|
||
except Exception as err:
|
||
print(f'yaml_encode 解析 ss 节点发生错误: {err}')
|
||
pass
|
||
|
||
if 'ssr://' in line:
|
||
try:
|
||
ssr_content = sub_convert.base64_decode(
|
||
line.replace('ssr://', ''))
|
||
|
||
parts = re.split(':', ssr_content)
|
||
if len(parts) != 6:
|
||
print('SSR 格式错误: %s' % ssr_content)
|
||
password_and_params = parts[5]
|
||
password_and_params = re.split(
|
||
'/\?', password_and_params)
|
||
password_encode_str = password_and_params[0]
|
||
params = password_and_params[1]
|
||
|
||
param_parts = re.split('\&', params)
|
||
param_dic = {'remarks': 'U1NSIE5vZGU=',
|
||
'obfsparam': '', 'protoparam': '', 'group': ''}
|
||
for part in param_parts:
|
||
key_and_value = re.split('\=', part)
|
||
param_dic.update(
|
||
{key_and_value[0]: key_and_value[1]})
|
||
yaml_url.setdefault(
|
||
'name', sub_convert.base64_decode(param_dic['remarks']))
|
||
yaml_url.setdefault('server', parts[0])
|
||
yaml_url.setdefault('port', parts[1])
|
||
yaml_url.setdefault('type', 'ssr')
|
||
yaml_url.setdefault('cipher', parts[3])
|
||
yaml_url.setdefault(
|
||
'password', sub_convert.base64_decode(password_encode_str))
|
||
yaml_url.setdefault('obfs', parts[4])
|
||
yaml_url.setdefault('protocol', parts[2])
|
||
yaml_url.setdefault(
|
||
'obfsparam', sub_convert.base64_decode(param_dic['obfsparam']))
|
||
yaml_url.setdefault(
|
||
'protoparam', sub_convert.base64_decode(param_dic['protoparam']))
|
||
yaml_url.setdefault(
|
||
'group', sub_convert.base64_decode(param_dic['group']))
|
||
|
||
url_list.append(yaml_url)
|
||
except Exception as err:
|
||
print(f'yaml_encode 解析 ssr 节点发生错误: {err}')
|
||
pass
|
||
|
||
if 'trojan://' in line:
|
||
try:
|
||
url_content = line.replace('trojan://', '')
|
||
# https://www.runoob.com/python/att-string-split.html
|
||
part_list = re.split('#', url_content, maxsplit=1)
|
||
yaml_url.setdefault(
|
||
'name', urllib.parse.unquote(part_list[1]))
|
||
|
||
server_part = part_list[0].replace('trojan://', '')
|
||
# 使用多个分隔符 https://blog.csdn.net/shidamowang/article/details/80254476 https://zhuanlan.zhihu.com/p/92287240
|
||
server_part_list = re.split(
|
||
':|@|\?|&', server_part)
|
||
yaml_url.setdefault('server', server_part_list[1])
|
||
yaml_url.setdefault('port', server_part_list[2])
|
||
yaml_url.setdefault('type', 'trojan')
|
||
yaml_url.setdefault(
|
||
'password', server_part_list[0])
|
||
server_part_list = server_part_list[3:]
|
||
|
||
for config in server_part_list:
|
||
if 'sni=' in config:
|
||
yaml_url.setdefault('sni', config[4:])
|
||
elif 'allowInsecure=' in config or 'tls=' in config:
|
||
if config[-1] == 0:
|
||
yaml_url.setdefault('tls', False)
|
||
elif 'type=' in config:
|
||
if config[5:] != 'tcp':
|
||
yaml_url.setdefault(
|
||
'network', config[5:])
|
||
elif 'path=' in config:
|
||
yaml_url.setdefault('ws-path', config[5:])
|
||
elif 'security=' in config:
|
||
if config[9:] != 'tls':
|
||
yaml_url.setdefault('tls', False)
|
||
|
||
yaml_url.setdefault('skip-cert-verify', True)
|
||
|
||
url_list.append(yaml_url)
|
||
except Exception as err:
|
||
print(f'yaml_encode 解析 trojan 节点发生错误: {err}')
|
||
pass
|
||
|
||
except Exception as e:
|
||
print(
|
||
f'failed to proccess yaml encoding the raw line: {line} & error: {e}')
|
||
|
||
yaml_content_dic = {'proxies': url_list}
|
||
if output:
|
||
yaml_content = yaml.dump(yaml_content_dic, default_flow_style=False,
|
||
sort_keys=False, allow_unicode=True, width=750, indent=2)
|
||
else:
|
||
yaml_content = yaml_content_dic
|
||
return yaml_content
|
||
|
||
except Exception as err:
|
||
print(f'yaml encode error: {err}')
|
||
|
||
def base64_encode(url_content): # 将 URL 内容转换为 Base64
|
||
if url_content == None:
|
||
url_content = ''
|
||
base64_content = base64.b64encode(
|
||
url_content.encode('utf-8')).decode('ascii')
|
||
return base64_content
|
||
|
||
# to url
|
||
def yaml_decode(url_content): # YAML 文本转换为 URL 链接内容
|
||
try:
|
||
if isinstance(url_content, dict):
|
||
sub_content = url_content
|
||
else:
|
||
sub_content = sub_convert.format(url_content)
|
||
|
||
print("Formatting Completed!")
|
||
proxies_list = sub_content['proxies']
|
||
|
||
protocol_url = []
|
||
# 不同节点订阅链接内容 https://github.com/hoochanlon/fq-book/blob/master/docs/append/srvurl.md
|
||
for index in range(len(proxies_list)):
|
||
try:
|
||
proxy = proxies_list[index]
|
||
|
||
# Vmess 节点提取, 由 Vmess 所有参数 dump JSON 后 base64 encode 得来。
|
||
if proxy['type'] == 'vmess':
|
||
|
||
yaml_default_config = {
|
||
'name': 'Vmess Node', 'server': '0.0.0.0', 'port': 0, 'uuid': '', 'alterId': 0,
|
||
'cipher': 'auto', 'network': 'ws',
|
||
'ws-opts': {'path': '', 'headers': {'Host': ''}},
|
||
'tls': '', 'sni': ''
|
||
}
|
||
#
|
||
yaml_default_config.update(proxy)
|
||
proxy_config = yaml_default_config
|
||
|
||
vmess_value = {
|
||
'v': 2, 'ps': proxy_config['name'], 'add': proxy_config['server'],
|
||
'port': proxy_config['port'], 'id': proxy_config['uuid'], 'aid': proxy_config['alterId'],
|
||
'scy': proxy_config['cipher'], 'net': proxy_config['network'], 'type': None, 'sni': proxy_config['sni']
|
||
}
|
||
|
||
if 'tls' in proxy:
|
||
if proxy['tls'] == 'true' or proxy['tls'] == True:
|
||
vmess_value['tls'] = 'tls'
|
||
# else:
|
||
# vmess_value['tls'] = ''
|
||
|
||
if 'ws-opts' in proxy:
|
||
if proxy['ws-opts'] != None and proxy['ws-opts'] != {} and proxy['ws-opts'] != '':
|
||
|
||
if 'headers' in proxy_config['ws-opts']:
|
||
if proxy_config['ws-opts']['headers']['Host'] != '':
|
||
vmess_value['host'] = proxy_config['ws-opts']['headers']['Host']
|
||
|
||
if 'path' in proxy_config['ws-opts']:
|
||
if proxy_config['ws-opts']['path'] != '':
|
||
vmess_value['path'] = proxy_config['ws-opts']['path']
|
||
|
||
vmess_raw_proxy = json.dumps(
|
||
vmess_value, sort_keys=False, indent=2, ensure_ascii=False)
|
||
vmess_proxy = str(
|
||
'\nvmess://' + sub_convert.base64_encode(vmess_raw_proxy) + '\n')
|
||
protocol_url.append(vmess_proxy)
|
||
|
||
# SS 节点提取, 由 ss_base64_decoded 部分(参数: 'cipher', 'password', 'server', 'port') Base64 编码后 加 # 加注释(URL_encode)
|
||
elif proxy['type'] == 'ss':
|
||
ss_base64_decoded = str(proxy['cipher']) + ':' + str(
|
||
proxy['password']) + '@' + str(proxy['server']) + ':' + str(proxy['port'])
|
||
ss_base64 = sub_convert.base64_encode(
|
||
ss_base64_decoded)
|
||
ss_proxy = str('\nss://' + ss_base64 + '#' +
|
||
str(urllib.parse.quote(proxy['name'])) + '\n')
|
||
protocol_url.append(ss_proxy)
|
||
|
||
# Trojan 节点提取, 由 trojan_proxy 中参数再加上 # 加注释(URL_encode) # trojan Go https://p4gefau1t.github.io/trojan-go/developer/url/
|
||
elif proxy['type'] == 'trojan':
|
||
if 'tls' in proxy.keys() and 'network' in proxy.keys():
|
||
if proxy['tls'] == True and proxy['network'] != 'tcp':
|
||
network_type = proxy['network']
|
||
trojan_go = f'?security=tls&type={network_type}&headerType=none'
|
||
elif proxy['tls'] == False and proxy['network'] != 'tcp':
|
||
trojan_go = f'??allowInsecure=0&type={network_type}&headerType=none'
|
||
else:
|
||
trojan_go = '?allowInsecure=1'
|
||
if 'sni' in proxy.keys():
|
||
trojan_go = trojan_go+'&sni='+proxy['sni']
|
||
trojan_proxy = str('\ntrojan://' + str(proxy['password']) + '@' + str(proxy['server']) + ':' + str(
|
||
proxy['port']) + trojan_go + '#' + str(urllib.parse.quote(proxy['name'])) + '\n')
|
||
protocol_url.append(trojan_proxy)
|
||
|
||
# ssr 节点提取, 由 ssr_base64_decoded 中所有参数总体 base64 encode
|
||
elif proxy['type'] == 'ssr':
|
||
ssr_default_config = {}
|
||
remarks = sub_convert.base64_encode(
|
||
proxy['name']).replace('+', '-')
|
||
server = proxy['server']
|
||
port = str(proxy['port'])
|
||
password = sub_convert.base64_encode(proxy['password'])
|
||
cipher = proxy['cipher']
|
||
protocol = proxy['protocol']
|
||
obfs = proxy['obfs']
|
||
param_dic = {'group': 'U1NSUHJvdmlkZXI',
|
||
'obfsparam': '', 'protoparam': ''}
|
||
for key in param_dic.keys():
|
||
try:
|
||
param_dic.update(
|
||
{key: sub_convert.base64_encode(proxy[key])})
|
||
except Exception:
|
||
pass
|
||
group, obfsparam, protoparam = param_dic[
|
||
'group'], param_dic['obfsparam'], param_dic['protoparam']
|
||
"""
|
||
for key in {'group', 'obfsparam', 'protoparam'}:
|
||
if key in proxy:
|
||
if key == 'group':
|
||
group = sub_convert.base64_encode(proxy[key])
|
||
elif key == 'obfsparam':
|
||
obfsparam = sub_convert.base64_encode(proxy[key])
|
||
elif key == 'protoparam':
|
||
protoparam = sub_convert.base64_encode(proxy[key])
|
||
else:
|
||
if key == 'group':
|
||
group = 'U1NSUHJvdmlkZXI'
|
||
elif key == 'obfsparam':
|
||
obfsparam = ''
|
||
elif key == 'protoparam':
|
||
protoparam = ''
|
||
"""
|
||
|
||
ssr_proxy = '\nssr://'+sub_convert.base64_encode(server+':'+port+':'+protocol+':'+cipher+':'+obfs+':' +
|
||
password+'/?group='+group+'&remarks='+remarks+'&obfsparam='+obfsparam+'&protoparam='+protoparam+'\n')
|
||
protocol_url.append(ssr_proxy)
|
||
|
||
except Exception as e:
|
||
print(f'yaml decode Error in coverting servers {e} 错误')
|
||
|
||
yaml_content = ''.join(protocol_url)
|
||
|
||
# note added here
|
||
yaml_content = list(
|
||
filter(lambda x: x != '', yaml_content.split("\n")))
|
||
yaml_content = "\n".join(yaml_content)
|
||
|
||
return yaml_content
|
||
except Exception as err:
|
||
print(f'yaml decode 发生 {err} 错误')
|
||
return '订阅内容解析错误'
|
||
|
||
def base64_decode(url_content): # Base64 转换为 URL 链接内容
|
||
if '-' in url_content:
|
||
url_content = url_content.replace('-', '+')
|
||
if '_' in url_content:
|
||
url_content = url_content.replace('_', '/')
|
||
# print(len(url_content))
|
||
missing_padding = len(url_content) % 4
|
||
if missing_padding != 0:
|
||
# 不是4的倍数后加= https://www.cnblogs.com/wswang/p/7717997.html
|
||
url_content += '='*(4 - missing_padding)
|
||
try:
|
||
base64_content = base64.b64decode(url_content.encode(
|
||
'utf-8')).decode('utf-8', 'ignore') # https://www.codenong.com/42339876/
|
||
base64_content_format = base64_content
|
||
return base64_content_format
|
||
except UnicodeDecodeError:
|
||
base64_content = base64.b64decode(url_content)
|
||
base64_content_format = base64_content
|
||
return str(base64_content)
|
||
|
||
# {url='订阅链接', output_type={'clash': 输出 Clash 配置, 'base64': 输出 Base64 配置, 'url': 输出 url 配置}, host='远程订阅转化服务地址'}
|
||
def convert_remote(url='', output_type='clash', host='http://127.0.0.1:25500'):
|
||
# 使用远程订阅转换服务,输出相应配置。
|
||
sever_host = host
|
||
# https://docs.python.org/zh-cn/3/library/urllib.parse.html
|
||
url = urllib.parse.quote(url, safe='')
|
||
if output_type == 'clash':
|
||
converted_url = sever_host+'/sub?target=clash&url=' + \
|
||
url+'&insert=false&emoji=true&list=true'
|
||
try:
|
||
resp = requests.get(converted_url)
|
||
except Exception as err:
|
||
print(err)
|
||
return 'Url 解析错误'
|
||
if resp.text == 'No nodes were found!':
|
||
sub_content = 'Url 解析错误'
|
||
else:
|
||
sub_content = sub_convert.makeup(sub_convert.format(
|
||
resp.text), dup_rm_enabled=False, format_name_enabled=True)
|
||
elif output_type == 'base64':
|
||
converted_url = sever_host+'/sub?target=mixed&url=' + \
|
||
url+'&insert=false&emoji=true&list=true'
|
||
try:
|
||
resp = requests.get(converted_url)
|
||
except Exception as err:
|
||
print(err)
|
||
return 'Url 解析错误'
|
||
if resp.text == 'No nodes were found!':
|
||
sub_content = 'Url 解析错误'
|
||
else:
|
||
sub_content = sub_convert.base64_encode(resp.text)
|
||
elif output_type == 'url':
|
||
converted_url = sever_host+'/sub?target=mixed&url=' + \
|
||
url+'&insert=false&emoji=true&list=true'
|
||
try:
|
||
resp = requests.get(converted_url)
|
||
except Exception as err:
|
||
print(err)
|
||
return 'Url 解析错误'
|
||
if resp.text == 'No nodes were found!':
|
||
sub_content = 'Url 解析错误'
|
||
else:
|
||
sub_content = resp.text
|
||
|
||
return sub_content
|
||
|
||
|
||
if __name__ == '__main__':
|
||
subscribe = 'https://cdn.jsdelivr.net/gh/mahdibland/ShadowsocksAggregator@master/sub/sub_merge.txt'
|
||
output_path = './output.txt'
|
||
|
||
content = sub_convert.main(subscribe, 'url', 'YAML')
|
||
|
||
file = open(output_path, 'w', encoding='utf-8')
|
||
file.write(content)
|
||
file.close()
|
||
print(f'Writing content to output.txt\n')
|