# -*- coding:utf-8 -*-
"""
@author:code37
@file:crawler_stable.py
@time:2018/4/1815:20
"""
import asyncio
import aiohttp
import async_timeout
import pandas as pd
import time
import os
from pandas.compat import StringIO
from tushare.stock import cons as ct
from factorset.Util.configutil import GetConfig
def balance_sheet_url(code):
url = ct.SINA_BALANCESHEET_URL % code[0:6]
return url
def profit_statement_url(code):
url = ct.SINA_PROFITSTATEMENT_URL % code[0:6]
return url
def cash_flow_url(code):
url = ct.SINA_CASHFLOW_URL % code[0:6]
return url
[docs]class FundCrawler(object):
"""
**FundCrawler类,协程爬取基本面数据**
"""
def __init__(self, TYPE):
"""
:param TYPE: 'BS', 'IS', 'CF'
"""
############ SETTING #############
self.config = GetConfig()
self.TYPE = TYPE # 'BS', 'IS', 'CF'
self.MONGO = self.config.MONGO
self.CSV = self.config.CSV
self.RAW = False
self.outdir = self.config.fund_dir
self.encode = self.config.encode
self.proxypool = self.config.proxypool
############ CHANGE ABOVE SETTING #############
if self.MONGO:
from arctic import Arctic
# mongod --dbpath D:/idwzx/project/arctic
a = Arctic(self.config.ahost)
a.initialize_library('ashare_{}'.format(self.TYPE))
self.lib = a['ashare_{}'.format(self.TYPE)]
self.result_dict = {}
[docs] async def get_proxy(self):
"""
获取proxy
:return: proxy地址
"""
async with aiohttp.ClientSession() as session:
async with session.get("http://{}/get/".format(self.proxypool)) as response:
proxy_str = await response.text()
return "http://{}".format(proxy_str)
[docs] async def fetch(self, queue, session, url, ticker):
"""
单个ticker基本面爬取
:param queue: ticker 队列
:param session: aiohttp.ClientSession()
:param url: 股票基本面爬取地址
:param ticker: 股票代码
:return: 基本面数据text
"""
proxy_url = await self.get_proxy()
print('proxy: ' + proxy_url)
# ticker = url.split('/')[-3]
try:
async with async_timeout.timeout(15):
async with session.get(url, proxy=proxy_url, allow_redirects=True) as resp:
if resp.status == 200 or 201:
return await resp.text()
else:
await queue.put(ticker)
except Exception as e:
print(e)
print('Put {} in queue!'.format(ticker))
await queue.put(ticker)
[docs] async def consume(self, queue):
"""
消费直到任务结束
:param queue: ticker 队列
:return: None
"""
while True:
ticker = await queue.get()
async with aiohttp.ClientSession() as session:
if self.TYPE == 'BS':
target_url = balance_sheet_url(ticker)
elif self.TYPE == 'IS':
target_url = profit_statement_url(ticker)
elif self.TYPE == 'CF':
target_url = cash_flow_url(ticker)
else:
raise Exception
html = await self.fetch(queue, session, target_url, ticker)
if html:
self.result_dict[ticker] = html
else:
queue.put(ticker)
queue.task_done()
[docs] async def run(self, queue, max_tasks):
"""
Schedule the consumer
:param queue: ticker 队列
:param max_tasks: 最大协程数
:return: None
"""
tasks = [asyncio.ensure_future(self.consume(queue)) for _ in range(max_tasks)]
await queue.join()
for w in tasks:
w.cancel()
[docs] def write_to_MongoDB(self, symbol, df, source='Tushare'):
"""
:param symbol: ticker
:param df: 单个ticker基本面数据,pd.DataFrame
:param source: 注释表明来源,str,默认为'Tushare'
:return: None
"""
try:
self.lib.write(symbol, df, metadata={'source': source})
print(symbol + '写入完成')
except Exception as e:
print("Failed for ", str(e))
[docs] def data_clean(self, text):
"""
text数据清洗
:param text: 协程爬取的text数据
:return: pd.DataFrame
"""
text = text.replace('\t\n', '\r\n')
text = text.replace('\t', ',')
df = pd.read_csv(StringIO(text), dtype={'code': 'object'})
df = df.T.drop_duplicates()
df.rename(columns=df.iloc[0], inplace=True)
df.drop(df.index[0], inplace=True)
df.index = pd.to_datetime(df.index)
df.index.name = 'date'
return df
[docs] def main(self, Ashare, num=10, retry=2):
"""
协程爬取主程序
:param Ashare: 带爬取tickers
:type Ashare: list
:param num: 最大协程数
:type num: int
:param retry: 重启次数
:type retry: int
:return: None
"""
fail = []
asyncio.set_event_loop(asyncio.new_event_loop())
for i in range(retry):
if fail:
# Global event loop closed last time
Ashare = list(set(fail))
# Reset fail
fail = []
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
for ticker in Ashare:
queue.put_nowait(ticker)
# Check if loop is closed, if it is, then break
if loop.is_closed():
return
loop.run_until_complete(self.run(queue, num))
loop.close()
if not os.path.exists(self.outdir):
os.mkdir(self.outdir)
if self.RAW:
try:
pd.DataFrame([self.result_dict]).to_csv(os.path.abspath("./fund/raw.csv"), encoding=self.encode)
except Exception as e:
print(e)
for key in self.result_dict:
try:
df = self.data_clean(self.result_dict[key])
if len(df.values) == 0:
fail.append(key)
if self.MONGO:
self.write_to_MongoDB(key, df)
if self.CSV:
df.to_csv(os.path.abspath("{}/{}_{}.csv".format(self.outdir, self.TYPE, key)), encoding=self.encode)
print("{0}写入{1}条数据,".format(key, len(df)))
except Exception as e:
print(e)
fail.append(key)
print("{}写入失败".format(key))
if fail:
print("{}表中{}爬取失败".format(self.TYPE, fail))
else:
print("{}表数据导入成功!".format(self.TYPE))
if __name__ == '__main__':
start = time.time()
import os
from factorset.data.OtherData import code_to_symbol
from factorset.data import CSVParser as cp
import tushare as ts
# allAshare = pd.read_csv(os.path.abspath('./allAShare.csv'))
# allAshare = allAshare['0']
hs300 = ts.get_hs300s()
hs300.code = hs300.code.apply(code_to_symbol)
# 爬取沪深300还未存入的数据
Ashare = list(set(hs300.code.tolist()) - set(cp.all_fund_symbol(os.path.abspath('.'), 'IS')))
# BS表内时间有重复
# Ashare = ['300671.SZ', '002886.SZ', '300696.SZ', '603055.SH', '300670.SZ', '300692.SZ',
# '002889.SZ', '603882.SH', '603801.SH', '603938.SH', '300687.SZ', '603535.SH', '603043.SH']
# BS时间有重复且值不相同(招股说明与申报稿)
# Ashare = ['002886.SZ', '300696.SZ', '603938.SH', '300692.SZ', '300670.SZ', '603882.SH']
# IS时间有重复且值不相同(招股说明与申报稿)
# Ashare = ['002886.SZ', '300696.SZ', '300670.SZ', '300692.SZ', '603055.SH', '603938.SH', '603882.SH']
# CF时间有重复且值不相同(招股说明与申报稿)
# Ashare = ['002386.SZ', '603882.SH', '603018.SH', '300671.SZ', '603938.SH', '300537.SZ', '300670.SZ' ,
# '002086.SZ', '000568.SZ', '600612.SH', '300696.SZ', '600552.SH', '300687.SZ', '600983.SH', '002889.SZ',
# '603801.SH', '300692.SZ', '603055.SH', '002886.SZ', '002852.SZ', '603505.SH', '300365.SZ', '603535.SH',
# '300214.SZ', '300135.SZ', '603043.SH']
FundCrawler('BS').main(Ashare, num=20)
print(time.time()-start)