修改巨量签到driver路径

This commit is contained in:
zhangdongxu 2024-02-23 11:02:10 +08:00
parent 2ef19cd3b3
commit 2a35ed7a5b
3 changed files with 213 additions and 1 deletions

BIN
drivers/chromedriver Normal file

Binary file not shown.

212
githubAuto.py Normal file
View File

@ -0,0 +1,212 @@
import email
import imaplib
import re
import threading
import time
from threading import Thread
from flask import Flask, jsonify, request
FETCHING = "Fetching"
LOGIN_FAILED = "LOGIN failed."
app = Flask(__name__)
class EmailManager:
def __init__(self):
self.verification_codes = {}
self.credentials = []
def read_credentials(self, filename):
with open(filename, 'r') as file:
lines = file.readlines()
accounts = []
threads = []
for line in lines:
line = line.strip()
if line:
parts = line.split('----')
if len(parts) < 2:
continue
account, password = parts[0], parts[1]
# 创建并启动一个新的线程来检查账户健康状态
thread = threading.Thread(target=self.check_and_append_account,
args=(account, password, accounts, line, parts))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
with open(filename, 'w') as file:
file.write('\n'.join(accounts))
self.credentials = [line.split('----')[:2] for line in accounts if not line.endswith(LOGIN_FAILED)]
def check_and_append_account(self, account, password, accounts, line, parts):
if line.endswith(LOGIN_FAILED):
accounts.append(line)
elif self.check_account_health(account, password):
accounts.append('----'.join(parts[:2]))
else:
accounts.append('----'.join(parts[:2]) + '----' + LOGIN_FAILED)
def check_account_health(self, account, password):
try:
mail = imaplib.IMAP4_SSL('imap-mail.outlook.com')
mail.login(account, password)
mail.logout()
print(f"Login {account} for account: Success!")
return True
except imaplib.IMAP4.error:
print(f"Login {account} for account: Failed!")
return False
def extract_email_body(self, msg):
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
return part.get_payload(decode=True).decode()
elif content_type == "text/html" and "attachment" not in content_disposition:
return part.get_payload(decode=True).decode()
else:
return msg.get_payload(decode=True).decode()
def extract_verification_code_from_body(self, email_body):
# https://github.com/users/xasds/emails/xxxx/confirm_verification/19769720?via_launch_code_email=true
match = re.search(r'confirm_verification/(\d+)\?via_launch_code_email=true', email_body)
if match:
print(f"Verification code found: {match.group(1)}")
return match.group(1)
return "No code found"
def fetch_and_process_emails_thread(self, account, password):
thread = Thread(target=self.fetch_and_process_emails, args=(account, password))
thread.start()
def fetch_and_process_emails(self, account, password):
try:
mail = imaplib.IMAP4_SSL('imap-mail.outlook.com')
mail.login(account, password)
last_email_id = None
found_code = False
while not found_code:
mail.select('inbox')
result, data = mail.search(None, 'ALL')
mail.close()
email_ids = data[0].split()
print(f"Email ids for account {account}: {email_ids}\n\n")
for eid in email_ids:
mail.select('inbox')
result, data = mail.fetch(eid, '(RFC822)')
mail.close()
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
email_body = self.extract_email_body(msg)
verification_code = self.extract_verification_code_from_body(email_body)
if verification_code != "No code found":
self.verification_codes[account] = verification_code
found_code = True
mail.logout()
break
last_email_id = email_ids[-1] if email_ids else last_email_id
time.sleep(5)
except imaplib.IMAP4.error:
self.verification_codes[account] = LOGIN_FAILED
except Exception as e:
print(f"An error occurred with account {account}: {e}")
@app.route('/get_email', methods=['GET'])
def get_email():
if email_manager.credentials:
account, password = email_manager.credentials.pop(0)
if email_manager.verification_codes.get(account) == FETCHING:
return jsonify({'message': 'Verification code is already being fetched for account: ' + account}), 200
email_manager.fetch_and_process_emails_thread(account, password)
email_manager.verification_codes[account] = FETCHING
return jsonify({'account': account}), 200
else:
return jsonify({'error': 'No more emails available'}), 400
@app.route('/get_verification_code', methods=['GET'])
def get_verification_code():
account = request.args.get('account')
verification_code = email_manager.verification_codes.get(account)
if verification_code is None:
for i, (acc, password) in enumerate(email_manager.credentials):
if acc == account:
email_manager.fetch_and_process_emails_thread(acc, password)
email_manager.credentials.pop(i)
email_manager.verification_codes[account] = FETCHING
return jsonify({'message': 'Verification code fetching started for account: ' + account}), 200
return jsonify({'error': 'Account not found in credentials'}), 404
elif verification_code == LOGIN_FAILED:
return jsonify({'error': 'LOGIN failed for account: ' + account}), 403
elif verification_code == FETCHING:
return jsonify({'message': 'Verification code is still being fetched for account: ' + account}), 200
else:
return jsonify({'account': account, 'verification_code': verification_code}), 200
if name == "__main__":
email_manager = EmailManager()
email_manager.read_credentials('mail.txt')
app.run(debug=True)
email_body = self.extract_email_body(msg)
verification_code = self.extract_verification_code_from_body(email_body)
if verification_code != "No code found":
self.verification_codes[account] = verification_code
found_code = True
mail.logout()
break
last_email_id = email_ids[-1] if email_ids else last_email_id
time.sleep(5)
except imaplib.IMAP4.error:
self.verification_codes[account] = LOGIN_FAILED
except Exception as e:
print(f"An error occurred with account {account}: {e}")
@app.route('/get_email', methods=['GET'])
def get_email():
if email_manager.credentials:
account, password = email_manager.credentials.pop(0)
if email_manager.verification_codes.get(account) == FETCHING:
return jsonify({'message': 'Verification code is already being fetched for account: ' + account}), 200
email_manager.fetch_and_process_emails_thread(account, password)
email_manager.verification_codes[account] = FETCHING
return jsonify({'account': account}), 200
else:
return jsonify({'error': 'No more emails available'}), 400
@app.route('/get_verification_code', methods=['GET'])
def get_verification_code():
account = request.args.get('account')
verification_code = email_manager.verification_codes.get(account)
if verification_code is None:
for i, (acc, password) in enumerate(email_manager.credentials):
if acc == account:
email_manager.fetch_and_process_emails_thread(acc, password)
email_manager.credentials.pop(i)
email_manager.verification_codes[account] = FETCHING
return jsonify({'message': 'Verification code fetching started for account: ' + account}), 200
return jsonify({'error': 'Account not found in credentials'}), 404
elif verification_code == LOGIN_FAILED:
return jsonify({'error': 'LOGIN failed for account: ' + account}), 403
elif verification_code == FETCHING:
return jsonify({'message': 'Verification code is still being fetched for account: ' + account}), 200
else:
return jsonify({'account': account, 'verification_code': verification_code}), 200
if name == "__main__":
email_manager = EmailManager()
email_manager.read_credentials('mail.txt')
app.run(debug=True)

View File

@ -40,7 +40,7 @@ class Tencent:
"""
profile = webdriver.FirefoxOptions() # 配置无头
profile.add_argument('-headless')
self.browser = webdriver.Chrome()
self.browser = webdriver.Chrome(executable_path='./drivers/chromedriver')
self.wait = WebDriverWait(self.browser, 20)
self.url = url # 目标url
self.username = username # 用户名