import time import tkinter as tk from tkinter import ttk, filedialog, messagebox import json import os import threading import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import Select from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from selenium.common.exceptions import TimeoutException from utils import abbreviation_to_index from bit_api import * from multiprocessing import Process import datetime current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) result_logger = logging.getLogger("normal") # 结果保留日志 logger1 = logging.getLogger('result_log') logger1.setLevel(logging.INFO) handler1 = logging.FileHandler("result_log" + ".log") formatter1 = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") handler1.setFormatter(formatter1) logger1.addHandler(handler1) # 自定义日志处理器,用于将日志输出到 GUI 的文本框 class TextHandler(logging.Handler): def __init__(self, text_widget, adjust_height_callback): super().__init__() self.text_widget = text_widget self.adjust_height_callback = adjust_height_callback def emit(self, record): msg = self.format(record) self.text_widget.insert(tk.END, msg + '\n') self.text_widget.see(tk.END) self.adjust_height_callback() # GUI 类 class AdsDataGUI: def __init__(self, root): self.root = root self.root.title("自动化注册 - BitBrowser") # 初始化运行状态 self.running = False self.current_web = None # 用于存储当前 Selenium WebDriver 实例 # 加载保存的配置 self.config_file = "config.json" self.load_config() # 参数输入框 self.params = { "WINDOW_ID": tk.StringVar(value=self.config.get("WINDOW_ID", "5c2216ed3b3e4bf0bc144b5ad0c87a5b")), "BASE_URL": tk.StringVar(value=self.config.get("BASE_URL", "http://127.0.0.1:54345")), "API_KEY": tk.StringVar(value=self.config.get("API_KEY", "704daf420f7244d08be51f61c987a232")), "FILE_PATH": tk.StringVar(value=self.config.get("FILE_PATH", "generated_data.txt")), "AMOUNT": tk.StringVar(value=self.config.get("AMOUNT", "$501-$1,000")), "SLEEP_TIME_1": tk.DoubleVar(value=self.config.get("SLEEP_TIME_1", 10.0)), # 验证码等待时间 "SLEEP_TIME_2": tk.DoubleVar(value=self.config.get("SLEEP_TIME_2", 3.0)), # 提交后等待时间 "SLEEP_TIME_3": tk.DoubleVar(value=self.config.get("SLEEP_TIME_3", 10.0)), # 关闭后等待时间 "NO_SANDBOX": tk.BooleanVar(value=self.config.get("NO_SANDBOX", True)), "DISABLE_DEV_SHM": tk.BooleanVar(value=self.config.get("DISABLE_DEV_SHM", True)), "START_MAXIMIZED": tk.BooleanVar(value=self.config.get("START_MAXIMIZED", True)) } # 创建主框架 self.main_frame = ttk.Frame(self.root) self.main_frame.pack(fill=tk.BOTH, expand=True) # 创建画布和滚动条 self.canvas = tk.Canvas(self.main_frame) self.scrollbar = ttk.Scrollbar(self.main_frame, orient=tk.VERTICAL, command=self.canvas.yview) self.scrollable_frame = ttk.Frame(self.canvas) self.scrollable_frame.bind( "<Configure>", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all")) ) self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw") self.canvas.configure(yscrollcommand=self.scrollbar.set) self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 创建 GUI 元素(中文界面) row = 0 tk.Label(self.scrollable_frame, text="自动化注册 - BitBrowser", font=("Arial", 16)).grid(row=row, column=0, columnspan=3, pady=10) row += 1 # 参数输入框 labels = [ ("窗口 ID:", self.params["WINDOW_ID"]), ("API 地址:", self.params["BASE_URL"]), ("API 密钥:", self.params["API_KEY"]), ("文件路径:", self.params["FILE_PATH"]), ("金额范围:", self.params["AMOUNT"]), ("等待时间 1 (验证码, 秒):", self.params["SLEEP_TIME_1"]), ("等待时间 2 (提交后, 秒):", self.params["SLEEP_TIME_2"]), ("等待时间 3 (关闭后, 秒):", self.params["SLEEP_TIME_3"]) ] for label_text, var in labels: tk.Label(self.scrollable_frame, text=label_text).grid(row=row, column=0, sticky="e", padx=5, pady=5) if label_text == "金额范围:": amount_options = ["$1-$500", "$501-$1,000", "$1,001-$5,000", "超过 $5,000"] ttk.Combobox(self.scrollable_frame, textvariable=var, values=amount_options, state="readonly").grid( row=row, column=1, padx=5, pady=5, sticky="ew") else: tk.Entry(self.scrollable_frame, textvariable=var).grid(row=row, column=1, padx=5, pady=5, sticky="ew") if label_text == "文件路径:": tk.Button(self.scrollable_frame, text="浏览", command=self.browse_file).grid(row=row, column=2, padx=5, pady=5) row += 1 tk.Checkbutton(self.scrollable_frame, text="无沙盒模式", variable=self.params["NO_SANDBOX"]).grid(row=row, column=0, columnspan=3, pady=5) row += 1 tk.Checkbutton(self.scrollable_frame, text="禁用 Dev SHM", variable=self.params["DISABLE_DEV_SHM"]).grid( row=row, column=0, columnspan=3, pady=5) row += 1 tk.Checkbutton(self.scrollable_frame, text="启动时最大化", variable=self.params["START_MAXIMIZED"]).grid( row=row, column=0, columnspan=3, pady=5) row += 1 self.status_label = tk.Label(self.scrollable_frame, text="状态: 就绪", fg="blue") self.status_label.grid(row=row, column=0, columnspan=3, pady=10) row += 1 tk.Label(self.scrollable_frame, text="日志输出:").grid(row=row, column=0, columnspan=3, pady=5) row += 1 # 日志窗口,初始高度为 0,动态调整 self.log_frame = ttk.Frame(self.scrollable_frame) self.log_frame.grid(row=row, column=0, columnspan=3, padx=5, pady=5, sticky="nsew") self.log_text = tk.Text(self.log_frame, height=0, width=60, wrap=tk.WORD) self.log_scrollbar = ttk.Scrollbar(self.log_frame, orient=tk.VERTICAL, command=self.log_text.yview) self.log_text.configure(yscrollcommand=self.log_scrollbar.set) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) self.log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y) row += 1 # 按钮框架 button_frame = ttk.Frame(self.scrollable_frame) button_frame.grid(row=row, column=0, columnspan=3, pady=10) tk.Button(button_frame, text="开始", command=self.start_script).pack(side=tk.LEFT, padx=5) tk.Button(button_frame, text="停止", command=self.stop_script).pack(side=tk.LEFT, padx=5) tk.Button(button_frame, text="清除日志", command=self.clear_log).pack(side=tk.LEFT, padx=5) tk.Button(button_frame, text="保存参数", command=self.save_config).pack(side=tk.LEFT, padx=5) row += 1 # 设置列权重,使输入框宽度自适应 for i in range(3): self.scrollable_frame.grid_columnconfigure(i, weight=1) # 设置日志处理器 self.log_handler = TextHandler(self.log_text, self.adjust_log_height) logger.addHandler(self.log_handler) # 保存配置时绑定关闭事件 self.root.protocol("WM_DELETE_WINDOW", self.on_closing) # 调整窗口大小以适应内容 self.root.update_idletasks() self.root.minsize(self.scrollable_frame.winfo_reqwidth(), self.scrollable_frame.winfo_reqheight()) def adjust_log_height(self): """动态调整日志框高度""" content = self.log_text.get("1.0", tk.END).strip() if not content: self.log_text.configure(height=0) else: lines = content.count('\n') + 1 max_height = 20 new_height = min(lines, max_height) self.log_text.configure(height=new_height) def load_config(self): """加载配置文件""" self.config = {} if os.path.exists(self.config_file): try: with open(self.config_file, 'r', encoding='utf-8') as f: self.config = json.load(f) except Exception as e: logger.error(f"无法加载配置文件: {str(e)}") def save_config(self): """保存配置""" config = { "WINDOW_ID": self.params["WINDOW_ID"].get(), "BASE_URL": self.params["BASE_URL"].get(), "API_KEY": self.params["API_KEY"].get(), "FILE_PATH": self.params["FILE_PATH"].get(), "AMOUNT": self.params["AMOUNT"].get(), "SLEEP_TIME_1": self.params["SLEEP_TIME_1"].get(), "SLEEP_TIME_2": self.params["SLEEP_TIME_2"].get(), "SLEEP_TIME_3": self.params["SLEEP_TIME_3"].get(), "NO_SANDBOX": self.params["NO_SANDBOX"].get(), "DISABLE_DEV_SHM": self.params["DISABLE_DEV_SHM"].get(), "START_MAXIMIZED": self.params["START_MAXIMIZED"].get() } try: with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=4, ensure_ascii=False) messagebox.showinfo("提示", "参数已保存!") except Exception as e: logger.error(f"无法保存配置文件: {str(e)}") messagebox.showerror("错误", f"无法保存参数: {str(e)}") def on_closing(self): """关闭窗口时保存配置""" self.save_config() self.root.destroy() def browse_file(self): """文件选择器""" file_path = filedialog.askopenfilename(filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")]) if file_path: self.params["FILE_PATH"].set(file_path) def clear_log(self): """清除日志""" self.log_text.delete(1.0, tk.END) self.adjust_log_height() def start_script(self): if self.running: messagebox.showinfo("提示", "脚本已经在运行!") return self.running = True self.status_label.config(text="状态: 运行中", fg="green") threading.Thread(target=self.run_script, daemon=True).start() def stop_script(self): self.running = False self.status_label.config(text="状态: 已停止", fg="red") # 立即关闭当前浏览器 if self.current_web: try: self.current_web.quit() logger.info("因用户停止脚本,关闭 Selenium 驱动...") except Exception as e: logger.error(f"关闭 Selenium 驱动失败: {str(e)}") self.current_web = None def run_script(self): try: logger.info("开始运行脚本...") datas = read_txt_file(self.params["FILE_PATH"].get()) if not datas: self.status_label.config(text="状态: 错误 - 文件未找到", fg="red") logger.error("文件未找到或为空") return logger.info(f"从文件中读取了 {len(datas)} 条记录") for data in datas: if not self.running: logger.info("用户停止了脚本") break self.status_label.config(text=f"状态: 正在处理 {data[0]}", fg="green") logger.info(f"处理记录: {data}") # 添加重试机制 max_retries = 3 attempt_success = False for attempt in range(max_retries): if not self.running: logger.info("用户停止了脚本") break try: result = start_Ads_data( self, data, window_id=self.params["WINDOW_ID"].get(), sleep_times={ "sleep1": self.params["SLEEP_TIME_1"].get(), "sleep2": self.params["SLEEP_TIME_2"].get(), "sleep3": self.params["SLEEP_TIME_3"].get() }, chrome_options={ "no_sandbox": self.params["NO_SANDBOX"].get(), "disable_dev_shm": self.params["DISABLE_DEV_SHM"].get(), "start_maximized": self.params["START_MAXIMIZED"].get() }, amount=self.params["AMOUNT"].get(), API_KEY=self.params["API_KEY"].get() ) if result: logger.info("操作成功!") # 删除数据 with open(self.params["FILE_PATH"].get(), "r") as f: lines = f.readlines() # 保留不匹配的行 filtered_lines = [line for line in lines if data[0] not in line] with open(self.params["FILE_PATH"].get(), 'w') as f: f.writelines(filtered_lines) attempt_success = True break # 成功则跳出循环 else: # 返回 False也跳出循环,并记录 logger.warning(f"查询超时已记录") break except Exception as e: if not self.running: logger.info("用户停止了脚本") break if attempt < max_retries - 1: logger.warning(f"尝试 {attempt + 1} 失败: {str(e)},等待 10 秒后重试...") time.sleep(10) else: raise e if not attempt_success and attempt == max_retries - 1: # 三次重试之后失败 logger1.info(f"finally{data}注册失败") logger.info("完成一条记录,进入下一条记录") self.status_label.config(text="状态: 已完成", fg="blue") logger.info("脚本运行完成") except Exception as e: self.status_label.config(text=f"状态: 错误 - {str(e)}", fg="red") logger.error(f"运行脚本时出错: {str(e)}") finally: self.running = False self.current_web = None def start_Ads_data(gui, data, window_id, sleep_times, chrome_options, amount, API_KEY): # 处理表单填写 # 使用 BitBrowser 窗口 ID # res = openBrowser(window_id) # # # 调试:打印 openBrowser 的返回值 # logger.info(f"openBrowser Response: {res}") # # # 检查返回值是否包含 'data' 键 # if 'data' not in res: # logger.error("BitBrowser API 返回中没有 'data' 键") # raise Exception("BitBrowser API 返回中没有 'data' 键") # # driverPath = res['data']['driver'] # debuggerAddress = res['data']['http'] # # if res["success"] is True: # logger.info("启动成功,返回信息") # else: # logger.error(f"{window_id}启动失败") # raise Exception("启动失败") # # chrome_opts = webdriver.ChromeOptions() # chrome_opts.add_experimental_option("debuggerAddress", debuggerAddress) # # if chrome_options["no_sandbox"]: # chrome_opts.add_argument('--no-sandbox') # if chrome_options["disable_dev_shm"]: # chrome_opts.add_argument('--disable-dev-shm-usage') # if chrome_options["start_maximized"]: # chrome_opts.add_argument('--start-maximized') # # chrome_service = Service(driverPath) # web = webdriver.Chrome(service=chrome_service, options=chrome_opts) # web.implicitly_wait(30) # # # 将 web 实例保存到 GUI 对象,以便在停止时关闭 # gui.current_web = web # # web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants") # original_window = web.current_window_handle # WebDriverWait(web, 30).until(EC.presence_of_element_located((By.TAG_NAME, 'body'))) # logger.info(f"当前页面标题: {web.title}") web = start_ADS(window_id, chrome_options, gui, sleep_times, API_KEY) # 滑动到页面的底部 web.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(sleep_times["sleep1"]) logger.info("等待验证码识别") try: iframe = WebDriverWait(web, 10).until( EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']")) ) except TimeoutException: logger.error("未找到reCAPTCHA iframe") raise Exception("未找到reCAPTCHA iframe") max_attempts = 10 # 最大尝试次数 attempts = 0 # 当前尝试计数器 while True: try: WebDriverWait(web, 10).until( lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != '' ) logger.info("reCAPTCHA验证成功") break except TimeoutException: attempts += 1 logger.warning("验证未完成或失败") if attempts >= max_attempts: logger.info("达到最大尝试次数,刷新页面...") web.refresh() attempts = 0 # 重置计数器 finally: web.switch_to.default_content() web.switch_to.default_content() web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1]) web.find_element(By.ID, "street1").send_keys(data[2]) web.find_element(By.ID, "city").send_keys(data[3]) time.sleep(1) Select(web.find_element(By.XPATH, "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index( abbreviation_to_index[data[4]]) web.find_element(By.ID, "zip").send_keys(data[5]) time.sleep(1) web.find_element(By.ID, "email").send_keys(data[0]) web.find_element(By.XPATH, "//input[@type='checkbox']").click() web.find_element(By.XPATH, f"//input[@value='{amount}']").click() web.find_element(By.ID, "signature").send_keys(data[1]) submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']"))) submit_button.click() logger.info("填写表单完成,提交并开始验证") # 验证提取是否成功 result = check_submission_success(data, web) time.sleep(sleep_times["sleep2"]) web.delete_all_cookies() web.execute_script("window.localStorage.clear();") web.execute_script("window.sessionStorage.clear();") web.quit() time.sleep(sleep_times["sleep3"]) # 使用 BitBrowser 窗口 ID closeBrowser(window_id) # 添加延迟,确保窗口完全关闭 logger.info("等待窗口完全关闭...") time.sleep(sleep_times["sleep3"]) logger.info("完成一次填表循环") return result def start_ADS(window_id, chrome_options, gui, sleep_times, API_KEY): # 开启窗口,打开页面,加载插件号 while True: try: res = openBrowser(window_id) # 调试:打印 openBrowser 的返回值 logger.info(f"openBrowser Response: {res}") # 检查返回值是否包含 'data' 键 if 'data' not in res: logger.error("BitBrowser API 返回中没有 'data' 键") raise Exception("BitBrowser API 返回中没有 'data' 键") driverPath = res['data']['driver'] debuggerAddress = res['data']['http'] if res["success"] is True: logger.info("启动成功,返回信息") else: logger.error(f"{window_id}启动失败") raise Exception("启动失败") chrome_opts = webdriver.ChromeOptions() chrome_opts.add_experimental_option("debuggerAddress", debuggerAddress) if chrome_options["no_sandbox"]: chrome_opts.add_argument('--no-sandbox') if chrome_options["disable_dev_shm"]: chrome_opts.add_argument('--disable-dev-shm-usage') if chrome_options["start_maximized"]: chrome_opts.add_argument('--start-maximized') chrome_service = Service(driverPath) web = webdriver.Chrome(service=chrome_service, options=chrome_opts) web.implicitly_wait(60) # 将 web 实例保存到 GUI 对象,以便在停止时关闭 gui.current_web = web logger.info("加载插件") web.get(f"https://nopecha.com/setup#{API_KEY}") web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants") original_window = web.current_window_handle WebDriverWait(web, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body'))) logger.info(f"当前页面标题: {web.title}") break except Exception as e: logger.info(f"加载页面失败{str(e)},将重启窗口{window_id}") # 关闭窗口 closeBrowser(window_id) time.sleep(sleep_times["sleep3"]) return web def check_submission_success(data, web): # 验证是否是否提交成功得参考号 try: # 等待成功标题出现 WebDriverWait(web, 30).until( EC.visibility_of_element_located((By.XPATH, "//h2[contains(., 'THANK YOU FOR SUBMITTING')]")) ) # 获取参考号和日期 ref_number = web.find_element(By.XPATH, "//p[contains(., 'Your Reference Number is')]/b").text # submission_date = web.find_element(By.XPATH, "//p[contains(., 'submitted on')]/b").text logger1.info(", ".join(data) + "提交成功参考号为:" + ref_number) return True except Exception as e: logger.warning({str(e)}) logger.warning(", ".join(data) + "验证失败") logger1.info(", ".join(data) + "提交后查询参考号失败") # 这里不抛异常 return False def read_txt_file(file_path): data = [] try: with open(file_path, 'r', encoding='utf-8') as file: for line in file: cleaned_line = line.strip() if not cleaned_line: continue fields = cleaned_line.split('\t') if len(fields) != 6: logger.warning(f"第 {len(data) + 1} 行字段数量异常: {len(fields)}") continue data.append(fields) except FileNotFoundError: logger.error(f"文件 {file_path} 未找到") return None except Exception as e: logger.error(f"读取文件时发生错误: {str(e)}") return None return data if __name__ == '__main__': root = tk.Tk() app = AdsDataGUI(root) root.mainloop() # 打包成exe命令,会打包所有python包 # Pyinstaller -F -w -i pp.ico allcode_by.py # 5c2216ed3b3e4bf0bc144b5ad0c87a5b # fd2014bb64b142a5a553d275567f573c