[Keras]transformerを用いた予測モデル
はじめに
transformerを用いたモデルが使えるんじゃないかと思ってやってみましたが,あまりうまくいかなかったためそのコードを挙げておきます.
import os
import sys
import random
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from glob import glob
import tensorflow as tf
import tensorflow.keras.layers as L
import tensorflow.keras.models as M
import tensorflow.keras.backend as K
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
! pip install --quiet tensorflow_addons
import tensorflow_addons as tfa
BASE_DIR = Path('/content/drive/MyDrive/competition/probspace-kabushiki')
DATA_DIR = BASE_DIR / 'input'
OUTPUT_DIR = BASE_DIR / ('exp' + input())
def get_trainingdata(train: pd.DataFrame, window=5, min_index=100):
output_X = []
output_y = []
output_test = []
train_array = train.values
for i in range(min_index, len(train_array)):
output_X.append(train_array[i-window:i])
output_y.append(train_array[i:i+1])
output_test.append(train_array[len(train_array)-window:len(train_array)])
output_X = np.array(output_X).transpose(0, 2, 1)
output_y = np.array(output_y).transpose(0, 2, 1)
output_test = np.array(output_test).transpose(0, 2, 1)
return output_X, output_y, output_test
def RMSE(y_true, y_pred):
return np.sqrt(np.mean(np.square(y_true - y_pred)))
class TransformerBlock(L.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.0):
super(TransformerBlock, self).__init__()
self.att = L.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = tf.keras.Sequential(
[L.Dense(ff_dim, activation="gelu"), L.Dense(embed_dim),]
)
self.layernorm1 = L.LayerNormalization(epsilon=1e-6)
self.layernorm2 = L.LayerNormalization(epsilon=1e-6)
self.dropout1 = L.Dropout(rate)
self.dropout2 = L.Dropout(rate)
def call(self, inputs, training):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
def custom_loss(y_true, x_value, x_ratio, lagx):
y_pred = (x_ratio * lagx) * 0.5 + x_value * 0.5
score = K.sqrt(K.mean(K.square(y_pred - y_true)))
return score
def build_model(input_shape):
inputs_x = tf.keras.layers.Input(shape=input_shape, name="inputs_x")
inputs_y = tf.keras.layers.Input(shape=(input_shape[0], 1), name="inputs_y")
inputs_lagx = tf.keras.layers.Input(shape=(input_shape[0], 1), name="inputs_lagx")
x = L.Dense(128, activation='selu')(inputs_x)
x = TransformerBlock(128, 2, 128)(x)
x = L.Dense(64, activation='selu')(x)
x_value = L.Dense(1, activation='linear')(x)
x_ratio = L.Dense(1, activation='linear')(x)
model = M.Model(inputs=[inputs_x, inputs_y, inputs_lagx], outputs=[x_value, x_ratio])
model.add_loss(custom_loss(inputs_y, x_value, x_ratio, inputs_lagx))
optimizer = tfa.optimizers.AdamW(learning_rate=1e-3, weight_decay=2e-5)
model.compile(optimizer=optimizer)
return model
def get_dataset(X, y=None, batch_size=32, dataset=''):
if dataset=='train':
train_dataset = (
tf.data.Dataset
.from_tensor_slices((X, y))
.shuffle(10**8)
.batch(batch_size)
.prefetch(tf.data.experimental.AUTOTUNE)
)
return train_dataset
elif dataset=='valid':
valid_dataset = (
tf.data.Dataset
.from_tensor_slices((X, y))
.batch(1)
.prefetch(tf.data.experimental.AUTOTUNE)
)
return valid_dataset
elif dataset=='test':
test_dataset = (
tf.data.Dataset
.from_tensor_slices(X)
.batch(1)
.prefetch(tf.data.experimental.AUTOTUNE)
)
return test_dataset
else:
raise NotImplementedError
def training_nn(train_X, train_y, spilit_index=-1, batch_size=32):
x_train, y_train = np.copy(train_X[:spilit_index]), np.copy(train_y[:spilit_index])
x_valid, y_valid = np.copy(train_X[spilit_index:]), np.copy(train_y[spilit_index:])
train_inputs = {"inputs_x": x_train, "inputs_y": y_train, "inputs_lagx": x_train[:, :, -1:]}
valid_inputs = {"inputs_x": x_valid, "inputs_y": y_valid, "inputs_lagx": x_valid[:, :, -1:]}
tr_dataset = get_dataset(X=train_inputs, y=y_train, batch_size=batch_size, dataset="train")
va_dataset = get_dataset(X=valid_inputs, y=y_valid, batch_size=batch_size, dataset="valid")
early_stop = EarlyStopping(
monitor='val_loss',
min_delta=0.0,
patience=16,
mode="min"
)
checkpoint = ModelCheckpoint(
str(OUTPUT_DIR / 'model.hdf5'),
moniter="val_loss",
verbose=0,
save_best_only=True,
save_weights_only=True,
mode="min"
)
reduce_lr = ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=8,
min_lr=1e-6,
verbose=0
)
model = build_model((3278, x_train.shape[2]))
display(model.summary())
model.fit(
tr_dataset,
validation_data=va_dataset,
epochs=1000,
callbacks=[early_stop, checkpoint, reduce_lr],
)
K.clear_session()
model = build_model((3278, x_train.shape[2]))
model.load_weights(str(OUTPUT_DIR / 'model.hdf5'))
oof_pred = model.predict(va_dataset)
score = RMSE(y_valid, oof_pred[0] * 0.5 + x_valid[:, :, -1:] * oof_pred[1] * 0.5)
print(round(score, 5))
return oof_pred
def inference_nn():
sub = pd.read_csv(DATA_DIR / 'submission_template.csv')
test_inputs = {"inputs_x": test_X, "inputs_y": test_X[:, :, -1:], "inputs_lagx": test_X[:, :, -1:]}
tr_dataset = get_dataset(X=test_inputs, y=None, dataset="test")
K.clear_session()
model = build_model((3278, test_X.shape[2]))
model.load_weights(str(OUTPUT_DIR / 'model.hdf5'))
predi = model.predict(tr_dataset)
tmp_pred = pd.DataFrame({
'id': list(train.columns),
'y': np.expm1(predi[0][0, :, 0] * 0.5 + test_X[0, :, -1] * predi[1][0, :, 0] * 0.5)
})
sub = sub.drop(columns='y').merge(tmp_pred, on='id', how='left')
sub.to_csv(OUTPUT_DIR / 'submission_exp003.csv', index=False)
return predi
train = pd.read_csv(DATA_DIR / 'train_data.csv')
company = pd.read_csv(DATA_DIR / 'company_list.csv')
sub = pd.read_csv(DATA_DIR / 'submission_template.csv')
train = train.iloc[:len(train)-1].set_index('Date')
train[train.columns] = np.log1p(train.values)
train_X, train_y, test_X = get_trainingdata(train, window=100, min_index=100)
oof_pred = training_nn(train_X, train_y, spilit_index=-1, batch_size=32)
test_pred = inference_nn()