123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- import requests
- import time
- from selenium import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.chrome.service import Service
- from selenium.webdriver.support.ui import Select
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.common.exceptions import TimeoutException
- from multiprocessing import Process
- def start_Ads(PROFILE_ID):
- BASE_URL = "http://127.0.0.1:54345"
- API_KEY = "dc954750b89edd7e421639832bff3151"
-
- start_profile_url = f"{BASE_URL}/api/v1/browser/start?user_id={PROFILE_ID}&api_key={API_KEY}"
- response = requests.get(start_profile_url).json()
-
- if response["code"] == 0:
- print(f"Adspower 启动成功,返回信息")
- chrome_driver_path = response["data"]["webdriver"]
- debugger_address = response["data"]["ws"]["selenium"]
- print(chrome_driver_path)
- else:
- print(f"启动失败: {response['msg']}")
- exit(1)
-
- options = webdriver.ChromeOptions()
- options.add_experimental_option("debuggerAddress", debugger_address)
- web = webdriver.Chrome(service=Service("/Users/zangtuo/Library/Application Support/adspower_global/cwd_global/chrome_131/chromedriver.app/Contents/MacOS/chromedriver"),options=options)
- web.implicitly_wait(30)
-
- web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants")
- print("当前页面标题:", web.title)
- time.sleep(10)
- print("等待验证码识别")
-
-
-
-
-
-
-
-
-
-
-
-
-
- try:
- iframe = WebDriverWait(web, 10).until(
- EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']"))
- )
- except TimeoutException:
- print("未找到reCAPTCHA iframe")
- exit()
- while True:
- try:
-
- WebDriverWait(web, 10).until(
- lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != ''
- )
- print("reCAPTCHA验证成功")
- break
- except TimeoutException:
- print("验证未完成或失败")
- finally:
- web.switch_to.default_content()
- web.switch_to.default_content()
-
- web.find_element(By.XPATH, "//input[@type='text']").send_keys("测试数据")
-
- web.find_element(By.XPATH, "//div[3]/div/div/input").send_keys("测试数据")
-
- web.find_element(By.ID,"street1").send_keys("测试数据")
-
- web.find_element(By.ID, "city").send_keys("测试数据")
-
- Select(web.find_element(By.XPATH, "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(37)
-
- web.find_element(By.ID, "zip").send_keys("测试数据")
-
- web.find_element(By.XPATH, "//div[6]/div/div/input").send_keys("测试数据")
-
- web.find_element(By.ID, "email").send_keys("8888888888@gmail.com")
-
- web.find_element(By.XPATH, "//input[@type='checkbox']").click()
-
- web.find_element(By.XPATH, "//input[@value='$501-$1,000']").click()
-
- web.find_element(By.ID, "signature").send_keys("测试数据")
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']")))
- submit_button.click()
- time.sleep(3)
- web.quit()
- print("完成一次")
- def start_Ads_data(PROFILE_ID, data:list):
- BASE_URL = "http://127.0.0.1:54345"
- API_KEY = "dc954750b89edd7e421639832bff3151"
-
- start_profile_url = f"{BASE_URL}/api/v1/browser/start?user_id={PROFILE_ID}&api_key={API_KEY}"
- response = requests.get(start_profile_url).json()
-
- if response["code"] == 0:
- print(f"Adspower 启动成功,返回信息")
- chrome_driver_path = response["data"]["webdriver"]
- debugger_address = response["data"]["ws"]["selenium"]
- print(chrome_driver_path)
- else:
- print(f"启动失败: {response['msg']}")
- exit(1)
-
- options = webdriver.ChromeOptions()
- options.add_experimental_option("debuggerAddress", debugger_address)
- web = webdriver.Chrome(service=Service("/Users/zangtuo/Library/Application Support/adspower_global/cwd_global/chrome_131/chromedriver.app/Contents/MacOS/chromedriver"),options=options)
- web.implicitly_wait(30)
-
- web.get("https://claimform.savingsclubsettlement.com/consumerb-claimants")
- print("当前页面标题:", web.title)
- time.sleep(10)
- print("等待验证码识别")
- try:
- iframe = WebDriverWait(web, 10).until(
- EC.frame_to_be_available_and_switch_to_it((By.CSS_SELECTOR, "iframe[src*='google.com/recaptcha']"))
- )
- except TimeoutException:
- print("未找到reCAPTCHA iframe")
- exit()
- while True:
- try:
-
- WebDriverWait(web, 10).until(
- lambda d: d.find_element(By.ID, 'g-recaptcha-response').get_attribute('value') != ''
- )
- print("reCAPTCHA验证成功")
- break
- except TimeoutException:
- print("验证未完成或失败")
- finally:
- web.switch_to.default_content()
- web.switch_to.default_content()
-
- web.find_element(By.XPATH, "//input[@type='text']").send_keys(data[1])
-
-
-
- web.find_element(By.ID,"street1").send_keys(data[2])
-
- web.find_element(By.ID, "city").send_keys(data[3])
-
-
- Select(web.find_element(By.XPATH, "(.//*[normalize-space(text()) and normalize-space(.)='*'])[5]/preceding::select[1]")).select_by_index(37)
-
- web.find_element(By.ID, "zip").send_keys(data[5])
-
-
- web.find_element(By.ID, "email").send_keys(data[0])
-
- web.find_element(By.XPATH, "//input[@type='checkbox']").click()
-
- web.find_element(By.XPATH, "//input[@value='$501-$1,000']").click()
-
- web.find_element(By.ID, "signature").send_keys(data[1])
- submit_button = WebDriverWait(web, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']")))
- submit_button.click()
- time.sleep(3)
- web.quit()
- time.sleep(3)
- print("完成一次")
- def read_data(path):
- records = []
- with open(path, 'r', encoding='utf-8') as f:
- lines = [line.strip() for line in f if line.strip()]
- for i in range(0, len(lines), 2):
- if i + 1 >= len(lines):
- break
- email_line = lines[i]
- data_line = lines[i + 1]
-
- email = email_line.replace('\t', '').strip()
-
- fields = data_line.split('\t')
- fields = [f.strip() for f in fields if f.strip()]
-
- if len(fields) < 5:
- print(f"跳过字段不足的行: {data_line}")
- continue
-
- name_parts = fields[:len(fields) - 5]
- name = ' '.join(name_parts)
- try:
- address = fields[-5]
- city = fields[-4]
- state = fields[-3]
- zip_code = fields[-2]
- phone = fields[-1]
- except IndexError as e:
- print(f"字段提取错误: {e}, 行内容: {data_line}")
- continue
- records.append({
- 'email': email,
- 'name': name,
- 'address': address,
- 'city': city,
- 'state': state,
- 'zip': zip_code,
- 'phone': phone
- })
- return records
- def read_data(path):
- records = []
- with open(path, 'r', encoding='utf-8') as f:
- lines = [line.strip() for line in f if line.strip()]
- for i in range(0, len(lines), 2):
- if i + 1 >= len(lines):
- break
- email_line = lines[i]
- data_line = lines[i + 1]
-
- email = email_line.replace('\t', '').strip()
-
- fields = data_line.split('\t')
- fields = [f.strip() for f in fields if f.strip()]
-
- if len(fields) < 5:
- print(f"跳过字段不足的行: {data_line}")
- continue
-
- name_parts = fields[:len(fields) - 5]
- name = ' '.join(name_parts)
- try:
- address = fields[-5]
- city = fields[-4]
- state = fields[-3]
- zip_code = fields[-2]
- phone = fields[-1]
- except IndexError as e:
- print(f"字段提取错误: {e}, 行内容: {data_line}")
- continue
- records.append({
- 'email': email,
- 'name': name,
- 'address': address,
- 'city': city,
- 'state': state,
- 'zip': zip_code,
- 'phone': phone
- })
- return records
- def read_txt_file(file_path):
- """
- 读取并解析制表符分隔的文本文件
- :param file_path: 文件路径(例如:data.txt)
- :return: 包含所有记录的列表,每个记录是字段列表
- """
- data = []
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- for line in file:
-
- cleaned_line = line.strip()
- if not cleaned_line:
- continue
- fields = cleaned_line.split('\t')
-
- if len(fields) != 6:
- print(f"警告:第 {len(data) + 1} 行字段数量异常: {len(fields)}")
- continue
- data.append(fields)
- except FileNotFoundError:
- print(f"错误:文件 {file_path} 未找到")
- return None
- except Exception as e:
- print(f"读取文件时发生错误: {str(e)}")
- return None
- return data
- def cont():
- datas = read_txt_file("generated_data.txt")
- for data in datas:
- start_Ads_data('kvavcrf', data)
- print("进入下次循环")
- return 0
- if __name__ == '__main__':
- cont()
|