Python爬虫
# 琐碎代码
# Url 编码解码
# url 编码
urllib.parse.quote()
# url 解码
urllib.parse.unquote()
# 日期格式化
特殊字符串转日期
# pip install python-dateutil
import datetime
from dateutil import parser
time_string = "Fri Jan 05 15:41:17 GMT+08:00 2024"
dt_object = parser.parse(time_string)
print(dt_object.strftime("%Y-%m-%d %H:%M:%S"))
# 转换为 UTC 时间
print(dt_object.astimezone(tz=datetime.timezone.utc))
# 或者如果你想转换到本地时区(假设你的系统配置了正确的本地时区)
print(dt_object.astimezone())
日期转带时区的字符串
from datetime import datetime, timezone
import pytz
# 获取当前时间(假设我们使用UTC时区作为示例,实际中你可能需要根据你的位置获取正确的时区)
now_utc = datetime.now(timezone.utc)
# 将UTC时间转换为北京时间(GMT+08:00)
# 注意:这里我们使用了pytz库来获取北京时间的时区对象
beijing_tz = pytz.timezone('Asia/Shanghai')
now_beijing = now_utc.astimezone(beijing_tz)
# 格式化时间,但注意strftime()不支持直接添加时区名称,所以我们手动添加
# '%A' - 星期几的全名
# '%b' - 月份的简写名
# '%d' - 月中的一天(01至31)
# '%Y' - 年份
# '%H:%M:%S' - 时:分:秒
# 然后我们手动添加 'GMT+0800 (中国标准时间)'
formatted_time = now_beijing.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)"
print("当前时间(格式化后):", formatted_time)
# json 转字符串乱码解决
# 确保输出的结果中不包含非ASCII字符
json.dumps(response["result"], ensure_ascii=False)
# 使用 lxml 解析数据
data="""
<datastore>
<recordset>
<record><![CDATA[
<div class="fyxx_lsbox"> <div class="btqz"></div> <div class="fyxx_lsbox_r"> <a title="xxxxxxxxxxxxxxxxxxxxxx" target="_blank" href="/art/2024/1/12/art_1229682709_457018.html">xxxxxxxxxxxxxxxxxxxxxx</a> <span>2024-01-12</span> </div></div>]]></record>
</recordset>
</datastore>
"""
# 方法一:工具直接解析(上述数据不太符合,但写法应如此)
from xml.etree import ElementTree as ET
root = ET.fromstring(data)
for element in root.iter():
# 简单地访问.text属性,因为在大多数情况下解析器已经处理好CDATA了
if element.text:
print(element.text)
# 方法二:字符串混合xpath处理
datastore = etree.HTML(data)
for row in datastore.xpath('//record'):
record = etree.tostring(row).replace(r'<record>', '').replace(r']]></record>', '')
record1 = etree.HTML(record)
print etree.tostring(record1)
"""
# Requests
# requests 库请求结果显示中文
# 设置编码
response.encoding='utf-8'
print(response.text)
# requests 库携带文件对象参数
response = requests.post(
url='http://xxx/captcha-ocr-yn/captcha',
files={'captcha': open(img_path, 'rb')},
data={'question': '请依次点击【工,确,海】'}
)
# Scrapy
# 程序中主动退出爬虫
# scrapy框架中手动退出爬虫程序
self.crawler.engine.close_spider(self, '验证码尝试三次不正确,退出爬虫')
# 代码中执行单个爬虫
from scrapy import cmdline
cmdline.execute("scrapy crawl xxx -o ./data/xxx.csv -s LOG_FILE=./logs/xxx.log".split())
# 命令行中传参
传入参数
cmdline.execute("scrapy crawl jianyuSearch -a searchWord=xxx -a page=10".split())
爬虫中接收参数(Spider中增加)
def __init__(self, searchWord='', page=1,*args, **kwargs):
super(xxxSpider, self).__init__(*args, **kwargs)
self.searchWord = searchWord
self.page = page
# 两个请求见传递参数
# 第一个请求携带参数
yield Request(url=url, callback=self.parse, meta={"para1": xxx})
# parse 回调函数中获取参数
response.meta['url']
# 发送请求并获取请求头中的Cookie
response = requests.get(url, headers=headers)
self.cookies = requests.utils.dict_from_cookiejar(response.cookies)
# PipeLine 存储 Item 数据到 Excel
class spiderNamePipeLine(object):
def open_spider(self,spider):
if spider.name in ['spiderName']:
self.field_list = [field_name for field_name, field_meta in spiderNameItems.fields.items()]
self.wb = openpyxl.Workbook()
self.ws = self.wb.active
self.ws.append(self.field_list)
def close_spider(self,spider):
if spider.name in ['spiderName']:
self.wb.save("test.xlsx")
def process_item(self, item, spider):
if spider.name in ['spiderName']:
self.ws.append([item.get(field_name) for field_name in self.field_list])
return item
# 中间件 from_crawler 方法中各个阶段执行前绑定函数
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
# 绑定 spider_closed 信号的处理方法
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
return s
# 中间件(或spider)中重新发送请求
from scrapy.downloadermiddlewares.retry import get_retry_request
return get_retry_request(request=request, spider=spider, reason='xxx') # spider = self
# 中间件中处理异常
# 抛出异常
raise IgnoreRequest("Filtered request: {}".format(request))
raise ValueError('403 response')
# 处理异常
def process_exception(self, request, exception, spider):
if isinstance(exception, IgnoreRequest):
# 处理 IgnoreRequest 异常,例如记录日志、发送通知等
# 返回 None 表示异常已经被处理
return None
else:
# 重发请求
request.meta['dont_filter'] = True
request.meta['max_retry_times'] = 4 # 增加重试次数
return get_retry_request(request=request, spider=spider, reason='process_exception')
# ERROR:使用Rquests发请求可以,但是使用Scrapy不行
- 网站有请求头 Content-Length 的校验:Scrapy如果在header写死,发请求时会再增加一个 Content-Length(而不是替换原来的,Request应该是直接替换原来的),导致请求异常(400 Bad Request)。
- 能不能用
headers["Content-Length"] = len(urllib.parse.urlencode(params).encode('utf-8'))
计算请求长度???
# Selenium
# Selenium Chrome 最新驱动下载地址
Chrome for Testing (opens new window)
# Selenium 去掉爬虫标识(与之前不同)
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled") # 将window.navigator.webdriver设置为false
browser = webdriver.Chrome(options=options)
#=============== 之前是这样,没好使? =====================
options = webdriver.ChromeOptions()
# selenium 不显示正在被测试软件控制
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
browser = webdriver.Chrome(options=options)
# 防止通过变量识别到 selenium
browser.execute_cdp_cmd(
'Page.addScriptToEvaluateOnNewDocument',
{'source': 'Object.defineProperty(navigator, "webdriver, {get: () => undefined})'}
)
# Selenium 设置请求头
browser.execute_cdp_cmd("Network.enable", {})
browser.execute_cdp_cmd("Network.setExtraHTTPHeaders", {"headers": {"User-Agent": "browser1"}})
# Selenium 设置页面超时时间
# 设置页面加载和js加载超时时间,超时立即报错,如下设置超时时间为x秒
browser.set_page_load_timeout(10)
browser.set_script_timeout(10)
# Selenium 在 Linux(Centos7) 上使用
# 1. 在普通用户没有问题
# 2. 在 root 用户一直报错 selenium.common.exceptions.WebDriverException: Message: unknown error: DevToolsActivePort file doesn't exist
# 1. headless 以下办法解决
options.add_argument('--no-sandbox') # fix:DevToolsActivePort file doesn't exist
options.add_argument('--disable-gpu') # fix:DevToolsActivePort file doesn't exist
options.add_argument('--disable-dev-shm-usage') # fix:DevToolsActivePort file doesn't exist
options.add_argument('--remote-debugging-port=9222') # fix:DevToolsActivePort file doesn't exist
# 注:(卡一整天的问题)
# 百度 linux 下 chrome 打开报错,有给出以下解决办法的。千万不要修改!!!要不 Selenium 执行报错!!!
# 将`/usr/bin/google-chrome` 中 `exec -a "$0" "$HERE/chrome" "$@"` 修改为 `exec -a "$0" "$HERE/chrome" "$@" --user-data-dir --no-sandbox`
# Selenium 获取请求头
# pip install selenium-wire
from seleniumwire import webdriver
driver = webdriver.Chrome()
driver.get('https://www.baidu.com')
for request in driver.requests:
print('请求headers:')
print(request.headers)
# print(request.response.headers)
break
driver.quit()
# Selenium 获取Cookies
driver.get_cookies()
# Selenium 获取 jQuery 中 localStorage
localStorage:在HTML5中,新加入的一个l特性,这个特性主要是用来作为本地存储来使用的,解决了cookie存储空间不足的问题,localStorage中一般浏览器支持的是5M大小,这个在不同的浏览器中localStorage会有所不同
local_storage = driver.execute_script("return window.localStorage;")
# 爬虫自测网站
# 可以查看与正常访问相比有哪些参数不同,用于优化爬虫程序
https://bot.sannysoft.com/
# 爬虫梳理
- Session 验证:
- 模拟登录后获取 Cookie 中,每次请求带上改 Cookie
- 数据字母验证码: OCR 识别,后面请求每次带上验证码(有些需加密)
- IP封禁: 购买动态IP
- User-Agent验证:修改请求UA
- 数据动态加载:
- 模拟接口请求,获取json数据
- Selenium 获取动态渲染后网站源码
- 数据加密(理论上在网站都能找到key)
- base64 加密:无秘钥,直接解密
- 使用aes加密秘钥,再用ras加密正文
- Content-Length 验证:有些网站会验证请求头中的 Content-Length,要保证有且只有一个正确的长度
- 通过 window.open('xxx'') 可获取跳转链接
- 通过Referer的反扒爬虫
- 针对 Selenium 的 find_element_by_* 方法的 click 事件监听
- 第三方库:pyautogui 等
分析思路
使用浏览器 > 元素 > 事件监听器,排查执行的js 使用移动端链接 www -> m
# 加密解密函数
# ras 解密
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
import base64
def rsa_decrypt(cipher_text, private_key):
decode_text = base64.b64decode(cipher_text)
cipher = Cipher_pkcs1_v1_5.new(RSA.importKey(private_key)) # 创建用于执行pkcs1_v1_5加密或解密的密码
decrypt_text = cipher.decrypt(decode_text, b"rsa")
return decrypt_text.decode("utf-8")
# aes 解密(AES-CTR 模式,其他类似)
from Crypto.Cipher import AES
from Crypto.Util import Counter
import base64
def decrypt_aes_ctr(encrypted_data, key, iv):
ciphertext = base64.b64decode(encrypted_data)
key = str.encode(key)
# 设置 AES-CTR 的计数器
ctr = Counter.new(128, initial_value=int.from_bytes(iv, byteorder='big'))
# 使用 AES-CTR 模式创建 AES 对象
aes = AES.new(key, AES.MODE_CTR, counter=ctr)
# 解密密文
decrypted_data = aes.decrypt(ciphertext)
# 将解密后的数据转换为文本
decrypted_text = decrypted_data.decode('utf-8')
return decrypted_text
# aes 加密(AES-ECB 模式)
from Crypto.Cipher import AES
def aes_encode(data, key):
global aes
while len(data) % 16 != 0: # 补足字符串长度为16的倍数
data += (16 - len(data) % 16) * chr(16 - len(data) % 16)
data = str.encode(data)
aes = AES.new(str.encode(key), AES.MODE_ECB) # 初始化加密器
return str(base64.encodebytes(aes.encrypt(data)), encoding='utf8').replace('\n', '') # 加密
# 附:代码示例
# 爬虫代码
from xxx.items import gpItems
from scrapy.spiders import CrawlSpider, Rule
from scrapy.http import Request
import logging
import base64
from Crypto.Cipher import AES
class XXXSpider(CrawlSpider):
name = 'XXX'
allowed_domains = ['www.xxx.com']
# key:数据接口地址 value:source type_code 一次爬取的页面数
categoryCodeDict = {
'网址1{}': '描述1 ID1 1 3', # 17页 15-31页
'网址2{}': '描述2 ID2 1 6', # 179页
}
headers = {
}
def start_requests(self):
for key, value in self.categoryCodeDict.items():
start_page = int(value.split(' ')[2])
page = int(value.split(' ')[3])
for index in range(start_page, page):
url = key.format(str(index))
logging.info('爬取总页数:{} 页面链接:{}'.format(page - 1, url))
yield Request(url=url, headers=self.headers, method='POST', callback=self.parse, meta={'categoryCode': key, 'categoryText': value})
def parse(self, response):
categoryText1 = response.meta['categoryText'].split(' ')[0]
categoryText2 = response.meta['categoryText'].split(' ')[1]
rows = response.xpath('//ul[@class="article-list-a"]/li')
for row in rows:
i = gpItems()
i['publishTime'] = row.xpath('div[1]/div[@class="list-times"]/text()').extract_first().strip()
url = row.xpath('div[1]/a/@href').extract_first().strip()
ccc = url.split('/')[-1].split('.')[0]
ddd = self.aes_encode(ccc, '公钥编码').replace('/', '^').replace("==", "")
i['url'] = url.replace(ccc, ddd)
i['title'] = ''.join(row.xpath('div[1]/a/text()').extract()).strip()
ad_name = row.xpath('div[1]/a/label/text()').extract_first()
if ad_name:
ad_name = ad_name.strip().replace('【', '').replace('】', '')
i['ad_name'] = 'sss' if ad_name == 'bbb' else ad_name
else:
i['ad_name'] = 'xxx'
i['source'] = categoryText1
i['type_code'] = categoryText2
yield Request(i['url'], headers=self.headers, meta={'item': i}, callback=self.parse_item)
def parse_item(self, response):
i = response.meta['item']
i['htmlcontent'] = response.xpath('//div[@class="class1111"]').extract_first().strip()
return i
def aes_encode(self, data, key):
global aes
while len(data) % 16 != 0: # 补足字符串长度为16的倍数
data += (16 - len(data) % 16) * chr(16 - len(data) % 16)
data = str.encode(data)
aes = AES.new(str.encode(key), AES.MODE_ECB) # 初始化加密器
return str(base64.encodebytes(aes.encrypt(data)), encoding='utf8').replace('\n', '') # 加密
# 中间件使用动态代理IP测试
class AutotestDownloaderMiddleware(object):
delay_sec = [0.1, 0.2, 0.3, 0.4, 0.5] # , 0.6, 0.7, 0.8, 0.9
proxy_ip_dict = {}
# 需要走代理的爬虫
spiders_list = ['XXX']
def spider_opened(self, spider):
# 初始化代理池
if spider.name in self.spiders_list:
while len(self.proxy_ip_dict) < 6:
time.sleep(5)
self.get_proxy()
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_request(self, request, spider):
if spider.name in self.spiders_list:
# time.sleep(random.choice(self.delay_sec))
proxy_ip = self.determine(int(time.time()*1000))
print('request.meta[\'proxy\'] = {proxy_ip}'.format(proxy_ip=proxy_ip))
print('proxy_ip_list:{}'.format(self.proxy_ip_dict))
request.meta['proxy'] = proxy_ip
def process_response(self, request, response, spider):
return response
def process_exception(self, request, exception, spider):
pass
def get_proxy(self, proxy_pop=''):
curtime = int(time.time()*1000)
api_url = '购买的动态IP接口' # 返回值为 IP:PORT,可用时间(毫秒)
response = requests.get(api_url).text.strip()
proxy = 'http://' +response.split(',')[0]
mill = int(response.split(',')[1])
print('从api获取动态ip:{}'.format(response))
# self.proxy_ip_list.append((proxy, curtime + mill))
# 已经有,更新;没有,删除旧的,插入新的 proxy_pop为空字符串时初始化字典
if proxy_pop == '' or self.proxy_ip_dict.get(proxy) is not None:
pass
else:
print("==================删除ip======{}".format(proxy_pop))
self.proxy_ip_dict.pop(proxy_pop)
self.proxy_ip_dict[proxy] = curtime + mill - 2000 # 2s预留时间差
# return (proxy, curtime + mill)
return proxy
def determine(self, curmill):
# 获取ip
item1 = random.choice(list(self.proxy_ip_dict.keys()))
print('可用时间:{} 当前时间:{}'.format(self.proxy_ip_dict[item1], curmill))
if self.proxy_ip_dict[item1] > curmill:
# 若可用,返回
# return tuple1[0]
return item1
else:
print("================== ip:{} 过期时间:{} 当前时间:{} ======".format(item1, self.proxy_ip_dict[item1], int(time.time()*1000) ))
# 若不可用,从列表移除 重新获取
return self.get_proxy(item1)
# 中间件使用Selenium发送请求 + 动态代理IP
from selenium import webdriver
from scrapy.downloadermiddlewares.retry import get_retry_request
from scrapy.exceptions import IgnoreRequest
import pymysql
from scrapy.utils.project import get_project_settings
class SeleniumDownloaderMiddleware(object):
# 需要走代理的爬虫
spiders_list = ['xxxx', 'xxxxxx']
proxy_ip_dict = {} # key:代理ip value:最大可用毫秒数 使用次数 错误次数(selenium中获取不到代理IP,故同一时间只能使用一个代理IP)
proxy = False # 是否走代理
db = None
cursorSeq = None
def __init__(self):
self.browser = None
def spider_opened(self, spider):
if spider.name in self.spiders_list:
self.browser = self.create_chrome_driver() # 创建浏览器对象
self.connect() # 创建数据库连接
if self.proxy:
while len(self.proxy_ip_dict) == 0: # 每次只有一个IP
self.get_proxy()
def spider_closed(self, spider):
if spider.name in self.spiders_list:
if self.browser: # 关闭浏览器连接
self.browser.close()
if self.cursorSeq or self.db: # 关闭数据库连接
self.cursorSeq.close()
self.db.close()
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
# 绑定 spider_closed 信号的处理方法
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
return s
def process_request(self, request, spider):
if spider.name in self.spiders_list:
# 库中存在就不访问
if self.should_filter(request):
# 如果需要过滤,则抛出 IgnoreRequest 异常
raise IgnoreRequest("Filtered request: {}".format(request))
if self.proxy:
self.preparatory_work(int(time.time()*1000))
self.browser.get(request.url)
# time.sleep(random.uniform(0.1, 0.5))
print("=" * 100)
print("#########{}".format(self.browser.current_url))
if len(self.browser.page_source) < 200: # 空请求体或IP超过最大请求量
# <html lang="zh-CN"><head><meta charset="UTF-8"><title>too many request</title></head><body>IP超过最大请求量,详情参考<a href="http://www.data5u.com/help/article-54.html">www.data5u.com</a></body></html>
# <html><head></head><body></body></html>
print("================= Empty response: {url} =============================".format(url=self.browser.current_url))
print(self.browser.page_source)
raise ValueError('Empty response')
return HtmlResponse(url=request.url, body=self.browser.page_source, encoding="utf-8",
request=request) # 然后把这个response对象返回给爬虫 self.browser.current_url
def process_response(self, request, response, spider):
if spider.name in self.spiders_list:
print("====================== process_response ====================================")
print(response.status)
if self.proxy:
# 使用 next() 函数获取第一个键值对
first_key, first_value = next(iter(self.proxy_ip_dict.items()))
item_info = self.proxy_ip_dict[first_key].split(' ')
item_info[1] = str(int(item_info[1]) + 1) # 使用次数+1
if response.status != 200:
item_info[2] = str(int(item_info[2]) + 1) # 错误次数+1
self.proxy_ip_dict[first_key] = ' '.join(item_info)
# 则返回原始的响应对象
return response
def process_exception(self, request, exception, spider):
if spider.name in self.spiders_list:
print("====================== process_exception ====================================")
print(request.url)
print(exception)
if isinstance(exception, IgnoreRequest):
# 处理 IgnoreRequest 异常,例如记录日志、发送通知等
# 返回 None 表示异常已经被处理
return None
if self.proxy:
# 使用 next() 函数获取第一个键值对
first_key, first_value = next(iter(self.proxy_ip_dict.items()))
item_info = self.proxy_ip_dict[first_key].split(' ')
item_info[1] = str(int(item_info[1]) + 1) # 使用次数+1
item_info[2] = str(int(item_info[2]) + 1) # 错误次数+1
self.proxy_ip_dict[first_key] = ' '.join(item_info)
request.meta['dont_filter'] = True # 可以不加,retry函数中会修改
request.meta['max_retry_times'] = 4 # 增加重试次数
return get_retry_request(request=request, spider=spider, reason='process_exception')
pass
#=========================================== 自定义函数 =====================================
def should_filter(self, request):
# 在这里编写过滤逻辑,根据请求的某些特征来判断是否需要过滤该请求
sqlexists = "select count(1) from crawler_t_src_spider a where a.url= '"+request.url+"'"
self.cursorSeq.execute(sqlexists)
num1 = self.cursorSeq.fetchone()
num = num1[0]
# 返回 True 表示需要过滤,返回 False 表示不需要过滤
if num == 0:
return False # 示例中默认不过滤任何请求
else:
print("======================= 过滤请求 =={}=============================".format(request.url))
return True
def connect(self):
settings = get_project_settings()
self.db = pymysql.connect(
host=settings['DB_HOST'],
port=settings['DB_PORT'],
user=settings['DB_USER'],
password=settings['DB_PASSWROD'],
db=settings['DB_NAME'],
charset=settings['DB_CHARSET']
)
self.cursorSeq = self.db.cursor()
def create_chrome_driver(self, headless=False, proxy_ip=None):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox') # fix:DevToolsActivePort file doesn't exist
options.add_argument('--disable-gpu') # fix:DevToolsActivePort file doesn't exist
options.add_argument('--disable-dev-shm-usage') # fix:DevToolsActivePort file doesn't exist
options.add_argument('--remote-debugging-port=9222') # fix:DevToolsActivePort file doesn't exist
# headless 模式请求头与正常不一致,需修改请求头
options.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36')
else:
options.add_argument("--start-maximized")
if proxy_ip: options.add_argument('--proxy-server={}'.format(proxy_ip))
options.add_argument("--disable-blink-features=AutomationControlled") # 将window.navigator.webdriver设置为false
options.add_experimental_option("prefs", {"profile.mamaged_default_content_settings.images": 2}) # 不加载图片,加快访问速度
# selenium 不显示正在被测试软件控制
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
browser = webdriver.Chrome(options=options)
# 防止通过变量识别到 selenium
browser.execute_cdp_cmd(
'Page.addScriptToEvaluateOnNewDocument',
{'source': 'Object.defineProperty(navigator, "webdriver, {get: () => undefined})'}
)
# browser.execute_cdp_cmd("Network.enable", {})
# browser.execute_cdp_cmd("Network.setExtraHTTPHeaders", {"headers": {"User-Agent": "browser1"}})
# 设置页面加载和js加载超时时间,超时立即报错,如下设置超时时间为x秒
browser.set_page_load_timeout(10)
browser.set_script_timeout(10)
time.sleep(2)
print("=========== 创建浏览器对象,代理IP为{} ================".format(proxy_ip))
return browser
def get_proxy(self, proxy_pop=''):
api_url = '代理商API接口'
curtime, proxy, mill = None, None, None
for i in range(3):
if i > 0: time.sleep(3)
curtime = int(time.time()*1000)
response = requests.get(api_url).text.strip()
proxy = 'http://' + response.split(',')[0]
mill = int(response.split(',')[1])
if self.proxy_ip_dict.get(proxy) is None: break # 获取到新的,跳出循环
# proxy_pop为空字符串时初始化字典
if proxy_pop == '':
self.proxy_ip_dict[proxy] = '{} 0 0'.format(curtime + mill)
if self.browser:
self.browser.close()
self.browser = self.create_chrome_driver(proxy_ip=proxy)
# IP已经存在,跳过
elif self.proxy_ip_dict.get(proxy) is not None:
pass
# 删除旧的,插入新的
else:
print("==================删除ip======{}".format(proxy_pop))
self.proxy_ip_dict.pop(proxy_pop)
self.proxy_ip_dict[proxy] = '{} 0 0'.format(curtime + mill)
if self.browser:
self.browser.close()
self.browser = self.create_chrome_driver(proxy_ip=proxy)
return proxy
def preparatory_work(self, curmill):
proxy_ip = random.choice(list(self.proxy_ip_dict.keys()))
item_info = self.proxy_ip_dict[proxy_ip].split(' ')
print('可用时间(使用次数、错误次数):{} 当前时间:{}'.format(self.proxy_ip_dict[proxy_ip], curmill))
if int(item_info[0]) > curmill and int(item_info[2]) == 0: # and int(item_info[1]) < 10
return proxy_ip # 若可用,返回
else:
print("================== ip:{} 过期时间:{} 当前时间:{} ======".format(proxy_ip, item_info[0], int(time.time()*1000)))
return self.get_proxy(proxy_ip) # 若不可用,从列表移除 重新获取
编辑 (opens new window)
上次更新: 2024/12/26, 14:30:54