import requests import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import Select from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from selenium.common.exceptions import TimeoutException from utils import abbreviation_to_index from bit_api import * from multiprocessing import Process # 替换 threading 为 multiprocessing # PROFILE_ID为环境ID res = openBrowser("947c8036facb4e8a935296e57f36652b") # 窗口ID从窗口配置界面中复制,或者api创建后返回 def start_Ads_data(PROFILE_ID, data:list): BASE_URL = "http://127.0.0.1:54345" API_KEY = "704daf420f7244d08be51f61c987a232" # 替换为你的 API Key # 启动 Adspower start_profile_url = f"{BASE_URL}/api/v1/browser/start?user_id={PROFILE_ID}&api_key={API_KEY}" response = requests.get(start_profile_url).json() # 检查是否启动成功 if response["code"] == 0: print(f"Adspower 启动成功,返回信息") chrome_driver_path = response["data"]["webdriver"] # 驱动地址 debugger_address = response["data"]["ws"]["selenium"] print(chrome_driver_path) else: print(f"启动失败: {response['msg']}") exit(1) # 打开 Selenium -> Adspower options = webdriver.ChromeOptions() options.add_experimental_option("debuggerAddress", debugger_address) web = webdriver.Chrome(service=Service(chrome_driver_path),options=options) web.implicitly_wait(30) # 访问页面 web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants") print("当前页面标题:", web.title) time.sleep(10) print("等待验证码识别") try: iframe = WebDriverWait(web, 10).until( EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']")) ) except TimeoutException: print("未找到reCAPTCHA iframe") exit() while True: try: # 等待直到响应值非空 WebDriverWait(web, 10).until( lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != '' ) print("reCAPTCHA验证成功") break except TimeoutException: print("验证未完成或失败") finally: web.switch_to.default_content() # 切回主页面 web.switch_to.default_content() # 姓名 web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1]) # 删除 # web.find_element(By.XPATH, "//div[3]/div/div/input").send_keys("测试数据") # street web.find_element(By.ID,"street1").send_keys(data[2]) # city web.find_element(By.ID, "city").send_keys(data[3]) time.sleep(1) # 选择州 Select(web.find_element(By.XPATH, "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(abbreviation_to_index[data[4]]) # 选择code web.find_element(By.ID, "zip").send_keys(data[5]) time.sleep(1) # web.find_element(By.XPATH, "//div[6]/div/div/input").send_keys("测试数据") # 邮箱 web.find_element(By.ID, "email").send_keys(data[0]) # 确认 web.find_element(By.XPATH, "//input[@type='checkbox']").click() # 金额 "//input[@value='$1-$500']" web.find_element(By.XPATH, "//input[@value='$501-$1,000']").click() # 签名 web.find_element(By.ID, "signature").send_keys(data[1]) submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']"))) submit_button.click() time.sleep(3) web.delete_all_cookies() # 清除 LocalStorage 和 SessionStorage web.execute_script("window.localStorage.clear();") web.execute_script("window.sessionStorage.clear();") web.quit() time.sleep(3) print("完成一次") def read_txt_file(file_path): data = [] try: with open(file_path, 'r', encoding='utf-8') as file: for line in file: # 移除首尾空白字符并按制表符分割 cleaned_line = line.strip() if not cleaned_line: # 跳过空行 continue fields = cleaned_line.split('\t') # 验证字段数量(示例数据每行6个字段) if len(fields) != 6: print(f"警告:第 {len(data) + 1} 行字段数量异常: {len(fields)}") continue data.append(fields) except FileNotFoundError: print(f"错误:文件 {file_path} 未找到") return None except Exception as e: print(f"读取文件时发生错误: {str(e)}") return None return data def cont(): datas = read_txt_file("generated_data.txt") # TODO:进程池 for data in datas: print(data) start_Ads_data('kvgnckt', data) print("进入下次循环") return 0 if __name__ == '__main__': cont()