米国株式市場 将来株価予測

テクニカル分析を駆使して成長銘柄を予言しよう!

賞金: 100,000 参加ユーザー数: 253 3年弱前に終了

[Keras]transformerを用いた予測モデル

はじめに

transformerを用いたモデルが使えるんじゃないかと思ってやってみましたが,あまりうまくいかなかったためそのコードを挙げておきます.

import os
import sys
import random
import pickle 

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from glob import glob

import tensorflow as tf
import tensorflow.keras.layers as L
import tensorflow.keras.models as M
import tensorflow.keras.backend as K
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau

! pip install --quiet tensorflow_addons
import tensorflow_addons as tfa


BASE_DIR = Path('/content/drive/MyDrive/competition/probspace-kabushiki')
DATA_DIR = BASE_DIR / 'input'
OUTPUT_DIR = BASE_DIR / ('exp' + input()) # 任意のディレクトリ


def get_trainingdata(train: pd.DataFrame, window=5, min_index=100):
    output_X = []
    output_y = []
    output_test = []
    train_array = train.values

    for i in range(min_index, len(train_array)):
        output_X.append(train_array[i-window:i])
        output_y.append(train_array[i:i+1])
    output_test.append(train_array[len(train_array)-window:len(train_array)])

    output_X = np.array(output_X).transpose(0, 2, 1)
    output_y = np.array(output_y).transpose(0, 2, 1)
    output_test = np.array(output_test).transpose(0, 2, 1)
    return output_X, output_y, output_test


def RMSE(y_true, y_pred):
    return np.sqrt(np.mean(np.square(y_true - y_pred)))


class TransformerBlock(L.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.0):
        super(TransformerBlock, self).__init__()
        self.att = L.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential(
            [L.Dense(ff_dim, activation="gelu"), L.Dense(embed_dim),]
        )
        self.layernorm1 = L.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = L.LayerNormalization(epsilon=1e-6)
        self.dropout1 = L.Dropout(rate)
        self.dropout2 = L.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


def custom_loss(y_true, x_value, x_ratio, lagx):
    y_pred = (x_ratio * lagx) * 0.5 + x_value * 0.5
    score = K.sqrt(K.mean(K.square(y_pred - y_true))) 
    return score


def build_model(input_shape):
    inputs_x = tf.keras.layers.Input(shape=input_shape, name="inputs_x")
    inputs_y = tf.keras.layers.Input(shape=(input_shape[0], 1), name="inputs_y")
    inputs_lagx = tf.keras.layers.Input(shape=(input_shape[0], 1), name="inputs_lagx")

    x = L.Dense(128, activation='selu')(inputs_x)
    x = TransformerBlock(128, 2, 128)(x)
    x = L.Dense(64, activation='selu')(x)
    x_value = L.Dense(1, activation='linear')(x)
    x_ratio = L.Dense(1, activation='linear')(x)

    model = M.Model(inputs=[inputs_x, inputs_y, inputs_lagx], outputs=[x_value, x_ratio])

    model.add_loss(custom_loss(inputs_y, x_value, x_ratio, inputs_lagx))
    optimizer = tfa.optimizers.AdamW(learning_rate=1e-3, weight_decay=2e-5)
    model.compile(optimizer=optimizer)
    return model


def get_dataset(X, y=None, batch_size=32, dataset=''):
    if dataset=='train':
        train_dataset = (
            tf.data.Dataset
            .from_tensor_slices((X, y))
            .shuffle(10**8)
            .batch(batch_size)
            .prefetch(tf.data.experimental.AUTOTUNE)
        )
        return train_dataset

    elif dataset=='valid':
        valid_dataset = (
            tf.data.Dataset
            .from_tensor_slices((X, y))
            .batch(1)
            .prefetch(tf.data.experimental.AUTOTUNE)
        )
        return valid_dataset

    elif dataset=='test':
        test_dataset = (
            tf.data.Dataset
            .from_tensor_slices(X)
            .batch(1)
            .prefetch(tf.data.experimental.AUTOTUNE)
        )
        return test_dataset

    else:
        raise NotImplementedError


def training_nn(train_X, train_y, spilit_index=-1, batch_size=32):
    x_train, y_train = np.copy(train_X[:spilit_index]), np.copy(train_y[:spilit_index])
    x_valid, y_valid = np.copy(train_X[spilit_index:]), np.copy(train_y[spilit_index:])

    train_inputs = {"inputs_x": x_train, "inputs_y": y_train, "inputs_lagx": x_train[:, :, -1:]}
    valid_inputs = {"inputs_x": x_valid, "inputs_y": y_valid, "inputs_lagx": x_valid[:, :, -1:]}

    tr_dataset = get_dataset(X=train_inputs, y=y_train, batch_size=batch_size, dataset="train")
    va_dataset = get_dataset(X=valid_inputs, y=y_valid, batch_size=batch_size, dataset="valid")

    early_stop = EarlyStopping(
        monitor='val_loss',
        min_delta=0.0,
        patience=16, 
        mode="min"
    )

    checkpoint = ModelCheckpoint(
        str(OUTPUT_DIR / 'model.hdf5'), 
        moniter="val_loss", 
        verbose=0, 
        save_best_only=True, 
        save_weights_only=True,
        mode="min"
    )

    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=8,
        min_lr=1e-6,
        verbose=0
    )

    model = build_model((3278, x_train.shape[2]))
    display(model.summary())

    model.fit(
        tr_dataset,
        validation_data=va_dataset, 
        epochs=1000,
        callbacks=[early_stop, checkpoint, reduce_lr],
    )

    K.clear_session()
    model = build_model((3278, x_train.shape[2]))
    model.load_weights(str(OUTPUT_DIR / 'model.hdf5'))
    oof_pred = model.predict(va_dataset)
    score = RMSE(y_valid, oof_pred[0] * 0.5 + x_valid[:, :, -1:] * oof_pred[1] * 0.5)
    print(round(score, 5))
    return oof_pred


def inference_nn():
    sub = pd.read_csv(DATA_DIR / 'submission_template.csv')

    test_inputs = {"inputs_x": test_X, "inputs_y": test_X[:, :, -1:], "inputs_lagx": test_X[:, :, -1:]}
    tr_dataset = get_dataset(X=test_inputs, y=None, dataset="test")

    K.clear_session()
    model = build_model((3278, test_X.shape[2]))
    model.load_weights(str(OUTPUT_DIR / 'model.hdf5'))
    predi = model.predict(tr_dataset)
    tmp_pred = pd.DataFrame({
        'id': list(train.columns),
        'y': np.expm1(predi[0][0, :, 0] * 0.5 + test_X[0, :, -1] * predi[1][0, :, 0] * 0.5)
    })
    sub = sub.drop(columns='y').merge(tmp_pred, on='id', how='left')
    sub.to_csv(OUTPUT_DIR / 'submission_exp003.csv', index=False)
    return predi



train = pd.read_csv(DATA_DIR / 'train_data.csv')
company = pd.read_csv(DATA_DIR / 'company_list.csv')
sub = pd.read_csv(DATA_DIR / 'submission_template.csv')

train = train.iloc[:len(train)-1].set_index('Date')
train[train.columns] = np.log1p(train.values)
train_X, train_y, test_X = get_trainingdata(train, window=100, min_index=100)
oof_pred = training_nn(train_X, train_y, spilit_index=-1, batch_size=32)

test_pred = inference_nn()

Favicon
new user
コメントするには 新規登録 もしくは ログイン が必要です。