allcode.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import requests
  2. import time
  3. from selenium import webdriver
  4. from selenium.webdriver.common.by import By
  5. from selenium.webdriver.chrome.service import Service
  6. from selenium.webdriver.support.ui import Select
  7. from selenium.webdriver.support import expected_conditions as EC
  8. from selenium.webdriver.support.ui import WebDriverWait
  9. from selenium.common.exceptions import TimeoutException
  10. from utils import abbreviation_to_index
  11. from bit_api import *
  12. from multiprocessing import Process # 替换 threading 为 multiprocessing
  13. # PROFILE_ID为环境ID
  14. res = openBrowser("947c8036facb4e8a935296e57f36652b") # 窗口ID从窗口配置界面中复制,或者api创建后返回
  15. def start_Ads_data(PROFILE_ID, data:list):
  16. BASE_URL = "http://127.0.0.1:54345"
  17. API_KEY = "704daf420f7244d08be51f61c987a232" # 替换为你的 API Key
  18. # 启动 Adspower
  19. start_profile_url = f"{BASE_URL}/api/v1/browser/start?user_id={PROFILE_ID}&api_key={API_KEY}"
  20. response = requests.get(start_profile_url).json()
  21. # 检查是否启动成功
  22. if response["code"] == 0:
  23. print(f"Adspower 启动成功,返回信息")
  24. chrome_driver_path = response["data"]["webdriver"] # 驱动地址
  25. debugger_address = response["data"]["ws"]["selenium"]
  26. print(chrome_driver_path)
  27. else:
  28. print(f"启动失败: {response['msg']}")
  29. exit(1)
  30. # 打开 Selenium -> Adspower
  31. options = webdriver.ChromeOptions()
  32. options.add_experimental_option("debuggerAddress", debugger_address)
  33. web = webdriver.Chrome(service=Service(chrome_driver_path),options=options)
  34. web.implicitly_wait(30)
  35. # 访问页面
  36. web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants")
  37. print("当前页面标题:", web.title)
  38. time.sleep(10)
  39. print("等待验证码识别")
  40. try:
  41. iframe = WebDriverWait(web, 10).until(
  42. EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']"))
  43. )
  44. except TimeoutException:
  45. print("未找到reCAPTCHA iframe")
  46. exit()
  47. while True:
  48. try:
  49. # 等待直到响应值非空
  50. WebDriverWait(web, 10).until(
  51. lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != ''
  52. )
  53. print("reCAPTCHA验证成功")
  54. break
  55. except TimeoutException:
  56. print("验证未完成或失败")
  57. finally:
  58. web.switch_to.default_content() # 切回主页面
  59. web.switch_to.default_content()
  60. # 姓名
  61. web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1])
  62. # 删除
  63. # web.find_element(By.XPATH, "//div[3]/div/div/input").send_keys("测试数据")
  64. # street
  65. web.find_element(By.ID,"street1").send_keys(data[2])
  66. # city
  67. web.find_element(By.ID, "city").send_keys(data[3])
  68. time.sleep(1)
  69. # 选择州
  70. Select(web.find_element(By.XPATH, "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(abbreviation_to_index[data[4]])
  71. # 选择code
  72. web.find_element(By.ID, "zip").send_keys(data[5])
  73. time.sleep(1)
  74. # web.find_element(By.XPATH, "//div[6]/div/div/input").send_keys("测试数据")
  75. # 邮箱
  76. web.find_element(By.ID, "email").send_keys(data[0])
  77. # 确认
  78. web.find_element(By.XPATH, "//input[@type='checkbox']").click()
  79. # 金额
  80. "//input[@value='$1-$500']"
  81. web.find_element(By.XPATH, "//input[@value='$501-$1,000']").click()
  82. # 签名
  83. web.find_element(By.ID, "signature").send_keys(data[1])
  84. submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']")))
  85. submit_button.click()
  86. time.sleep(3)
  87. web.delete_all_cookies()
  88. # 清除 LocalStorage 和 SessionStorage
  89. web.execute_script("window.localStorage.clear();")
  90. web.execute_script("window.sessionStorage.clear();")
  91. web.quit()
  92. time.sleep(3)
  93. print("完成一次")
  94. def read_txt_file(file_path):
  95. data = []
  96. try:
  97. with open(file_path, 'r', encoding='utf-8') as file:
  98. for line in file:
  99. # 移除首尾空白字符并按制表符分割
  100. cleaned_line = line.strip()
  101. if not cleaned_line: # 跳过空行
  102. continue
  103. fields = cleaned_line.split('\t')
  104. # 验证字段数量(示例数据每行6个字段)
  105. if len(fields) != 6:
  106. print(f"警告:第 {len(data) + 1} 行字段数量异常: {len(fields)}")
  107. continue
  108. data.append(fields)
  109. except FileNotFoundError:
  110. print(f"错误:文件 {file_path} 未找到")
  111. return None
  112. except Exception as e:
  113. print(f"读取文件时发生错误: {str(e)}")
  114. return None
  115. return data
  116. def cont():
  117. datas = read_txt_file("generated_data.txt")
  118. # TODO:进程池
  119. for data in datas:
  120. print(data)
  121. start_Ads_data('kvgnckt', data)
  122. print("进入下次循环")
  123. return 0
  124. if __name__ == '__main__':
  125. cont()