본문 바로가기

Python/개발

KoGPT2로 생성형 챗봇을 만들고 서버에서 API로 활용해보자

내가 개발을 해오면서 하지 못해 아쉬웠던 것이 몇 개 있었는데 그 중 하나가 챗봇이다. 그래서 원래 프로젝트 기획에 없던 챗봇 기능을 오롯이 나의 욕망으로 넣게 되었다. 여러분도 팀플을 할 때 쾌락주의 개발자의 면모를 마구 보여준다면 없던 기획도 추가할 수 있다.(물론 구현은 본인이)

 

SKT-AI/KoGPT2: https://sktelecom.github.io/project/kogpt2/

 

KoGPT2

Korean GPT(Generative Pre-trained Transformer) 2

sktelecom.github.io

챗봇 구현에는 SKT 오픈 소스의 KoGPT2 모델을 사용했다. 이 녀석은 무려 문장을 생성한다. 챗봇 중에선 선택지와 대답이 둘 다 고정되어 있거나(예: 카카오톡 챗봇) 입력을 여러 유형으로 분류해서 유형 별 고정된 대답을 뱉는 녀석도 있는데, 이건 대답을 생성하는 일명 진짜진짜 챗봇인 것이다. 

 

일단 결과부터 보자. 서버는 FastAPI로 구현했다. 참고로 FastAPI는 주소 뒤에 /docs를 붙이면 swagger를 사용할 수 있지만 보기 쉽게 URL로 요청해보았다.

 

썩 나쁘지 않은 대답이 왔다.

 

server.py

API 호출 코드를 보면 입력을 바로 처리하는 게 아니라 쪼개놨다. 챗봇이 한 요청에 대한 응답을 동기적으로 생성하는 데 걸리는 시간 동안 다른 요청이 대기하는 것을 방지하기 위해서다.(하지만 추가적인 처리를 안 해놔서 결국 모든 요청을 꾸역꾸역 실행한다...)

from fastapi import FastAPI
import text_chatbot
import asyncio

app = FastAPI()

async def async_chat(input: str):
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, text_chatbot.chat, input)
    return result
    
@app.get("/api/ai/chatbot")
async def chat_response(input: str):
    result = await async_chat(input)
    return result

 

text_chatbot.py

챗봇이 응답을 생성하는 부분이다. 여기서 학습을 통해 얻어야 하는 것은 kogpt2_model_state_dict.pt 라는 학습된 AI 모델 객체의 state_dict를 추출해 저장한 파일이다.

import torch
from transformers import PreTrainedTokenizerFast, GPT2LMHeadModel

print("호출: test.py")

Q_TKN = "<usr>"
A_TKN = "<sys>"
BOS = '</s>'
EOS = '</s>'
MASK = '<unused0>'
SENT = '<unused1>'
PAD = '<pad>'
sent = '0'

koGPT2_TOKENIZER = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2",
            bos_token=BOS, eos_token=EOS, unk_token='<unk>',
            pad_token=PAD, mask_token=MASK) 

model = GPT2LMHeadModel.from_pretrained('skt/kogpt2-base-v2')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #cuda사용가능하면 or cpu
model.load_state_dict(torch.load("kogpt2_model_state_dict.pt",map_location=device),strict = False)

def chat(input: str):
    q=input.strip()
    a=""
    while 1:
        input_ids = torch.LongTensor(koGPT2_TOKENIZER.encode(Q_TKN + q + SENT + sent + A_TKN + a)).unsqueeze(dim=0)
        pred = model(input_ids)
        pred = pred.logits
        gen = koGPT2_TOKENIZER.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().numpy().tolist())[-1]
        if gen == EOS:
            break
        a += gen.replace("▁", " ")
    return a.strip()

 

 

여기서부터 학습 과정이다. 나는 GPU가 사용 가능한 주피터 노트북 환경에서 수행했다.

 

참고: https://wikidocs.net/157001

 

9. koGPT2 챗봇 만들기

언어 모델 (Language Model)이란 문장 혹은 단어에 확률을 할당하여 컴퓨터가 처리할 수 있도록 하는 모델입니다. 한발 나아가 언어 모델링 (Language Modeli…

wikidocs.net

사용 데이터셋: https://github.com/songys/Chatbot_data

 

GitHub - songys/Chatbot_data: Chatbot_data_for_Korean

Chatbot_data_for_Korean. Contribute to songys/Chatbot_data development by creating an account on GitHub.

github.com

챗봇 자체는 프로젝트 주 기능이 아니기에 학습에 데이터를 많이 사용하진 않았다. 

 

학습시작: 라이브러리 import

import numpy as np
import pandas as pd
import torch
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.core.lightning import LightningModule
from torch.utils.data import DataLoader, Dataset
from transformers.optimization import AdamW, get_cosine_schedule_with_warmup
from transformers import PreTrainedTokenizerFast, GPT2LMHeadModel
import re

 

Tokenizer용 스페셜 토큰 정의

Q_TKN = "<usr>"
A_TKN = "<sys>"
BOS = '</s>'
EOS = '</s>'
MASK = '<unused0>'
SENT = '<unused1>'
PAD = '<pad>'

 

사전 학습된 KoGPT2 모델과 Tokenizer 가져오기

koGPT2_TOKENIZER = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2",
            bos_token=BOS, eos_token=EOS, unk_token='<unk>',
            pad_token=PAD, mask_token=MASK) 
model = GPT2LMHeadModel.from_pretrained('skt/kogpt2-base-v2')

 

학습용 데이터 가져오기

import urllib.request

urllib.request.urlretrieve(
    "https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv",
    filename="ChatBotData.csv",
)
Chatbot_Data = pd.read_csv("ChatBotData.csv")
Chatbot_Data.head()

데이터는 이렇게 생겼어요.

 

데이터로 Dataset을 만드는 부분

# 챗봇 데이터를 처리하는 클래스를 만든다.
class ChatbotDataset(Dataset):
    def __init__(self, chats, max_len=40):  # 데이터셋의 전처리를 해주는 부분
        self._data = chats
        self.max_len = max_len
        self.q_token = Q_TKN
        self.a_token = A_TKN
        self.sent_token = SENT
        self.eos = EOS
        self.mask = MASK
        self.tokenizer = koGPT2_TOKENIZER

    def __len__(self):  # chatbotdata 의 길이를 리턴한다.
        return len(self._data)

    def __getitem__(self, idx):  # 로드한 챗봇 데이터를 차례차례 DataLoader로 넘겨주는 메서드
        turn = self._data.iloc[idx]
        q = turn["Q"]  # 질문을 가져온다.
        q = re.sub(r"([?.!,])", r" ", q)  # 구둣점들을 제거한다.

        a = turn["A"]  # 답변을 가져온다.
        a = re.sub(r"([?.!,])", r" ", a)  # 구둣점들을 제거한다.

        q_toked = self.tokenizer.tokenize(self.q_token + q + self.sent_token)
        q_len = len(q_toked)

        a_toked = self.tokenizer.tokenize(self.a_token + a + self.eos)
        a_len = len(a_toked)

        #질문의 길이가 최대길이보다 크면
        if q_len > self.max_len:
            a_len = self.max_len - q_len        #답변의 길이를 최대길이 - 질문길이
            if a_len <= 0:       #질문의 길이가 너무 길어 질문만으로 최대 길이를 초과 한다면
                q_toked = q_toked[-(int(self.max_len / 2)) :]   #질문길이를 최대길이의 반으로 
                q_len = len(q_toked)
                a_len = self.max_len - q_len              #답변의 길이를 최대길이 - 질문길이
            a_toked = a_toked[:a_len]
            a_len = len(a_toked)

        #질문의 길이 + 답변의 길이가 최대길이보다 크면
        if q_len + a_len > self.max_len:
            a_len = self.max_len - q_len        #답변의 길이를 최대길이 - 질문길이
            if a_len <= 0:       #질문의 길이가 너무 길어 질문만으로 최대 길이를 초과 한다면
                q_toked = q_toked[-(int(self.max_len / 2)) :]   #질문길이를 최대길이의 반으로 
                q_len = len(q_toked)
                a_len = self.max_len - q_len              #답변의 길이를 최대길이 - 질문길이
            a_toked = a_toked[:a_len]
            a_len = len(a_toked)

        # 답변 labels = [mask, mask, ...., mask, ..., <bos>,..답변.. <eos>, <pad>....]
        labels = [self.mask,] * q_len + a_toked[1:]

        # mask = 질문길이 0 + 답변길이 1 + 나머지 0
        mask = [0] * q_len + [1] * a_len + [0] * (self.max_len - q_len - a_len)
        # 답변 labels을 index 로 만든다.
        labels_ids = self.tokenizer.convert_tokens_to_ids(labels)
        # 최대길이만큼 PADDING
        while len(labels_ids) < self.max_len:
            labels_ids += [self.tokenizer.pad_token_id]

        # 질문 + 답변을 index 로 만든다.    
        token_ids = self.tokenizer.convert_tokens_to_ids(q_toked + a_toked)
        # 최대길이만큼 PADDING
        while len(token_ids) < self.max_len:
            token_ids += [self.tokenizer.pad_token_id]

        #질문+답변, 마스크, 답변
        return (token_ids, np.array(mask), labels_ids)

 

배치 데이터를 만들기 위한 함수

def collate_batch(batch):
    data = [item[0] for item in batch]
    mask = [item[1] for item in batch]
    label = [item[2] for item in batch]
    return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)

 

Dataset을 만들고 Dataloader를 선언

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
train_set = ChatbotDataset(Chatbot_Data, max_len=40)
#윈도우 환경에서 num_workers 는 무조건 0으로 지정, 리눅스에서는 2
train_dataloader = DataLoader(train_set, batch_size=32, num_workers=0, shuffle=True, collate_fn=collate_batch,)

 

추가학습을 위해 .train()을 사용

model.to(device)
model.train()

 

학습용 하이퍼 파라매터 선언

learning_rate = 3e-5
criterion = torch.nn.CrossEntropyLoss(reduction="none")
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

epoch_num = 10
Sneg = -1e18

 

학습 시작: 여기서 문제가 있었다. dup cpu and gpu error라고 내가 명명한(원래 더 김) 어떤 데이터는 GPU에 있고 다른 자원은 CPU에 있어서 생기는 에러였다. 중간에 데이터를 GPU로 옮겨주는 코드를 추가하여 해결

print ("start")
for epoch in range(epoch_num):
    print("epoch=",epoch)
    for batch_idx, samples in enumerate(train_dataloader):
        optimizer.zero_grad()
        token_ids, mask, label = samples
        
        token_ids = token_ids.to(device)  # 데이터를 모델이 있는 디바이스로 이동
        mask = mask.to(device)
        label = label.to(device)
        
        out = model(token_ids)
        out = out.logits      #Returns a new tensor with the logit of the elements of input
        mask_3d = mask.unsqueeze(dim=2).repeat_interleave(repeats=out.shape[2], dim=2)
        mask_out = torch.where(mask_3d == 1, out, Sneg * torch.ones_like(out))
        loss = criterion(mask_out.transpose(2, 1), label)
        # 평균 loss 만들기 avg_loss[0] / avg_loss[1] <- loss 정규화
        avg_loss = loss.sum() / mask.sum()
        avg_loss.backward()
        # 학습 끝
        optimizer.step()
#         print(avg_loss)
print ("end")

 

테스트 전에 CPU로 모델을 옮겨 줬다. 프로젝트에서 추가기능에 GPU 서버를 제공할 재력은 없다.

model.eval()
model = model.to("cpu")

 

테스트

with torch.no_grad():
    while 1:
        q = input("user > ").strip()
        if q == "quit":
            break
        a = ""
        while 1:
            input_ids = torch.LongTensor(koGPT2_TOKENIZER.encode(Q_TKN + q + SENT + '0' + A_TKN + a)).unsqueeze(dim=0)
            pred = model(input_ids)
            pred = pred.logits
            gen = koGPT2_TOKENIZER.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().numpy().tolist())[-1]
            if gen == EOS:
                break
            a += gen.replace("▁", " ")
        print("Chatbot > {}".format(a.strip()))

그건 실로 벅찬 감격이었다.고마워요 본드. 덕분에 마음이 아주 편해졌어요.

 

챗봇을 API로 사용하기 위해 모델 객체의 state_dict를 추출하여 저장

torch.save(model.state_dict(),'kogpt2_model_state_dict.pt')  # 모델 객체의 state_dict 저장

 

이렇게 얻은 파일을 맨 위에서 text_chatbot.py파일과 동일한 위치에 넣어주면 챗봇 API 구현 끝!

 

 

프로젝트에선 이렇게 활용됐습니다.

'Python > 개발' 카테고리의 다른 글

SCE-TTS 활용기(3) - 서버 구축  (0) 2024.06.04
SCE-TTS 활용기(2) - 데이터 학습  (0) 2024.06.04
SCE-TTS 활용기(1) - 음성 데이터 생성  (0) 2024.06.04