Oregin
研究論文の国際学会採択予測で、件名と概要を利用した簡単なBERTのサンプルコードです。まだ、BERTについて勉強し始めたばかりなので、精度や見栄えはイマイチですが、なんとか動くものを作れました。
ご参考までご活用ください。
※Google Colab(GPU利用)で実行可能です。
LB= 0.7387 でした。
ディレクトリ構成
!pip install transformers
# 各種ライブラリのインポート
import os
import pandas as pd
import itertools
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from transformers import BertModel, BertTokenizer
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
# 各種パスの設定
######################################################################
base_path = 'XXXXXXXXXX' # ベースとなるパスを指定してください。#######
######################################################################
os.makedirs(os.path.join(base_path,'model'), exist_ok=True) # 学習済みモデルの保存するディレクトリを作成
os.makedirs(os.path.join(base_path,'output'), exist_ok=True) # 提出用ファイルを出力するディレクトリを作成
train_data_path = os.path.join(base_path,'data/train_data.csv') # 訓練データのパスを指定
test_data_path = os.path.join(base_path,'data/test_data.csv') # テストデータのパスを指定
submit_data_path = os.path.join(base_path,'data/submission.csv') # 提出用サンプルfileのパスを指定
model_file_path = os.path.join(base_path,'model/best_model.pt') # 学習済みモデルのパスを指定
output_file_path = os.path.join(base_path,'output/001_submission.csv') # 提出用ファイルのパスを指定
# 各種データの読込
df = pd.read_csv(train_data_path) # 訓練データの読込
test_df = pd.read_csv(test_data_path) # テストデータの読込
test_df['y'] = 0 # テストデータのYの値を初期化
submit_df = pd.read_csv(submit_data_path) # 提出用散布リファイルの読込
# BERTを使用してトークナイズするためのクラスを定義します。
class BERTTokenize(nn.Module):
def __init__(self):
super(BERTTokenize, self).__init__()
self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
def forward(self, text):
return self.tokenizer.encode_plus(
text,
max_length=512,
add_special_tokens=True,
return_token_type_ids=False,
padding='max_length',
return_attention_mask=True,
return_tensors='pt',
truncation=True
)
# データセットを作成するクラスを定義します。
class PaperDataset(Dataset):
def __init__(self, dataframe, tokenizer):
self.data = dataframe
self.tokenizer = tokenizer
def __len__(self):
return len(self.data)
def __getitem__(self, index):
title = self.data.iloc[index]['title']
abstract = self.data.iloc[index]['abstract']
text = title + ' ' + abstract
label = self.data.iloc[index]['y']
inputs = self.tokenizer(text)
return {
'input_ids': inputs['input_ids'].flatten(),
'attention_mask': inputs['attention_mask'].flatten(),
'targets': torch.tensor(label, dtype=torch.float)
}
# datasetで定義されたバッチ形式の__getitem__を処理して、オリジナルのtorch.Tensorにする関数
def collate_fn(batch):
input_ids = pad_sequence([torch.tensor(item["input_ids"]) for item in batch], batch_first=True)
attention_mask = pad_sequence([torch.tensor(item["attention_mask"]) for item in batch], batch_first=True)
labels = torch.tensor([item['targets'] for item in batch])
return {"input_ids": input_ids, "attention_mask": attention_mask, 'targets': labels}
# BERTのモデルのクラスを定義します。
class BERTModel(nn.Module):
def __init__(self):
super(BERTModel, self).__init__()
self.bert = BertModel.from_pretrained('bert-base-uncased')
self.dropout = nn.Dropout(0.3)
self.fc = nn.Linear(768, 1)
self.sig = nn.Sigmoid()
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids, attention_mask=attention_mask,token_type_ids=None)
pooled_output = outputs[1]
dropout_output = self.dropout(pooled_output)
output = self.fc(dropout_output)
return self.sig(output)
#トレーニングとバリデーションのためのデータローダーを作成します。
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
tokenizer = BERTTokenize()
train_dataset = PaperDataset(train_df, tokenizer)
val_dataset = PaperDataset(val_df, tokenizer)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=8, collate_fn=collate_fn)
Downloading (…)solve/main/vocab.txt: 0%| | 0.00/232k [00:00<?, ?B/s]
Downloading (…)okenizer_config.json: 0%| | 0.00/28.0 [00:00<?, ?B/s]
Downloading (…)lve/main/config.json: 0%| | 0.00/570 [00:00<?, ?B/s]
# 損失関数とオプティマイザーを定義して、モデルを作成します。
model = BERTModel()
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# 学習の関数の定義
def train(model, dataloader, optimizer, criterion):
model.train()
train_loss = 0
for batch in dataloader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
targets = batch['targets'].view(-1, 1).to(device)
optimizer.zero_grad()
outputs = model(input_ids, attention_mask)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
train_loss += loss.item()
return train_loss / len(dataloader)
# 検証の関数の定義
def validate(model, dataloader, criterion):
model.eval()
val_loss = 0
with torch.no_grad():
for batch in dataloader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
targets = batch['targets'].view(-1, 1).to(device)
outputs = model(input_ids, attention_mask)
loss = criterion(outputs, targets)
val_loss += loss.item()
return val_loss / len(dataloader)
# 学習・検証の実行
best_val_loss = float('inf')
for epoch in tqdm(range(10)):
train_loss = train(model, train_loader, optimizer, criterion)
val_loss = validate(model, val_loader, criterion)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), model_file_path) # ベストなモデルを保存する
print(f'Epoch {epoch+1} - train loss: {train_loss:.3f}, val loss: {val_loss:.3f}')
# モデルの読み込み
model.load_state_dict(torch.load(model_file_path))
# テストデータのデータローダ―の作成
tokenizer = BERTTokenize()
test_dataset = PaperDataset(test_df, tokenizer)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False,collate_fn=collate_fn)
# 予測する関数の定義
def predict(model, dataloader):
outputs = []
with torch.no_grad():
for batch in dataloader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
# モデルによる予測
output = model(input_ids, attention_mask=attention_mask)
outputs.extend(list(itertools.chain.from_iterable(output.tolist())))
return outputs
#予測の実行
outputs = predict(model, test_loader)
#予測結果を提出形式に合わせて出力
submit_df['y'] = pd.Series(outputs)
submit_df['y'].mask(submit_df['y'] > 0.5,1,inplace=True)
submit_df['y'].mask(submit_df['y'] < 1,0,inplace=True)
submit_df['y'] = submit_df['y'].astype('int')
submit_df.to_csv(output_file_path,index = False)