采用协程的方式抓取非小号网站的相关数据,那么首先这里以csv文件为例,我们把需要抓取的所有信息的url存入csv文件中,那么读取的代码参考:
import asyncio
import csv
async def main():
#loop=asyncio.get_event_loop()
csv_reader=csv.reader(open('XXX.csv',encoding='utf-8'))
for row in csv_reader:
print(row)
if __name__=='__main__':
loop=asyncio.get_event_loop() #d定义事件循环
loop.create_task(main()) #创建任务
loop.run_forever() #一直运行
读取csv文件,这个csv文件其实就是通过非小号接口抓取每个币种详情页的url,我们要提取的数据就是在这个详情页,csv文件需要的话可以自己下载。
async def main():
csv_reader=csv.reader(open('feixiaohao.csv',encoding='utf-8'))
for row in csv_reader:
try:
if row[1].startswith('https'):#判断是否以https开头
await Common.task_queue.put(row)#将row放入队列里面
except:
pass
#print(Common.task_queue)
await get_market_cap()
print('总市值:{}'.format(Common.market_cap_all))
await get_currency_rate()
print('汇率:{}'.format(Common.currency_rate) )
for _ in range(10):#同时创建10个down_and_parse_task任务,提高效率
loop.create_task(down_and_parse_task(Common.task_queue))
loop.create_task(monitor_finish())
loop.create_task(speed_monitor()) #速度检测
loop.create_task(time_limit()) #5分钟跑完
if __name__=='__main__':
loop=asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever() #一直运行
main函数中包含有down_and_parse_task(获取网页html和状态码并解析除相关数据),monitor_finish,speed_monitor(速度检测),time_limit(时间限制),get__currency_rate(获取汇率),get_market_cap(获取总市值)函数,还有一个队列,队列的代码参考:
class Common():
task_queue=Queue()#创建实例队列,任务队列
result_queue=Queue()#结果队列
market_cap_all=0 #总市值初始值
currency_rate=0 #汇率初始值
那么其他一些不重要的函数代码参考:
async def get_market_cap():
#总市值的接口url
url='https://dncapi.bqiapp.com/api/home/global?webp=0'
response=requests.get(url)
#通过接口提取总市值
response_json=json.loads(response.text)
marketcap=response_json['data']['marketcapvol']
Common.market_cap_all=int(marketcap)#重新赋值给market_cap_all
async def get_currency_rate():
url_rate='https://dncapi.bqiapp.com/api/coin/web-rate'
response=requests.get(url_rate)
#通过接口提取人民币的汇率
currency_rate=json.loads(response.text)['data'][11]['cny']
Common.currency_rate=currency_rate#重新赋值给currency_rate
async def time_limit():#设置300秒上限
await asyncio.sleep(300)
raise SystemExit()
async def speed_monitor():#速度检测
while Common.task_queue.qsize()!=0: #判断队列大小
old_queue_len=Common.task_queue.qsize()
await asyncio.sleep(5)
new_queue_size=Common.task_queue.qsize()
print('《---------------》')
print('speed = ',(old_queue_len-new_queue_size)/5) #5秒中之内,算一下速度
async def monitor_finish():
while len(asyncio.Task.all_tasks())>3:#判断当前任务数量是否大于3
await asyncio.sleep(1)
await asyncio.sleep(5)
raise SystemExit()
down_and_parse_task函数参考代码:
async def down_and_parse_task(queue):
while 1 :#死循环
try:
name,url=queue.get_nowait()
except:
return
for retry_cnt in range(3):#重试3次
try:
html,status=await download(url)
if status ==200: #如果状态码不是200,重试
html,status=await download(url)
html_parse_result=await parse_html(name,url,html)
print(html_parse_result)
await Common.result_queue.put(html_parse_result)
break #成功则退出异常
except:
await asyncio.sleep(0.2)
continue
在这个函数中包含download,parse_html函数,具体的download函数代码参考:
async def download(url):
connector=ProxyConnector()#创建实例
async with aiohttp.ClientSession(
connector=connector,
request_class=ProxyClientRequest
) as session:
ret,status=await session_get(session,url)
if 'window.location.href=' in ret and len(ret)<1000:
url=ret.split("window.location.href='")[1].split("'")[0]
ret,status=await session_get(session,url)
return ret,status
其中download函数汇总包含有session_get函数,代码参考:
async def session_get(session,url):
headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)'}
timeout=aiohttp.ClientTimeout(total=20) #设置超时时间
response = await session.get(
url,timeout=timeout, headers=headers, ssl=ssl.SSLContext()
)
return await response.text(),response.status
然后是parse_html函数,这个就是具体解析网页中需要的数据的代码,使用xpath进行解析,跟以前的一样,具体代码参考:
async def parse_html(name,url,response):
coin_info={}
coin_value={}
coin_info['url']=url
coin_info['name']=name
coin_info['time']=int(time.time())
tree=etree.HTML(response)
#,美元价格,上下涨幅,流通市值,流通量,流通率,24小时成交额,换手率,发行时间,最大供应量。
try:#美元价格
price_usd=tree.xpath('//div[@class="priceInfo"]/div[@class="sub"]/span/span/text()')[0].strip()
coin_value['price']='$'+price_usd
except:
pass
try:#上下涨幅
updown=tree.xpath('//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()')[0].strip()
coin_value['updown']='$'+updown
except:
pass
try:#24小时成交额需要转换为美元
value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/span/text()')[0].strip()
coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
except:
pass
try:#换手率
value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
except:
pass
try:#流通量
circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[2]/text()')[0].strip()
coin_value['circulating_supply']=int(circulating_supply)
except:
pass
try:#流通率
circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
coin_value['circulating_supply']=int(circulating_supply)
except:
pass
try:#币的当前市值
bi_price=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[1]/div[3]/div[2]/div[1]/span[2]/span[2]/text()')[0].strip()
coin_value['bi_price']=bi_price
except:
pass
try:#发行时间
data_time=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[2]/div/div[3]/div[1]/div[1]/div[3]/div[1]/span[2]/text()')[0].strip()
coin_value['data_time']=data_time
except:
pass
try:#上架交易所
pass
except:
pass
coin_info['value']=coin_value
return coin_info
至此全部代码可参考:
import asyncio
import aiohttp
import requests,random,json,time
from lxml import etree
from asyncio.queues import Queue
import csv
import ssl
from aiosocksy import Socks5Auth #使用socks代理
from aiosocksy.connector import ProxyConnector
from aiosocksy.connector import ProxyClientRequest
class Common():
task_queue=Queue()#创建实例队列,任务队列
result_queue=Queue()#结果队列
market_cap_all=0 #总市值初始值
currency_rate=0 #汇率初始值
#免费socks5代理:http://31f.cn/socks-proxy/
#线上内网
socks5_address_prod=[
'socks5://221.232.233.159:1080',
'socks5://117.34.70.200:1081',
'socks5://120.197.179.170:1080'
]
async def get_market_cap():
#总市值的接口url
url='https://dncapi.bqiapp.com/api/home/global?webp=0'
response=requests.get(url)
#通过接口提取总市值
response_json=json.loads(response.text)
marketcap=response_json['data']['marketcapvol']
Common.market_cap_all=int(marketcap)#重新赋值给market_cap_all
async def get_currency_rate():
url_rate='https://dncapi.bqiapp.com/api/coin/web-rate'
response=requests.get(url_rate)
#通过接口提取人民币的汇率
currency_rate=json.loads(response.text)['data'][11]['cny']
Common.currency_rate=currency_rate#重新赋值给currency_rate
async def time_limit():#设置300秒上限
await asyncio.sleep(300)
raise SystemExit()
async def down_and_parse_task(queue):
while 1 :#死循环
try:
name,url=queue.get_nowait()
except:
return
for retry_cnt in range(3):#重试3次
try:
html,status=await download(url)
if status ==200: #如果状态码不是200,重试
html,status=await download(url)
html_parse_result=await parse_html(name,url,html)
print(html_parse_result)
await Common.result_queue.put(html_parse_result)
break #成功则退出异常
except:
await asyncio.sleep(0.2)
continue
async def download(url):
connector=ProxyConnector()#创建实例
async with aiohttp.ClientSession(
connector=connector,
request_class=ProxyClientRequest
) as session:
ret,status=await session_get(session,url)
if 'window.location.href=' in ret and len(ret)<1000:
url=ret.split("window.location.href='")[1].split("'")[0]
ret,status=await session_get(session,url)
return ret,status
async def parse_html(name,url,response):
coin_info={}
coin_value={}
coin_info['url']=url
coin_info['name']=name
coin_info['time']=int(time.time())
tree=etree.HTML(response)
#,美元价格,上下涨幅,流通市值,流通量,流通率,24小时成交额,换手率,发行时间,最大供应量。
try:#美元价格
price_usd=tree.xpath('//div[@class="priceInfo"]/div[@class="sub"]/span/span/text()')[0].strip()
coin_value['price']='$'+price_usd
except:
pass
try:#上下涨幅
updown=tree.xpath('//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()')[0].strip()
coin_value['updown']='$'+updown
except:
pass
try:#24小时成交额需要转换为美元
value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/span/text()')[0].strip()
coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
except:
pass
try:#换手率
value_24h=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[3]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
coin_value['value_24h']=(int(value_24h)/Common.currency_rate)
except:
pass
try:#流通量
circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[2]/text()')[0].strip()
coin_value['circulating_supply']=int(circulating_supply)
except:
pass
try:#流通率
circulating_supply=tree.xpath('//div[@class="priceInfo"]/div[@class="chart_list"]/div[2]/div[@class="charbox"]/div[2]/text()')[0].strip()
coin_value['circulating_supply']=int(circulating_supply)
except:
pass
try:#币的当前市值
bi_price=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[1]/div[3]/div[2]/div[1]/span[2]/span[2]/text()')[0].strip()
coin_value['bi_price']=bi_price
except:
pass
try:#发行时间
data_time=tree.xpath('//*[@id="__layout"]/section/div/div/div[1]/div[2]/div/div[3]/div[1]/div[1]/div[3]/div[1]/span[2]/text()')[0].strip()
coin_value['data_time']=data_time
except:
pass
try:#上架交易所
pass
except:
pass
coin_info['value']=coin_value
return coin_info
async def session_get(session,url):
headers={'User-Agent':'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)'}
timeout=aiohttp.ClientTimeout(total=20) #设置超时时间
response = await session.get(
url,timeout=timeout, headers=headers, ssl=ssl.SSLContext()
)
return await response.text(),response.status
async def speed_monitor():#速度检测
while Common.task_queue.qsize()!=0: #判断队列大小
old_queue_len=Common.task_queue.qsize()
await asyncio.sleep(5)
new_queue_size=Common.task_queue.qsize()
print('《---------------》')
print('speed = ',(old_queue_len-new_queue_size)/5) #5秒中之内,算一下速度
async def monitor_finish():
while len(asyncio.Task.all_tasks())>3:#判断当前任务数量是否大于3
await asyncio.sleep(1)
await asyncio.sleep(5)
raise SystemExit()
async def push_results():
pass
async def main():
csv_reader=csv.reader(open('feixiaohao.csv',encoding='utf-8'))
for row in csv_reader:
try:
if row[1].startswith('https'):#判断是否以https开头
await Common.task_queue.put(row)#将row放入队列里面
except:
pass
#print(Common.task_queue)
await get_market_cap()
print('总市值:{}'.format(Common.market_cap_all))
await get_currency_rate()
print('汇率:{}'.format(Common.currency_rate) )
for _ in range(10):#同时创建10个down_and_parse_task任务,提高效率
loop.create_task(down_and_parse_task(Common.task_queue))
loop.create_task(monitor_finish())
loop.create_task(speed_monitor()) #速度检测
loop.create_task(time_limit()) #5分钟跑完
if __name__=='__main__':
loop=asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever() #一直运行
在爬取的时候没有使用代理,这个貌似只能添加socks5的代理,一般来说,普通的网页没有相关的限制,就是在抓取csv文件的时候,有个数据的json接口,操作次数过多会封掉ip,但还是把csv文件抓取下来,供练习使用。抓取的的数据基本都在详情页,可随意提取数据,具体参考网址:https://www.feixiaohao.com/currencies/bitcoin/
声明:1. 本站所有资源来源于用户上传和网络,因此不包含技术服务请大家谅解!如有侵权请邮件联系客服!
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!