唐宋元明清 唐宋元明清
首页
  • 基础

    • Java基础
  • 工具

    • hutool
    • commons
  • 框架

    • Spring Boot相关
  • 设计模式

    • 设计模式入门
  • Hadoop

    • Hadoop分布式搭建
    • Hadoop高可用搭建
    • 集群端口
    • 代码demo
  • Zookeeper

    • Zookeeper集群搭建
  • Hive

    • Hive集群搭建
    • Hive相关
    • HSQL
  • Kafka

    • Kafka集群搭建
  • HBase

    • HBase集群搭建
    • HBase基础学习
  • Spark

    • Spark环境搭建
    • Spark相关知识
  • Flink

    • Flink环境搭建
    • Flink学习
  • Flume

    • Flume安装配置
    • Flume高可用集群安装
    • Flume相关学习
  • Sqoop

    • Sqoop安装配置
    • Sqoop使用
  • 其他

    • docker
  • Oracle

    • Oracle相关知识杂记
    • 系统函数篇
    • 与MySQL语法区别
  • MySQL

    • MySQL知识点
  • Python

    • Python简单语法
    • Python操作Office
    • Python类库学习
    • Python爬虫
  • Shell

    • Shell基础
    • Shell命令行
  • Scala

    • 语法学习
  • 正则表达式

    • 正则基础
  • 调度

    • 调度工具
  • 前端

    • 前端相关
  • 杂记

    • 常用工具或网站
    • 琐碎知识
  • 摘录

    • 摘录
GitHub (opens new window)
首页
  • 基础

    • Java基础
  • 工具

    • hutool
    • commons
  • 框架

    • Spring Boot相关
  • 设计模式

    • 设计模式入门
  • Hadoop

    • Hadoop分布式搭建
    • Hadoop高可用搭建
    • 集群端口
    • 代码demo
  • Zookeeper

    • Zookeeper集群搭建
  • Hive

    • Hive集群搭建
    • Hive相关
    • HSQL
  • Kafka

    • Kafka集群搭建
  • HBase

    • HBase集群搭建
    • HBase基础学习
  • Spark

    • Spark环境搭建
    • Spark相关知识
  • Flink

    • Flink环境搭建
    • Flink学习
  • Flume

    • Flume安装配置
    • Flume高可用集群安装
    • Flume相关学习
  • Sqoop

    • Sqoop安装配置
    • Sqoop使用
  • 其他

    • docker
  • Oracle

    • Oracle相关知识杂记
    • 系统函数篇
    • 与MySQL语法区别
  • MySQL

    • MySQL知识点
  • Python

    • Python简单语法
    • Python操作Office
    • Python类库学习
    • Python爬虫
  • Shell

    • Shell基础
    • Shell命令行
  • Scala

    • 语法学习
  • 正则表达式

    • 正则基础
  • 调度

    • 调度工具
  • 前端

    • 前端相关
  • 杂记

    • 常用工具或网站
    • 琐碎知识
  • 摘录

    • 摘录
GitHub (opens new window)
  • Python

    • Python简单语法学习
    • Python操作Office
    • Python类库学习
    • 自己写的python程序(减少工作量)
    • Python爬虫
    • Shell

      • Shell基础
      • Shell命令行
    • Scala

      • 语法学习
    • 正则表达式

      • 正则基础
    • 前端

      • 前端相关
    • 资料

      • 资料
    • 杂记

      • 常用工具或网站
      • 琐碎知识
      • 快捷键
      • FAQ
    • 摘录

      • 摘录
    • 其他
    • Python
    Ai
    2024-01-11
    目录

    Python爬虫

    # 琐碎代码


    # Url 编码解码
    # url 编码
    urllib.parse.quote()
    # url 解码
    urllib.parse.unquote()
    
    # 日期格式化

    特殊字符串转日期

    # pip install python-dateutil
    import datetime
    from dateutil import parser
    
    time_string = "Fri Jan 05 15:41:17 GMT+08:00 2024"
    dt_object = parser.parse(time_string)
    print(dt_object.strftime("%Y-%m-%d %H:%M:%S"))
    # 转换为 UTC 时间
    print(dt_object.astimezone(tz=datetime.timezone.utc))
    # 或者如果你想转换到本地时区(假设你的系统配置了正确的本地时区)
    print(dt_object.astimezone())
    

    日期转带时区的字符串

    from datetime import datetime, timezone
    import pytz
    
    # 获取当前时间(假设我们使用UTC时区作为示例,实际中你可能需要根据你的位置获取正确的时区)
    now_utc = datetime.now(timezone.utc)
    
    # 将UTC时间转换为北京时间(GMT+08:00)
    # 注意:这里我们使用了pytz库来获取北京时间的时区对象
    beijing_tz = pytz.timezone('Asia/Shanghai')
    now_beijing = now_utc.astimezone(beijing_tz)
    
    # 格式化时间,但注意strftime()不支持直接添加时区名称,所以我们手动添加
    # '%A' - 星期几的全名
    # '%b' - 月份的简写名
    # '%d' - 月中的一天(01至31)
    # '%Y' - 年份
    # '%H:%M:%S' - 时:分:秒
    # 然后我们手动添加 'GMT+0800 (中国标准时间)'
    formatted_time = now_beijing.strftime("%a %b %d %Y %H:%M:%S") + " GMT+0800 (中国标准时间)"
    
    print("当前时间(格式化后):", formatted_time)
    
    # json 转字符串乱码解决
    # 确保输出的结果中不包含非ASCII字符
    json.dumps(response["result"], ensure_ascii=False)
    
    # 使用 lxml 解析数据
    data="""
    <datastore>
    <recordset>
    <record><![CDATA[
    <div class="fyxx_lsbox">	<div class="btqz"></div>	<div class="fyxx_lsbox_r">		<a title="xxxxxxxxxxxxxxxxxxxxxx" target="_blank" href="/art/2024/1/12/art_1229682709_457018.html">xxxxxxxxxxxxxxxxxxxxxx</a> <span>2024-01-12</span> </div></div>]]></record>
    </recordset>
    </datastore>
    """
    
    # 方法一:工具直接解析(上述数据不太符合,但写法应如此)
    from xml.etree import ElementTree as ET
    root = ET.fromstring(data)
    for element in root.iter():
       # 简单地访问.text属性,因为在大多数情况下解析器已经处理好CDATA了
       if element.text:
          print(element.text)
    
    # 方法二:字符串混合xpath处理
    datastore = etree.HTML(data)
    for row in datastore.xpath('//record'):
       record = etree.tostring(row).replace(r'<record>', '').replace(r']]&gt;</record>', '')
       record1 = etree.HTML(record)
       print etree.tostring(record1)
    """
    

    # Requests


    # requests 库请求结果显示中文
    # 设置编码
    response.encoding='utf-8'
    print(response.text)
    
    # requests 库携带文件对象参数
    response = requests.post(
        url='http://xxx/captcha-ocr-yn/captcha',
        files={'captcha': open(img_path, 'rb')},
        data={'question': '请依次点击【工,确,海】'}
    )
    

    # Scrapy


    # 程序中主动退出爬虫
    # scrapy框架中手动退出爬虫程序
    self.crawler.engine.close_spider(self, '验证码尝试三次不正确,退出爬虫')
    
    # 代码中执行单个爬虫
    from scrapy import cmdline
    cmdline.execute("scrapy crawl xxx -o ./data/xxx.csv -s LOG_FILE=./logs/xxx.log".split())
    
    # 命令行中传参

    传入参数

    cmdline.execute("scrapy crawl jianyuSearch -a searchWord=xxx -a page=10".split())
    

    爬虫中接收参数(Spider中增加)

    def __init__(self, searchWord='', page=1,*args, **kwargs):
      super(xxxSpider, self).__init__(*args, **kwargs)
      self.searchWord = searchWord
      self.page = page
    
    # 两个请求见传递参数
    # 第一个请求携带参数
    yield Request(url=url, callback=self.parse, meta={"para1": xxx})
    
    # parse 回调函数中获取参数
    response.meta['url']
    
    # 发送请求并获取请求头中的Cookie
    response = requests.get(url, headers=headers)
    self.cookies = requests.utils.dict_from_cookiejar(response.cookies)
    
    # PipeLine 存储 Item 数据到 Excel
    class spiderNamePipeLine(object):
        def open_spider(self,spider):
            if spider.name in ['spiderName']:
                self.field_list = [field_name for field_name, field_meta in spiderNameItems.fields.items()]
                self.wb = openpyxl.Workbook()
                self.ws = self.wb.active
                self.ws.append(self.field_list)
    
        def close_spider(self,spider):
            if spider.name in ['spiderName']:
                self.wb.save("test.xlsx")
    
        def process_item(self, item, spider):
            if spider.name in ['spiderName']:
                self.ws.append([item.get(field_name) for field_name in self.field_list])
                return item
    
    # 中间件 from_crawler 方法中各个阶段执行前绑定函数
    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        # 绑定 spider_closed 信号的处理方法
        crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
        return s
    
    # 中间件(或spider)中重新发送请求
    from scrapy.downloadermiddlewares.retry import get_retry_request
    
    return get_retry_request(request=request, spider=spider, reason='xxx')   # spider = self
    
    # 中间件中处理异常
    # 抛出异常
    raise IgnoreRequest("Filtered request: {}".format(request))
    raise ValueError('403 response')
    
    # 处理异常
    def process_exception(self, request, exception, spider):
        if isinstance(exception, IgnoreRequest):
            # 处理 IgnoreRequest 异常,例如记录日志、发送通知等
            # 返回 None 表示异常已经被处理
            return None
        else:
            # 重发请求
            request.meta['dont_filter'] = True
            request.meta['max_retry_times'] = 4   # 增加重试次数
            return get_retry_request(request=request, spider=spider, reason='process_exception')
    
    # ERROR:使用Rquests发请求可以,但是使用Scrapy不行
    1. 网站有请求头 Content-Length 的校验:Scrapy如果在header写死,发请求时会再增加一个 Content-Length(而不是替换原来的,Request应该是直接替换原来的),导致请求异常(400 Bad Request)。
    2. 能不能用 headers["Content-Length"] = len(urllib.parse.urlencode(params).encode('utf-8')) 计算请求长度???

    # Selenium


    # Selenium Chrome 最新驱动下载地址

    Chrome for Testing (opens new window)

    # Selenium 去掉爬虫标识(与之前不同)
    options = webdriver.ChromeOptions()
    options.add_argument("--disable-blink-features=AutomationControlled")  # 将window.navigator.webdriver设置为false
    browser = webdriver.Chrome(options=options)
    
    
    #=============== 之前是这样,没好使? =====================
    options = webdriver.ChromeOptions()
    # selenium 不显示正在被测试软件控制
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option('useAutomationExtension', False)
    browser = webdriver.Chrome(options=options)
    # 防止通过变量识别到 selenium
    browser.execute_cdp_cmd(
        'Page.addScriptToEvaluateOnNewDocument',
        {'source': 'Object.defineProperty(navigator, "webdriver, {get: () => undefined})'}
    )
    
    # Selenium 设置请求头
    browser.execute_cdp_cmd("Network.enable", {})
    browser.execute_cdp_cmd("Network.setExtraHTTPHeaders", {"headers": {"User-Agent": "browser1"}})
    
    # Selenium 设置页面超时时间
    # 设置页面加载和js加载超时时间,超时立即报错,如下设置超时时间为x秒
    browser.set_page_load_timeout(10)
    browser.set_script_timeout(10)
    
    # Selenium 在 Linux(Centos7) 上使用
    # 1. 在普通用户没有问题
    # 2. 在 root 用户一直报错 selenium.common.exceptions.WebDriverException: Message: unknown error: DevToolsActivePort file doesn't exist
    #    1. headless 以下办法解决
    options.add_argument('--no-sandbox')  # fix:DevToolsActivePort file doesn't exist
    options.add_argument('--disable-gpu')  # fix:DevToolsActivePort file doesn't exist
    options.add_argument('--disable-dev-shm-usage')  # fix:DevToolsActivePort file doesn't exist
    options.add_argument('--remote-debugging-port=9222')  # fix:DevToolsActivePort file doesn't exist
    
    # 注:(卡一整天的问题)
    # 百度 linux 下 chrome 打开报错,有给出以下解决办法的。千万不要修改!!!要不 Selenium 执行报错!!!
    # 将`/usr/bin/google-chrome` 中 `exec -a "$0" "$HERE/chrome" "$@"` 修改为 `exec -a "$0" "$HERE/chrome" "$@" --user-data-dir --no-sandbox`
    
    # Selenium 获取请求头
    # pip install selenium-wire
    
    from seleniumwire import webdriver
    
    driver = webdriver.Chrome()
    driver.get('https://www.baidu.com')
    for request in driver.requests:
        print('请求headers:')
        print(request.headers)
        # print(request.response.headers)
        break
    driver.quit()
    
    # Selenium 获取Cookies
    driver.get_cookies()
    
    # Selenium 获取 jQuery 中 localStorage

    localStorage:在HTML5中,新加入的一个l特性,这个特性主要是用来作为本地存储来使用的,解决了cookie存储空间不足的问题,localStorage中一般浏览器支持的是5M大小,这个在不同的浏览器中localStorage会有所不同

    local_storage = driver.execute_script("return window.localStorage;")
    
    # 爬虫自测网站
    # 可以查看与正常访问相比有哪些参数不同,用于优化爬虫程序
    https://bot.sannysoft.com/
    

    # 爬虫梳理


    1. Session 验证:
      • 模拟登录后获取 Cookie 中,每次请求带上改 Cookie
    2. 数据字母验证码: OCR 识别,后面请求每次带上验证码(有些需加密)
    3. IP封禁: 购买动态IP
    4. User-Agent验证:修改请求UA
    5. 数据动态加载:
      • 模拟接口请求,获取json数据
      • Selenium 获取动态渲染后网站源码
    6. 数据加密(理论上在网站都能找到key)
      • base64 加密:无秘钥,直接解密
      • 使用aes加密秘钥,再用ras加密正文
    7. Content-Length 验证:有些网站会验证请求头中的 Content-Length,要保证有且只有一个正确的长度
    8. 通过 window.open('xxx'') 可获取跳转链接
    9. 通过Referer的反扒爬虫
    10. 针对 Selenium 的 find_element_by_* 方法的 click 事件监听
    • 第三方库:pyautogui 等

    分析思路

    使用浏览器 > 元素 > 事件监听器,排查执行的js 使用移动端链接 www -> m

    # 加密解密函数


    # ras 解密
    from Crypto.PublicKey import RSA
    from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
    import base64
    
    def rsa_decrypt(cipher_text, private_key):
        decode_text = base64.b64decode(cipher_text)
        cipher = Cipher_pkcs1_v1_5.new(RSA.importKey(private_key))  # 创建用于执行pkcs1_v1_5加密或解密的密码
        decrypt_text = cipher.decrypt(decode_text, b"rsa")
        return decrypt_text.decode("utf-8")
    
    # aes 解密(AES-CTR 模式,其他类似)
    from Crypto.Cipher import AES
    from Crypto.Util import Counter
    import base64
    
    def decrypt_aes_ctr(encrypted_data, key, iv):
       ciphertext = base64.b64decode(encrypted_data)
        key = str.encode(key)
        # 设置 AES-CTR 的计数器
        ctr = Counter.new(128, initial_value=int.from_bytes(iv, byteorder='big'))
        # 使用 AES-CTR 模式创建 AES 对象
        aes = AES.new(key, AES.MODE_CTR, counter=ctr)
        # 解密密文
        decrypted_data = aes.decrypt(ciphertext)
        # 将解密后的数据转换为文本
        decrypted_text = decrypted_data.decode('utf-8')
        return decrypted_text
    
    # aes 加密(AES-ECB 模式)
    from Crypto.Cipher import AES
    
    def aes_encode(data, key):
      global aes
      while len(data) % 16 != 0:  # 补足字符串长度为16的倍数
          data += (16 - len(data) % 16) * chr(16 - len(data) % 16)
          data = str.encode(data)
          aes = AES.new(str.encode(key), AES.MODE_ECB)  # 初始化加密器
      return str(base64.encodebytes(aes.encrypt(data)), encoding='utf8').replace('\n', '')  # 加密
    

    # 附:代码示例


    # 爬虫代码
    from xxx.items import gpItems
    from scrapy.spiders import CrawlSpider, Rule
    from scrapy.http import Request
    
    import logging
    import base64
    from Crypto.Cipher import AES
    
    class XXXSpider(CrawlSpider):
    	name = 'XXX'
    	allowed_domains = ['www.xxx.com']
    
    	# key:数据接口地址   value:source type_code 一次爬取的页面数
    	categoryCodeDict = {
    		'网址1{}': '描述1 ID1 1 3',  # 17页    15-31页
    		'网址2{}': '描述2 ID2 1 6',  # 179页
    	}
    	headers = {
    	}
    
    	def start_requests(self):
    		for key, value in self.categoryCodeDict.items():
    			start_page = int(value.split(' ')[2])
    			page = int(value.split(' ')[3])
    			for index in range(start_page, page):
    				url = key.format(str(index))
    				logging.info('爬取总页数:{}  页面链接:{}'.format(page - 1, url))
    				yield Request(url=url, headers=self.headers, method='POST', callback=self.parse, meta={'categoryCode': key, 'categoryText': value})
    
    	def parse(self, response):
    		categoryText1 = response.meta['categoryText'].split(' ')[0]
    		categoryText2 = response.meta['categoryText'].split(' ')[1]
    
    		rows = response.xpath('//ul[@class="article-list-a"]/li')
    		for row in rows:
    			i = gpItems()
    			i['publishTime'] = row.xpath('div[1]/div[@class="list-times"]/text()').extract_first().strip()
    			url = row.xpath('div[1]/a/@href').extract_first().strip()
    			ccc = url.split('/')[-1].split('.')[0]
    			ddd = self.aes_encode(ccc, '公钥编码').replace('/', '^').replace("==", "")
    			i['url'] = url.replace(ccc, ddd)
    			i['title'] = ''.join(row.xpath('div[1]/a/text()').extract()).strip()
    			ad_name = row.xpath('div[1]/a/label/text()').extract_first()
    			if ad_name:
    				ad_name = ad_name.strip().replace('【', '').replace('】', '')
    				i['ad_name'] = 'sss' if ad_name == 'bbb' else ad_name
    			else:
    				i['ad_name'] = 'xxx'
    			i['source'] = categoryText1
    			i['type_code'] = categoryText2
    			yield Request(i['url'], headers=self.headers, meta={'item': i}, callback=self.parse_item)
    
    	def parse_item(self, response):
    		i = response.meta['item']
    		i['htmlcontent'] = response.xpath('//div[@class="class1111"]').extract_first().strip()
    		return i
    
    	def aes_encode(self, data, key):
    		global aes
    		while len(data) % 16 != 0:  # 补足字符串长度为16的倍数
    			data += (16 - len(data) % 16) * chr(16 - len(data) % 16)
    			data = str.encode(data)
    			aes = AES.new(str.encode(key), AES.MODE_ECB)  # 初始化加密器
    		return str(base64.encodebytes(aes.encrypt(data)), encoding='utf8').replace('\n', '')  # 加密
    
    # 中间件使用动态代理IP测试
    class AutotestDownloaderMiddleware(object):
        delay_sec = [0.1, 0.2, 0.3, 0.4, 0.5]   # , 0.6, 0.7, 0.8, 0.9
        proxy_ip_dict = {}
        # 需要走代理的爬虫
        spiders_list = ['XXX']
    
        def spider_opened(self, spider):
            # 初始化代理池
            if spider.name in self.spiders_list:
                while len(self.proxy_ip_dict) < 6:
                    time.sleep(5)
                    self.get_proxy()
    
        @classmethod
        def from_crawler(cls, crawler):
            # This method is used by Scrapy to create your spiders.
            s = cls()
            crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
            return s
    
        def process_request(self, request, spider):
            if spider.name in self.spiders_list:
                # time.sleep(random.choice(self.delay_sec))
                proxy_ip = self.determine(int(time.time()*1000))
                print('request.meta[\'proxy\'] = {proxy_ip}'.format(proxy_ip=proxy_ip))
                print('proxy_ip_list:{}'.format(self.proxy_ip_dict))
                request.meta['proxy'] = proxy_ip
    
        def process_response(self, request, response, spider):
            return response
    
        def process_exception(self, request, exception, spider):
            pass
    
        def get_proxy(self, proxy_pop=''):
            curtime = int(time.time()*1000)
            api_url = '购买的动态IP接口'   # 返回值为 IP:PORT,可用时间(毫秒)
            response = requests.get(api_url).text.strip()
            proxy = 'http://' +response.split(',')[0]
            mill = int(response.split(',')[1])
            print('从api获取动态ip:{}'.format(response))
            # self.proxy_ip_list.append((proxy, curtime + mill))
            # 已经有,更新;没有,删除旧的,插入新的   proxy_pop为空字符串时初始化字典
            if proxy_pop == '' or self.proxy_ip_dict.get(proxy) is not None:
                pass
            else:
                print("==================删除ip======{}".format(proxy_pop))
                self.proxy_ip_dict.pop(proxy_pop)
            self.proxy_ip_dict[proxy] = curtime + mill - 2000 # 2s预留时间差
            # return (proxy, curtime + mill)
            return proxy
    
        def determine(self, curmill):
            # 获取ip
            item1 = random.choice(list(self.proxy_ip_dict.keys()))
            print('可用时间:{}  当前时间:{}'.format(self.proxy_ip_dict[item1], curmill))
            if self.proxy_ip_dict[item1] > curmill:
                # 若可用,返回
                # return tuple1[0]
                return item1
            else:
                print("================== ip:{}  过期时间:{}  当前时间:{} ======".format(item1, self.proxy_ip_dict[item1], int(time.time()*1000) ))
                # 若不可用,从列表移除  重新获取
                return self.get_proxy(item1)
    
    # 中间件使用Selenium发送请求 + 动态代理IP
    from selenium import webdriver
    from scrapy.downloadermiddlewares.retry import get_retry_request
    from scrapy.exceptions import IgnoreRequest
    import pymysql
    from scrapy.utils.project import get_project_settings
    class SeleniumDownloaderMiddleware(object):
        # 需要走代理的爬虫
        spiders_list = ['xxxx', 'xxxxxx']
        proxy_ip_dict = {}  # key:代理ip  value:最大可用毫秒数 使用次数 错误次数(selenium中获取不到代理IP,故同一时间只能使用一个代理IP)
        proxy = False    # 是否走代理
        db = None
        cursorSeq = None
    
        def __init__(self):
            self.browser = None
    
        def spider_opened(self, spider):
            if spider.name in self.spiders_list:
                self.browser = self.create_chrome_driver()  # 创建浏览器对象
                self.connect()  # 创建数据库连接
                if self.proxy:
                    while len(self.proxy_ip_dict) == 0:   # 每次只有一个IP
                        self.get_proxy()
    
        def spider_closed(self, spider):
            if spider.name in self.spiders_list:
                if self.browser:   # 关闭浏览器连接
                    self.browser.close()
                if self.cursorSeq or self.db:   # 关闭数据库连接
                    self.cursorSeq.close()
                    self.db.close()
    
        @classmethod
        def from_crawler(cls, crawler):
            # This method is used by Scrapy to create your spiders.
            s = cls()
            crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
            # 绑定 spider_closed 信号的处理方法
            crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
            return s
    
        def process_request(self, request, spider):
            if spider.name in self.spiders_list:
                # 库中存在就不访问
                if self.should_filter(request):
                    # 如果需要过滤,则抛出 IgnoreRequest 异常
                    raise IgnoreRequest("Filtered request: {}".format(request))
                if self.proxy:
                    self.preparatory_work(int(time.time()*1000))
                self.browser.get(request.url)
                # time.sleep(random.uniform(0.1, 0.5))
                print("=" * 100)
                print("#########{}".format(self.browser.current_url))
                if len(self.browser.page_source) < 200:  # 空请求体或IP超过最大请求量
                    # <html lang="zh-CN"><head><meta charset="UTF-8"><title>too many request</title></head><body>IP超过最大请求量,详情参考<a href="http://www.data5u.com/help/article-54.html">www.data5u.com</a></body></html>
                    # <html><head></head><body></body></html>
                    print("================= Empty response: {url} =============================".format(url=self.browser.current_url))
                    print(self.browser.page_source)
                    raise ValueError('Empty response')
                return HtmlResponse(url=request.url, body=self.browser.page_source, encoding="utf-8",
                                    request=request)  # 然后把这个response对象返回给爬虫  self.browser.current_url
    
        def process_response(self, request, response, spider):
            if spider.name in self.spiders_list:
                print("====================== process_response ====================================")
                print(response.status)
                if self.proxy:
                    # 使用 next() 函数获取第一个键值对
                    first_key, first_value = next(iter(self.proxy_ip_dict.items()))
                    item_info = self.proxy_ip_dict[first_key].split(' ')
                    item_info[1] = str(int(item_info[1]) + 1)  # 使用次数+1
                    if response.status != 200:
                        item_info[2] = str(int(item_info[2]) + 1)  # 错误次数+1
                    self.proxy_ip_dict[first_key] = ' '.join(item_info)
            # 则返回原始的响应对象
            return response
    
        def process_exception(self, request, exception, spider):
            if spider.name in self.spiders_list:
                print("====================== process_exception ====================================")
                print(request.url)
                print(exception)
                if isinstance(exception, IgnoreRequest):
                    # 处理 IgnoreRequest 异常,例如记录日志、发送通知等
                    # 返回 None 表示异常已经被处理
                    return None
                if self.proxy:
                    # 使用 next() 函数获取第一个键值对
                    first_key, first_value = next(iter(self.proxy_ip_dict.items()))
                    item_info = self.proxy_ip_dict[first_key].split(' ')
                    item_info[1] = str(int(item_info[1]) + 1)  # 使用次数+1
                    item_info[2] = str(int(item_info[2]) + 1)  # 错误次数+1
                    self.proxy_ip_dict[first_key] = ' '.join(item_info)
                request.meta['dont_filter'] = True    # 可以不加,retry函数中会修改
                request.meta['max_retry_times'] = 4   # 增加重试次数
                return get_retry_request(request=request, spider=spider, reason='process_exception')
            pass
    
        #=========================================== 自定义函数 =====================================
    
        def should_filter(self, request):
            # 在这里编写过滤逻辑,根据请求的某些特征来判断是否需要过滤该请求
            sqlexists = "select count(1) from crawler_t_src_spider a where a.url= '"+request.url+"'"
            self.cursorSeq.execute(sqlexists)
            num1 = self.cursorSeq.fetchone()
            num = num1[0]
            # 返回 True 表示需要过滤,返回 False 表示不需要过滤
            if num == 0:
                return False  # 示例中默认不过滤任何请求
            else:
                print("======================= 过滤请求 =={}=============================".format(request.url))
                return True
    
        def connect(self):
            settings = get_project_settings()
            self.db = pymysql.connect(
                host=settings['DB_HOST'],
                port=settings['DB_PORT'],
                user=settings['DB_USER'],
                password=settings['DB_PASSWROD'],
                db=settings['DB_NAME'],
                charset=settings['DB_CHARSET']
            )
            self.cursorSeq = self.db.cursor()
    
        def create_chrome_driver(self, headless=False, proxy_ip=None):
            options = webdriver.ChromeOptions()
            if headless:
                options.add_argument('--headless')
                options.add_argument('--no-sandbox')  # fix:DevToolsActivePort file doesn't exist
                options.add_argument('--disable-gpu')  # fix:DevToolsActivePort file doesn't exist
                options.add_argument('--disable-dev-shm-usage')  # fix:DevToolsActivePort file doesn't exist
                options.add_argument('--remote-debugging-port=9222')  # fix:DevToolsActivePort file doesn't exist
                # headless 模式请求头与正常不一致,需修改请求头
                options.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36')
            else:
                options.add_argument("--start-maximized")
            if proxy_ip: options.add_argument('--proxy-server={}'.format(proxy_ip))
    
            options.add_argument("--disable-blink-features=AutomationControlled")  # 将window.navigator.webdriver设置为false
            options.add_experimental_option("prefs", {"profile.mamaged_default_content_settings.images": 2})  # 不加载图片,加快访问速度
            # selenium 不显示正在被测试软件控制
            options.add_experimental_option('excludeSwitches', ['enable-automation'])
            options.add_experimental_option('useAutomationExtension', False)
            browser = webdriver.Chrome(options=options)
            # 防止通过变量识别到 selenium
            browser.execute_cdp_cmd(
                'Page.addScriptToEvaluateOnNewDocument',
                {'source': 'Object.defineProperty(navigator, "webdriver, {get: () => undefined})'}
            )
    
            # browser.execute_cdp_cmd("Network.enable", {})
            # browser.execute_cdp_cmd("Network.setExtraHTTPHeaders", {"headers": {"User-Agent": "browser1"}})
    
            # 设置页面加载和js加载超时时间,超时立即报错,如下设置超时时间为x秒
            browser.set_page_load_timeout(10)
            browser.set_script_timeout(10)
            time.sleep(2)
            print("=========== 创建浏览器对象,代理IP为{} ================".format(proxy_ip))
            return browser
    
        def get_proxy(self, proxy_pop=''):
            api_url = '代理商API接口'
            curtime, proxy, mill = None, None, None
            for i in range(3):
                if i > 0: time.sleep(3)
                curtime = int(time.time()*1000)
                response = requests.get(api_url).text.strip()
                proxy = 'http://' + response.split(',')[0]
                mill = int(response.split(',')[1])
                if self.proxy_ip_dict.get(proxy) is None: break   # 获取到新的,跳出循环
            # proxy_pop为空字符串时初始化字典
            if proxy_pop == '':
                self.proxy_ip_dict[proxy] = '{} 0 0'.format(curtime + mill)
                if self.browser:
                    self.browser.close()
                self.browser = self.create_chrome_driver(proxy_ip=proxy)
            # IP已经存在,跳过
            elif self.proxy_ip_dict.get(proxy) is not None:
                pass
            # 删除旧的,插入新的
            else:
                print("==================删除ip======{}".format(proxy_pop))
                self.proxy_ip_dict.pop(proxy_pop)
                self.proxy_ip_dict[proxy] = '{} 0 0'.format(curtime + mill)
                if self.browser:
                    self.browser.close()
                self.browser = self.create_chrome_driver(proxy_ip=proxy)
            return proxy
    
        def preparatory_work(self, curmill):
            proxy_ip = random.choice(list(self.proxy_ip_dict.keys()))
            item_info = self.proxy_ip_dict[proxy_ip].split(' ')
            print('可用时间(使用次数、错误次数):{}  当前时间:{}'.format(self.proxy_ip_dict[proxy_ip], curmill))
            if int(item_info[0]) > curmill and int(item_info[2]) == 0:  # and int(item_info[1]) < 10
                return proxy_ip  # 若可用,返回
            else:
                print("================== ip:{}  过期时间:{}  当前时间:{} ======".format(proxy_ip, item_info[0], int(time.time()*1000)))
                return self.get_proxy(proxy_ip)   # 若不可用,从列表移除  重新获取
    
    编辑 (opens new window)
    上次更新: 2024/12/26, 14:30:54
    自己写的python程序(减少工作量)
    Shell基础

    ← 自己写的python程序(减少工作量) Shell基础→

    Theme by Vdoing | Copyright © 2022-2025 Ai | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式
    ×