본문 바로가기
AI/Pytorch

[Pytorch] Seq2Seq Transformer 실습

by 까다로운오리 2024. 2. 4.

※ 해당 실습 이론은 https://nusnxxy.tistory.com/71 에 포스팅 되어있습니다.

 

 

위키북스 파이토치 트랜스포머를 활용한 자연어 처리와 컴퓨터비전 심층학습 서포터즈가 되어 교재 7장 실습 포스팅을 진행하게 되었습니다. 이번 포스팅에선 교재에 나와있는 실습 중, 파이토치에서 제공하는 트랜스포머 모델을 활용해 영어-독일어 번역 모델 실습을 진행하도록 하겠습니다...!

 

 

코드가 길기 때문에 위키북스 Git으로 먼저 들어가서 코드를 참조하셔도 좋습니다.

git link : https://github.com/wikibook/pytorchtrf

 

 

 

 

 

학습에 사용되는 데이터세트는 Multi30k 입니다. https://github.com/multi30k/dataset

 

GitHub - multi30k/dataset: Multi30k Dataset

Multi30k Dataset. Contribute to multi30k/dataset development by creating an account on GitHub.

github.com

 

Muti30K는 영어-독일어 Dataset으로 토치 데이터와 토치 텍스트 라이브러리로 해당 데이터 셋을 다운받을 수 있습니다.

토치 데이터 라이브러리는 대규모 데이터셋을 쉽게 불러오고 변환, 배치 하는 간단 유연한 API를 제공합니다.

portalocker 라이브러리는 Python에서 file lock을 관리하기 위한 라이브러리로 여러 프로세스 간 동시에 파일을 수정하거나 읽는 것을 방지합니다.

 

 

코드를 작성하기 전 필요한 라이브러리부터 다운받도록 합니다~^^*

pip install torchdata torchtext portalocker

 

 

Code 1 : Init

from torchtext.datasets import Multi30k
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator


def generate_tokens(text_iter, language):
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}
    for text in text_iter:
        yield token_transform[language](text[language_index[language]])


SRC_LANGUAGE = "de"
TGT_LANGUAGE = "en"
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ["<unk>", "<pad>", "<bos>", "<eos>"]

token_transform = {
    SRC_LANGUAGE: get_tokenizer("spacy", language="de_core_news_sm"),
    TGT_LANGUAGE: get_tokenizer("spacy", language="en_core_web_sm"),
}
print("Token Transform:")
print(token_transform)

vocab_transform = {}
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    train_iter = Multi30k(split="train", language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    vocab_transform[language] = build_vocab_from_iterator(
        generate_tokens(train_iter, language),
        min_freq=1,
        specials=special_symbols,
        special_first=True,
    )

for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[language].set_default_index(UNK_IDX)

print("Vocab Transform:")
print(vocab_transform)

 

1. 독일어와 영어에 대해 각각 토크나이저와 어휘 사전을 생성합니다.

2. Get_tokenizer 함수는 사용자가 지정한 토크나이저를 가져오는 함수로 spaCy 라이브러리로 사전 학습된 모델을 가져와 이 값을 token_transform 변수에 저장합니다.

3. vocab_transform 변수는 토큰을 인덱스로 변환시키는 함수를 저장합니다.

4. Multi30k 함수를 통해 튜플 형식으로 데이터를 불러와 train_iter변수에 저장합니다.

5. generate_tokens을 사용해 생성된 토큰으로 vocab를 만드는 build_vocab_from_iterator 함수와를 사용하여 언어별 어휘 사전을 생성합니다. min_freq는 단어들의 최소 빈도수, specials는 트랜스포머에활용하는 특수토큰을 지정하며, specials_first는  매개변수가 참인 경우 특수 토큰을 단어 집합의 맨 앞에 추가합니다.

6. set_default_index 는 인덱스의 기본 값을 설정하여 어휘 사전에 없는 토큰인 <unk>의 인덱스를 할당합니다.

 

 

Result 1 : Init

code1 result

Token Transform:
{'de': functools.partial(<function _spacy_tokenize at 0x7907affa8a60>, spacy=<spacy.lang.de.German object at 0x79071fc766e0>), 'en': functools.partial(<function _spacy_tokenize at 0x7907affa8a60>, spacy=<spacy.lang.en.English object at 0x79071b833c70>)}
Vocab Transform:
{'de': Vocab(), 'en': Vocab()}

 

 

+) 이 셀을 돌릴 때

AttributeError: 'NoneType' object has no attribute 'Lock' This exception is thrown by __iter__of_MemoryCellIterDataPipe(remember_elements=1000, source_datapipe=_ChildDataPipe)

에러가 발생했는데, 커널을 다시 실행하니 해결되었습니다..!

 

 

 

Code 2 : Create Seq2seq Transformer

import math
import torch
from torch import nn


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        )

        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[: x.size(0)]
        return self.dropout(x)


class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, emb_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)


class Seq2SeqTransformer(nn.Module):
    def __init__(
        self,
        num_encoder_layers,
        num_decoder_layers,
        emb_size,
        max_len,
        nhead,
        src_vocab_size,
        tgt_vocab_size,
        dim_feedforward,
        dropout=0.1,
    ):
        super().__init__()
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            d_model=emb_size, max_len=max_len, dropout=dropout
        )
        self.transformer = nn.Transformer(
            d_model=emb_size,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
        )
        self.generator = nn.Linear(emb_size, tgt_vocab_size)

    def forward(
        self,
        src,
        trg,
        src_mask,
        tgt_mask,
        src_padding_mask,
        tgt_padding_mask,
        memory_key_padding_mask,
    ):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(
            src=src_emb,
            tgt=tgt_emb,
            src_mask=src_mask,
            tgt_mask=tgt_mask,
            memory_mask=None,
            src_key_padding_mask=src_padding_mask,
            tgt_key_padding_mask=tgt_padding_mask,
            memory_key_padding_mask=memory_key_padding_mask
        )
        return self.generator(outs)

    def encode(self, src, src_mask):
        return self.transformer.encoder(
            self.positional_encoding(self.src_tok_emb(src)), src_mask
        )

    def decode(self, tgt, memory, tgt_mask):
        return self.transformer.decoder(
            self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask
        )

 

1. PositionalEncoding : 단어 위치를 encoding하는 class

2. TokenEmbedding : 단어를 token화 시키는 class

3. Seq2SeqTransformer : src와 input을 입력 임베딩으로 변환하는 class

 

각 class 에 대한 이론은 https://nusnxxy.tistory.com/71 에 포스팅 되어있습니다.

 

그럼 위에서 만든 Seq2SeqTransformer 모델은 어떠한 구조로 되어있는 지 코드를 통해 살펴보겠습니다.

from torch import optim


BATCH_SIZE = 128
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

model = Seq2SeqTransformer(
    num_encoder_layers=3,
    num_decoder_layers=3,
    emb_size=512,
    max_len=512,
    nhead=8,
    src_vocab_size=len(vocab_transform[SRC_LANGUAGE]),
    tgt_vocab_size=len(vocab_transform[TGT_LANGUAGE]),
    dim_feedforward=512,
).to(DEVICE)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX).to(DEVICE)
optimizer = optim.Adam(model.parameters())

for main_name, main_module in model.named_children():
    print(main_name)
    for sub_name, sub_module in main_module.named_children():
        print("└", sub_name)
        for ssub_name, ssub_module in sub_module.named_children():
            print("│  └", ssub_name)
            for sssub_name, sssub_module in ssub_module.named_children():
                print("│  │  └", sssub_name)

Seq2SeqTransformer&nbsp; &nbsp;모델 구조

Seq2SeqTransformer 는 입력 임베딩(src_tok_emb, tgt_tok_emb), 위치 인코딩(positional_encoding), 트랜스포머 블록(transformer), 생성기(generator)로 구성됩니다. 트랜스포머 인코더 디코더 블록은 3개의 레이어로 구성되었음을 코드를 통해 다시한번 확인 할 수 있습니다.

 

 

Code 3 : Data preprocessing

from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence


def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

def input_transform(token_ids):
    return torch.cat(
        (torch.tensor([BOS_IDX]), torch.tensor(token_ids), torch.tensor([EOS_IDX]))
    )

def collator(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch


text_transform = {}
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[language] = sequential_transforms(
        token_transform[language], vocab_transform[language], input_transform
    )

data_iter = Multi30k(split="valid", language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
dataloader = DataLoader(data_iter, batch_size=BATCH_SIZE, collate_fn=collator)
source_tensor, target_tensor = next(iter(dataloader))

print("(source, target):")
print(next(iter(data_iter)))

print("source_batch:", source_tensor.shape)
print(source_tensor)

print("target_batch:", target_tensor.shape)
print(target_tensor)

 

1. token_transform을 이용해 문장을 토큰화 합니다.

2. vocab_transform을 적용해 각 토큰을 인덱스화 합니다.

3. input_transfor 함수로 인덱스화된 토큰에 문장의 시작 ( [BOS_IDX] ) 과 끝 ( [EOS_IDX] )을 알리는 특수 토큰을 할당합니다.

4. data_iter에 데이터 세트를 불러와 DataLoader를 통해 데이터를 배치사이즈 단위로 불러올 수 있게 합니다.

Result 3 : Data preprocessing

Code 3 Result

1. source와 target 문장

2. batch별로 묶여있는 sorce (input)

3. batch별로 묶여있는 target (output)

 

 

 

Code 4 : Create mask

def generate_square_subsequent_mask(s):
    mask = (torch.triu(torch.ones((s, s), device=DEVICE)) == 1).transpose(0, 1)
    mask = (
        mask.float()
        .masked_fill(mask == 0, float("-inf"))
        .masked_fill(mask == 1, float(0.0))
    )
    return mask


def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask


target_input = target_tensor[:-1, :]
target_out = target_tensor[1:, :]

source_mask, target_mask, source_padding_mask, target_padding_mask = create_mask(
    source_tensor, target_input
)

print("source_mask:", source_mask.shape)
print(source_mask)
print("target_mask:", target_mask.shape)
print(target_mask)
print("source_padding_mask:", source_padding_mask.shape)
print(source_padding_mask)
print("target_padding_mask:", target_padding_mask.shape)
print(target_padding_mask)

 

다음은 어텐션 마스크를 생성하는 코드입니다.

 

 

1. target_input과 target_output은 토큰 순서를 한 칸 shift하여 이전 토큰들이 주어졌을 때 다음 토큰을 예측하게 합니다.

2. 이후 마스크를 생성합니다. 

3. generae_squre_subsequent_mask는 마스크를 생성하는 함수로 입력으로 정수 s를 받아 sxs크기의 마스크를 생성합니다. 

4. tensor에서 0인 값은 -inf로, 1인 값은 0으로 채워 어텐션 연산을 수행합니다. -inf 값은 셀프 어텐션 계산 과정에서 어텐션 스코어가 0에 수렴하게끔 하며, 0으로 설정된 값은 셀프 어텐션에 참조되는 시퀀스를 가리킵니다.

 

 

Result 4 : Create mask

result4-1
result4-2

1.source mask는 셀프 어텐션 과정에서 참조되는 소스 데이터의 시퀀스 범위를 나타냅니다. False는 셀프 어텐션에 참조되는 토큰을 가리키며, True는 어텐션에서 제외되는 토큰을 가리킵니다.

2. target_mask는 [query_seq_len, key_seq_len]로 구성되며 i번째 쿼리 벡터는 i+1이상의 키 벡터에 대해 어텐션 연산을 수행할 수 없습니다.

3. source_padding_pask와 target_padding_mask는 배치 데이터에서 텍스트토큰이 존재하는지 여부를 나타내는 값입니다. False는 해당 토큰 인덱스가 존재하고, True인 경우 해당 토큰 인덱스가 패딩 토큰으로 채워져있음을 나타냅니다. 

 

 

Code 5 : Model Train

def run(model, optimizer, criterion, split):
    model.train() if split == "train" else model.eval()
    data_iter = Multi30k(split=split, language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    dataloader = DataLoader(data_iter, batch_size=BATCH_SIZE, collate_fn=collator)

    losses = 0
    for source_batch, target_batch in dataloader:
        source_batch = source_batch.to(DEVICE)
        target_batch = target_batch.to(DEVICE)

        target_input = target_batch[:-1, :]
        target_output = target_batch[1:, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(
            source_batch, target_input
        )

        logits = model(
            src=source_batch,
            trg=target_input,
            src_mask=src_mask,
            tgt_mask=tgt_mask,
            src_padding_mask=src_padding_mask,
            tgt_padding_mask=tgt_padding_mask,
            memory_key_padding_mask=src_padding_mask,
        )

        optimizer.zero_grad()
        loss = criterion(logits.reshape(-1, logits.shape[-1]), target_output.reshape(-1))
        if split == "train":
            loss.backward()
            optimizer.step()
        losses += loss.item()

    return losses / len(list(dataloader))


for epoch in range(5):
    train_loss = run(model, optimizer, criterion, "train")
    val_loss = run(model, optimizer, criterion, "valid")
    print(f"Epoch: {epoch+1}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}")

 

총 5번의 학습을 수행하여 학습 과정을 출력합니다.

 

Result 5 : Model Train

Result 5 Colab 환경 (GPU)
Result 5 개인 pc환경 (CPU)

Colab을 통해 모델 학습을 gpu로 했을 때는 Val loss가 떨어지는 것을 확인할 수 있었지만, 제 개인 PC(CPU)에서 모델 학습을 시켰을 때는 이상하게도 Val loss가 점점 증가했습니다.(CPU로 돌릴 때는 총 학습 시간이 약 4시간 정도 걸렸습니다.. gpu를 사용합시다..~) 물론 새로운 모델을 생성하고 학습을 진행할 때 마다 학습은 다르게 진행되지만, val loss가 증가했던 이유는 무엇이었을까요..? 

 

 

 

Code 6 : Test

def greedy_decode(model, source_tensor, source_mask, max_len, start_symbol):
    source_tensor = source_tensor.to(DEVICE)
    source_mask = source_mask.to(DEVICE)

    memory = model.encode(source_tensor, source_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len - 1):
        memory = memory.to(DEVICE)
        target_mask = generate_square_subsequent_mask(ys.size(0))
        target_mask = target_mask.type(torch.bool).to(DEVICE)

        out = model.decode(ys, memory, target_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat(
            [ys, torch.ones(1, 1).type_as(source_tensor.data).fill_(next_word)], dim=0
        )
        if next_word == EOS_IDX:
            break
    return ys


def translate(model, source_sentence):
    model.eval()
    source_tensor = text_transform[SRC_LANGUAGE](source_sentence).view(-1, 1)
    num_tokens = source_tensor.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    tgt_tokens = greedy_decode(
        model, source_tensor, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX
    ).flatten()
    output = vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))[1:-1]
    return " ".join(output)


output_oov = translate(model, "Eine Gruppe von Menschen steht vor einem Iglu .")
output = translate(model, "Eine Gruppe von Menschen steht vor einem Gebäude .")
print(output_oov)
print(output)

 

마지막으로 테스트 코드입니다. 모델 번역 방식은 그리드 디코딩을 통해 디코더가 생성한 확률 분포에서 가장 높은 확률은 갖는 단어를 선택하여 디코딩을 진행합니다.

 

"Eine Gruppe von Menschen steht vor einem Iglu ." 와 "Eine Gruppe von Menschen steht vor einem Gebäude ."를 학습된 Seq2Seq Transformer 모델에게 번역해달라고 부탁하면 어떻게 번역해줄까요? 

 

먼저 Google 번역기를 통해 위의 두 문장을 번역해 보았습니다.

번역 1

 

 

번역 2

두 문장 다 건물 앞에 무리의 사람들이 서있다는 것을 뜻하는 문장인 것 같습니다.

 

Result 6 : Test

Result 6 Colab ( GPU )
Result 6 개인 PC환경( CPU )

먼저 colab 환경의 결과를 보면 .. 음.. 학습이 잘 됐다고 해야 할까요? 우선 val loss가 점차 줄어든 것을 봤으니 epoch를 키웠으면 더 잘 예측했을 것으로 보여집니다. 그와 반대로 val loss가 점점 커졌던 cpu환경은 단어 조차 유추하지 못하였습니다.

 

 

 

저의 첫 Transformer 모델,

위키북스 '파이토치 트랜스포머를 활용한 자연어 처리와 컴퓨터비전 심층학습 ' 을 통해 수월히 만들 수 있었습니다.

다음은 이 포스팅에서 자세하게 다루지 못했던, 라이브러리 기능 및 함수들에 대해 포스팅 하도록 하겠습니다.

 

 

 

 

 

 

책 구매는 아래 링크에서 구매하실 수 있습니다.

 

Yes24: https://www.yes24.com/Product/Goods/122753048

교보문고 : https://product.kyobobook.co.kr/detail/S000209621433

알라딘 : http://aladin.kr/p/l4arb