|
- import time
- import tkinter as tk
- from tkinter import ttk, filedialog, messagebox
- import json
- import os
- import threading
- import logging
- from selenium import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.chrome.service import Service
- from selenium.webdriver.support.ui import Select
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.common.exceptions import TimeoutException, ElementClickInterceptedException
- from utils import abbreviation_to_index
- from bit_api import *
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import datetime
- from enum import Enum
- current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- result_logger = logging.getLogger("normal")
- logger1 = logging.getLogger('result_log')
- logger1.setLevel(logging.INFO)
- handler1 = logging.FileHandler("result_log" + ".log")
- formatter1 = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
- handler1.setFormatter(formatter1)
- logger1.addHandler(handler1)
- class TextHandler(logging.Handler):
- def __init__(self, text_widget, adjust_height_callback):
- super().__init__()
- self.text_widget = text_widget
- self.adjust_height_callback = adjust_height_callback
- def emit(self, record):
- msg = self.format(record)
- self.text_widget.insert(tk.END, msg + '\n')
- self.text_widget.see(tk.END)
- self.adjust_height_callback()
- class AdsDataGUI:
- def __init__(self, root):
- self.root = root
- self.root.title("自动化注册 - BitBrowser")
-
- self.running = False
- self.current_web = None
-
- self.config_file = "config_th.json"
- self.load_config()
-
- self.params = {
- "WINDOW_ID": tk.StringVar(value=self.config.get("WINDOW_ID", "5c2216ed3b3e4bf0bc144b5ad0c87a5b")),
- "BASE_URL": tk.StringVar(value=self.config.get("BASE_URL", "http://127.0.0.1:54345")),
- "API_KEY": tk.StringVar(value=self.config.get("API_KEY", "704daf420f7244d08be51f61c987a232")),
- "FILE_PATH": tk.StringVar(value=self.config.get("FILE_PATH", "generated_data.txt")),
- "AMOUNT": tk.StringVar(value=self.config.get("AMOUNT", "$501-$1,000")),
- "SLEEP_TIME_1": tk.DoubleVar(value=self.config.get("SLEEP_TIME_1", 60.0)),
- "SLEEP_TIME_2": tk.DoubleVar(value=self.config.get("SLEEP_TIME_2", 3.0)),
- "SLEEP_TIME_3": tk.DoubleVar(value=self.config.get("SLEEP_TIME_3", 10.0)),
- "NO_SANDBOX": tk.BooleanVar(value=self.config.get("NO_SANDBOX", True)),
- "DISABLE_DEV_SHM": tk.BooleanVar(value=self.config.get("DISABLE_DEV_SHM", True)),
- "START_MAXIMIZED": tk.BooleanVar(value=self.config.get("START_MAXIMIZED", True))
- }
-
- self.main_frame = ttk.Frame(self.root)
- self.main_frame.pack(fill=tk.BOTH, expand=True)
-
- self.canvas = tk.Canvas(self.main_frame)
- self.scrollbar = ttk.Scrollbar(self.main_frame, orient=tk.VERTICAL, command=self.canvas.yview)
- self.scrollable_frame = ttk.Frame(self.canvas)
- self.scrollable_frame.bind(
- "<Configure>",
- lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
- )
- self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
- self.canvas.configure(yscrollcommand=self.scrollbar.set)
- self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
-
- row = 0
- tk.Label(self.scrollable_frame, text="自动化注册 - BitBrowser", font=("Arial", 16)).grid(row=row, column=0,
- columnspan=3, pady=10)
- row += 1
-
- labels = [
- ("窗口 ID:", self.params["WINDOW_ID"]),
- ("API 地址:", self.params["BASE_URL"]),
- ("API 密钥:", self.params["API_KEY"]),
- ("文件路径:", self.params["FILE_PATH"]),
- ("金额范围:", self.params["AMOUNT"]),
- ("等待时间 1 (验证码, 秒):", self.params["SLEEP_TIME_1"]),
- ("等待时间 2 (提交后, 秒):", self.params["SLEEP_TIME_2"]),
- ("等待时间 3 (关闭后, 秒):", self.params["SLEEP_TIME_3"])
- ]
- for label_text, var in labels:
- tk.Label(self.scrollable_frame, text=label_text).grid(row=row, column=0, sticky="e", padx=5, pady=5)
- if label_text == "金额范围:":
- amount_options = ["$1-$500", "$501-$1,000", "$1,001-$5,000", "超过 $5,000"]
- ttk.Combobox(self.scrollable_frame, textvariable=var, values=amount_options, state="readonly").grid(
- row=row, column=1, padx=5, pady=5, sticky="ew")
- else:
- tk.Entry(self.scrollable_frame, textvariable=var).grid(row=row, column=1, padx=5, pady=5, sticky="ew")
- if label_text == "文件路径:":
- tk.Button(self.scrollable_frame, text="浏览", command=self.browse_file).grid(row=row, column=2, padx=5,
- pady=5)
- row += 1
- tk.Checkbutton(self.scrollable_frame, text="无沙盒模式", variable=self.params["NO_SANDBOX"]).grid(row=row,
- column=0,
- columnspan=3,
- pady=5)
- row += 1
- tk.Checkbutton(self.scrollable_frame, text="禁用 Dev SHM", variable=self.params["DISABLE_DEV_SHM"]).grid(
- row=row, column=0, columnspan=3, pady=5)
- row += 1
- tk.Checkbutton(self.scrollable_frame, text="启动时最大化", variable=self.params["START_MAXIMIZED"]).grid(
- row=row, column=0, columnspan=3, pady=5)
- row += 1
- self.status_label = tk.Label(self.scrollable_frame, text="状态: 就绪", fg="blue")
- self.status_label.grid(row=row, column=0, columnspan=3, pady=10)
- row += 1
- tk.Label(self.scrollable_frame, text="日志输出:").grid(row=row, column=0, columnspan=3, pady=5)
- row += 1
-
- self.log_frame = ttk.Frame(self.scrollable_frame)
- self.log_frame.grid(row=row, column=0, columnspan=3, padx=5, pady=5, sticky="nsew")
- self.log_text = tk.Text(self.log_frame, height=0, width=60, wrap=tk.WORD)
- self.log_scrollbar = ttk.Scrollbar(self.log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
- self.log_text.configure(yscrollcommand=self.log_scrollbar.set)
- self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- self.log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
- row += 1
-
- button_frame = ttk.Frame(self.scrollable_frame)
- button_frame.grid(row=row, column=0, columnspan=3, pady=10)
- tk.Button(button_frame, text="开始", command=self.start_script).pack(side=tk.LEFT, padx=5)
- tk.Button(button_frame, text="停止", command=self.stop_script).pack(side=tk.LEFT, padx=5)
- tk.Button(button_frame, text="清除日志", command=self.clear_log).pack(side=tk.LEFT, padx=5)
- tk.Button(button_frame, text="保存参数", command=self.save_config).pack(side=tk.LEFT, padx=5)
- row += 1
-
- for i in range(3):
- self.scrollable_frame.grid_columnconfigure(i, weight=1)
-
- self.log_handler = TextHandler(self.log_text, self.adjust_log_height)
- logger.addHandler(self.log_handler)
-
- self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
-
- self.root.update_idletasks()
- self.root.minsize(self.scrollable_frame.winfo_reqwidth(), self.scrollable_frame.winfo_reqheight())
- def adjust_log_height(self):
- """动态调整日志框高度"""
- content = self.log_text.get("1.0", tk.END).strip()
- if not content:
- self.log_text.configure(height=0)
- else:
- lines = content.count('\n') + 1
- max_height = 20
- new_height = min(lines, max_height)
- self.log_text.configure(height=new_height)
- def load_config(self):
- """加载配置文件"""
- self.config = {}
- if os.path.exists(self.config_file):
- try:
- with open(self.config_file, 'r', encoding='utf-8') as f:
- self.config = json.load(f)
- except Exception as e:
- logger.error(f"无法加载配置文件: {str(e)}")
- def save_config(self):
- """保存配置"""
- config = {
- "WINDOW_ID": self.params["WINDOW_ID"].get(),
- "BASE_URL": self.params["BASE_URL"].get(),
- "API_KEY": self.params["API_KEY"].get(),
- "FILE_PATH": self.params["FILE_PATH"].get(),
- "AMOUNT": self.params["AMOUNT"].get(),
- "SLEEP_TIME_1": self.params["SLEEP_TIME_1"].get(),
- "SLEEP_TIME_2": self.params["SLEEP_TIME_2"].get(),
- "SLEEP_TIME_3": self.params["SLEEP_TIME_3"].get(),
- "NO_SANDBOX": self.params["NO_SANDBOX"].get(),
- "DISABLE_DEV_SHM": self.params["DISABLE_DEV_SHM"].get(),
- "START_MAXIMIZED": self.params["START_MAXIMIZED"].get()
- }
- try:
- with open(self.config_file, 'w', encoding='utf-8') as f:
- json.dump(config, f, indent=4, ensure_ascii=False)
- messagebox.showinfo("提示", "参数已保存!")
- except Exception as e:
- logger.error(f"无法保存配置文件: {str(e)}")
- messagebox.showerror("错误", f"无法保存参数: {str(e)}")
- def on_closing(self):
- """关闭窗口时保存配置"""
- self.save_config()
- self.root.destroy()
- def browse_file(self):
- """文件选择器"""
- file_path = filedialog.askopenfilename(filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")])
- if file_path:
- self.params["FILE_PATH"].set(file_path)
- def clear_log(self):
- """清除日志"""
- self.log_text.delete(1.0, tk.END)
- self.adjust_log_height()
- def start_script(self):
- if self.running:
- messagebox.showinfo("提示", "脚本已经在运行!")
- return
- self.running = True
- self.status_label.config(text="状态: 运行中", fg="green")
- threading.Thread(target=self.run_script, daemon=True).start()
- def stop_script(self):
- self.running = False
- self.status_label.config(text="状态: 已停止", fg="red")
-
- if self.current_web:
- try:
- self.current_web.quit()
- logger.info("因用户停止脚本,关闭 Selenium 驱动...")
- except Exception as e:
- logger.error(f"关闭 Selenium 驱动失败: {str(e)}")
- self.current_web = None
- def run_script(self):
- try:
- logger.info("开始运行脚本...")
- datas = read_txt_file(self.params["FILE_PATH"].get())
- if not datas:
- self.status_label.config(text="状态: 错误 - 文件未找到", fg="red")
- logger.error("文件未找到或为空")
- return
- logger.info(f"从文件中读取了 {len(datas)} 条记录")
- window_ids = self.params["WINDOW_ID"].get().split(',')
- logger.info(f"窗口ID列表: {window_ids}")
- from queue import Queue
- data_queue = Queue()
- for data in datas:
- data = [s.replace("#", "") for s in data]
- data_queue.put(data)
-
- threads = []
- for window_id in window_ids:
- thread = threading.Thread(target=self.process_window, args=(window_id, data_queue))
- thread.start()
- threads.append(thread)
-
- for thread in threads:
- thread.join()
- self.status_label.config(text="状态: 已完成", fg="blue")
- logger.info("脚本运行完成")
- except Exception as e:
- self.status_label.config(text=f"状态: 错误 - {str(e)}", fg="red")
- logger.error(f"运行脚本时出错: {str(e)}")
- finally:
- self.running = False
- def process_window(self, window_id, data_queue):
- while not data_queue.empty() and self.running:
- data = data_queue.get()
- success = self.process_data_for_window(data, window_id)
- if not success:
- data_queue.put(data)
- data_queue.task_done()
- def process_data_for_window(self, data, window_id):
- """为每个窗口 ID 处理数据"""
- attempt_success = False
- max_retries = 3
- for attempt in range(max_retries):
- try:
-
- chrome_options = {
- "no_sandbox": self.params["NO_SANDBOX"].get(),
- "disable_dev_shm": self.params["DISABLE_DEV_SHM"].get(),
- "start_maximized": self.params["START_MAXIMIZED"].get()
- }
- sleep_times = {
- "sleep1": max(self.params["SLEEP_TIME_1"].get(), 60.0),
- "sleep2": self.params["SLEEP_TIME_2"].get(),
- "sleep3": self.params["SLEEP_TIME_3"].get()
- }
- logger.info(f"使用的等待时间: sleep1={sleep_times['sleep1']}, sleep2={sleep_times['sleep2']}, sleep3={sleep_times['sleep3']}")
- result = start_Ads_data(self, data, window_id, sleep_times, chrome_options, self.params["AMOUNT"].get(),
- self.params["API_KEY"].get())
- if result:
- logger.info("操作成功!")
-
- with open(self.params["FILE_PATH"].get(), "r") as f:
- lines = f.readlines()
-
- filtered_lines = [line for line in lines if data[0] not in line]
- with open(self.params["FILE_PATH"].get(), 'w') as f:
- f.writelines(filtered_lines)
- attempt_success = True
- break
- else:
- logger.warning(f"查询超时已记录")
- attempt_success = True
- break
- except Exception as e:
- if not self.running:
- logger.info(f"{window_id} 用户停止了脚本")
- break
- if attempt < max_retries - 1:
- logger.warning(f"{window_id} 尝试 {attempt + 1} 失败: {str(e)},等待 10 秒后重试...")
-
- if self.current_web:
- try:
- self.current_web.quit()
- logger.info(f"关闭 Selenium 驱动以准备重试...")
- except Exception as close_err:
- logger.error(f"关闭 Selenium 驱动失败: {str(close_err)}")
- self.current_web = None
- closeBrowser(window_id)
- time.sleep(sleep_times["sleep3"])
- else:
- logger.error(f"处理 {window_id} 记录 {data[0]} 时出错: {str(e)},达到最大重试次数,失败!")
- break
- if not attempt_success:
- logger1.info(f"{data} 在 {window_id} 注册失败,放入其他线程再次填写")
- logger.info("完成一条记录,进入下一条记录")
- return attempt_success
- def start_Ads_data(gui, data, window_id, sleep_times, chrome_options, amount, API_KEY):
-
- web = start_ADS(window_id, chrome_options, gui, sleep_times, API_KEY)
-
- web.execute_script("window.scrollTo(0, document.body.scrollHeight);")
-
- logger.info("等待验证码插件初始化...")
- time.sleep(20)
-
- logger.info("开始填写表单...")
- web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1])
- logger.info("填写姓名完成")
- web.find_element(By.ID, "street1").send_keys(data[2])
- logger.info("填写街道地址完成")
- web.find_element(By.ID, "city").send_keys(data[3])
- logger.info("填写城市完成")
- time.sleep(1)
- Select(web.find_element(By.XPATH,
- "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(
- abbreviation_to_index[data[4]])
- logger.info("选择州完成")
- web.find_element(By.ID, "zip").send_keys(data[5])
- logger.info("填写邮编完成")
- time.sleep(1)
- web.find_element(By.ID, "email").send_keys(data[0])
- logger.info("填写邮箱完成")
-
- logger.info("点击复选框...")
- try:
- checkbox = WebDriverWait(web, 10).until(
- EC.presence_of_element_located((By.XPATH, "//input[@type='checkbox']"))
- )
- web.execute_script("arguments[0].scrollIntoView(true);", checkbox)
- web.execute_script("arguments[0].click();", checkbox)
- logger.info("通过 JavaScript 点击复选框完成")
- except Exception as e:
- logger.warning(f"点击复选框失败: {str(e)},尝试使用更复杂的 JavaScript 点击...")
- web.execute_script("""
- var checkbox = document.querySelector('input[type="checkbox"]');
- if (checkbox) {
- var event = new Event('click', { bubbles: true });
- checkbox.dispatchEvent(event);
- }
- """)
- logger.info("通过复杂 JavaScript 点击复选框完成")
-
- logger.info("选择金额范围...")
- try:
- amount_radio = WebDriverWait(web, 10).until(
- EC.presence_of_element_located((By.XPATH, f"//input[@value='{amount}']"))
- )
- web.execute_script("arguments[0].scrollIntoView(true);", amount_radio)
- web.execute_script("arguments[0].click();", amount_radio)
- logger.info("通过 JavaScript 选择金额范围完成")
- except Exception as e:
- logger.warning(f"选择金额范围失败: {str(e)},尝试使用更复杂的 JavaScript 点击...")
- web.execute_script(f"""
- var radio = document.querySelector('input[value="{amount}"]');
- if (radio) {{
- var event = new Event('click', {{ bubbles: true }});
- radio.dispatchEvent(event);
- }}
- """)
- logger.info("通过复杂 JavaScript 选择金额范围完成")
- web.find_element(By.ID, "signature").send_keys(data[1])
- logger.info("填写签名完成")
- logger.info("表单填写完成,开始检测验证码...")
-
- logger.info("注入全局监听器以检测验证码...")
- web.execute_script("""
- window.recaptchaVerified = false;
- function checkRecaptcha() {
- var recaptchaResponse = document.getElementById('g-recaptcha-response');
- if (recaptchaResponse && recaptchaResponse.value && recaptchaResponse.value.trim() !== '') {
- window.recaptchaVerified = true;
- console.log('reCAPTCHA 验证成功: ' + recaptchaResponse.value);
- }
- }
- // 每 500 毫秒检查一次
- window.recaptchaInterval = setInterval(checkRecaptcha, 500);
- """)
-
- logger.info("开始检测验证码...")
- max_attempts = 3
- attempts = 0
- total_wait_time = sleep_times["sleep1"]
- poll_interval = 1
- elapsed_time = 0
- while attempts < max_attempts and elapsed_time < total_wait_time:
-
- try:
- element = web.find_element(By.ID, 'g-recaptcha-response')
- response_value = element.get_attribute('value')
- logger.info(f"g-recaptcha-response 元素状态: display={element.value_of_css_property('display')}, value={response_value}")
- if response_value and response_value.strip() != '':
- logger.info("reCAPTCHA 验证成功(通过 Selenium 检测)")
- break
- except Exception as e:
- logger.debug(f"无法找到 g-recaptcha-response 元素: {str(e)}")
-
- verified = web.execute_script("return window.recaptchaVerified;")
- if verified:
- logger.info("reCAPTCHA 验证成功(通过全局监听器)")
- break
-
- try:
- checkbox = WebDriverWait(web, 2).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, "span.recaptcha-checkbox-checked"))
- )
- logger.info("reCAPTCHA 勾选框已变成绿色,额外等待 10 秒...")
- time.sleep(10)
- elapsed_time += 10
- logger.info(f"已等待 {elapsed_time} 秒,剩余 {total_wait_time - elapsed_time} 秒...")
- continue
- except TimeoutException:
- logger.debug("reCAPTCHA 勾选框未变成绿色,继续等待...")
-
- elapsed_time += poll_interval
- logger.info(f"已等待 {elapsed_time} 秒,剩余 {total_wait_time - elapsed_time} 秒...")
- time.sleep(poll_interval)
- if elapsed_time >= total_wait_time:
- attempts += 1
- logger.warning(f"第 {attempts} 次尝试:验证码验证未完成或失败")
- if attempts >= max_attempts:
- logger.error("验证码识别失败,达到最大尝试次数,跳过当前记录")
-
- web.execute_script("if (window.recaptchaInterval) clearInterval(window.recaptchaInterval);")
- if web:
- try:
- web.quit()
- logger.info("关闭 Selenium 驱动...")
- except Exception as close_err:
- logger.error(f"关闭 Selenium 驱动失败: {str(close_err)}")
- gui.current_web = None
- closeBrowser(window_id)
- time.sleep(sleep_times["sleep3"])
- return False
- else:
- logger.info("刷新页面并重试...")
-
- web.execute_script("if (window.recaptchaInterval) clearInterval(window.recaptchaInterval);")
- web.refresh()
- time.sleep(5)
-
- web.execute_script("""
- window.recaptchaVerified = false;
- function checkRecaptcha() {
- var recaptchaResponse = document.getElementById('g-recaptcha-response');
- if (recaptchaResponse && recaptchaResponse.value && recaptchaResponse.value.trim() !== '') {
- window.recaptchaVerified = true;
- console.log('reCAPTCHA 验证成功: ' + recaptchaResponse.value);
- }
- }
- window.recaptchaInterval = setInterval(checkRecaptcha, 500);
- """)
-
- logger.info("重新填写表单...")
- web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1])
- logger.info("重新填写姓名完成")
- web.find_element(By.ID, "street1").send_keys(data[2])
- logger.info("重新填写街道地址完成")
- web.find_element(By.ID, "city").send_keys(data[3])
- logger.info("重新填写城市完成")
- time.sleep(1)
- Select(web.find_element(By.XPATH,
- "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(
- abbreviation_to_index[data[4]])
- logger.info("重新选择州完成")
- web.find_element(By.ID, "zip").send_keys(data[5])
- logger.info("重新填写邮编完成")
- time.sleep(1)
- web.find_element(By.ID, "email").send_keys(data[0])
- logger.info("重新填写邮箱完成")
-
- logger.info("重新点击复选框...")
- try:
- checkbox = WebDriverWait(web, 10).until(
- EC.presence_of_element_located((By.XPATH, "//input[@type='checkbox']"))
- )
- web.execute_script("arguments[0].scrollIntoView(true);", checkbox)
- web.execute_script("arguments[0].click();", checkbox)
- logger.info("通过 JavaScript 重新点击复选框完成")
- except Exception as e:
- logger.warning(f"重新点击复选框失败: {str(e)},尝试使用更复杂的 JavaScript 点击...")
- web.execute_script("""
- var checkbox = document.querySelector('input[type="checkbox"]');
- if (checkbox) {
- var event = new Event('click', { bubbles: true });
- checkbox.dispatchEvent(event);
- }
- """)
- logger.info("通过复杂 JavaScript 重新点击复选框完成")
-
- logger.info("重新选择金额范围...")
- try:
- amount_radio = WebDriverWait(web, 10).until(
- EC.presence_of_element_located((By.XPATH, f"//input[@value='{amount}']"))
- )
- web.execute_script("arguments[0].scrollIntoView(true);", amount_radio)
- web.execute_script("arguments[0].click();", amount_radio)
- logger.info("通过 JavaScript 重新选择金额范围完成")
- except Exception as e:
- logger.warning(f"重新选择金额范围失败: {str(e)},尝试使用更复杂的 JavaScript 点击...")
- web.execute_script(f"""
- var radio = document.querySelector('input[value="{amount}"]');
- if (radio) {{
- var event = new Event('click', {{ bubbles: true }});
- radio.dispatchEvent(event);
- }}
- """)
- logger.info("通过复杂 JavaScript 重新选择金额范围完成")
- web.find_element(By.ID, "signature").send_keys(data[1])
- logger.info("重新填写签名完成")
- elapsed_time = 0
-
- web.execute_script("if (window.recaptchaInterval) clearInterval(window.recaptchaInterval);")
-
- logger.info("提交表单...")
- try:
- submit_button = WebDriverWait(web, 10).until(
- EC.presence_of_element_located((By.XPATH, "//button[@type='submit']"))
- )
- web.execute_script("arguments[0].scrollIntoView(true);", submit_button)
- web.execute_script("arguments[0].click();", submit_button)
- logger.info("通过 JavaScript 提交表单完成")
- except Exception as e:
- logger.warning(f"提交表单失败: {str(e)},尝试使用更复杂的 JavaScript 点击...")
- web.execute_script("""
- var button = document.querySelector('button[type="submit"]');
- if (button) {
- var event = new Event('click', { bubbles: true });
- button.dispatchEvent(event);
- }
- """)
- logger.info("通过复杂 JavaScript 提交表单完成")
-
- result = check_submission_success(data, web)
-
- time.sleep(sleep_times["sleep2"])
- web.delete_all_cookies()
- web.execute_script("window.localStorage.clear();")
- web.execute_script("window.sessionStorage.clear();")
- if web:
- try:
- web.quit()
- logger.info("关闭 Selenium 驱动...")
- except Exception as close_err:
- logger.error(f"关闭 Selenium 驱动失败: {str(close_err)}")
- gui.current_web = None
- closeBrowser(window_id)
-
- logger.info("等待窗口完全关闭...")
- time.sleep(sleep_times["sleep3"])
- logger.info("完成一次填表循环")
- return result
- def start_ADS(window_id, chrome_options, gui, sleep_times, API_KEY):
-
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- res = openBrowser(window_id)
-
- logger.info(f"openBrowser Response: {res}")
-
- if 'data' not in res:
- logger.error("BitBrowser API 返回中没有 'data' 键")
- raise Exception("BitBrowser API 返回中没有 'data' 键")
- driverPath = res['data']['driver']
- debuggerAddress = res['data']['http']
- if res["success"] is True:
- logger.info("启动成功,返回信息")
- else:
- logger.error(f"{window_id} 启动失败")
- raise Exception("启动失败")
- chrome_opts = webdriver.ChromeOptions()
- chrome_opts.add_experimental_option("debuggerAddress", debuggerAddress)
- if chrome_options["no_sandbox"]:
- chrome_opts.add_argument('--no-sandbox')
- if chrome_options["disable_dev_shm"]:
- chrome_opts.add_argument('--disable-dev-shm-usage')
- if chrome_options["start_maximized"]:
- chrome_opts.add_argument('--start-maximized')
- chrome_service = Service(driverPath)
- web = webdriver.Chrome(service=chrome_service, options=chrome_opts)
- web.implicitly_wait(0)
-
- gui.current_web = web
- logger.info("加载页面...")
- web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants")
- original_window = web.current_window_handle
- WebDriverWait(web, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
- logger.info(f"当前页面标题: {web.title}")
- return web
- except Exception as e:
- logger.info(f"加载页面失败: {str(e)},将重启窗口 {window_id}")
-
- if gui.current_web:
- try:
- gui.current_web.quit()
- logger.info("关闭 Selenium 驱动以准备重试...")
- except Exception as close_err:
- logger.error(f"关闭 Selenium 驱动失败: {str(close_err)}")
- gui.current_web = None
- closeBrowser(window_id)
- time.sleep(sleep_times["sleep3"])
- if attempt == max_attempts - 1:
- logger.error(f"启动窗口 {window_id} 失败,达到最大尝试次数")
- raise Exception("无法启动浏览器")
- return web
- def check_submission_success(data, web):
-
- try:
-
- WebDriverWait(web, 30).until(
- EC.visibility_of_element_located((By.XPATH, "//h2[contains(., 'THANK YOU FOR SUBMITTING')]"))
- )
-
- ref_number = web.find_element(By.XPATH, "//p[contains(., 'Your Reference Number is')]/b").text
-
- logger1.info(", ".join(data) + "提交成功参考号为:" + ref_number)
- return True
- except Exception as e:
- logger.warning({str(e)})
- logger.warning(", ".join(data) + "验证失败")
- logger1.info(", ".join(data) + "提交后查询参考号失败")
-
- return False
- def read_txt_file(file_path):
- data = []
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- for line in file:
- cleaned_line = line.strip()
- if not cleaned_line:
- continue
- fields = cleaned_line.split('\t')
- if len(fields) != 6:
- logger.warning(f"第 {len(data) + 1} 行字段数量异常: {len(fields)}")
- continue
- data.append(fields)
- except FileNotFoundError:
- logger.error(f"文件 {file_path} 未找到")
- return None
- except Exception as e:
- logger.error(f"读取文件时发生错误: {str(e)}")
- return None
- return data
- if __name__ == '__main__':
- root = tk.Tk()
- app = AdsDataGUI(root)
- root.mainloop()
|