mhy的量化笔记 之 tushare数据缓存

Tushare是python下一个金融数据的开放接口,个人感觉里面信息非常多。

链接:http://tushare.org/index.html#

但是由于其本身没有支持下载数据,因此每一次调用都是在线获取数据,这就导致了速度不高,以及请求频率不能太高,因此希望能够写一个第一次在线获取数据,并保存在本地的脚本。第二次获取相同的数据,就直接在本地数据库中查找。

当然,由于个人使用,只实现了ticks缓存以及日数据缓存。

实现过程中的主要难点如下:

  • python的logging模块,这是一个专门输出日志的模块,感觉老是会出现一些奇奇怪怪的bug,现在还没弄清楚……
  • pymongo的若干规则
    • 数据插入
    • 区间询问
  • Pandas的DataFrame类型构造方式:这里使用dict套list构造
  • python的datatime模块:datetime.timedelta使用
from pymongo import MongoClient
import json
import tushare as ts
import pandas as pd
import logging

client = MongoClient('127.0.0.1', port=27017)
db = client.tushare_database

def clear_database():
    db.drop_collection('tick_data')

def get_tick_data(stock,date):
    logger = logging.getLogger(__name__)
    collection = db.tick_data
    ww = collection.find_one({'stock':stock,'date':date})
    if (ww == None):
        logger.debug("ReFetch stock <%s,%s>",stock,date)
        df = ts.get_tick_data(stock,date=date)
        store_data = json.loads(df.to_json(orient='records'))
        for i in range(0,len(store_data)):
            store_data[i]['stock'] = stock
            store_data[i]['date'] = date
            collection.insert(store_data[i])
    ww = collection.find_one({'stock':stock,'date':date})
    result = dict()
    for w in ww.keys():
        if (w=='_id'):
            continue
        result[w] = []
    for item in collection.find({'stock':stock,'date':date}):
        for w in item.keys():
            if (w=='_id'):
                continue
            result[w].append(item[w])
    result = pd.DataFrame(result)
    return result

if __name__ == '__main__':
    clear_database()
    stock = '600848'
    date = '2014-12-22'
    result = get_tick_data(stock,date)
import pymongo
import json
import tushare as ts
import pandas as pd
import logging
import datetime
import time
client = MongoClient('127.0.0.1', port=27017)
db = client.tushare_database


def clear_database():
    db.drop_collection('hist_data')

def get_hist_data_day(stock,start,end):
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    logger.debug("CALL get_hist_data_day <%s,%s,%s>",stock,start,end)

    collection = db.hist_data
    result = []
    mx_time = None
    mn_time = None
    for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.ASCENDING).limit(1):
        mn_time = bar['date']
    for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.DESCENDING).limit(1):
        mx_time = bar['date']

    if mx_time != None and mx_time < start:
        _start = start
        start = mx_time
    tt = end
    delta_time = datetime.timedelta(days=20)
    start_time = datetime.datetime.strptime(start,'%Y-%m-%d')
    end_time = datetime.datetime.strptime(end,'%Y-%m-%d')

    #若数据库非空,则从数据库生成表头
    www = collection.find_one({'stock':stock,'ktype':'D'})
    if www != None:
        result = dict()
        for w in www.keys():
            if (w=='_id'):
                continue
            result[w] = []
        header = pd.DataFrame(result)
        df = header
        current_time = end_time
    else:
        df = ts.get_hist_data(stock,end,end)
        current_time = end_time-datetime.timedelta(days=1)

    #遍历时间区间,并且将数据库中缺失部分补充完整
    while current_time >= start_time:
        prev_time = current_time - datetime.timedelta(days=19)
        if mn_time == None or (not (prev_time.strftime('%Y-%m-%d') >= mn_time
            and current_time.strftime('%Y-%m-%d') <= mx_time)):
            dff = ts.get_hist_data(stock,
                    prev_time.strftime('%Y-%m-%d'),
                    current_time.strftime('%Y-%m-%d'))
            df = df.append(dff)
        current_time = current_time - datetime.timedelta(days=20)

    store_data = json.loads(df.to_json(orient='records'))
    for i in range(0,len(store_data)):
        www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':'D'})
        if www != None:
            continue
        store_data[i]['stock'] = stock
        store_data[i]['date'] = df.index[i]
        store_data[i]['ktype'] = 'D'
        collection.insert(store_data[i])

    www = collection.find_one()
    result = dict()
    for w in www.keys():
        if (w=='_id'):
            continue
        result[w] = []
    for item in collection.find({'stock':stock,'ktype':'D','date':{'lte':end,'gte':start}}):
        for w in item.keys():
            if (w=='_id'):
                continue
            result[w].append(item[w])
    result = pd.DataFrame(result)
    return result



def get_hist_data_sp(stock,date,ktype='D'):
    logger = logging.getLogger(__name__)
    logger.debug("CALL get_hist_data_sp <%s,%s,%s>",stock,date,ktype)
    collection = db.hist_data
    ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype})
    if (ww == None):
        logger.debug("ReFetch... <%s,%s>",stock,date)
        df = ts.get_hist_data(stock,start = date,ktype=ktype)
        store_data = json.loads(df.to_json(orient='records'))
        for i in range(0,len(store_data)):
            www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':ktype})
            if (www!=None):
                continue
            store_data[i]['stock'] = stock
            store_data[i]['date'] = df.index[i]
            store_data[i]['ktype'] = ktype
            collection.insert(store_data[i])
    else:
        logger.debug("Use cache..")
    ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype})
    if ww == None:
        ww = collection.find_one()
    result = dict()
    for w in ww.keys():
        if (w=='_id'):
            continue
        result[w] = []
    for item in collection.find({'stock':stock,'date':date,'ktype':ktype}):
        for w in item.keys():
            if (w=='_id'):
                continue
            result[w].append(item[w])
    result = pd.DataFrame(result)
    return result

if __name__ == '__main__':
    #clear_database()
    stock = '002082'
    date = '2016-10-30'
    result = get_hist_data_day(stock,'2016-11-01','2017-01-01')

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据