VHDL学习笔记

VHDL学习笔记

  • entity/architecture:
    • entity:实体,描述所设计的系统的外部接口信号,定义电路设计中所有的输入和输出端口
    • architecture:结构体 ,描述系统内部的结构和行为,理解为实体的派生类,实体只是给定了输入输出的信号类型,而具体输入输出的如何关联,是由architecture决定
    • generic:类属,对于实体外加一些属性定义???
  • 端口方向
    • in
    • out
    • inout
    • buffer
  • process:进程,进程间并行,进程内部串行计算,进程的参数表示进程对于哪一个信号的变化敏感

相关资源

VHDL英文版文档:https://www.nandland.com/vhdl/tutorials/index.html

 

交换友链~~~

RT >_<

博主大学生、计算机领域、原OIer,自认为计算机水平还是不错哒,希望和各类朋友多多交流,也希望与各位感兴趣的博主多多交流,当然,重点是想要交换一波友情链接,同类网站、个人网站优先。

另外,笔者想好好维护这个博客,并作为个人性质,所以谢绝任何有商业推广性质的网站交换友链哦~

联系QQ:519954392

邮箱:maohanyang789@163.com

mhy的量化笔记 之 老鼠仓识别

最近看到一个股市中非常有趣的趋势——老鼠仓

大概的原理可自行百度,而这只股票每隔10天左右会有一个约9%的下影线,换句话说,如果提前在当日开盘价-9%位置埋单,并且成交,就能在隔日获得几乎一个涨停的收益!

于是现在希望写一个脚本找出所有可能形成老鼠仓的股票——

满足老鼠仓的K线图满足如下要求:

  • 单日涨跌幅小于2%
  • 单日最低价大于-8%
  • 从跌2%到接近最低价所用时间小于5分钟
  • 整体平均值跌幅小于3%

在代码编写中,发现一个问题,即可能在分时图上股价并没有太大的变化,但在日K线中有一个很长的下影线,这是由于一单非理性限价单成交导致的。这也对于K线实际的意义产生了质疑。任意一个挂单都可以使K线产生巨大的形态变化,这应该能够成为操作市场的一个方式(如果我没有理解错的话)

代码

import tushare as ts
import time
import numpy as np
import logging
'''
通过识别“老鼠仓”实现的套利策略
'''

def check_rat(stock,date,end_day):
    logger = logging.getLogger(__name__)
    logger.info("Check Pit <%s,%s,%s>..."%(stock,date,end_day))
    if end_day:
        summary = get_hist_data_sp(stock,date)
        #summary = ts.get_hist_data(stock,date,date)
        if len(summary['low']) != 1:
            logger.warning("Error occur at <%s>"%stock)
            return False
        if (float(summary['low'][0])/float(summary['open'][0]) > 0.92):
            return False
        logger.debug("Big Hole...checked")
        if (abs(1.0-float(summary['open'][0])/float(summary['close'][0])) > .2):
            return False
        if (float(summary['high'][0])/float(summary['open']) > 1.2):
            return False
        logger.debug("Slightly fluctuating...checked")
        rt = get_tick_data(stock,date = date)
        rt = rt[['time','price','change']]
        low_value = np.min(rt['price'])
        if (low_value/float(summary['open'][0]) > 0.92):
            return False
        #rt.to_csv('save.csv',sep='\t')

        lower_bound = float(low_value) * 1.01
        upper_bound = float(summary['open'][0]) * 0.985
        pos_x = rt['time'].size-1
        pos_y = -1
        for i in range(rt['time'].size-1,-1,-1):
            if (rt['price'][i] > upper_bound):
                pos_x = i
            if (rt['price'][i] < lower_bound):
                pos_y = i
                break
        time_x = time.strptime('2000 '+rt['time'][pos_x],'%Y %H:%M:%S')
        time_y = time.strptime('2000 '+rt['time'][pos_y],'%Y %H:%M:%S')
        time_x = time.mktime(time_x)
        time_y = time.mktime(time_y)
        delta = time_y-time_x
        if (delta > 5 * 60):
            return False
        logger.debug("Fast down...checked")
        return True


def build_list():
    logger = logging.getLogger(__name__)
    info = ts.get_stock_basics()
    stocks = info.index
    for stock in stocks:
        realtime = ts.get_realtime_quotes(stock)
        logger.info("Begin Stock <%s>"%stock)
        check_result = True
        if (float(realtime['low']) < float(realtime['open'])*0.92):
            check_result = True
        if (check_result):
            recent =ts.get_k_data(stock)
            check_range = 80
            if (recent.size < 100):
                continue
            recentId = recent.index[-check_range:]
            for w in recentId:
                count = 0
                check_h_result = check_rat(stock,end_day = True, date = recent['date'][w])
                if (check_h_result):
                    logger.warning("Pit found <%s,%s>"%(stock,recent['date'][w]))
                    count += 1
            if (count > 2):
                logger.error("Pit Stock Found <%s,%s/%s>"%(stock,count,check_range))

效果:高频率的老鼠仓只存在于002072一支股票,也让我非常失望。

对于该算法的拓展,我又尝试获得所有影线较长,但是开盘收盘价相差较小的股票,最后发现识别出的股票本身都是在强烈的震荡情况,也就是说无法判断一个大幅下跌属于影线还是正常的大跌,也就没有太强的实用价值。

mhy的量化笔记 之 tushare数据缓存

Tushare是python下一个金融数据的开放接口,个人感觉里面信息非常多。

链接:http://tushare.org/index.html#

但是由于其本身没有支持下载数据,因此每一次调用都是在线获取数据,这就导致了速度不高,以及请求频率不能太高,因此希望能够写一个第一次在线获取数据,并保存在本地的脚本。第二次获取相同的数据,就直接在本地数据库中查找。

当然,由于个人使用,只实现了ticks缓存以及日数据缓存。

实现过程中的主要难点如下:

  • python的logging模块,这是一个专门输出日志的模块,感觉老是会出现一些奇奇怪怪的bug,现在还没弄清楚……
  • pymongo的若干规则
    • 数据插入
    • 区间询问
  • Pandas的DataFrame类型构造方式:这里使用dict套list构造
  • python的datatime模块:datetime.timedelta使用
from pymongo import MongoClient
import json
import tushare as ts
import pandas as pd
import logging

client = MongoClient('127.0.0.1', port=27017)
db = client.tushare_database

def clear_database():
    db.drop_collection('tick_data')

def get_tick_data(stock,date):
    logger = logging.getLogger(__name__)
    collection = db.tick_data
    ww = collection.find_one({'stock':stock,'date':date})
    if (ww == None):
        logger.debug("ReFetch stock <%s,%s>",stock,date)
        df = ts.get_tick_data(stock,date=date)
        store_data = json.loads(df.to_json(orient='records'))
        for i in range(0,len(store_data)):
            store_data[i]['stock'] = stock
            store_data[i]['date'] = date
            collection.insert(store_data[i])
    ww = collection.find_one({'stock':stock,'date':date})
    result = dict()
    for w in ww.keys():
        if (w=='_id'):
            continue
        result[w] = []
    for item in collection.find({'stock':stock,'date':date}):
        for w in item.keys():
            if (w=='_id'):
                continue
            result[w].append(item[w])
    result = pd.DataFrame(result)
    return result

if __name__ == '__main__':
    clear_database()
    stock = '600848'
    date = '2014-12-22'
    result = get_tick_data(stock,date)
import pymongo
import json
import tushare as ts
import pandas as pd
import logging
import datetime
import time
client = MongoClient('127.0.0.1', port=27017)
db = client.tushare_database


def clear_database():
    db.drop_collection('hist_data')

def get_hist_data_day(stock,start,end):
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    logger.debug("CALL get_hist_data_day <%s,%s,%s>",stock,start,end)

    collection = db.hist_data
    result = []
    mx_time = None
    mn_time = None
    for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.ASCENDING).limit(1):
        mn_time = bar['date']
    for bar in collection.find({'stock':stock,'ktype':'D'}).sort("date",pymongo.DESCENDING).limit(1):
        mx_time = bar['date']

    if mx_time != None and mx_time < start:
        _start = start
        start = mx_time
    tt = end
    delta_time = datetime.timedelta(days=20)
    start_time = datetime.datetime.strptime(start,'%Y-%m-%d')
    end_time = datetime.datetime.strptime(end,'%Y-%m-%d')

    #若数据库非空,则从数据库生成表头
    www = collection.find_one({'stock':stock,'ktype':'D'})
    if www != None:
        result = dict()
        for w in www.keys():
            if (w=='_id'):
                continue
            result[w] = []
        header = pd.DataFrame(result)
        df = header
        current_time = end_time
    else:
        df = ts.get_hist_data(stock,end,end)
        current_time = end_time-datetime.timedelta(days=1)

    #遍历时间区间,并且将数据库中缺失部分补充完整
    while current_time >= start_time:
        prev_time = current_time - datetime.timedelta(days=19)
        if mn_time == None or (not (prev_time.strftime('%Y-%m-%d') >= mn_time
            and current_time.strftime('%Y-%m-%d') <= mx_time)):
            dff = ts.get_hist_data(stock,
                    prev_time.strftime('%Y-%m-%d'),
                    current_time.strftime('%Y-%m-%d'))
            df = df.append(dff)
        current_time = current_time - datetime.timedelta(days=20)

    store_data = json.loads(df.to_json(orient='records'))
    for i in range(0,len(store_data)):
        www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':'D'})
        if www != None:
            continue
        store_data[i]['stock'] = stock
        store_data[i]['date'] = df.index[i]
        store_data[i]['ktype'] = 'D'
        collection.insert(store_data[i])

    www = collection.find_one()
    result = dict()
    for w in www.keys():
        if (w=='_id'):
            continue
        result[w] = []
    for item in collection.find({'stock':stock,'ktype':'D','date':{'lte':end,'gte':start}}):
        for w in item.keys():
            if (w=='_id'):
                continue
            result[w].append(item[w])
    result = pd.DataFrame(result)
    return result



def get_hist_data_sp(stock,date,ktype='D'):
    logger = logging.getLogger(__name__)
    logger.debug("CALL get_hist_data_sp <%s,%s,%s>",stock,date,ktype)
    collection = db.hist_data
    ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype})
    if (ww == None):
        logger.debug("ReFetch... <%s,%s>",stock,date)
        df = ts.get_hist_data(stock,start = date,ktype=ktype)
        store_data = json.loads(df.to_json(orient='records'))
        for i in range(0,len(store_data)):
            www = collection.find_one({'stock':stock,'date':df.index[i],'ktype':ktype})
            if (www!=None):
                continue
            store_data[i]['stock'] = stock
            store_data[i]['date'] = df.index[i]
            store_data[i]['ktype'] = ktype
            collection.insert(store_data[i])
    else:
        logger.debug("Use cache..")
    ww = collection.find_one({'stock':stock,'date':date,'ktype':ktype})
    if ww == None:
        ww = collection.find_one()
    result = dict()
    for w in ww.keys():
        if (w=='_id'):
            continue
        result[w] = []
    for item in collection.find({'stock':stock,'date':date,'ktype':ktype}):
        for w in item.keys():
            if (w=='_id'):
                continue
            result[w].append(item[w])
    result = pd.DataFrame(result)
    return result

if __name__ == '__main__':
    #clear_database()
    stock = '002082'
    date = '2016-10-30'
    result = get_hist_data_day(stock,'2016-11-01','2017-01-01')