Tushare是python下一个金融数据的开放接口,个人感觉里面信息非常多。
链接:http://tushare.org/index.html#
但是由于其本身没有支持下载数据,因此每一次调用都是在线获取数据,这就导致了速度不高,以及请求频率不能太高,因此希望能够写一个第一次在线获取数据,并保存在本地的脚本。第二次获取相同的数据,就直接在本地数据库中查找。
当然,由于个人使用,只实现了ticks缓存以及日数据缓存。
实现过程中的主要难点如下:
- python的logging模块,这是一个专门输出日志的模块,感觉老是会出现一些奇奇怪怪的bug,现在还没弄清楚……
- pymongo的若干规则
- Pandas的DataFrame类型构造方式:这里使用dict套list构造
- python的datatime模块:datetime.timedelta使用
from pymongo import MongoClient
import json
import tushare as ts
import pandas as pd
import logging
client = MongoClient('127.0.0.1', port=27017)
db = client.tushare_database
def clear_database():
db.drop_collection('tick_data')
def get_tick_data(stock,date):
logger = logging.getLogger(__name__)
collection = db.tick_data
ww = collection.find_one({'stock':stock,'date':date})
if (ww == None):
logger.debug("ReFetch stock <%s,%s>",stock,date)
df = ts.get_tick_data(stock,date=date)
store_data = json.loads(df.to_json(orient='records'))
for i in range(0,len(store_data)):
store_data[i]['stock'] = stock
store_data[i]['date'] = date
collection.insert(store_data[i])
ww = collection.find_one({'stock':stock,'date':date})
result = dict()
for w in ww.keys():
if (w=='_id'):
continue
result[w] = []
for item in collection.find({'stock':stock,'date':date}):
for w in item.keys():
if (w=='_id'):
continue
result[w].append(item[w])
result = pd.DataFrame(result)
return result
if __name__ == '__main__':
clear_database()
stock = '600848'
date = '2014-12-22'
result = get_tick_data(stock,date)
import pymongo
import json
import tushare as ts
import pandas as pd
import logging
import datetime
import time
client = MongoClient('127.0.0.1', port=27017)
db = client.tushare_database
def clear_database():
db.drop_collection('hist_data')
def get_hist_data_day(stock,start,end):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.debug("CALL get_hist_data_day <%s,%s,%s>",stock,start,end)
collection = db.hist_data
result = []
mx_time = None
mn_time = None
for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.ASCENDING).limit(1):
mn_time = bar['date']
for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.DESCENDING).limit(1):
mx_time = bar['date']
if mx_time != None and mx_time < start:
_start = start
start = mx_time
tt = end
delta_time = datetime.timedelta(days=20)
start_time = datetime.datetime.strptime(start,'%Y-%m-%d')
end_time = datetime.datetime.strptime(end,'%Y-%m-%d')
#若数据库非空,则从数据库生成表头
www = collection.find_one({'stock':stock,'ktype':'D'})
if www != None:
result = dict()
for w in www.keys():
if (w=='_id'):
continue
result[w] = []
header = pd.DataFrame(result)
df = header
current_time = end_time
else:
df = ts.get_hist_data(stock,end,end)
current_time = end_time-datetime.timedelta(days=1)
#遍历时间区间,并且将数据库中缺失部分补充完整
while current_time >= start_time:
prev_time = current_time - datetime.timedelta(days=19)
if mn_time == None or (not (prev_time.strftime('%Y-%m-%d') >= mn_time
and current_time.strftime('%Y-%m-%d') <= mx_time)):
dff = ts.get_hist_data(stock,
prev_time.strftime('%Y-%m-%d'),
current_time.strftime('%Y-%m-%d'))
df = df.append(dff)
current_time = current_time - datetime.timedelta(days=20)
store_data = json.loads(df.to_json(orient='records'))
for i in range(0,len(store_data)):
www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':'D'})
if www != None:
continue
store_data[i]['stock'] = stock
store_data[i]['date'] = df.index[i]
store_data[i]['ktype'] = 'D'
collection.insert(store_data[i])
www = collection.find_one()
result = dict()
for w in www.keys():
if (w=='_id'):
continue
result[w] = []
for item in collection.find({'stock':stock,'ktype':'D','date':{'
gte':start}}):
for w in item.keys():
if (w=='_id'):
continue
result[w].append(item[w])
result = pd.DataFrame(result)
return result
def get_hist_data_sp(stock,date,ktype='D'):
logger = logging.getLogger(__name__)
logger.debug("CALL get_hist_data_sp <%s,%s,%s>",stock,date,ktype)
collection = db.hist_data
ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype})
if (ww == None):
logger.debug("ReFetch... <%s,%s>",stock,date)
df = ts.get_hist_data(stock,start = date,ktype=ktype)
store_data = json.loads(df.to_json(orient='records'))
for i in range(0,len(store_data)):
www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':ktype})
if (www!=None):
continue
store_data[i]['stock'] = stock
store_data[i]['date'] = df.index[i]
store_data[i]['ktype'] = ktype
collection.insert(store_data[i])
else:
logger.debug("Use cache..")
ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype})
if ww == None:
ww = collection.find_one()
result = dict()
for w in ww.keys():
if (w=='_id'):
continue
result[w] = []
for item in collection.find({'stock':stock,'date':date,'ktype':ktype}):
for w in item.keys():
if (w=='_id'):
continue
result[w].append(item[w])
result = pd.DataFrame(result)
return result
if __name__ == '__main__':
#clear_database()
stock = '002082'
date = '2016-10-30'
result = get_hist_data_day(stock,'2016-11-01','2017-01-01')