|
- import time
- import tkinter as tk
- from tkinter import ttk, filedialog, messagebox
- import json
- import os
- import threading
- import logging
- from selenium import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.chrome.service import Service
- from selenium.webdriver.support.ui import Select
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.common.exceptions import TimeoutException
- from utils import abbreviation_to_index
- from bit_api import *
- from multiprocessing import Process
- import datetime
- current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- result_logger = logging.getLogger("normal")
- logger1 = logging.getLogger('result_log')
- logger1.setLevel(logging.INFO)
- handler1 = logging.FileHandler("result_log" + ".log")
- formatter1 = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
- handler1.setFormatter(formatter1)
- logger1.addHandler(handler1)
- class TextHandler(logging.Handler):
- def __init__(self, text_widget, adjust_height_callback):
- super().__init__()
- self.text_widget = text_widget
- self.adjust_height_callback = adjust_height_callback
- def emit(self, record):
- msg = self.format(record)
- self.text_widget.insert(tk.END, msg + '\n')
- self.text_widget.see(tk.END)
- self.adjust_height_callback()
- class AdsDataGUI:
- def __init__(self, root):
- self.root = root
- self.root.title("自动化注册 - BitBrowser")
-
- self.running = False
- self.current_web = None
-
- self.config_file = "config.json"
- self.load_config()
-
- self.params = {
- "WINDOW_ID": tk.StringVar(value=self.config.get("WINDOW_ID", "5c2216ed3b3e4bf0bc144b5ad0c87a5b")),
- "BASE_URL": tk.StringVar(value=self.config.get("BASE_URL", "http://127.0.0.1:54345")),
- "API_KEY": tk.StringVar(value=self.config.get("API_KEY", "704daf420f7244d08be51f61c987a232")),
- "FILE_PATH": tk.StringVar(value=self.config.get("FILE_PATH", "generated_data.txt")),
- "AMOUNT": tk.StringVar(value=self.config.get("AMOUNT", "$501-$1,000")),
- "SLEEP_TIME_1": tk.DoubleVar(value=self.config.get("SLEEP_TIME_1", 10.0)),
- "SLEEP_TIME_2": tk.DoubleVar(value=self.config.get("SLEEP_TIME_2", 3.0)),
- "SLEEP_TIME_3": tk.DoubleVar(value=self.config.get("SLEEP_TIME_3", 10.0)),
- "NO_SANDBOX": tk.BooleanVar(value=self.config.get("NO_SANDBOX", True)),
- "DISABLE_DEV_SHM": tk.BooleanVar(value=self.config.get("DISABLE_DEV_SHM", True)),
- "START_MAXIMIZED": tk.BooleanVar(value=self.config.get("START_MAXIMIZED", True))
- }
-
- self.main_frame = ttk.Frame(self.root)
- self.main_frame.pack(fill=tk.BOTH, expand=True)
-
- self.canvas = tk.Canvas(self.main_frame)
- self.scrollbar = ttk.Scrollbar(self.main_frame, orient=tk.VERTICAL, command=self.canvas.yview)
- self.scrollable_frame = ttk.Frame(self.canvas)
- self.scrollable_frame.bind(
- "<Configure>",
- lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
- )
- self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
- self.canvas.configure(yscrollcommand=self.scrollbar.set)
- self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
-
- row = 0
- tk.Label(self.scrollable_frame, text="自动化注册 - BitBrowser", font=("Arial", 16)).grid(row=row, column=0,
- columnspan=3, pady=10)
- row += 1
-
- labels = [
- ("窗口 ID:", self.params["WINDOW_ID"]),
- ("API 地址:", self.params["BASE_URL"]),
- ("API 密钥:", self.params["API_KEY"]),
- ("文件路径:", self.params["FILE_PATH"]),
- ("金额范围:", self.params["AMOUNT"]),
- ("等待时间 1 (验证码, 秒):", self.params["SLEEP_TIME_1"]),
- ("等待时间 2 (提交后, 秒):", self.params["SLEEP_TIME_2"]),
- ("等待时间 3 (关闭后, 秒):", self.params["SLEEP_TIME_3"])
- ]
- for label_text, var in labels:
- tk.Label(self.scrollable_frame, text=label_text).grid(row=row, column=0, sticky="e", padx=5, pady=5)
- if label_text == "金额范围:":
- amount_options = ["$1-$500", "$501-$1,000", "$1,001-$5,000", "超过 $5,000"]
- ttk.Combobox(self.scrollable_frame, textvariable=var, values=amount_options, state="readonly").grid(
- row=row, column=1, padx=5, pady=5, sticky="ew")
- else:
- tk.Entry(self.scrollable_frame, textvariable=var).grid(row=row, column=1, padx=5, pady=5, sticky="ew")
- if label_text == "文件路径:":
- tk.Button(self.scrollable_frame, text="浏览", command=self.browse_file).grid(row=row, column=2, padx=5,
- pady=5)
- row += 1
- tk.Checkbutton(self.scrollable_frame, text="无沙盒模式", variable=self.params["NO_SANDBOX"]).grid(row=row,
- column=0,
- columnspan=3,
- pady=5)
- row += 1
- tk.Checkbutton(self.scrollable_frame, text="禁用 Dev SHM", variable=self.params["DISABLE_DEV_SHM"]).grid(
- row=row, column=0, columnspan=3, pady=5)
- row += 1
- tk.Checkbutton(self.scrollable_frame, text="启动时最大化", variable=self.params["START_MAXIMIZED"]).grid(
- row=row, column=0, columnspan=3, pady=5)
- row += 1
- self.status_label = tk.Label(self.scrollable_frame, text="状态: 就绪", fg="blue")
- self.status_label.grid(row=row, column=0, columnspan=3, pady=10)
- row += 1
- tk.Label(self.scrollable_frame, text="日志输出:").grid(row=row, column=0, columnspan=3, pady=5)
- row += 1
-
- self.log_frame = ttk.Frame(self.scrollable_frame)
- self.log_frame.grid(row=row, column=0, columnspan=3, padx=5, pady=5, sticky="nsew")
- self.log_text = tk.Text(self.log_frame, height=0, width=60, wrap=tk.WORD)
- self.log_scrollbar = ttk.Scrollbar(self.log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
- self.log_text.configure(yscrollcommand=self.log_scrollbar.set)
- self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- self.log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
- row += 1
-
- button_frame = ttk.Frame(self.scrollable_frame)
- button_frame.grid(row=row, column=0, columnspan=3, pady=10)
- tk.Button(button_frame, text="开始", command=self.start_script).pack(side=tk.LEFT, padx=5)
- tk.Button(button_frame, text="停止", command=self.stop_script).pack(side=tk.LEFT, padx=5)
- tk.Button(button_frame, text="清除日志", command=self.clear_log).pack(side=tk.LEFT, padx=5)
- tk.Button(button_frame, text="保存参数", command=self.save_config).pack(side=tk.LEFT, padx=5)
- row += 1
-
- for i in range(3):
- self.scrollable_frame.grid_columnconfigure(i, weight=1)
-
- self.log_handler = TextHandler(self.log_text, self.adjust_log_height)
- logger.addHandler(self.log_handler)
-
- self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
-
- self.root.update_idletasks()
- self.root.minsize(self.scrollable_frame.winfo_reqwidth(), self.scrollable_frame.winfo_reqheight())
- def adjust_log_height(self):
- """动态调整日志框高度"""
- content = self.log_text.get("1.0", tk.END).strip()
- if not content:
- self.log_text.configure(height=0)
- else:
- lines = content.count('\n') + 1
- max_height = 20
- new_height = min(lines, max_height)
- self.log_text.configure(height=new_height)
- def load_config(self):
- """加载配置文件"""
- self.config = {}
- if os.path.exists(self.config_file):
- try:
- with open(self.config_file, 'r', encoding='utf-8') as f:
- self.config = json.load(f)
- except Exception as e:
- logger.error(f"无法加载配置文件: {str(e)}")
- def save_config(self):
- """保存配置"""
- config = {
- "WINDOW_ID": self.params["WINDOW_ID"].get(),
- "BASE_URL": self.params["BASE_URL"].get(),
- "API_KEY": self.params["API_KEY"].get(),
- "FILE_PATH": self.params["FILE_PATH"].get(),
- "AMOUNT": self.params["AMOUNT"].get(),
- "SLEEP_TIME_1": self.params["SLEEP_TIME_1"].get(),
- "SLEEP_TIME_2": self.params["SLEEP_TIME_2"].get(),
- "SLEEP_TIME_3": self.params["SLEEP_TIME_3"].get(),
- "NO_SANDBOX": self.params["NO_SANDBOX"].get(),
- "DISABLE_DEV_SHM": self.params["DISABLE_DEV_SHM"].get(),
- "START_MAXIMIZED": self.params["START_MAXIMIZED"].get()
- }
- try:
- with open(self.config_file, 'w', encoding='utf-8') as f:
- json.dump(config, f, indent=4, ensure_ascii=False)
- messagebox.showinfo("提示", "参数已保存!")
- except Exception as e:
- logger.error(f"无法保存配置文件: {str(e)}")
- messagebox.showerror("错误", f"无法保存参数: {str(e)}")
- def on_closing(self):
- """关闭窗口时保存配置"""
- self.save_config()
- self.root.destroy()
- def browse_file(self):
- """文件选择器"""
- file_path = filedialog.askopenfilename(filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")])
- if file_path:
- self.params["FILE_PATH"].set(file_path)
- def clear_log(self):
- """清除日志"""
- self.log_text.delete(1.0, tk.END)
- self.adjust_log_height()
- def start_script(self):
- if self.running:
- messagebox.showinfo("提示", "脚本已经在运行!")
- return
- self.running = True
- self.status_label.config(text="状态: 运行中", fg="green")
- threading.Thread(target=self.run_script, daemon=True).start()
- def stop_script(self):
- self.running = False
- self.status_label.config(text="状态: 已停止", fg="red")
-
- if self.current_web:
- try:
- self.current_web.quit()
- logger.info("因用户停止脚本,关闭 Selenium 驱动...")
- except Exception as e:
- logger.error(f"关闭 Selenium 驱动失败: {str(e)}")
- self.current_web = None
- def run_script(self):
- try:
- logger.info("开始运行脚本...")
- datas = read_txt_file(self.params["FILE_PATH"].get())
- if not datas:
- self.status_label.config(text="状态: 错误 - 文件未找到", fg="red")
- logger.error("文件未找到或为空")
- return
- logger.info(f"从文件中读取了 {len(datas)} 条记录")
- for data in datas:
- if not self.running:
- logger.info("用户停止了脚本")
- break
- self.status_label.config(text=f"状态: 正在处理 {data[0]}", fg="green")
- logger.info(f"处理记录: {data}")
-
- max_retries = 3
- attempt_success = False
- for attempt in range(max_retries):
- if not self.running:
- logger.info("用户停止了脚本")
- break
- try:
- result = start_Ads_data(
- self,
- data,
- window_id=self.params["WINDOW_ID"].get(),
- sleep_times={
- "sleep1": self.params["SLEEP_TIME_1"].get(),
- "sleep2": self.params["SLEEP_TIME_2"].get(),
- "sleep3": self.params["SLEEP_TIME_3"].get()
- },
- chrome_options={
- "no_sandbox": self.params["NO_SANDBOX"].get(),
- "disable_dev_shm": self.params["DISABLE_DEV_SHM"].get(),
- "start_maximized": self.params["START_MAXIMIZED"].get()
- },
- amount=self.params["AMOUNT"].get(), API_KEY=self.params["API_KEY"].get()
- )
- if result:
- logger.info("操作成功!")
-
- with open(self.params["FILE_PATH"].get(), "r") as f:
- lines = f.readlines()
-
- filtered_lines = [line for line in lines if data[0] not in line]
- with open(self.params["FILE_PATH"].get(), 'w') as f:
- f.writelines(filtered_lines)
- attempt_success = True
- break
- else:
-
- logger.warning(f"查询超时已记录")
- break
- except Exception as e:
- if not self.running:
- logger.info("用户停止了脚本")
- break
- if attempt < max_retries - 1:
- logger.warning(f"尝试 {attempt + 1} 失败: {str(e)},等待 10 秒后重试...")
- time.sleep(10)
- else:
- raise e
- if not attempt_success and attempt == max_retries - 1:
-
- logger1.info(f"finally{data}注册失败")
- logger.info("完成一条记录,进入下一条记录")
- self.status_label.config(text="状态: 已完成", fg="blue")
- logger.info("脚本运行完成")
- except Exception as e:
- self.status_label.config(text=f"状态: 错误 - {str(e)}", fg="red")
- logger.error(f"运行脚本时出错: {str(e)}")
- finally:
- self.running = False
- self.current_web = None
- def start_Ads_data(gui, data, window_id, sleep_times, chrome_options, amount, API_KEY):
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- web = start_ADS(window_id, chrome_options, gui, sleep_times, API_KEY)
-
- web.execute_script("window.scrollTo(0, document.body.scrollHeight);")
- time.sleep(sleep_times["sleep1"])
- logger.info("等待验证码识别")
- try:
- iframe = WebDriverWait(web, 10).until(
- EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']"))
- )
- except TimeoutException:
- logger.error("未找到reCAPTCHA iframe")
- raise Exception("未找到reCAPTCHA iframe")
- max_attempts = 10
- attempts = 0
- while True:
- try:
- WebDriverWait(web, 10).until(
- lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != ''
- )
- logger.info("reCAPTCHA验证成功")
- break
- except TimeoutException:
- attempts += 1
- logger.warning("验证未完成或失败")
- if attempts >= max_attempts:
- logger.info("达到最大尝试次数,刷新页面...")
- web.refresh()
- attempts = 0
- finally:
- web.switch_to.default_content()
- web.switch_to.default_content()
- web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1])
- web.find_element(By.ID, "street1").send_keys(data[2])
- web.find_element(By.ID, "city").send_keys(data[3])
- time.sleep(1)
- Select(web.find_element(By.XPATH,
- "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(
- abbreviation_to_index[data[4]])
- web.find_element(By.ID, "zip").send_keys(data[5])
- time.sleep(1)
- web.find_element(By.ID, "email").send_keys(data[0])
- web.find_element(By.XPATH, "//input[@type='checkbox']").click()
- web.find_element(By.XPATH, f"//input[@value='{amount}']").click()
- web.find_element(By.ID, "signature").send_keys(data[1])
- submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']")))
- submit_button.click()
- logger.info("填写表单完成,提交并开始验证")
-
- result = check_submission_success(data, web)
- time.sleep(sleep_times["sleep2"])
- web.delete_all_cookies()
- web.execute_script("window.localStorage.clear();")
- web.execute_script("window.sessionStorage.clear();")
- web.quit()
- time.sleep(sleep_times["sleep3"])
-
- closeBrowser(window_id)
-
- logger.info("等待窗口完全关闭...")
- time.sleep(sleep_times["sleep3"])
- logger.info("完成一次填表循环")
- return result
- def start_ADS(window_id, chrome_options, gui, sleep_times, API_KEY):
-
- while True:
- try:
- res = openBrowser(window_id)
-
- logger.info(f"openBrowser Response: {res}")
-
- if 'data' not in res:
- logger.error("BitBrowser API 返回中没有 'data' 键")
- raise Exception("BitBrowser API 返回中没有 'data' 键")
- driverPath = res['data']['driver']
- debuggerAddress = res['data']['http']
- if res["success"] is True:
- logger.info("启动成功,返回信息")
- else:
- logger.error(f"{window_id}启动失败")
- raise Exception("启动失败")
- chrome_opts = webdriver.ChromeOptions()
- chrome_opts.add_experimental_option("debuggerAddress", debuggerAddress)
- if chrome_options["no_sandbox"]:
- chrome_opts.add_argument('--no-sandbox')
- if chrome_options["disable_dev_shm"]:
- chrome_opts.add_argument('--disable-dev-shm-usage')
- if chrome_options["start_maximized"]:
- chrome_opts.add_argument('--start-maximized')
- chrome_service = Service(driverPath)
- web = webdriver.Chrome(service=chrome_service, options=chrome_opts)
- web.implicitly_wait(60)
-
- gui.current_web = web
- logger.info("加载插件")
- web.get(f"https://nopecha.com/setup#{API_KEY}")
- web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants")
- original_window = web.current_window_handle
- WebDriverWait(web, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body')))
- logger.info(f"当前页面标题: {web.title}")
- break
- except Exception as e:
- logger.info(f"加载页面失败{str(e)},将重启窗口{window_id}")
-
- closeBrowser(window_id)
- time.sleep(sleep_times["sleep3"])
- return web
- def check_submission_success(data, web):
-
- try:
-
- WebDriverWait(web, 30).until(
- EC.visibility_of_element_located((By.XPATH, "//h2[contains(., 'THANK YOU FOR SUBMITTING')]"))
- )
-
- ref_number = web.find_element(By.XPATH, "//p[contains(., 'Your Reference Number is')]/b").text
-
- logger1.info(", ".join(data) + "提交成功参考号为:" + ref_number)
- return True
- except Exception as e:
- logger.warning({str(e)})
- logger.warning(", ".join(data) + "验证失败")
- logger1.info(", ".join(data) + "提交后查询参考号失败")
-
- return False
- def read_txt_file(file_path):
- data = []
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- for line in file:
- cleaned_line = line.strip()
- if not cleaned_line:
- continue
- fields = cleaned_line.split('\t')
- if len(fields) != 6:
- logger.warning(f"第 {len(data) + 1} 行字段数量异常: {len(fields)}")
- continue
- data.append(fields)
- except FileNotFoundError:
- logger.error(f"文件 {file_path} 未找到")
- return None
- except Exception as e:
- logger.error(f"读取文件时发生错误: {str(e)}")
- return None
- return data
- if __name__ == '__main__':
- root = tk.Tk()
- app = AdsDataGUI(root)
- root.mainloop()
|