Tushare是python下一个金融数据的开放接口,个人感觉里面信息非常多。
链接:http://tushare.org/index.html#
但是由于其本身没有支持下载数据,因此每一次调用都是在线获取数据,这就导致了速度不高,以及请求频率不能太高,因此希望能够写一个第一次在线获取数据,并保存在本地的脚本。第二次获取相同的数据,就直接在本地数据库中查找。
当然,由于个人使用,只实现了ticks缓存以及日数据缓存。
实现过程中的主要难点如下:
- python的logging模块,这是一个专门输出日志的模块,感觉老是会出现一些奇奇怪怪的bug,现在还没弄清楚……
- pymongo的若干规则
- 数据插入
- 区间询问
- Pandas的DataFrame类型构造方式:这里使用dict套list构造
- python的datatime模块:datetime.timedelta使用
from pymongo import MongoClient import json import tushare as ts import pandas as pd import logging client = MongoClient('127.0.0.1', port=27017) db = client.tushare_database def clear_database(): db.drop_collection('tick_data') def get_tick_data(stock,date): logger = logging.getLogger(__name__) collection = db.tick_data ww = collection.find_one({'stock':stock,'date':date}) if (ww == None): logger.debug("ReFetch stock <%s,%s>",stock,date) df = ts.get_tick_data(stock,date=date) store_data = json.loads(df.to_json(orient='records')) for i in range(0,len(store_data)): store_data[i]['stock'] = stock store_data[i]['date'] = date collection.insert(store_data[i]) ww = collection.find_one({'stock':stock,'date':date}) result = dict() for w in ww.keys(): if (w=='_id'): continue result[w] = [] for item in collection.find({'stock':stock,'date':date}): for w in item.keys(): if (w=='_id'): continue result[w].append(item[w]) result = pd.DataFrame(result) return result if __name__ == '__main__': clear_database() stock = '600848' date = '2014-12-22' result = get_tick_data(stock,date)
import pymongo import json import tushare as ts import pandas as pd import logging import datetime import time client = MongoClient('127.0.0.1', port=27017) db = client.tushare_database def clear_database(): db.drop_collection('hist_data') def get_hist_data_day(stock,start,end): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.debug("CALL get_hist_data_day <%s,%s,%s>",stock,start,end) collection = db.hist_data result = [] mx_time = None mn_time = None for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.ASCENDING).limit(1): mn_time = bar['date'] for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.DESCENDING).limit(1): mx_time = bar['date'] if mx_time != None and mx_time < start: _start = start start = mx_time tt = end delta_time = datetime.timedelta(days=20) start_time = datetime.datetime.strptime(start,'%Y-%m-%d') end_time = datetime.datetime.strptime(end,'%Y-%m-%d') #若数据库非空,则从数据库生成表头 www = collection.find_one({'stock':stock,'ktype':'D'}) if www != None: result = dict() for w in www.keys(): if (w=='_id'): continue result[w] = [] header = pd.DataFrame(result) df = header current_time = end_time else: df = ts.get_hist_data(stock,end,end) current_time = end_time-datetime.timedelta(days=1) #遍历时间区间,并且将数据库中缺失部分补充完整 while current_time >= start_time: prev_time = current_time - datetime.timedelta(days=19) if mn_time == None or (not (prev_time.strftime('%Y-%m-%d') >= mn_time and current_time.strftime('%Y-%m-%d') <= mx_time)): dff = ts.get_hist_data(stock, prev_time.strftime('%Y-%m-%d'), current_time.strftime('%Y-%m-%d')) df = df.append(dff) current_time = current_time - datetime.timedelta(days=20) store_data = json.loads(df.to_json(orient='records')) for i in range(0,len(store_data)): www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':'D'}) if www != None: continue store_data[i]['stock'] = stock store_data[i]['date'] = df.index[i] store_data[i]['ktype'] = 'D' collection.insert(store_data[i]) www = collection.find_one() result = dict() for w in www.keys(): if (w=='_id'): continue result[w] = [] for item in collection.find({'stock':stock,'ktype':'D','date':{'gte':start}}): for w in item.keys(): if (w=='_id'): continue result[w].append(item[w]) result = pd.DataFrame(result) return result def get_hist_data_sp(stock,date,ktype='D'): logger = logging.getLogger(__name__) logger.debug("CALL get_hist_data_sp <%s,%s,%s>",stock,date,ktype) collection = db.hist_data ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype}) if (ww == None): logger.debug("ReFetch... <%s,%s>",stock,date) df = ts.get_hist_data(stock,start = date,ktype=ktype) store_data = json.loads(df.to_json(orient='records')) for i in range(0,len(store_data)): www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':ktype}) if (www!=None): continue store_data[i]['stock'] = stock store_data[i]['date'] = df.index[i] store_data[i]['ktype'] = ktype collection.insert(store_data[i]) else: logger.debug("Use cache..") ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype}) if ww == None: ww = collection.find_one() result = dict() for w in ww.keys(): if (w=='_id'): continue result[w] = [] for item in collection.find({'stock':stock,'date':date,'ktype':ktype}): for w in item.keys(): if (w=='_id'): continue result[w].append(item[w]) result = pd.DataFrame(result) return result if __name__ == '__main__': #clear_database() stock = '002082' date = '2016-10-30' result = get_hist_data_day(stock,'2016-11-01','2017-01-01')