在实际工作中,爬虫处理的数据量是非常大的,如果仅仅使用单线程爬虫,效率是非常低的!这篇文章主要来说一下多线程爬虫、多进程爬虫和多协程爬虫。
并发和并行
并发(concurrency)和并行(parallelism)是两个相似的概念。并发:在一个时间段内发生若干事件的情况;并行:在同一时刻发生若干事件的情况。
在单核CPU上,多个任务是以并发的方式运行的,因为只有一个CPU,各个人物会分别占用CPU的一段时间一次执行。如果某个任务在时间段内没有完成,就会切换到另一个任务,然后再下一次得到CPU的使用权的时候再继续执行,直到完成。这种情况下,各个任务的时间段很短,经常切换,看起来多个任务像是在同时进行的。在多核CPU上,多个任务真正能够同时运行,而不是看起来像,这就是并行。
同步和异步
在理解并发和并行的基础上,同步就是并发或并行的各个任务不是独自运行的,任务之间有一定的交替顺序,可能在运行完一个任务得到结果后,另一个任务才会开始运行,可以理解为串行。类似于4×4接力赛,要拿到交接棒之后,下一个选手才可以开始跑。
异步则是并发或并行的各个人物可以独立运行,一个任务的运行不受另一个任务的影响。类似于400赛跑,任务之间就像比赛的各个选手在不同的赛道比赛一样,跑步的速度不受其他赛道选手的影响!
假设现在打开4个不同的网站,IO(Input输入/Ouput输出)的过程相当于打开网站,cpu执行单击的动作。同步IO指你每单击个网站,需要等待该网站彻底显示才可以单击下一个网站;异步IO是指单击完一个网站,不用等待对方服务器返回结果,立即可以打开另一个网站,以此类推,最后同时等待4个网站彻底打开。
多线程、多进程、多协程都是使用异步的方式,“同时”获取多个网页,从而提升爬虫效率。
多线程爬虫多线程爬虫的原理,这里简单说一下。Python在设计的时候是没有多核的电脑的,所以作者也就没有考虑多核的情况,所以设置了GIL(全局解释器锁)。在Python中,一个线程的执行过程包括获取GIL、执行代码直到挂起和释放GIL。
某个线程想要执行,必须要先申请GIL,可以把GIL看作“通行证”,并且在一个Python进程中,只有一个GI。拿不到GIL锁,线程就不允许进入CPU执行。每次释放GIL锁,线程之间都会进行锁竞争,线程的来回切换回消耗资源。

爬虫属于IO密集型,多线程能够有效地提升效率,因为单线程下IO操作会进行IO等待,开启多线程能在线程等待的时候切换到另一个线程,充分利用CPU的资源。
单线程爬虫
以单线程(单进程)为例,抓取1000个网页,这1000个网页我们可以从:中文网站排行榜进行提取,该网站一页有20个数据,我们只需要爬虫1000个,就是50页,代码参考:
import requests
from lxml import etree
headers={
"User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
for a in range(1,51):
url='http://www.alexa.cn/siterank/{0}'.format(a)
res=requests.get(url,headers=headers)
tree=etree.HTML(res.text)
for i in tree.xpath('//ul[@class="siterank-sitelist"]/li'):
href=i.xpath('./div[2]/div[1]/span/a/@href')[0]
with open('./alexa.txt','a',encoding='utf-8') as f:
f.write(href+','+'\n')
将1000个网站的URL保存在alexa.txt文件,现在我使用单线程来抓取这1000个网页,之前先编写一个测试时间的装饰器,因为我们需要多次使用:
import time
def test_time(func):
def wrapper(*args,**kwargs):
start_time=time.time()
func(*args,**kwargs)
end_time=time.time()
print('time cost:%s,%s'% (func.__name__,str(end_time-start_time)))
return wrapper
然后定义单线程函数:
import requests,time
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
file_list=f.readlines() #逐行读取
for i in file_list:
link=i.split(',')[0]
link_list.append(link)
headers={
"User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
@test_time
def single_thread(link_list):
for url in link_list:
try:
res=requests.get(url,headers=headers)
print(res.status_code,url)
except Exception as e:
print('error',e)
single_thread(link_list)
运行代码,就可以测试出请求1000个网站所花费的时间,具体多少自己尝试,反正不快!
多线程爬虫
在Python中使用多线程有两种方式:
- 函数式:调用_thread模块的中的start_new_thread()方法产生新线程
- 继承式:调用Threading库创建线程,从threading.Thread类继承
threading模块提供了Thread类来处理线程,包括以下几个方法:
- run():表示线程活动的方法
- start():启动线程活动
- join([time]):等待到线程终止。阻塞调用线程直至线程的join方法被调用为止
- isAlive():返回线程是否是活动的
- getName():返回线程名
- setName():设置线程名
import threading
import time
class myThread(threading.Thread):
def __init__(self,name,delay):
threading.Thread.__init__(self)
self.name=name
self.delay=delay
def run(self): # 方法固定
print("starting "+self.name)
print_time(self.name,self.delay)
print("exiting "+self.name)
def print_time(threadNam,delay):
count=0
while count<3:
time.sleep(delay)
print(threadNam,time.ctime())
count+=1
threads=[]
# 创建新线程
thread1=myThread("thread-1",2)
thread2=myThread("thread-2",3)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
for i in threads:
i.join()
print("exit main thread")
代码中,我们将任务分到两个线程中,即thread1=myThread(“thread-1”,2),然后在myThread类中对线程进行设置,使用run方法表示线程的运行方法,名字固定,当count小于3时,打印该线程的名称和时间!然后使用thread.start()开启线程,使用threads.append()将线程加入线程列表,使用join()等待所有的子线程执行完成才会执行主线程。
现在知道了如何使用多线程,呢么我们修改一下最上面的爬虫代码,使用多线程抓取1000个网页:import requests,time,threading
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
file_list=f.readlines() #逐行读取
for i in file_list:
link=i.split(',')[0]
link_list.append(link)
headers={
"User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
start_time=time.time()
class myThread(threading.Thread):
def __init__(self,name,link_range):
threading.Thread.__init__(self)
self.name=name
self.link_range=link_range
def run(self):
print("starting "+self.name)
crawler(self.name,self.link_range)
print("exiting "+self.name)
def crawler(threadName,link_range):
for i in range(link_range[0],link_range[1]+1):# (0, 201)
try:
res=requests.get(link_list[i],timeout=20,headers=headers)
print(threadName,res.status_code,link_list[i])
except Exception as e:
print(threadName,'error',e)
thread_list = []
# 将1000个网页分成5份,每一份200个
link_range_list = [(0,200), (201,400), (401,600), (601,800), (801,1000)]
# 利用for循环创建5个线程,将这些网页分别交给这5个线程运行
for i in range(1,6):
thread = myThread("Thread-" + str(i), link_range_list[i-1]) # 循环link_range_list
thread.start()
thread_list.append(thread)
# 等待所有线程完成
for thread in thread_list:
thread.join()
end_time=time.time()
print('多线程花费时间:%s'% str(end_time-start_time))
print ("Exiting Main Thread")
由于这个测试耗时,需要在类上添加装饰器,鳄鱼君Ba不太会,所以暂时抛弃在类上使用装饰器,这里就直接计算花费的时间了。在上面的代码中,我们将1000个网页分成5份,启动5个线程,每个线程执行200个。在每一个线程中,我们调用了爬虫函数crawler。为了让所有的子线程执行完毕再执行主进程,这里使用join方法等待所有的子线程执行完毕!这里耗时大约在420-430之间!
这样你可能就会思考,假设某一个线程执行完成,就会退出线程,这样就只剩4个线程在运行,一次类推,最后会剩下一个线程在运行,就变成了单线程。
使用Queue的多线程爬虫
为了充分利用多线程,这时候就需要使用Queue队列。python的Queue模块中提供了同步的,线程安全的队列类,包括FIFO(先入先出)队列,LIFO(后入先出)队列等等,具体可参考:threading模块、线程锁、信号量、Event、Queue队列
将这1000个网址放到队列中,各个线程都从这个队列获取网址,直到完成所有的抓取为止,代码参考:
import requests,time,threading
import queue
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
file_list=f.readlines() #逐行读取
for i in file_list:
link=i.split(',')[0]
link_list.append(link)
headers={
"User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
start_time=time.time()
class myThread(threading.Thread):
def __init__(self,name,task_queue):
threading.Thread.__init__(self)
self.name=name
self.q=task_queue
def run(self):
print("starting "+self.name)
while True:
try:
crawler(self.name,self.q)
except:
break
print("exiting "+self.name)
def crawler(threadName,q):
url=q.get(timeout=2)
try:
res=requests.get(url,timeout=20,headers=headers)
print(q.qsize(),threadName,res.status_code,url)
except Exception as e:
print(threadName,'error',e)
thread_list = ["thread-1","thread-2","thread-3","thread-4","thread-5"]
threads=[]
task_queue=queue.Queue(1000)
# 填充队列
for url in link_list:
task_queue.put(url)
for i in thread_list:
thread = myThread(i,task_queue)
thread.start()
threads.append(thread)
# 等待所有线程完成
for a in threads:
a.join()
end_time=time.time()
print('queue多线程花费时间:%s'% str(end_time-start_time))
print ("Exiting Main Thread")
耗时350-360左右,这个会受网速和机器配置影响!实际工作中,这种爬虫方法使用的也是比较多的!
多进程爬虫
由于python全局解释器锁的存在,多线程爬虫并不能充分的利用多核CPU,呢么就需要使用多进程爬冲来利用多核CPU。在python中,使用多进程需要用到multiprocess库。multiprocess库有两种方法:Process+Queue或者Pool+Queue,我们分别测试一下!
查看CPU的进程,这里可以借助multiprocessing:
from multiprocessing import cpu_count
print(cpu_count())
Process+Queue多进程
这里根据自己情况,不要占满核心,鳄鱼君Ba的是12核心数,所以在这里使用10个进程:
from multiprocessing import Process,Queue
import requests,time
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
file_list=f.readlines() #逐行读取
for i in file_list:
link=i.split(',')[0]
link_list.append(link)
headers={
"User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
class myProcess(Process):
def __init__(self,task_queue):
Process.__init__(self)
self.q=task_queue
def run(self):
print("starting ",self.pid)
while not self.q.empty():
crawler(self.q)
print("exiting ",self.pid)
def crawler(q):
url=q.get(timeout=2)
try:
res=requests.get(url,timeout=20,headers=headers)
print(q.qsize(),res.status_code,url)
except Exception as e:
print('error',e)
if __name__ == '__main__':
start_time = time.time()
Process_list = ["process-1","process-2","process-3","process-4","process-5"]
threads=[]
task_queue=Queue(1000)
# 填充队列
for url in link_list:
task_queue.put(url)
for i in range(0,9):
p = myProcess(task_queue)
p.daemon=True # 父进程结束,子进程就会被终止
p.start()
p.join()
end_time=time.time()
print('queue多进程花费时间:%s'% str(end_time-start_time))
print ("Exiting Main Process")
具体的花费时间,自己测试
Pool+Queue多进程
Pool可以提供指定数量的进程供用户调用。当有新的请求提交到pool中时,如果池还没有满,就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定的最大值,该请求就会继续等待,直到池中有进程结束才能够创建新的进程。
阻塞和非阻塞关注的是程序在等待调用结果(消息、返回值)时的状态。阻塞要等到回调结果出来,在有结果之前,当前进程会被挂起。非阻塞为添加进程后,不一定非要等到结果出来就可以添加其他进程运行。
我们可以使用Pool的非阻塞方法和Queue获取网页数据,代码如下:
from multiprocessing import Pool,Manager
import time,requests
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
file_list=f.readlines() #逐行读取
for i in file_list:
link=i.split(',')[0]
link_list.append(link)
headers={
"User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
start_time=time.time()
def crawler(q,index):
process_id='process-'+str(index)
while not q.empty():
url=q.get(timeout=2)
try:
res=requests.get(url,headers=headers)
print(process_id,q.qsize(),res.status_code,url)
except Exception as e:
print(process_id,'error',e)
if __name__ == '__main__':
with Manager() as manager:# 父进程和子进程之间通过管道manager通信
task_queue=manager.Queue(1000)
# 填充队列
for url in link_list:
task_queue.put(url)
pool=Pool(processes=3)
for i in range(4):
pool.apply_async(crawler,args=(task_queue,i))
# pool.apply(crawler,args=(task_queue,i)) 串行
print('starting process')
pool.close()
pool.join()
end_time=time.time()
print('pool+process多进程爬虫耗时:',end_time-start_time)
print('exiting main process ')
多协程爬虫
协程是一种用户态的轻量级线程,使用协程有以下优点:- 由于是单线程,没有上下文切换,资源消耗少
- 高扩展性和高并发性,一个CPU支持上万个协程,甚至更多
- 方便切换控制流,简化编程模型
协程的本质上是一个单线程,不能使用单个CPU的多核,需要和进程配合才能运行在多核CPU上。IO操作阻塞时间太长不要使用协程,可能会阻塞整个程序。
在python的协程中,可以使用gevent。使用pip命令就可以安装,安装完成就可以使用协程了:
import gevent
from gevent.queue import Queue
from gevent import monkey # 有可能有IO操作的单独坐上标记
monkey.patch_all()# 将IO转为异步执行的函数
import time,requests # gevent模块上面
link_list=[] # 读取文件的URL,存放为list
with open('./alexa.txt','r') as f:
file_list=f.readlines() #逐行读取
for i in file_list:
link=i.split(',')[0]
link_list.append(link)
headers={
"User-Agent":"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
}
start_time=time.time()
def crawler(index):
process_id='process-'+str(index)
while not task_queue.empty():
url=task_queue.get(timeout=2)
try:
res=requests.get(url,headers=headers,timeout=20)
print(process_id,url,res.status_code)
except Exception as e:
print('error',e,url)
def boss():
for url in link_list:
task_queue.put_nowait(url)
if __name__ == '__main__':
task_queue=Queue(1000) # 生成一个队列
gevent.spawn(boss).join() #将队列加入到gevent协程中
jobs=[]
for i in range(10): # 10个协程
jobs.append(gevent.spawn(crawler,i)) # 生成一个协程
gevent.joinall(jobs) # 将
end_time=time.time()
print('gevent+queue多协程爬虫耗时:',end_time-start_time)
经过测试,感觉协程速度更快一些!具体效果,可以自己尝试!
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!