import email import imaplib import re import threading import time from threading import Thread from flask import Flask, jsonify, request FETCHING = "Fetching" LOGIN_FAILED = "LOGIN failed." app = Flask(__name__) class EmailManager: def __init__(self): self.verification_codes = {} self.credentials = [] def read_credentials(self, filename): with open(filename, 'r') as file: lines = file.readlines() accounts = [] threads = [] for line in lines: line = line.strip() if line: parts = line.split('----') if len(parts) < 2: continue account, password = parts[0], parts[1] # 创建并启动一个新的线程来检查账户健康状态 thread = threading.Thread(target=self.check_and_append_account, args=(account, password, accounts, line, parts)) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() with open(filename, 'w') as file: file.write('\n'.join(accounts)) self.credentials = [line.split('----')[:2] for line in accounts if not line.endswith(LOGIN_FAILED)] def check_and_append_account(self, account, password, accounts, line, parts): if line.endswith(LOGIN_FAILED): accounts.append(line) elif self.check_account_health(account, password): accounts.append('----'.join(parts[:2])) else: accounts.append('----'.join(parts[:2]) + '----' + LOGIN_FAILED) def check_account_health(self, account, password): try: mail = imaplib.IMAP4_SSL('imap-mail.outlook.com') mail.login(account, password) mail.logout() print(f"Login {account} for account: Success!") return True except imaplib.IMAP4.error: print(f"Login {account} for account: Failed!") return False def extract_email_body(self, msg): if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if content_type == "text/plain" and "attachment" not in content_disposition: return part.get_payload(decode=True).decode() elif content_type == "text/html" and "attachment" not in content_disposition: return part.get_payload(decode=True).decode() else: return msg.get_payload(decode=True).decode() def extract_verification_code_from_body(self, email_body): # https://github.com/users/xasds/emails/xxxx/confirm_verification/19769720?via_launch_code_email=true match = re.search(r'confirm_verification/(\d+)\?via_launch_code_email=true', email_body) if match: print(f"Verification code found: {match.group(1)}") return match.group(1) return "No code found" def fetch_and_process_emails_thread(self, account, password): thread = Thread(target=self.fetch_and_process_emails, args=(account, password)) thread.start() def fetch_and_process_emails(self, account, password): try: mail = imaplib.IMAP4_SSL('imap-mail.outlook.com') mail.login(account, password) last_email_id = None found_code = False while not found_code: mail.select('inbox') result, data = mail.search(None, 'ALL') mail.close() email_ids = data[0].split() print(f"Email ids for account {account}: {email_ids}\n\n") for eid in email_ids: mail.select('inbox') result, data = mail.fetch(eid, '(RFC822)') mail.close() raw_email = data[0][1] msg = email.message_from_bytes(raw_email) email_body = self.extract_email_body(msg) verification_code = self.extract_verification_code_from_body(email_body) if verification_code != "No code found": self.verification_codes[account] = verification_code found_code = True mail.logout() break last_email_id = email_ids[-1] if email_ids else last_email_id time.sleep(5) except imaplib.IMAP4.error: self.verification_codes[account] = LOGIN_FAILED except Exception as e: print(f"An error occurred with account {account}: {e}") @app.route('/get_email', methods=['GET']) def get_email(): if email_manager.credentials: account, password = email_manager.credentials.pop(0) if email_manager.verification_codes.get(account) == FETCHING: return jsonify({'message': 'Verification code is already being fetched for account: ' + account}), 200 email_manager.fetch_and_process_emails_thread(account, password) email_manager.verification_codes[account] = FETCHING return jsonify({'account': account}), 200 else: return jsonify({'error': 'No more emails available'}), 400 @app.route('/get_verification_code', methods=['GET']) def get_verification_code(): account = request.args.get('account') verification_code = email_manager.verification_codes.get(account) if verification_code is None: for i, (acc, password) in enumerate(email_manager.credentials): if acc == account: email_manager.fetch_and_process_emails_thread(acc, password) email_manager.credentials.pop(i) email_manager.verification_codes[account] = FETCHING return jsonify({'message': 'Verification code fetching started for account: ' + account}), 200 return jsonify({'error': 'Account not found in credentials'}), 404 elif verification_code == LOGIN_FAILED: return jsonify({'error': 'LOGIN failed for account: ' + account}), 403 elif verification_code == FETCHING: return jsonify({'message': 'Verification code is still being fetched for account: ' + account}), 200 else: return jsonify({'account': account, 'verification_code': verification_code}), 200 if name == "__main__": email_manager = EmailManager() email_manager.read_credentials('mail.txt') app.run(debug=True) email_body = self.extract_email_body(msg) verification_code = self.extract_verification_code_from_body(email_body) if verification_code != "No code found": self.verification_codes[account] = verification_code found_code = True mail.logout() break last_email_id = email_ids[-1] if email_ids else last_email_id time.sleep(5) except imaplib.IMAP4.error: self.verification_codes[account] = LOGIN_FAILED except Exception as e: print(f"An error occurred with account {account}: {e}") @app.route('/get_email', methods=['GET']) def get_email(): if email_manager.credentials: account, password = email_manager.credentials.pop(0) if email_manager.verification_codes.get(account) == FETCHING: return jsonify({'message': 'Verification code is already being fetched for account: ' + account}), 200 email_manager.fetch_and_process_emails_thread(account, password) email_manager.verification_codes[account] = FETCHING return jsonify({'account': account}), 200 else: return jsonify({'error': 'No more emails available'}), 400 @app.route('/get_verification_code', methods=['GET']) def get_verification_code(): account = request.args.get('account') verification_code = email_manager.verification_codes.get(account) if verification_code is None: for i, (acc, password) in enumerate(email_manager.credentials): if acc == account: email_manager.fetch_and_process_emails_thread(acc, password) email_manager.credentials.pop(i) email_manager.verification_codes[account] = FETCHING return jsonify({'message': 'Verification code fetching started for account: ' + account}), 200 return jsonify({'error': 'Account not found in credentials'}), 404 elif verification_code == LOGIN_FAILED: return jsonify({'error': 'LOGIN failed for account: ' + account}), 403 elif verification_code == FETCHING: return jsonify({'message': 'Verification code is still being fetched for account: ' + account}), 200 else: return jsonify({'account': account, 'verification_code': verification_code}), 200 if name == "__main__": email_manager = EmailManager() email_manager.read_credentials('mail.txt') app.run(debug=True)