python爬虫
前言
之前突发奇想看过一些文章和视频来学习爬虫,但是都是囫囵吞枣没有深入实践和总结,这次借着js逆向爬虫这个课程,来系统学习并总结一下自己学的python爬虫吧。在这里记录下自己的学习笔记和心得,方便自己日后查阅,本文部分内容来自课程老师的笔记。根据自己的学习习惯本文采取的是实用主义,需要什么学什么。
正文
前置基础知识
能够学习到爬虫基本上默认你有一定的python基础,至少电脑有明白python的基础类型,对于任意一种语言有学习过流程控制,同时能够简单的实现print(),文件操作。那么这里只需要简单提及一下我们需要使用的python基础的数据结构和文件操作。
#pip安装依赖
pip install pyppeteer -i https://pypi.tuna.tsinghua.edu.cn/simple
# 第一个知识点,在python中,或者说所有语言的的数据类型中索引取值和切片取值都是一个很常见的知识点
s = "hello yuan"
print(s[6])
print(s[1:4])
#列表中可以存储任意类型的数据
#第二个知识点,编码
#- 计算机只能处理数字01,如果要处理文本,就必须先把文本转换为数字01二进制的形式,这种转换方式就称为字符编码。
#对于我们而言,你只需要简单记住下面几种编码就好:
# - ASCII编码:早期专门为英语语系编码,只有255个字符,每个字符需要8位也就是1个字节。不兼容汉字。
# - Unicode编码:又称万国码,国际组织制定的可以容纳世界上所有文字和符号的字符编码方案。用2个字节来表示汉字。
# - UTF-8编码:为了节省字节数,在Unicode的基础上进行优化的编码。用1个字节表示英文字符,3个字符表示汉字。天生兼容ASCII编码,所以最为流行。
# - GBK: 全称《汉字内码扩展规范》,向下与GB2312兼容,向上支持ISO10646.1国际标准,是前者向后者过渡过程中的一个承上启下的产物。windows中文版的汉字编码用的就是GBK。也非世界范围通用的编码
# - 其它编码:非以上类型者的统称。属于能不用就不要碰的编码。
#因为爬虫的特殊性质,我们会对文件,网页进行处理,所以编码这个是我们需要注意的一个点
#第三个知识点 数据结构
#列表中可以存储任意类型的数据,基础操作 增alist.append() 删alist.pop() 改按照下表直接赋值 查 print(s[1:4])
alist = [1,2,3,4,5]
items = [1,'bobo',12.34]
#字典的实现机制:Python的字典数据类型是基于hash散列算法实现的,采用键值对(key:value)的形式,根据key的值计算value的地址,具有非常快的查取和插入速 度,不是序列类型,因此没有索引下标的概念,更没有切片的说法。字典中无法存储重复的键值对
dict_1 = {'name':'bobo','age':18,'score':100,'age':18}
#注意:不要在字段中存储相同的key,value可以相同
dict_2 = {'name':'bobo','age':18,'age':20}
print(dict_2)
d = {'name':'bobo','age':20,"scores":[100,120,99]}
d['name'] = 'jay' #给存在的key修改对应的value值
d['address'] = 'Beijing' #给一个不存在的key赋值表示新增键值对
del d['age'] #删除age键值对
print(d)
#流程控制
if 表达式:
代码块 1
else:
代码块 2
while true:
循环体
for 迭代变量 in 字符串|列表|元组|字典|集合:
代码块
#文件操作
with open('name.type','w') as fp:
fp.write(page_text)
html
day前端无状态部分
当我们明白了上边的一些知识点后,我们就可以尝试写爬虫来尝试学习,正如主席说过,在战争中学习战争,我们通过不断的练习,来完善我们的知识点.引用路飞老师的笔记内容
- 什么是爬虫
- 就是通过编写程序,“模拟”浏览器上网,然后让其在互联网中“抓取”数据的过程。
- 模拟:浏览器本身就是一个纯天然的爬虫工具。爬虫相关的操作都是模拟/基于浏览器为基础开发实现出来的。
- 抓取:
- 一种是抓取一张页面中所有的数据
- 一种是抓取页面中局部的数据
- 提问:如果日后你的爬虫程序没有爬取到你想要的数据,why?
- 你的程序模拟浏览器的力度不够!
- 就是通过编写程序,“模拟”浏览器上网,然后让其在互联网中“抓取”数据的过程。
爬虫作为一种计算机技术就决定了它的中立性,因此爬虫本身在法律上并不被禁止,但是利用爬虫技术获取数据这一行为是具有违法甚至是犯罪的风险的。
requests操作
- requests是一个基于网络请求的模块。可以使用程序模拟浏览器上网。
我们爬虫的目的是为了获得数据,所以我们的流程就应该是:指定url---->发送请求--服务器-->返回相应数据包----持久化存储
我们直接对来一个案例来方便我们理解.
案例:东方财富首页数据爬取
-
https://www.eastmoney.com/
-
import requests #1.指定url main_url = 'https://www.eastmoney.com/' #2.发起请求: #get函数可以根据指定的url发起网络请求 #get函数会返回一个响应对象: response = requests.get(url=main_url) #3.获取响应数据 page_text = response.text #text是可以返回字符串形式的响应数据/爬取到的数据 #4.持久化存储 with open('dongfang.html','w') as fp: fp.write(page_text)
那么当你直接将上面的代码跑起来,你的编译软件可能会爆出一个错误
Traceback (most recent call last): File "c:\Users\amdma\Desktop\python\spider\dongfang.py", line 15, in <module> fp.write(page_text) UnicodeEncodeError: 'gbk' codec can't encode character '\ufeff' in position 0: illegal multibyte sequence
那么这就是一个典型的编码问题,根据gpt的解释
**默认编码:**你的系统或编辑器可能默认使用
gbk
编码(常见于简体中文Windows系统),但'\ufeff'
是一个 Unicode 字符,通常表示字节序标记(BOM,Byte Order Mark)。字符编码不匹配:你在写入文件时指定的编码与实际的文本编码不匹配。
当你使用下边的代码就能在dongfang.html就能正常显示了,这里添加了两个encoding,你可以自行调试删除其中一个的结果
import requests # 1. 指定url main_url = 'https://www.eastmoney.com/' # 2. 发起请求 response = requests.get(url=main_url) # 3. 设置响应编码为 'utf-8' response.encoding = 'utf-8' # 4. 获取响应数据 page_text = response.text # 5. 持久化存储,明确指定编码为 'utf-8' with open('dongfang.html', 'w', encoding='utf-8') as fp: fp.write(page_text)
案例:爬取51游戏中任何游戏对应的搜索结果页面数据
-
url:https://www.51.com/
-
import requests #1.指定url game_title = input('enter a game name:') params = { #字典是用于封装请求参数 'q':game_title } url = 'https://game.51.com/search/action/game/' #2.发起请求 #get是基于指定的url和携带了固定的请求参数进行请求发送 response = requests.get(url=url,params=params) response.encoding = 'utf-8' #3.获取响应数据 page_text = response.text #text表示获取字符串形式的响应数据 # print(page_text) #4.持久化存储 fileName = game_title + '.html' with open(fileName,'w',encoding = 'utf-8') as fp: fp.write(page_text)
这里引入一个字典params,用于封装请求参数.request能够直接识别,所以你的 URL就不需要包含了查询参数例如这样url='https://www.xiachufang.com/search/?keyword=key&cat=1001'这就是一个错误的写法requests 库会自动处理 params,
案例:中国人事考试网(UA检测)
-
url:http://www.cpta.com.cn/
-
这里你可以尝试着按照上边的逻辑写一个爬虫去观察一下反馈在继续.
- 爬虫模拟浏览器主要是模拟请求参数和主要的请求头。
- User-Agent:请求载体的身份标识。
- 使用浏览器发请求,则请求载体就是浏览器
- 使用爬虫程序发请求,则请求载体就是爬虫程序
- User-Agent:请求载体的身份标识。
- 反爬机制:UA检测
- 网站后台会检测请求的载体是不是浏览器,如果是则返回正常数据,不是则返回错误数据。
- 反反爬机制:UA伪装
- 将爬虫发起请求的User-Agent伪装成浏览器的身份。
- 爬虫模拟浏览器主要是模拟请求参数和主要的请求头。
-
import requests url = 'http://www.cpta.com.cn/' #User-Agent:请求载体(浏览器,爬虫程序)的身份表示 header = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' } #伪装了浏览器的请求头 response = requests.get(url=url,headers=header) response.encoding = 'utf-8' page_text = response.text with open('kaoshi.html','w',encoding = 'utf-8') as fp: fp.write(page_text)
-
中国人事考试网---站内搜索(post请求+请求参数)
-
response.encoding = 'utf-8' import requests url = 'http://www.cpta.com.cn/category/search' param = { "keywords": "人力资源", "搜 索": "搜 索" } header = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36' } #发起了post请求:通过data参数携带了请求参数 response = requests.post(url=url,data=param,headers=header) response.encoding = 'utf-8' page_text = response.text with open('renshi.html','w',encoding = 'utf-8') as fp: fp.write(page_text) #通过抓包工具定位了指定的数据包: #提取:url,请求方式,请求参数,请求头信息
-
小试牛刀
- url:https://www.xiachufang.com/
- 实现爬取下厨房网站中任意菜谱搜索结果数据爬取
import requests
import urllib
from urllib.parse import quote
#url='https://www.eastmoney.com/'
url='https://www.xiachufang.com/search/?keyword={key}&cat=1001'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36',
}
key=input("enter your want eat:")
#key='炒肉'
key = quote(key, 'utf-8')
print (url.format(key=key))
repose=requests.get(url= url.format(key=key),headers=headers)
text=repose.content
decoded_text = urllib.parse.unquote(text)
print(decoded_text)
with open('eastmoney.html','w',encoding='utf-8') as f:
f.write(decoded_text)
案例:智慧职教(动态加载数据爬取)
-
抓取智慧职教官网中的专业群板块下的所有数据
- url : https://www.icve.com.cn/portal_new/course/course.html
-
测试:直接使用浏览器地址栏中的url,进行请求发送查看是否可以爬取到电影详情数据?
- 不用写程序,基于抓包工具测试观察即可。
-
经过测试发现,我们爬取到的数据并没有包含电影详情数据,why?
-
动态加载数据:
- 在一个网页中看到的数据,并不一定是通过浏览器地址栏中的url发起请求请求到的。如果请求不到,一定是基于其他的请求请求到的数据。
- 动态加载数据值的就是:
- 不是直接通过浏览器地址栏的url请求到的数据,这些数据叫做动态加载数据。
- 如何获取动态加载数据?
- 确定动态加载的数据是基于哪一个数据包请求到的?
- 数据包数据的全局搜索:
- 点击抓包工具中任何一个数据包
- control+f进行全局搜索(弹出全局搜索框)
- 目的:定位动态加载数据是在哪一个数据包中
- 定位到动态加载数据对应的数据包,模拟该数据包进行请求发送即可:
- 从数据包中提取出:
- url
- 请求参数
- 从数据包中提取出:
注意:请求头中需要携带Referer。(体现模拟浏览器的力度)
-
import requests url = 'https://www.icve.com.cn/portal/course/getNewCourseInfo' data = { "kczy": "", "order": "", "printstate": "", "keyvalue": "" } headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', 'Referer':'https://www.icve.com.cn/portal_new/course/course.html' } response = requests.post(url=url,headers=headers,data=data) #json()可以直接将请求到的响应数据进行反序列化 page_text = response.json() #解析人名 for dic in page_text['list']: name = dic['TeacherDisplayname'] print(name)
案例:肯德基(POST请求、动态加载数据、UA检测)
-
http://www.kfc.com.cn/kfccda/storelist/index.aspx
-
将餐厅的位置信息进行数据爬取
-
爬取多页数据
-
import requests # POST请求的URL url = "http://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword" # 自定义请求头,模拟浏览器访问 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' } # 模拟多页请求 page_num = 1 keyword = input("请输入关键字(回车退出):") while True: # 构建POST请求的参数(你可能需要根据实际情况调整) data = { 'cname': '', 'pid': '', 'keyword': keyword, 'pageIndex': page_num, 'pageSize': 10, # 每页10条数据 } # 发送POST请求 response = requests.post(url, headers=headers, data=data) # 解析返回的JSON数据 stores = response.json()['Table1'] # 如果没有更多数据,退出循环 if not stores: break # 处理并保存数据 for store in stores: print(f"餐厅名: {store['storeName']}, 地址: {store['addressDetail']}, 城市: {store['cityName']}") # 下一页 page_num += 1 print("数据爬取完成")
-
数据解析
数据解析通用原理:
- 在一张页面源码中,想要爬取的数据是存在于相关的html的标签中。
- 可以将指定的标签进行定位,然后提取该标签中或者标签属性中存储的数据即可。
python中可以实现数据解析的技术:
- xpath(重要、常用和便捷)
- Bs4(自行了解学习)
- re正则
- pyquery(自行了解学习)
而我们本文主要使用的xpath来进行数据解析
- xpath解析的编码流程:
- 1.创建一个etree类型的对象,然后把即将被解析的页面源码数据加载到该对象中。
- 2.调用etree对象中的xpath函数,让其结合着不同形式的xpath表达式进行标签定位和数据提取。
关于xpath的使用我认为最好的是先尝试理解,可以看看八爪鱼xpath教程然后通过不断练习,就能有更清晰的认识
我们在爬虫时经常会遇到不同的数据格式,例如图片视频压缩包等,当我们请求他们的地址时,会给我们返回的是二进制数据,而我们只需要将他们写入文件并添加上后缀即可. data为我们的数据
file_name = 'img/'+img_name+'.jpg'
with open(file_name,'wb') as fp:
fp.write(date)
print(img_name,':下载保存成功!')
案例:碧血剑小说数据爬取
#https://bixuejian.5000yan.com/
#需求:将每一个章节的标题和内容进行爬取然后存储到文件中
import requests
from lxml import etree
headers={
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'
}
url="https://bixuejian.5000yan.com/"
response=requests.get(url,headers=headers)
response.encoding = 'utf-8'
tree = etree.HTML(response.text)
title_list=tree.xpath('//div[@class="p-2 my-2 bg-white rounded"]/ul/li')
with open('xiaoshuo.txt','w',encoding='utf-8') as fp:
for title in title_list:
tiltle_name=title.xpath('./a/text()')[0]
print(tiltle_name)
title_link=title.xpath('./a/@href')[0]
print(title_link)
artick=requests.get(url=title_link,headers=headers)
artick.encoding='utf-8'
#print(artick.text)
tree2=etree.HTML(artick.text)
content=tree2.xpath('/html/body/div[2]/div/div[1]/div[3]/div[4]/p//text()')
content = ''.join(content).strip()
#content.encode('utf-8')
print(content)
fp.write(tiltle_name + ':' + content+'\n')
print("章节下载完成")
案例:简历数据爬取
# 载当前页所有的建立模板
#url:https://sc.chinaz.com/jianli/free.html
# - 简历名称+简历的下载链接
# - 根据简历的下载链接 下载简历文件
# - 根据下载地址下载的压缩包,压缩包是二进制的数据
import requests
from lxml import etree
headers={
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.128 Safari/537.36 Edg/89.0.774.77',
}
for i in range(1,5):
if i==1:
url='https://sc.chinaz.com/jianli/free.html'
else:
url='https://sc.chinaz.com/jianli/free_%d.html'%i
print('正在爬取第%d页'%i)
# 解析数据
response=requests.get(url=url,headers=headers)
response.encoding='utf-8'
selector=etree.HTML(response.text)
# 获取简历名称和简历的下载链接
li_list=selector.xpath('//*[@id="container"]/div/p')
for li in li_list:
title=li.xpath('./a/text()')[0]
href=li.xpath('./a/@href')[0]
#print(title,href)
# print(li)
detail_html=requests.get(url=href,headers=headers)
# 下载简历文件
detail_html.encoding='utf-8'
selector2=etree.HTML(detail_html.text)
# 获取简历的下载链接
href2=selector2.xpath('//*[@id="down"]/div[2]/ul/li/a/@href')[0]
#print(title,href2)
# 下载简历文件
detail_html2=requests.get(url=href2,headers=headers).content
# 保存简历文件
file_name = 'jianli/'+title+'.rar'
with open(file_name,'wb') as fp:
fp.write(detail_html2)
print(title,':下载保存成功!')
案例:图片数据爬取
#url: http://pic.netbian.com/4kmeinv/
#图片数据爬取:
# - 将爬取到的图片存储到指定的文件夹中
# - 抓取详情页大图
# - 爬取多页
import requests
from lxml import etree
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36',
}
url='https://pic.netbian.com/4kfengjing/'
respones=requests.get(url,headers=header)
respones.encoding='gbk'
tree=etree.HTML(respones.text)
img_list=tree.xpath('//*[@id="main"]/div[3]/ul/li')
print(img_list)
for img in img_list:
img_url=img.xpath('.//@src')[0]
img_name=img.xpath('.//@alt')[0]
img_name=img_name.replace('*','x')
print(img_name)
img_url='https://pic.netbian.com/'+img_url
print(img_url)
date=requests.get(url=img_url,headers=header).content
file_name = 'img/'+img_name+'.jpg'
with open(file_name,'wb') as fp:
fp.write(date)
print(img_name,':下载保存成功!')
反爬虫思考
正如前文提到的如果日后你的爬虫程序没有爬取到你想要的数据,why?你的程序模拟浏览器的力度不够!
那么只要在你在浏览器能够看到的所有数据东西都是能够通过python爬取的,如果不行,那一定是你模拟浏览器的程度不够,一定是你目前有什么技术问题没有解决掉。所以我们就要看需要怎么加大模拟的力度,无论是增加请求的参数,使用新技术等,这里我们讲讲请求参数问题,面对爬虫的过程中需要那些数据来请求可以看看浏览器开发者工具中header中有哪些参数,当然我们并不需要全部写上去,我们可以控制变量调试,选择最少的参数

-
referer
referer是告诉服务器你是从哪个地方来,接下来这个案例如果你直接访问这个视频地址,显然是行不通的,因为你直接访问时是没有referer的,而你通过主页访问,带上了referer就可以,所以对应过来,你的请求有也需要带上referer\
案例:视频爬取
# url:https://www.51miz.com/shipin/
# 爬取当前url页面中营销日期下的几个视频数据。
import requests
from lxml import etree
url='https://www.51miz.com/shipin/'
headers={
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.128 Safari/537.36 Edg/89.0.774.77',
'referer':'https://www.51miz.com/shipin/'
}
response=requests.get(url,headers=headers)
response.encoding='utf-8'
tree=etree.HTML(response.text)
# 获取当前页面中营销日期下的视频数据
video_list=tree.xpath('/html/body/div[2]/div[2]/div[1]/div[2]/div[2]/div')
for video in video_list:
video_list2=video.xpath('./a')
#print(video_list2)
for one in video_list2:
title=one.xpath('div/div/div/div/text()')
url=one.xpath('.//source/@src')
url='https:'+url[0]
print(url)
data=requests.get(url=url,headers=headers)
with open(f'video/{title}.mp4','wb') as f:
f.write(data.content)
print(f'{title}下载成功')
-
cookie
案例:雪球数据爬取
这里我们引入session这个对象,它能够存储自动存储我们的cookie
import requests headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36', } param = { #如果遇到了动态变化的请求参数?必须经过测试才知道需不需要处理 "since_id": "-1", "max_id": "553059", #动态变化的请求参数 "size": "25" } url = 'https://xueqiu.com/statuses/hot/listV2.json' #session对象会实时保存跟踪服务器端给客户端创建的cookie #创建一个session对象 session = requests.Session() #空白的session对象 print(session.cookies) first_url = 'https://xueqiu.com/' #使用session对象进行请求发送:如果该次请求时,服务器端给客户端创建cookie的话,则该cookie就会被保存seesion对象中 session.get(url=first_url,headers=headers) #使用保存了cookie的session对象进行后续请求发送 ret = session.get(url=url,headers=headers,params=param).json() print(ret)
-
proxy代理
代理也是爬虫一个特别重要的工具,很多网站为了保护自己的服务器正常运行对于,一些异常的ip会采取一些临时封禁的策略.那么这个时候,我们如果没有别的ip那么爬虫任务不久失败了吗?我们可以在网上搜代理ip会发现很多,关于代理中的介绍可以看看快代理的产品介绍
#proxy参数接受的是一个字典,字典的key是协议类型,value是ip地址 dic['https'] = ips.strip() import requests from lxml import etree import random import time headers ={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.128 Safari/537.36', } proxy_url = '' page_text = requests.get(url=proxy_url,headers=headers).text proxy_list = [] #代理池 for ips in page_text.split('\n')[0:-1]: dic = {} dic['https'] = ips.strip() proxy_list.append(dic) url='http://www.cip.cc' response=requests.get(url,headers=headers) tree=etree.HTML(response.text) result=tree.xpath('/html/body/div/div/div[3]/pre/text()')[0] print(result.split('\n')[0]) #print(result) page_text = requests.get(url=proxy_url,headers=headers).text proxy_list = [] #代理池 for ips in page_text.split('\n')[0:-1]: dic = {} dic['https'] = ips.strip() proxy_list.append(dic) for page in range(1, 5001): print('正在爬取第%d页的ip数据......' % page) #生成不同页码对应的url url = 'https://www.kuaidaili.com/free/inha/%d/' % page page_text = requests.get(url=url, headers=headers,proxies=random.choice(proxy_list)).text time.sleep(0.5) tree = etree.HTML(page_text) ip = tree.xpath('//*[@id="list"]/div[1]/table/tbody/tr[1]/td[1]/text()')[0] print(ip)
并发编程
面对大量数据爬取过程中我们肯定是想尽可能提升自己的爬取效率,这里就会引入一些新的概念,包括线程,进程,并发,并行,同步,异步。这里我们简答解释方便理解就行,关于底层的原理我们不深入分析这个是操作系统这门课程会讲的。
进程&线程
计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。CPU的核数(多核计算机多个CPU,大部分情况下也只是用了一核CPU)
- 假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其他车间都必须停工。背后的含义就是,单核CPU一次只能运行一个任务(同一时刻只能干一件事)。
进程就相当于工厂里边的车间,他代表的是cpu所能处理的单个任务。那么在车间里边可以有很多的流水线或者说工人来协助完成一个任务。一个进程可以包括多个线程。
拓展:车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是被该进程下所有线程共享的,每个线程都可以使用这些共享内存。
进程调度
进程就是计算机中正在运行的一个程序或者软件,并且在上述工厂案例中,我们说单个CPU一次只能运行一个任务,那么你有没有在电脑上一边聊微信一边听音乐一边打游戏的场景啊?why?
要想多个进程交替运行,操作系统必须对这些进程进行调度,这个调度也不是随机进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。
- 目前已实现的调度算法有:先来先服务(FCFS)调度算法、短作业优先调度算法和时间片轮转法。不过被公认的一种比较好的进程调度算法是"时间片轮转法"。
这里不做深入解释,想要了解的可以再去单独查询资料
并行&并发
通过进程之间的调度,也就是进程之间的切换,我们用户感知到的好像是两个视频文件同时在播放,或者音乐和游戏同时在进行,那就让我们来看一下什么叫做并发和并行。
- **并行:**同时运行,只有具备多个cpu才能实现并行
- **并发:**是伪并行,即看起来是同时运行(时间片轮转法)。
无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真实干活的是cpu,而一个cpu同一时刻只能执行一个任务。
举例说明
你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。
你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发。
你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。
总结
并发的关键是你有处理多个任务的能力,不一定要同时。
并行的关键是你有同时处理多个任务的能力。
所以它们最关键的点就是:是否是『同时』。
进程的状态
在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行、阻塞和终止。
-
就绪(Ready)状态
- 进程已经准备好,已分配到所需资源/内存。
-
执行/运行(Running)状态
- 进程处于就绪状态被调度后,进程进入执行状态
-
阻塞(Blocked)状态(耗时操作)
- 正在执行的进程由于某些事件而暂时无法运行,进程受到阻塞,则进入就绪状态等待系统调用
- 网络请求,input等
- 正在执行的进程由于某些事件而暂时无法运行,进程受到阻塞,则进入就绪状态等待系统调用
-
终止状态
- 进程结束,或出现错误,或被系统终止,进入终止状态。无法再执行
同步&异步
-
同步:
- CPU在执行一个任务的时候,任务的每一个步骤是顺序执行的,并且必须是每前一个步骤执行完毕后才可以执行下一个步骤,这就是同步的含义。
-
异步:
- 异步是指,任务的所有步骤也是顺序被执行,但是与同步不同的是,异步的模式下,不会等前一个步骤执行完毕后才会执行下一个步骤,而是当一个步骤一旦被执行,无论该步骤是否被执行结束,都会马上执行下一个步骤。
-
案例理解:
- 以做饭为例:
- 同步方式就是按照步骤依次做,先烧水,然后煮饭,最后炒菜。只有前面的步骤完成后才能进行下一个步骤。在一个步骤未完成的情况下,你不可以干任何事情。
- 异步的方式就是,在烧水且水还没开的情况下,你可以去干其他事,比如刷手机、发邮件等。也就是在等待每个步骤完成的过程中,你可以干其他事,不必傻傻的等下去。
- 该案例中的,烧水、煮饭和炒菜都是一些耗时操作,可以被称为阻塞操作!
- 以做饭为例:
-
注意:同步和异步针对是cup在执行任务时遇到阻塞操作时,所产生的不同行为!
接下来就来到python当中具体的实现部分
Python进程的实现
multiprocessing包
multiprocess是python中管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块,提供的子模块非常多。
Process模块
Process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
之前我们说过,运行一个py文件就相当于启动了一个进程,这个进程我们成为**"主进程"**
而在主进程对应的py文件中,可以通过Process模块创建另一个进程,这个进程是基于主进程创建的,因此可以被称为**"子进程"**
当有了两个进程后,我们其实就可以实现异步机制了!
from multiprocessing import Process
def func():
print('我是绑定给子进程的一组任务!')
if __name__ == '__main__':
print('主进程开始执行!')
#创建一个进程p,给该进程绑定一组任务
p = Process(target=func)
#启动创建好的进程
p.start()
print('主进程执行结束!')
具体实现过程:
1.导入模块:from multiprocessing import Process
2.基于Process创建一个子进程对象(当前运行的整个py文件表示主进程),然后可以基于target参数将外部的一个函数注册到该子进程中
3.基于start()方法启动创建好的子进程
那么这里我们看得到没有参数传递,我们要如何给函数传递参数呢?——通过args传递参数
from multiprocessing import Process
def func(num1,num2):
print('我是绑定给子进程的一组任务!',num1,num2)
if __name__ == '__main__':
print('主进程开始执行!')
#创建一个进程p,给该进程绑定一组任务
p = Process(target=func,args=(123,456))
#启动创建好的进程
p.start()
print('主进程执行结束!')
我们学习异步就是应为他能极大的提升效率,他能解决时间,接下来我们来说说进程实现异步效果。
首先我们用一个同步程序来进行对比
import time
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)
if __name__ == "__main__":
start = time.time()
urls = ['www.1.com','www.2.com','www.3.com']
for url in urls:
get_request(url)
print('总耗时:',time.time()-start)
这个程序会请求三个网站运行时间应该在6秒多点点
而我们使用异步效果
import time
from multiprocessing import Process
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)
if __name__ == "__main__":
urls = ['www.1.com','www.2.com','www.3.com']
for url in urls:
#创建了三个进程,表示三组任务
p = Process(target=get_request,args=(url,))
p.start()
为了不让增大大家对异步理解,我们就不在上边+计时,计时的在下边这个代码里边。因为这里不能简单地在urls前边加个 start = time.time(),程序后边加一个print答应,因为你启动了进程,但没有等待它们结束。主程序会立即执行完 for
循环并打印出总耗时,而不会等待所有子进程完成。这可能导致你看到的总耗时小于实际所有请求完成所需的时间。并确保你计算的是所有请求完成的时间,你需要在启动每个进程后调用 p.join()
方法。这会阻塞主进程,直到对应的子进程完成。以下是修改后的代码:
import time
from multiprocessing import Process
def get_request(url):
print('正在请求网址的数据:', url)
time.sleep(2)
print('请求结束:', url)
if __name__ == "__main__":
start = time.time()
urls = ['www.1.com', 'www.2.com', 'www.3.com']
processes = [] # 创建一个列表来存储所有的进程
for url in urls:
# 创建进程
p = Process(target=get_request, args=(url,))
p.start() # 启动进程
processes.append(p) # 将进程添加到列表中
# 等待所有进程完成#p就是列表中的每一个进程
for p in processes:
p.join()#每一个子进程都执行了join操作
print('总耗时:', time.time() - start)
进程数据通信
#观察下述代码出现的问题是什么?(了解)
from multiprocessing import Process
import time
ticketNum = 10 #全部的车票
def func(num):
print('我是子进程,我要购买%d张票!'%num)
global ticketNum
ticketNum -= num
time.sleep(2)
if __name__ == '__main__':
p = Process(target=func,args=(3,))
p.start()
#主进程在子进程结束之后在结束
p.join() #只有当子进程结束后,join的调用结束,才会执行join后续的操作
print('目前剩余车票数量为:',ticketNum) #输出结果依然是10
#进程和进程之间是完全独立。两个进程对应的是两块独立的内存空间,每一个进程只可以访问自己内存空间里的数据。
-
如果主进程的查询结果是在2s中后才出现的,则join生效了。但是查询结果为什么是这样的呢?
- 首先,ticketNum = 10这个变量是存在于主进程中的,然后再func函数中ticketNum则是将全局变量ticketNum的值拷贝到了子进程中的ticketNum变量中,因此在func中的减法操作只能作用在子进程的变量中。最终,最后一行主进程打印的ticketNum则是原来主进程未发生变量的值。
-
如何解决?(自己可以尝试文件共享)
- 进程通信机制,管道,信号量等(没必要掌握,日后用不到)
-
继续思考:一个子进程函数的返回值如何被主进程获取?
-
总结:进程之间的数据是隔离的,也就是数据不共享
守护进程
那么如果有一天我们的需求是我的主进程结束了,由主进程创建的那些子进程必须跟着结束,怎么办?守护进程就来了!
import time
from multiprocessing import Process
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)
if __name__ == "__main__":
start = time.time()
p = Process(target=get_request,args=('www.1.com',))
# 将当前p这个子进程设置为了守护进程
p.daemon = True #该操作必须放置在子进程启动操作之前
p.start()
print('主进程执行结束')
主进程创建守护进程后:
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:主进程代码运行结束,守护进程随即终止!
线程的实现
基本概念
**线程:**线程是操作系统能够进行运算调度的最小单位(车间里的工人),它被包含在进程之中,线程是进程中的实际运作单位。
注意:
1.同一个进程内的多个线程是共享该进程的资源的,不同进程内的线程资源肯定是隔离的
2.创建线程的开销比创建进程的开销要小的多
3.每一个进程中至少会包含有一个线程,该线程叫做"主线程"
python线程模块的选择
Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。但是threading模块更为先进,对线程的支持更为完善,因此推荐大家使用该模块!
threading模块
- 线程的创建
from threading import Thread
def func(num):
print('num的值是:',num)
if __name__ == '__main__':
#创建好了一个子线程(在主线程中创建)
t = Thread(target=func,args=(1,))
t.start()
和上边进程类似的join方法
from threading import Thread
import time
class MyThread(Thread):
def __init__(self):
super().__init__()
def run(self):
print('当前子线程正在执行')
time.sleep(2)
print('当前子线程执行结束')
if __name__ == '__main__':
start = time.time()
ts = []
for i in range(3):
t = MyThread() #创建线程对象
t.start() #启动线程对象
ts.append(t)
for t in ts:
t.join()
print('总耗时:',time.time()-start)
关于通讯,线程的内存数据是共享的,我们可以看这个例子
from threading import Thread
import time
def work():
global n
n = 0 #将全局变量修改为了0
if __name__ == '__main__':
n = 1 #全局变量
t = Thread(target=work)
t.start()
print(n) #在进程中输出全局变量的值就是线程修改后的结果为0
守护线程
- 无论是进程还是线程,都遵循:守护xx会在主xx运行完毕后被销毁,不管守护xx时候被执行结束。
from threading import Thread
import time
def work():
time.sleep(1)
print('子线程正在执行!')
if __name__ == '__main__':
t = Thread(target=work)
t.daemon = True #当前的子线程设置为了守护线程
t.start()
print('主线程结束!')
那么到这里我们已经学习了异步和多线程,那么现在来试试多线程实现的异步效果
案例:多线程实现的异步效果:
#urls = ['www.1.com','www.2.com','www.3.com','www.4.com','www.5.com']
from threading import Thread
import time
def work(url):
print('正在请求',url)
time.sleep(2)
print('爬取数据结束')
start = time.time()
urls = ['www.1.com', 'www.2.com', 'www.3.com', 'www.4.com', 'www.5.com']
ts=[]
for url in urls:
t=url=Thread(target=work,args=(url,))
t.start()
ts.append(t)
for t in ts:
t.join()
print('总耗时:',time.time()-start)
线程池
线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。
from multiprocessing.dummy import Pool #导入了线程池模块
import time
urls = ['www.1.com','www.2.com','www.3.com','www.4.com','www.5.com']
def get_reqeust(url):
print('正在请求数据:',url)
time.sleep(2)
print('请求结束:',url)
start = time.time()
#创建一个线程池,开启了5个线程
pool = Pool(5)
#可以利用线程池中三个线程不断的去处理5个任务
pool.map(get_reqeust,urls)
#get_reqeust函数调用的次数取决urls列表元素的个数
#get_requests每次执行都会接收urls列表中的一个元素作为参数
print('总耗时:',time.time()-start)
pool.close() #释放线程池
协程(重要!)
协程可以实现在单进程或者单线程的模式下,大幅度提升程序的运行效率!
- 假设我们有一个需求:从一个URL列表中下载多个网页内容,假设下载一个网页内容需要耗时2秒。
- 在传统的多线程或多进程模型中,我们会为每个URL创建一个线程或进程来进行异步的下载操作。但是这样做会有一个问题:
- 计算机中肯定不会只有下载URL的这几个进程/线程,还会有其他的进程/线程(Pycharm、音乐播放器、微信、网盘等)。
- 将每一个下载网页的操作封装成一个进程/线程的目的就是为了实现异步的网页数据下载,也就是当一个下载网页的操作出现阻塞后,可以不必等待阻塞操作结束后就可以让计算机去下载其他网页内容(CPU切换到其他网页下载的进程/线程中)。
- 但是,计算机中启动的进程/线程那么多,你确定每次CPU进行进程/线程切换,都会切换到网页下载的进程/线程中吗?答案是不一定,因为这个进程/线程切换是由操作系统实现的,无法人为干涉。那么,这些网页下载任务的执行的效率就降低下来了。因此,可以使用协程来解决该问题!
- 协程处理多个网页内容下载任务:
- 将所有的网页下载任务全部封装在一个进程/线程中,基于单进程/单线程来实现多个网页下载的任务。
- 在这个下载任务的单进程/单线程,需要我们自己主动监测出所有的阻塞环节,使得cpu在这些阻塞环节切换执行,这样当前下载任务的单进程/单线程处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为当前的下载任务是阻塞比较少的单进程/单线程,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的。
- 因此,有了协程后,在单进程或者单线程的模式下,就可以大幅度提升程序的运行效率了!
- 总而言之,就是想尽一切办法留住CPU在我们自己的程序中,从而提升整个程序的执行效率!
- 在传统的多线程或多进程模型中,我们会为每个URL创建一个线程或进程来进行异步的下载操作。但是这样做会有一个问题:
asyncio模块
-
在python3.6之后新增了asyncio模块,可以帮我们检测阻塞(只能是网络阻塞),实现应用程序 级别的切换。
-
接下来让我们来了解下协程的实现,从 Python 3.6 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.6 则增加了 asyncio,使得协程的实现更加方便。首先我们需要了解下面几个概念:
-
特殊函数:
- 在函数定义前添加一个async关键字,则该函数就变为了一个特殊的函数!
- 特殊函数的特殊之处是什么?
- 1.特殊函数被调用后,函数内部的程序语句(函数体)没有被立即执行
- 2.特殊函数被调用后,会返回一个协程对象
-
协程:
- 协程对象,特殊函数调用后就可以返回/创建了一个协程对象。
- 协程对象 == 特殊的函数 == 一组指定形式的操作
- 协程对象 == 一组指定形式的操作
-
任务:
- 任务对象就是一个高级的协程对象。高级之处,后面讲,不着急!
- 任务对象 == 协程对象 == 一组指定形式的操作
- 任务对象 == 一组指定形式的操作
-
事件循环:
- 事件循环对象(Event Loop),可以将其当做是一个容器,该容器是用来装载任务对象的。所以说,让创建好了一个或多个任务对象后,下一步就需要将任务对象全部装载在事件循环对象中。loop就可以将其内部装载的任务对象进行异步的执行。
直接说显得比较抽象,我们用几段代码来展示。这是一个get_requests是一个异步函数
import asyncio import time #特殊的函数 async def get_request(url): print('正在请求的网址是:',url) time.sleep(2) print('请求网址结束!') return 123 #创建了一个协程对象 c = get_request('www.1.com') #创建任务对象 task = asyncio.ensure_future(c) #创建事件循环对象 loop = asyncio.get_event_loop() #将任务对象装载在loop对象中且启动事件循环对象 loop.run_until_complete(task)
那么接下来我们就来试试异步的效果吧
import asyncio import time start = time.time() urls = [ 'www.1.com','www.2.com','www.3.com' ] async def get_request(url): print('正在请求:',url) time.sleep(2) print('请求结束:',url) #有了三个任务对象和一个事件循环对象 if __name__ == '__main__': tasks = [] for url in urls: c = get_request(url) task = asyncio.ensure_future(c) tasks.append(task) #将三个任务对象,添加到一个事件循环对象中 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:',time.time()-start)
当我们运行代码的时候会发现,程序并没有实现异步效果
-
这是为什么呢,那是因为在特殊函数内部,不可以出现不支持异步的木块代码,否则就会中断整个异步效果。在Python中,有些标准库模块和函数是同步执行的,这意味着它们会阻塞当前线程直到操作完成,因此它们不适用于异步编程环境。常见的有
time.sleep():这是一个同步函数,它会暂停程序执行指定的时间,阻塞事件循环。
标准I/O操作:
open():用于打开文件的同步函数。
read():从文件中读取数据的同步函数。
write():向文件写入数据的同步函数。
socket模块:标准的socket模块提供的是同步的网络操作。
select模块:用于同步I/O多路复用。
threading模块:用于创建线程的同步模块。
multiprocessing模块:用于创建进程的同步模块。
subprocess模块:用于创建子进程的同步模块,其中的函数如subprocess.run()、subprocess.call()等会阻塞直到子进程完成。
requests模块:用于同步HTTP请求的模块。
urllib模块:用于同步URL处理的模块。
json模块:虽然json.loads()和json.dumps()本身是同步的,但它们可以用于异步环境,只要它们处理的数据不是来自异步I/O操作。
xml模块:用于解析XML数据的同步模块。
数据库连接:大多数数据库连接库(如sqlite3、psycopg2等)提供的是同步连接和操作。
在异步编程中,通常需要使用支持异步操作的库来替代这些同步库。例如:
使用aiohttp库替代requests进行异步HTTP请求。
使用aiofiles库进行异步文件操作。
使用asyncio库中的asyncio.sleep()代替time.sleep()。
使用asyncio库中的asyncio.open_connection()、asyncio.start_server()等函数进行异步网络操作。
使用asyncio库中的子进程管理功能,如asyncio.create_subprocess_exec()来替代subprocess模块。
那么显然这里是应为出现了 time.sleep(2),我们替换为asyncio.sleep(2),前面需要加await

那么接下是多任务的异步协程作用在爬虫当中。前文提到request不支持到异步,那么我们需要用到aiohttp
with aiohttp.ClientSession() as sess:
#基于请求对象发起请求
#此处的get是发起get请求,常用参数:url,headers,params,proxy
#post方法发起post请求,常用参数:url,headers,data,proxy
#发现处理代理的参数和requests不一样(注意),此处处理代理使用proxy='http://ip:port'
with sess.get(url=url) as response:
page_text = response.text()
#text():获取字符串形式的响应数据
#read():获取二进制形式的响应数据
return page_text
-
在每一个with前加上async关键字
-
在阻塞操作前加上await关键字
async def get_request(url):
#requests是不支持异步的模块
# response = await requests.get(url=url)
# page_text = response.text
#创建请求对象(sess)
async with aiohttp.ClientSession() as sess:
#基于请求对象发起请求
#此处的get是发起get请求,常用参数:url,headers,params,proxy
#post方法发起post请求,常用参数:url,headers,data,proxy
#发现处理代理的参数和requests不一样(注意),此处处理代理使用proxy='http://ip:port'
async with await sess.get(url=url) as response:
page_text = await response.text()
#text():获取字符串形式的响应数据
#read():获取二进制形式的响应数据
return page_text
那么后续结合着实战来理解可能会更好吗,这里先将到这。
M3U8流视频数据爬虫
HLS技术介绍
现在大部分视频客户端都采用HTTP Live Streaming(HLS,Apple为了提高流播效率开发的技术),而不是直接播放MP4等视频文件。HLS技术的特点是将流媒体切分为若干【TS片段】(比如几秒一段),然后通过一个【M3U8列表文件】将这些TS片段批量下载供客户端播放器实现实时流式播放。因此,在爬取HLS的流媒体文件的思路一般是先【下载M3U8文件】并分析其中内容,然后在批量下载文件中定义的【TS片段】,最后将其【组合】成mp4文件或者直接保存TS片段。
M3U8文件
M3U8也是一种M3U的扩展格式(高级的M3U,所以也属于M3U)。
**M3U8示例:**大家会看到在该文件中有大量的ts文件的链接地址,这个就是我们之前描述的真正的视频文件。其中任何一个ts文件都是一小段视频,可以单独播放。我们做视频爬虫的目标就是把这些ts文件都爬取下来。
案例:美剧天堂视频
具体操作
- 进入视频播放页
- 点击播放按钮,定位ts数据包,从中提取ts片段的url,探究url的规律
- 打开抓包工具,刷新页面,全局搜索m3u8定位到找到m3u8文件
- 解析m3u8文件提取文件中ts片段链接
- ts文件的合并,最好网上找专业的工具进行合并,自己手动合并会经常出问题
寻找到关于切片的信息后就可以开始写代码

正常同步版本
import os.path
import requests
import re
dirname='tlsfile'
if not os.path.exists(dirname):
os.mkdir(dirname)
headers={
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'
}
url='https://cdn13.tvtvgood.com/202308/03/ccc19bdb33b6/playlist.m3u8?token=bPDBdBggejAgNQP52VwPOA&expires=1726477655'
response_text=requests.get(url=url,headers=headers).text
print(response_text)
tls_list=[]
for line in response_text.split('\n'):
if not line.startswith('#'):
ts_url=line
ts_url='https://cdn13.tvtvgood.com/202308/03/ccc19bdb33b6/'+ts_url
print('请求url:',ts_url)
tls_list.append(ts_url)
for url in tls_list:
response=requests.get(url=url,headers=headers)
ts_data=response.content
ts_name=url.split('/')[-1]
ts_path=dirname+'/'+ts_name
with open(ts_path,'wb') as fp:
fp.write(ts_data)
print(ts_name,'下载成功')
print('文件下载完成')
异步版本(只是将不支持异步的部分换一下,你会发现速度快了很多)
# -*- coding =utf-8 -*-
# @time: 2024-09-16 16:59
# @anthor: jasun
# @file:m3u8.py
# @Asoftware:PyCharm
import os.path
import requests
import re
import asyncio
import aiohttp
dirname='tlsfile'
if not os.path.exists(dirname):
os.mkdir(dirname)
headers={
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'
}
url='https://cdn13.tvtvgood.com/202308/03/ccc19bdb33b6/playlist.m3u8?token=bPDBdBggejAgNQP52VwPOA&expires=1726477655'
response_text=requests.get(url=url,headers=headers).text
print(response_text)
tls_list=[]
for line in response_text.split('\n'):
if not line.startswith('#'):
ts_url=line
ts_url='https://cdn13.tvtvgood.com/202308/03/ccc19bdb33b6/'+ts_url
print('请求url:',ts_url)
tls_list.append(ts_url)
async def get_requests(url):
async with aiohttp.ClientSession() as req:
async with await req.get(url=url,headers=headers) as response:
ts_data = await response.read()
dic = {'ts_data':ts_data,'ts_title':url.split('/')[-1]}
return dic
def save_ts_data(t):
dic = t.result()
ts_data = dic['ts_data']
ts_title = dic['ts_title']
ts_path = dirname + '/' + ts_title
with open(ts_path,'wb') as fp:
fp.write(ts_data)
print(ts_title,':保存下载成功!')
tasks = []
for url in tls_list:
c = get_requests(url)
task = asyncio.ensure_future(c)
task.add_done_callback(save_ts_data)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
前文我们还提到了线程池,这里也可以试试
import os.path
import requests
import re
import asyncio
import aiohttp
from multiprocessing import Pool
dirname='tlsfile'
if not os.path.exists(dirname):
os.mkdir(dirname)
headers={
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'
}
url='https://cdn13.tvtvgood.com/202308/03/ccc19bdb33b6/playlist.m3u8?token=bPDBdBggejAgNQP52VwPOA&expires=1726477655'
response_text=requests.get(url=url,headers=headers).text
print(response_text)
tls_list=[]
for line in response_text.split('\n'):
if not line.startswith('#'):
ts_url=line
ts_url='https://cdn13.tvtvgood.com/202308/03/ccc19bdb33b6/'+ts_url
print('请求url:',ts_url)
tls_list.append(ts_url)
def get_reqeust(url):#参数url就是ts片段的请求url
ts_data = requests.get(url=url,headers=headers,verify=False).content
ts_path = dirname + '/' + url.split('/')[-1]
with open(ts_path,'wb') as fp:
fp.write(ts_data)
print(ts_path,':保存下载成功!')
#HTTPSConnectionPool异常原因:
#网络请求的并发量太大(减少并发or在headers中添加一个Connection:closed)
pool = Pool(100)
pool.map(get_reqeust,tls_list)
Selenium
selenium是一种浏览器自动化的工具,所谓的自动化是指,我们可以通过代码的形式制定一系列的行为动作,然后执行代码,这些动作就会同步触发在浏览器中。他是模拟人去打开一个浏览器去进行操作。
环境安装
-
下载安装selenium:
- pip install selenium
-
下载浏览器驱动程序:
- http://chromedriver.storage.googleapis.com/index.html
- win64选win32即可(根据浏览器版本来选择)
对selenium我认为不需要你去从0开始学习他的命令,而是当你有什么需求的时候再去查找相关的命令,针对的使用和学习。所以这里只是简单的举个例子。同时如果使用selenium一定要将浏览器的版本控制住,目前chromdriver最高只能支持到114版本
from selenium import webdriver
from time import sleep
from selenium.webdriver.common.by import By
# 后面是你的浏览器驱动位置,记得前面加r'','r'是防止字符转义的
driver = webdriver.Chrome(executable_path='./chromedriver')
# 用get打开百度页面
driver.get("http://www.baidu.com")
# 查找页面的“设置”选项,并进行点击
# driver.find_element_by_xpath('//*[@id="s-usersetting-top"]').click()
driver.find_element(By.XPATH,'//*[@id="s-usersetting-top"]').click()
sleep(1)
# # 打开设置后找到“搜索设置”选项,设置为每页显示50条
# driver.find_elements_by_link_text('搜索设置')[0].click()
driver.find_element(By.LINK_TEXT,'搜索设置').click()
sleep(1)
# 选中每页显示50条
m = driver.find_element_by_xpath('//*[@id="nr_3"]').click()
sleep(1)
# 点击保存设置
driver.find_element_by_xpath('//*[@id="se-setting-7"]/a[2]').click()
sleep(1)
# 处理弹出的警告页面 确定accept() 和 取消dismiss()
driver.switch_to.alert.accept()
sleep(1)
# 找到百度的输入框,并输入 美女
driver.find_element_by_id('kw').send_keys('美女')
sleep(1)
# 点击搜索按钮
driver.find_element_by_id('su').click()
sleep(1)
driver.find_element_by_xpath('//*[@id="1"]/div/h3/a').click()
sleep(3)
# 关闭浏览器
driver.quit()
元素定位
webdriver 提供了一系列的元素定位方法,常用的有以下几种:
find_element_by_id()
find_element_by_name()
find_element_by_class_name()
find_element_by_tag_name()
find_element_by_link_text()
find_element_by_xpath()
find_element_by_css_selector()
或者
from selenium.webdriver.common.by import By
driver.find_element(By.xxx,value) #返回定位到的标签
driver.find_elements(By.xx, value) #返回列表
执行js
对于某些操作,Selenium API并没有提供。比如,下拉进度条,它可以直接模拟运行JavaScript,此时使用execute_script()
方法即可实现。
from selenium import webdriver
from time import sleep
from selenium.webdriver.common.by import By
#1.创建一个浏览器对象,executable_path指定当前浏览器的驱动程序
#注意:我当前是mac系统,驱动程序也是mac版本的,如果是window系统注意更换驱动
bro = webdriver.Chrome(executable_path='./chromedriver')
#2.浏览器的请求发送
bro.get('https://www.jd.com/')
#3.标签定位:调用find系列的函数进行标签定位
# search_box = bro.find_element_by_xpath('//*[@id="key"]')
search_box = bro.find_element(By.XPATH,'//*[@id="key"]')
#4.节点交互
search_box.send_keys('mac pro m1')#向指定标签中录入内容
sleep(2)
# btn = bro.find_element_by_xpath('//*[@id="search"]/div/div[2]/button')
btn = bro.find_element(By.XPATH,'//*[@id="search"]/div/div[2]/button')
btn.click() #点击按钮
sleep(2)
#js注入
bro.execute_script('document.documentElement.scrollTo(0,2000)')
sleep(5)
#关闭浏览器
bro.quit()
selenium能够实现可见即可得,所以正常用户能够看到的selenium都能看到,调用解析,就能获取到。
动作链
在上面的实例中,一些交互动作都是针对某个节点执行的。比如,对于输入框,我们就调用它的输入文字和清空文字方法;对于按钮,就调用它的点击方法。其实,还有另外一些操作,它们没有特定的执行对象,比如鼠标拖曳、键盘按键等,这些动作用另一种方式来执行,那就是动作链。
from selenium.webdriver import ActionChains
from selenium import webdriver
from time import sleep
bro = webdriver.Chrome(executable_path='./chromedriver')
bro.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')
sleep(1)
#注意:如果定位的标签是存在于iframe表示的子页面中,则常规的标签定位报错
#处理:使用如下指定操作
bro.switch_to.frame('iframeResult')
div_tag = bro.find_element_by_id('draggable')
#实例化一个动作链对象且将该对象绑定到指定的浏览器中
action = ActionChains(bro)
action.click_and_hold(div_tag) #对指定标签实现点击且长按操作
for i in range(5):
action.move_by_offset(10,10).perform() #perform让动作链立即执行
sleep(0.5)
sleep(3)
bro.quit()
页面等待
-
为什么需要等待
如果网站采用了动态html技术,那么页面上的部分元素出现/加载的时间便不能确定,这个时候就可以设置一个等待时间,强制等待指定时间,等待结束之后进行元素定位,如果还是无法定位到则报错 -
页面等待的三种方法
-
强制等待
import time time.sleep(n) # 阻塞等待设定的秒数之后再继续往下执行
-
显式等待
也称为智能等待,针对指定元素定位指定等待时间,在指定时间范围内进行元素查找,找到元素则直接返回,如果在超时还没有找到元素,则抛出异常,显示等待是 selenium 当中比较灵活的一种等待方式,他的实现原理其实是通过 while 循环不停的尝试需要进行的操作。
from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # 每隔 0.5s 检查一次(默认就是 0.5s), 最多等待 10 秒,否则报错。如果定位到元素则直接结束等待,如果在10秒结束之后仍未定位到元素则报错 wait = WebDriverWait(chrome, 10,0.5) wait.until(EC.presence_of_element_located((By.ID, 'J_goodsList')))
-
隐式等待:
设置超时时间为10秒,使用了implicitlyWait后,如果第一次没有找到元素,会在10秒之内不断循环去找元素,如果超过10秒还没有找到,则抛出异常,隐式等待比较智能。
driver.implicitly_wait(10) # 在指定的n秒内每隔一段时间尝试定位元素,如果n秒结束还未被定位出来则报错
-
滑动验证
import time
from selenium import webdriver
from selenium.webdriver.common.by import By # 按照什么方式查找,By.ID,By.CSS_SELECTOR
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait # 等待页面加载某些元素
import cv2 #pip install opencv-python
from urllib import request
from selenium.webdriver.common.action_chains import ActionChains
#获取要滑动的距离
def get_distance():
#滑动验证码的整体背景图片
background = cv2.imread("background.png", 0)
#缺口图片
gap = cv2.imread("gap.png", 0)
res = cv2.matchTemplate(background, gap, cv2.TM_CCOEFF_NORMED)
value = cv2.minMaxLoc(res)[2][0]
print(value)
#单位换算
return value * 278 / 360
def main():
chrome = webdriver.Chrome(executable_path='./chromedriver')
chrome.implicitly_wait(5)
chrome.get('https://passport.jd.com/new/login.aspx?')
login = chrome.find_element(By.ID, 'pwd-login')
login.click()
loginname = chrome.find_element(By.ID, 'loginname')
loginname.send_keys("123@qq.com")
nloginpwd = chrome.find_element(By.ID, 'nloginpwd')
nloginpwd.send_keys("987654321")
loginBtn = chrome.find_element(By.CLASS_NAME, 'login-btn')
loginBtn.click()
#带缺口的大图
img_src = chrome.find_element(By.XPATH, '//*[@class="JDJRV-bigimg"]/img').get_attribute("src")
#缺口图片
temp_src = chrome.find_element(By.XPATH, '//*[@class="JDJRV-smallimg"]/img').get_attribute("src")
#两张图片保存起来
request.urlretrieve(img_src, "background.png")
request.urlretrieve(temp_src, "gap.png")
distance = int(get_distance())
print("distance:", distance)
print('第一步,点击滑动按钮')
element = chrome.find_element(By.CLASS_NAME, 'JDJRV-slide-btn')
ActionChains(chrome).click_and_hold(on_element=element).perform() # 点击鼠标左键,按住不放
ActionChains(chrome).move_by_offset(xoffset=distance, yoffset=0).perform()
ActionChains(chrome).release(on_element=element).perform()
time.sleep(2)
if __name__ == '__main__':
main()
验证没有通过原因:
没有模拟人的行为动作
检测出是selenium
验证码问题
- 第三方收费验证码模块封装
- python自带的cv2模块
规避检测(重要)
-
现在不少大网站有对selenium采取了监测机制。比如正常情况下我们用浏览器访问淘宝等网站的 window.navigator.webdriver的值为 undefined或者为false。而使用selenium访问则该值为true。那么如何解决这个问题呢?
- 实现js注入,绕过检测
from selenium.webdriver import ActionChains
from selenium.webdriver import Chrome
driver = Chrome('./chromedriver',options=chrome_options)
#Selenium在打开任何页面之前,先运行这个Js文件。
with open('./stealth.min.js') as f:
js = f.read()
#进行js注入,绕过检测
#execute_cdp_cmd执行cdp命令(在浏览器开发者工具中执行相关指令,完成相关操作)
#Page.addScriptToEvaluateOnNewDocument执行脚本
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": js
})
driver.get('https://www.taobao.com')
stealth.min.js文件内容较多,就不放在这了
Pyppetee
那么selenium模拟人的请求,速度相对来说较慢,我们想想能不能也使用上异步呢?这里就引入一个模块pyppeteer
Pyppeteer 就是依赖于 Chromium 这个浏览器来运行的。那么有了 Pyppeteer 之后,我们就可以免去那些繁琐的环境配置等问题。如果第一次运行的时候,Chromium 浏览器没有安装,那么程序会帮我们自动安装和配置,就免去了繁琐的环境配置等工作。另外 Pyppeteer 是基于 Python 的新特性 async 实现的,所以它的一些执行也支持异步操作,效率相对于 Selenium 来说也提高了。
案例爬取http://quotes.toscrape.com/js/ 全部页面数据
import asyncio
from pyppeteer import launch
from lxml import etree
#创建一个特殊的函数
async def main():
#对应的pyppeteer相关的操作要写在特殊函数内部
#1.创建一个浏览器对象
bro = await launch(headless=True)
#2.创建一个新的page
page = await bro.newPage()
#3.发起请求
await page.goto('http://quotes.toscrape.com/js/')
#4.获取页面源码数据
page_text = await page.content()
#5.数据解析
tree = etree.HTML(page_text)
div_list = tree.xpath('//div[@class="quote"]')
print(len(div_list))
await asyncio.sleep(3)
await bro.close()
#创建一个协程对象
c = main()
#创建且启动事件循环对象
loop = asyncio.get_event_loop()
loop.run_until_complete(c)
解释:launch 方法会新建一个 Browser 对象,然后赋值给 browser,然后调用 newPage 方法相当于浏览器中新建了一个选项卡,同时新建了一个 Page 对象。然后 Page 对象调用了 goto 方法就相当于在浏览器中输入了这个 URL,浏览器跳转到了对应的页面进行加载,加载完成之后再调用 content 方法,返回当前浏览器页面的源代码。在这个过程中,我们没有配置 Chrome 浏览器,没有配置浏览器驱动,免去了一些繁琐的步骤,同样达到了 Selenium 的效果,还实现了异步抓取
关闭提示条:”Chrome 正受到自动测试软件的控制”,这个提示条有点烦,那咋关闭呢?这时候就需要用到 args 参数了,禁用操作如下:
browser = await launch(headless=False, args=['--disable-infobars'])
发现页面显示出现了问题,需要手动调用setViewport方法设置显示页面的长宽像素。设置如下:
width, height = 1366, 768
规避检测:执行js程序执行指定的js程序
-
正常情况下我们用浏览器访问淘宝等网站的 window.navigator.webdriver的值为 undefined或者为false。而使用pyppeteer访问则该值为true。那么如何解决这个问题呢?
import asyncio from pyppeteer import launch width, height = 1366, 768 async def main(): #规避检测 browser = await launch(headless=False, args=['--disable-infobars']) page = await browser.newPage() await page.setViewport({'width': width, 'height': height}) await page.goto('https://login.taobao.com/member/login.jhtml?redirectURL=https://www.taobao.com/') #规避检测 await page.evaluate( '''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''') await asyncio.sleep(20) await browser.close() asyncio.get_event_loop().run_until_complete(main())
-
节点交互:(可以很好的模拟人的行为)
-
import asyncio from pyppeteer import launch async def main(): # headless参数设为False,则变成有头模式 browser = await launch( headless=False ) page = await browser.newPage() # 设置页面视图大小 await page.setViewport(viewport={'width': 1280, 'height': 800}) await page.goto('https://www.baidu.com/') # 节点交互 await page.type('#kw', '周杰伦', {'delay': 1000}) await asyncio.sleep(3) #点击搜索按钮 await page.click('#su') await asyncio.sleep(3) # 使用选择器选中标签进行点击 alist = await page.querySelectorAll('.s_tab_inner > a') a = alist[3] await a.click() await asyncio.sleep(3) await browser.close() asyncio.get_event_loop().run_until_complete(main())
-
案例爬取:异步爬取网易新闻首页的新闻标题
#https://news.163.com/domestic/
import asyncio
from pyppeteer import launch
from lxml import etree
#创建一个特殊的函数
async def main():
#对应的pyppeteer相关的操作要写在特殊函数内部
#1.创建一个浏览器对象
bro = await launch(headless=True, executablePath='D:/download/chrome-win/chrome.exe')
#2.创建一个新的page
page = await bro.newPage()
#3.发起请求
await page.goto('https://news.163.com/domestic/')
#4.获取页面源码数据
page_text = await page.content()
return page_text
#5.数据解析
def parse(task):
page_text = task.result()
tree = etree.HTML(page_text)
div_list = tree.xpath('//div[@class="data_row news_article clearfix "]')
for div in div_list:
title = div.xpath('.//div[@class="news_title"]/h3/a/text()')[0]
print('wangyi:', title)
#创建一个协程对象
tasks = []
task1 = asyncio.ensure_future(main())
task1.add_done_callback(parse)
tasks.append(task1)
asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))
Scapy