mhy的量化笔记 之 基于LSTM的股市趋势预测

思路

经过一段时间对LSTM的学习,大概了解了LSTM的运作规律。因此尝试运用LSTM预测股市走势。

参考:文章A  文章B

这篇文章是对于该方法的一种初步尝试,然而问题显而易见。

  1. 该算法将指数数据未经任何处理就丢到了LSTM里面,这是“不可能”学习出任何知识的。由于神经网络类机器学习算法是基于线性组合以及非线性函数,因此能够学习出“较大”,“正”,“上升”这一类描述,但无法学习出“区间最大值”,“日线高点出现”,“指数线交叉”。所以我们需要做的应该是将这类信息人工处理后丢入LSTM!

尝试1:上证指数K线4值

直接输入上证K线的open,close,high,low,预测1/5/10日收盘价涨跌.

正确率最后能够到达65%左右。

奇怪的是用GRU和LSTM效果没有显著的差别,感觉远期的信息并没有被学习进去……

全部都学成跌了,当然没有差别咯……算了一下,MCC相关系数仅有0.02,等于随机预测.

由于之前使用的是涨跌幅的log的平方和作为loss函数,可能确实在精度上也会有问题。

是不是将label设为涨跌幅log平方的符号函数会更好一些呢?

最后,模型准确率平均在51%左右,且在训练过程中有时候会跳到49%,相关性也会在有些时候呈现负相关,具体来说就是没有训练处结果。

尝试2:加入更多数据

测试数据

随机选择股票

000416 000996 600096 600291 600695 000416 000996 600096 600291 600695 000951 300143  600155 600354 600785

收集2010-2017的日线数据进行分为训练集与测试集。

指标选择

由最初的四个数据拓展到更多的指标

  • open 开盘价
  • close 收盘价
  • high 最高价
  • low 最低价
  • volume 成交量
  • sh 上证指数
  • sz  深证成指
  • hs300 沪深300
  • cyb 创业板指数
  • ma_5 5日均线
  • ma_10 10日均线
  • ma_20 20日均线
  • ma_60 60日均线
  • macd_dif macd短线
  • macd_ema macd长线
  • macd_macd macd柱状线
  • rsi rsi指数
  • adx adx指数
  • cci cci指数

将指数分组进行归一化,例如将ochl和均线值统一进行归一化,指数独自归一化。

分类函数

之前是将误差平方和作为loss函数,后来发现这在二分类问题中非常的不优美。

对于典型的二分类问题

loss =- (X * log(X’) + (1-X)*log(1-X’))

作为误差函数会更加容易收敛。

分组训练

之前的分组训练是直接取相邻区间,仔细想想确实bug非常严重,本身分batch就是为了防止出现边界数据,导致将梯度影响错误的方向。这里我们改成使用随机batch_size个数据使得训练表现好了一些

优化指标

之前指标发现实际上大盘数据对于一只股票并没有太大的用处,因为我不知道这个股票的类型,因此考虑增加:

  • is_sh 是否为沪市
  • is_sz 是否为深市
  • is_cyz 是否为创业板

至此,已经可以将模型准确率提高至53%

修改分类函数

将之前的二分类改为三分类,其中表示随机波动的分类0产生的所有结果不计入loss函数。准确率提高至55%,不过这个55%的含义与之前不同,这里计算的只是非零分类的准确率。

 

Tushare数据自动缓存

tushare是一个财经数据获取库,内置若干财经数据的获取爬虫,将各种数据的爬取封装在了一起。但是tushare有一个很大的缺点,就是其抓取的数据无法缓存在本地。故考虑实现一组自己的api用以缓存tushare数据。

import tushare as ts
industry_info = ts.get_industry_classified()
profit_info = ts.get_profit_data(2017,1)

这是python中获取tushare数据的典型例子,我们考虑实现一个简单的装饰函数,将函数调用以及调用的参数进行记忆化!

ts.get_industry_classified = data_cacher(ts.get_industry_classified)
ts.get_profit_data = data_cacher(ts.get_profit_data)

industry_info = ts.get_industry_classified()
profit_info = ts.get_profit_data(2017,1)

这里面data_cacher函数包装后的ts.api增加了将返回值保存为以“函数名”、“参数”命名的文件中,供下次调用直接使用。

最后,如何实现data_cacher函数呢?直接上代码:

import attr
import pandas as pd

def data_cacher(func):
    def inner_function(*args,**kwargs):
        path = 'data/'+func.__name__
        for w in args:
            path += '&'+str(w)
        for w in kwargs:
            path += '&'+w+'='+str(kwargs[w])
        try:
            data = pd.DataFrame().from_csv(path)
            print("Use cache :",path)
        except FileNotFoundError:
            if 'fast_mode' in kwargs.keys() and kwargs['fast_mode'] == True:
                return pd.DataFrame()
            elif 'fast_mode' in kwargs.keys():
                del kwargs['fast_mode']
            print("Online => ",path)
            data = func(*args,**kwargs)
            #print(data,type(data),len(data))
            if len(data) == 0:
                print("Empty...")
                return data
            data.to_csv(path)
            data = pd.DataFrame().from_csv(path)
        return data
    return inner_function

 

 

RNN学习笔记 之 利用tensorflow-lstm实现sin预测

RNN用于处理不定长的序列数据,而简谐波预测就是一个很好的例子,显然,对于机器学习,sin(x)、cos(x)、a*sin(x) + b*cos(x)甚至x*sin(x)都没有任何难度上的差别。

那么我们就用RNN来预测吧!

(CNMB……由于一个偶然的错误,我发现这个问题直接线性规划类型的神经网络就可以很精确的计算出来!!!而且LSTM根本比不上!!!)

考虑

sin(a(k-1)+b)、sin(ak+b) 和sin(a(k+1)+b)的关系

sin(a(k-1)+b) = sin(ak+b-a) = sin(ak+b)cos(a) – cos(ak+b)sin(a)

sin(a(k+1)+b) = sin(ak+b+a) = sin(ak+b)cos(a) + cos(ak+b)sin(a)

sin(a(k+1)+b) = -sin(a(k-1)+b) + 2sin(ak+b)cos(a)

其中由于a我取的是整数,所以非常容易估计,于是实际上sin数列的每一项可以由前两项线性表示出来……

即使a不是整数,也可通过三角带换,得到近似于线性表达的公式

但是现在就出现这样一个问题。

既然是如此简单的问题,LSTM的表现为啥这么渣呢???

原因可能如下:

  1. LSTM变量太多了,训练会出现梯度消失的问题
  2. 使用LSTM,倒数第二项作用于倒数第一项,中间有若干非线性层,导致本来非常简单的线性组合变得非常难以模拟了!
  3. 写错了……

脑残的把最后一层加了一个sigmoid函数,然而并不知道sigmoid的返回值是一个正数,并不包含sin(x)的值域区间

import tensorflow as tf
import numpy as np
import talib
import random
from matplotlib import pyplot

max_length = 20
total_branches = 10010
batch_size = 31
hidden_size = 16
start_position = 10
test_batch_index = 0
num_input_classes = 1
difficulity = 10
total_layers = 1

test_x = None
test_y = None
test_seqlen = None

def gen_sine_wave(length):
    L = random.randint(1,difficulity)
    R = random.randint(1,difficulity)
    if L>R:
        L,R = R,L
    x = np.linspace(L,R,length+1)
    y = np.sin(x)
    return y[:-1],y[-1]

def generate_batches():
    global test_x,test_y,test_seqlen
    test_x = []
    test_y = []
    test_seqlen = []
    test_batch_len = 0
    for i in range(0,total_branches):
        length = random.randint(start_position,max_length)
        x,y = gen_sine_wave(length)
        x = np.append(np.array([t for t in x]),np.array([0 for i in range(max_length-length)]))
        x = np.reshape(x,[max_length,num_input_classes])
        test_seqlen.append(length)
        test_x.append(x)
        test_y.append(y)

def next_batch(total_branches = total_branches):
    global test_batch_index,test_x,test_y,test_seqlen
    if test_batch_index == total_branches:
        test_batch_index = 0
    xs = test_x[test_batch_index:min(test_batch_index+batch_size,total_branches)]
    ys = test_y[test_batch_index:min(test_batch_index+batch_size,total_branches)]
    seqlens = test_seqlen[test_batch_index:min(test_batch_index+batch_size,total_branches)]
    test_batch_index = min(test_batch_index + batch_size,total_branches)
    return xs,ys,seqlens


if __name__ == '__main__':
    generate_batches()
    #sess = tf.InteractiveSession()
    input_data = tf.placeholder(tf.float32,[None,max_length,num_input_classes])
    input_seqlen = tf.placeholder(tf.int32,[None])
    input_labels = tf.placeholder(tf.float32,[None])
    keep_prob = tf.placeholder(tf.float32)
    current_batch_size = tf.shape(input_data)[0]

    with tf.name_scope('lstm_layer'):
        method = 'lstm'
        data = input_data
        print(data.shape)#10*20*1
        if method == 'lstm':
            def create_lstm():
                lstm_cell = tf.contrib.rnn.BasicLSTMCell(hidden_size,state_is_tuple = True)
                #lstm_cell = tf.nn.rnn_cell.DropoutWrapper(lstm_cell, output_keep_prob=keep_prob,state_keep_prob=keep_prob)
                return lstm_cell
            lstm_cell = create_lstm()
            #lstm_cell = tf.nn.rnn_cell.MultiRNNCell([create_lstm() for i in range(total_layers)], state_is_tuple=True)
            #state = lstm_cell.zero_state(current_batch_size,tf.float32)
            data = tf.unstack(data,axis=1)
            outputs,state = tf.nn.static_rnn(lstm_cell,data,dtype=tf.float32,sequence_length=input_seqlen)
            outputs = tf.stack(outputs,axis=1)
            #outputs =  tf.transpose(outputs,[1,0,2])
        else:
            gru_cell = tf.contrib.rnn.GRUCell(hidden_size)
            outputs,state = tf.nn.dynamic_rnn(gru_cell,data,dtype=tf.float32,sequence_length=input_seqlen)

        print(outputs.shape)
        output = tf.gather_nd(outputs, tf.stack([tf.range(current_batch_size), input_seqlen-1], axis=1))

    with tf.name_scope('final_layer'):
        final_w = tf.Variable(tf.truncated_normal([hidden_size,1],.1))
        tf.summary.histogram('weight',final_w)
        final_b = tf.Variable(tf.constant(.1,shape=[1]))
        tf.summary.histogram('bias',final_b)

    with tf.name_scope('output_layer'):
        result = tf.tanh(tf.matmul(output,final_w) + final_b)
        print(result.shape)
        result = tf.reshape(result,[-1])
        loss = tf.reduce_mean(tf.losses.mean_squared_error(labels = input_labels,predictions=result))
        tf.summary.scalar('loss',loss)

    optimizer = tf.train.AdamOptimizer().minimize(loss)

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())
        merged = tf.summary.merge_all()
        writer = tf.summary.FileWriter("logs/",sess.graph)

        steps = 0
        while True:
            batch_x , batch_y, batch_seqlen = next_batch()
            sess.run(optimizer, feed_dict={input_data:batch_x,
                input_labels:batch_y,
                input_seqlen:batch_seqlen,
                keep_prob:.95})
            if steps%50 == 0:
                print(steps,sess.run(loss,feed_dict={input_data:batch_x,input_labels:batch_y,input_seqlen:batch_seqlen,keep_prob:1}))
                print("PREDICT",sess.run(result,feed_dict={input_data:batch_x,input_labels:batch_y,input_seqlen:batch_seqlen,keep_prob:1}))
                print("LABEL",sess.run(input_labels,feed_dict={input_data:batch_x,input_labels:batch_y,input_seqlen:batch_seqlen,keep_prob:1}))
                merged_result = sess.run(merged, feed_dict={input_data:batch_x,
                    input_labels: batch_y,
                    input_seqlen: batch_seqlen,
                    keep_prob:1})
                writer.add_summary(merged_result,steps)
            steps += 1
            if steps%1000 == 0:
                batch_x , batch_y, batch_seqlen = next_batch()
                current_result = sess.run(result,feed_dict={input_data:batch_x,input_seqlen:batch_seqlen,keep_prob:1})
                X = batch_x[0][:batch_seqlen[0]]
                Y = current_result[0]
                Z = batch_y[0]
                plt = pyplot.figure()
                pyplot.plot(np.linspace(1,len(X),len(X)),X)
                pyplot.plot(len(X)+1,Y,'r+')
                pyplot.plot(len(X)+1,Z,'r^')
                pyplot.show()