main.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. import requests
  2. import time
  3. from selenium import webdriver
  4. from selenium.webdriver.common.by import By
  5. from selenium.webdriver.chrome.service import Service
  6. from selenium.webdriver.support.ui import Select
  7. from selenium.webdriver.support import expected_conditions as EC
  8. from selenium.webdriver.support.ui import WebDriverWait
  9. from selenium.common.exceptions import TimeoutException
  10. from multiprocessing import Process # 替换 threading 为 multiprocessing
  11. # PROFILE_ID为环境ID
  12. def start_Ads(PROFILE_ID):
  13. BASE_URL = "http://127.0.0.1:54345"
  14. API_KEY = "dc954750b89edd7e421639832bff3151" # 替换为你的 API Key
  15. # 启动 Adspower
  16. start_profile_url = f"{BASE_URL}/api/v1/browser/start?user_id={PROFILE_ID}&api_key={API_KEY}"
  17. response = requests.get(start_profile_url).json()
  18. # 检查是否启动成功
  19. if response["code"] == 0:
  20. print(f"Adspower 启动成功,返回信息")
  21. chrome_driver_path = response["data"]["webdriver"]
  22. debugger_address = response["data"]["ws"]["selenium"]
  23. print(chrome_driver_path)
  24. else:
  25. print(f"启动失败: {response['msg']}")
  26. exit(1)
  27. # 打开 Selenium -> Adspower
  28. options = webdriver.ChromeOptions()
  29. options.add_experimental_option("debuggerAddress", debugger_address)
  30. web = webdriver.Chrome(service=Service("/Users/zangtuo/Library/Application Support/adspower_global/cwd_global/chrome_131/chromedriver.app/Contents/MacOS/chromedriver"),options=options)
  31. web.implicitly_wait(30)
  32. # 访问页面
  33. web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants")
  34. print("当前页面标题:", web.title)
  35. time.sleep(10)
  36. print("等待验证码识别")
  37. # while True:
  38. # try:
  39. # WebDriverWait(web, 5).until(
  40. # EC.frame_to_be_available_and_switch_to_it(
  41. # (By.XPATH, "//iframe[contains(@title, 'recaptcha challenge')]"))
  42. # )
  43. # # 切换回主界面
  44. # # web.switch_to.default_content()
  45. # print("等待 10 秒后再次尝试")
  46. # time.sleep(10)
  47. # except TimeoutException:
  48. # print("验证码识别成功")
  49. # break # 超时后退出循环,继续执行后续代码
  50. try:
  51. iframe = WebDriverWait(web, 10).until(
  52. EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']"))
  53. )
  54. except TimeoutException:
  55. print("未找到reCAPTCHA iframe")
  56. exit()
  57. while True:
  58. try:
  59. # 等待直到响应值非空
  60. WebDriverWait(web, 10).until(
  61. lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != ''
  62. )
  63. print("reCAPTCHA验证成功")
  64. break
  65. except TimeoutException:
  66. print("验证未完成或失败")
  67. finally:
  68. web.switch_to.default_content() # 切回主页面
  69. web.switch_to.default_content()
  70. # 姓名
  71. web.find_element(By.XPATH, "//input[@type='text']").send_keys("测试数据") # Claimants Name: *
  72. # 删除
  73. web.find_element(By.XPATH, "//div[3]/div/div/input").send_keys("测试数据") # Claimants Name: *
  74. # street
  75. web.find_element(By.ID,"street1").send_keys("测试数据")
  76. # city
  77. web.find_element(By.ID, "city").send_keys("测试数据")
  78. # 选择州
  79. Select(web.find_element(By.XPATH, "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(37)
  80. # 选择code
  81. web.find_element(By.ID, "zip").send_keys("测试数据")
  82. #
  83. web.find_element(By.XPATH, "//div[6]/div/div/input").send_keys("测试数据")
  84. # 邮箱
  85. web.find_element(By.ID, "email").send_keys("8888888888@gmail.com")
  86. # 确认
  87. web.find_element(By.XPATH, "//input[@type='checkbox']").click()
  88. # 金额
  89. web.find_element(By.XPATH, "//input[@value='$501-$1,000']").click()
  90. # 签名
  91. web.find_element(By.ID, "signature").send_keys("测试数据")
  92. # 切换frame,点击验证按钮
  93. # WebDriverWait(web, 10).until(EC.frame_to_be_available_and_switch_to_it((By.XPATH, "//iframe[contains(@title, 'reCAPTCHA')]")))
  94. # recaptcha_anchor = WebDriverWait(web, 10).until(
  95. # EC.element_to_be_clickable((By.XPATH, "//span[@id='recaptcha-anchor']/div"))
  96. # )
  97. # recaptcha_anchor.click()
  98. # # 验证码处理逻辑,切换页面
  99. # EC.frame_to_be_available_and_switch_to_it((By.XPATH, "//iframe[contains(@title, 'reCAPTCHA')]"))
  100. # # 测试
  101. # EC.frame_to_be_available_and_switch_to_it((By.XPATH, "/html/body/div/div[4]/iframe"))
  102. # # 验证码指令
  103. # dataw = web.find_element(By.XPATH,"//*[@id='rc-imageselect']/div[2]/div[1]/div[1]/div/text()[1]")
  104. # # 加载图片
  105. # img = web.find_element(By.XPATH, "//div[@id='rc-imageselect-target']/table/tbody/tr/td[2]/div/div/img")
  106. # 点击提交按钮
  107. submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']")))
  108. submit_button.click()
  109. time.sleep(3)
  110. web.quit()
  111. print("完成一次")
  112. def start_Ads_data(PROFILE_ID, data:list):
  113. BASE_URL = "http://127.0.0.1:54345"
  114. API_KEY = "dc954750b89edd7e421639832bff3151" # 替换为你的 API Key
  115. # 启动 Adspower
  116. start_profile_url = f"{BASE_URL}/api/v1/browser/start?user_id={PROFILE_ID}&api_key={API_KEY}"
  117. response = requests.get(start_profile_url).json()
  118. # 检查是否启动成功
  119. if response["code"] == 0:
  120. print(f"Adspower 启动成功,返回信息")
  121. chrome_driver_path = response["data"]["webdriver"]
  122. debugger_address = response["data"]["ws"]["selenium"]
  123. print(chrome_driver_path)
  124. else:
  125. print(f"启动失败: {response['msg']}")
  126. exit(1)
  127. # 打开 Selenium -> Adspower
  128. options = webdriver.ChromeOptions()
  129. options.add_experimental_option("debuggerAddress", debugger_address)
  130. web = webdriver.Chrome(service=Service("/Users/zangtuo/Library/Application Support/adspower_global/cwd_global/chrome_131/chromedriver.app/Contents/MacOS/chromedriver"),options=options)
  131. web.implicitly_wait(30)
  132. # 访问页面
  133. web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants")
  134. print("当前页面标题:", web.title)
  135. time.sleep(10)
  136. print("等待验证码识别")
  137. try:
  138. iframe = WebDriverWait(web, 10).until(
  139. EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']"))
  140. )
  141. except TimeoutException:
  142. print("未找到reCAPTCHA iframe")
  143. exit()
  144. while True:
  145. try:
  146. # 等待直到响应值非空
  147. WebDriverWait(web, 10).until(
  148. lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != ''
  149. )
  150. print("reCAPTCHA验证成功")
  151. break
  152. except TimeoutException:
  153. print("验证未完成或失败")
  154. finally:
  155. web.switch_to.default_content() # 切回主页面
  156. web.switch_to.default_content()
  157. # 姓名
  158. web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1])
  159. # 删除
  160. # web.find_element(By.XPATH, "//div[3]/div/div/input").send_keys("测试数据")
  161. # street
  162. web.find_element(By.ID,"street1").send_keys(data[2])
  163. # city
  164. web.find_element(By.ID, "city").send_keys(data[3])
  165. # 选择州
  166. # TODO: 缺个逻辑根据州名判断点击哪个按钮
  167. Select(web.find_element(By.XPATH, "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(37)
  168. # 选择code
  169. web.find_element(By.ID, "zip").send_keys(data[5])
  170. # web.find_element(By.XPATH, "//div[6]/div/div/input").send_keys("测试数据")
  171. # 邮箱
  172. web.find_element(By.ID, "email").send_keys(data[0])
  173. # 确认
  174. web.find_element(By.XPATH, "//input[@type='checkbox']").click()
  175. # 金额
  176. web.find_element(By.XPATH, "//input[@value='$501-$1,000']").click()
  177. # 签名
  178. web.find_element(By.ID, "signature").send_keys(data[1])
  179. submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']")))
  180. submit_button.click()
  181. time.sleep(3)
  182. web.quit()
  183. time.sleep(3)
  184. print("完成一次")
  185. def read_data(path):
  186. records = []
  187. with open(path, 'r', encoding='utf-8') as f:
  188. lines = [line.strip() for line in f if line.strip()]
  189. for i in range(0, len(lines), 2):
  190. if i + 1 >= len(lines):
  191. break
  192. email_line = lines[i]
  193. data_line = lines[i + 1]
  194. # 处理电子邮件
  195. email = email_line.replace('\t', '').strip()
  196. # 分割数据行字段
  197. fields = data_line.split('\t')
  198. fields = [f.strip() for f in fields if f.strip()]
  199. # 确保字段足够
  200. if len(fields) < 5:
  201. print(f"跳过字段不足的行: {data_line}")
  202. continue
  203. # 提取姓名等字段
  204. name_parts = fields[:len(fields) - 5]
  205. name = ' '.join(name_parts)
  206. try:
  207. address = fields[-5]
  208. city = fields[-4]
  209. state = fields[-3]
  210. zip_code = fields[-2]
  211. phone = fields[-1]
  212. except IndexError as e:
  213. print(f"字段提取错误: {e}, 行内容: {data_line}")
  214. continue
  215. records.append({
  216. 'email': email,
  217. 'name': name,
  218. 'address': address,
  219. 'city': city,
  220. 'state': state,
  221. 'zip': zip_code,
  222. 'phone': phone
  223. })
  224. return records
  225. def read_data(path):
  226. records = []
  227. with open(path, 'r', encoding='utf-8') as f:
  228. lines = [line.strip() for line in f if line.strip()]
  229. for i in range(0, len(lines), 2):
  230. if i + 1 >= len(lines):
  231. break
  232. email_line = lines[i]
  233. data_line = lines[i + 1]
  234. # 处理电子邮件
  235. email = email_line.replace('\t', '').strip()
  236. # 分割数据行字段
  237. fields = data_line.split('\t')
  238. fields = [f.strip() for f in fields if f.strip()]
  239. # 确保字段足够
  240. if len(fields) < 5:
  241. print(f"跳过字段不足的行: {data_line}")
  242. continue
  243. # 提取姓名等字段
  244. name_parts = fields[:len(fields) - 5]
  245. name = ' '.join(name_parts)
  246. try:
  247. address = fields[-5]
  248. city = fields[-4]
  249. state = fields[-3]
  250. zip_code = fields[-2]
  251. phone = fields[-1]
  252. except IndexError as e:
  253. print(f"字段提取错误: {e}, 行内容: {data_line}")
  254. continue
  255. records.append({
  256. 'email': email,
  257. 'name': name,
  258. 'address': address,
  259. 'city': city,
  260. 'state': state,
  261. 'zip': zip_code,
  262. 'phone': phone
  263. })
  264. return records
  265. def read_txt_file(file_path):
  266. """
  267. 读取并解析制表符分隔的文本文件
  268. :param file_path: 文件路径(例如:data.txt)
  269. :return: 包含所有记录的列表,每个记录是字段列表
  270. """
  271. data = []
  272. try:
  273. with open(file_path, 'r', encoding='utf-8') as file:
  274. for line in file:
  275. # 移除首尾空白字符并按制表符分割
  276. cleaned_line = line.strip()
  277. if not cleaned_line: # 跳过空行
  278. continue
  279. fields = cleaned_line.split('\t')
  280. # 验证字段数量(示例数据每行6个字段)
  281. if len(fields) != 6:
  282. print(f"警告:第 {len(data) + 1} 行字段数量异常: {len(fields)}")
  283. continue
  284. data.append(fields)
  285. except FileNotFoundError:
  286. print(f"错误:文件 {file_path} 未找到")
  287. return None
  288. except Exception as e:
  289. print(f"读取文件时发生错误: {str(e)}")
  290. return None
  291. return data
  292. # start_Ads('kvavcrf')
  293. # path = '模拟数据.txt'
  294. #
  295. # records = read_data(path)
  296. # for record in records:
  297. # start_Ads_data('kvavcrf',record)
  298. # print("全部完成")
  299. def cont():
  300. datas = read_txt_file("generated_data.txt")
  301. for data in datas:
  302. start_Ads_data('kvavcrf', data)
  303. print("进入下次循环")
  304. return 0
  305. if __name__ == '__main__':
  306. cont()