|
|
|
from string import punctuation |
|
from tqdm.auto import tqdm, trange |
|
|
|
import torch |
|
from transformers import AutoTokenizer, AutoModel |
|
import datasets |
|
|
|
import pandas as pd |
|
import nltk |
|
from nltk import word_tokenize |
|
from nltk.corpus import stopwords |
|
from nltk.stem import wordnet |
|
from nltk import pos_tag |
|
nltk.download('omw-1.4') |
|
nltk.download('punkt') |
|
nltk.download('averaged_perceptron_tagger') |
|
nltk.download('wordnet') |
|
nltk.download('stopwords') |
|
|
|
|
|
import numpy as np |
|
import os |
|
import re |
|
import time |
|
|
|
from sklearn.feature_extraction.text import CountVectorizer |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.metrics import pairwise_distances |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
from gensim.models import Word2Vec, KeyedVectors |
|
import gensim.downloader as api |
|
|
|
import gradio as gr |
|
import time |
|
|
|
|
|
|
|
|
|
|
|
df = pd.read_csv("rachel_friends.csv") |
|
|
|
|
|
|
|
|
|
|
|
def text_normalization(text): |
|
text = str(text).lower() |
|
spl_char_text = re.sub(r'[^a-z]', ' ', text) |
|
tokens = nltk.word_tokenize(spl_char_text) |
|
lema = wordnet.WordNetLemmatizer() |
|
tags_list = pos_tag(tokens, tagset = None) |
|
lema_words = [] |
|
for token, pos_token in tags_list: |
|
if pos_token.startswith('V'): |
|
pos_val = 'v' |
|
elif pos_token.startswith('J'): |
|
pos_val = 'a' |
|
elif pos_token.startswith('R'): |
|
pos_val = 'r' |
|
else: |
|
pos_val = 'n' |
|
lema_token = lema.lemmatize(token, pos_val) |
|
lema_words.append(lema_token) |
|
return " ".join(lema_words) |
|
|
|
|
|
question_normalized = df['question'].apply(text_normalization) |
|
df.insert(2, 'Normalized question', question_normalized, True) |
|
|
|
|
|
stop = stopwords.words('english') |
|
stop = [] |
|
def removeStopWords(text): |
|
Q = [] |
|
s = text.split() |
|
q = '' |
|
for w in s: |
|
if w in stop: |
|
continue |
|
else: |
|
Q.append(w) |
|
q = " ".join(Q) |
|
return q |
|
|
|
|
|
question_norm_and_stop = df['Normalized question'].apply(removeStopWords) |
|
df.insert(3, 'Normalized and StopWords question', question_norm_and_stop, True) |
|
|
|
tfidf = TfidfVectorizer(ngram_range=(1,3), max_features=5024) |
|
x_tfidf = tfidf.fit_transform(df['Normalized and StopWords question']).toarray() |
|
features_tfidf = tfidf.get_feature_names_out() |
|
df_tfidf = pd.DataFrame(x_tfidf, columns = features_tfidf) |
|
|
|
|
|
def chat_tfidf(question): |
|
tidy_question = text_normalization(removeStopWords(question)) |
|
tf = tfidf.transform([tidy_question]).toarray() |
|
cos = 1- pairwise_distances(df_tfidf, tf, metric = 'cosine') |
|
index_value = cos.argmax() |
|
|
|
answer = df['answer'].loc[index_value] |
|
return answer |
|
|
|
|
|
def chat_tfidf_context(question, history): |
|
|
|
len_history = len(history) |
|
|
|
if len_history > 1: |
|
memory_weights = np.array([0.1, 0.3, 1.0]) |
|
|
|
history = history[-2:] |
|
|
|
else: |
|
memory_weights = np.array([0.3, 1.0]) |
|
|
|
history_sentence = np.zeros(shape=(len_history+1, 5024)) |
|
|
|
for ind, h in enumerate(history): |
|
|
|
tidy_question = text_normalization(removeStopWords(h[0])) |
|
|
|
tf = tfidf.transform([tidy_question]).toarray() |
|
|
|
|
|
history_sentence[ind] = tf * memory_weights[ind] |
|
|
|
tidy_question = text_normalization(removeStopWords(question)) |
|
tf = tfidf.transform([tidy_question]).toarray() |
|
|
|
history_sentence[-1] = tf |
|
history_sentence = history_sentence.mean(axis=0).reshape(1,-1) |
|
|
|
cos = 1- pairwise_distances(df_tfidf, history_sentence, metric = 'cosine') |
|
index_value = cos.argmax() |
|
answer = df['answer'].loc[index_value] |
|
|
|
return answer |
|
|
|
punkt = [p for p in punctuation] + ["`", "``" ,"''", "'"] |
|
|
|
def tokenize(sent: str) -> str: |
|
tokens = nltk.word_tokenize(sent.lower()) |
|
return ' '.join([word for word in tokens if word not in stop and word not in punkt]) |
|
|
|
questions_preprocessed = [] |
|
for question in df["question"].tolist() + df["answer"].tolist(): |
|
questions_preprocessed.append(tokenize(question)) |
|
|
|
questions_w2v = [sent.split(" ") for sent in questions_preprocessed] |
|
|
|
w2v = KeyedVectors.load('w2v.bin') |
|
unknown_vector = np.random.uniform(low=-0.2, high=0.2, size=(25,)) |
|
|
|
|
|
def w2v_get_vector_for_sentence(sentence): |
|
sent = nltk.word_tokenize(sentence.lower()) |
|
sent = [word for word in sent if word not in punkt] |
|
sentence_vector = [] |
|
if len(sent)==0: |
|
sentence_vector.append(unknown_vector) |
|
else: |
|
for word in sent: |
|
if word in w2v.key_to_index: |
|
sentence_vector.append(w2v[word]) |
|
else: |
|
sentence_vector.append(unknown_vector) |
|
|
|
return np.array(sentence_vector).mean(axis=0) |
|
|
|
|
|
base = np.zeros(shape=(len(df.question), 25)) |
|
for ind, sentence in enumerate(df['question']): |
|
base[ind] = w2v_get_vector_for_sentence(sentence) |
|
|
|
|
|
def chat_word2vec(question): |
|
question = [w2v_get_vector_for_sentence(question)] |
|
cos = 1-pairwise_distances(base, question, metric = 'cosine') |
|
index_value = cos.argmax() |
|
answer = df['answer'].loc[index_value] |
|
return answer |
|
|
|
|
|
def chat_word2vec_context(question, history): |
|
|
|
len_history = len(history) |
|
|
|
if len_history > 1: |
|
memory_weights = np.array([0.1, 0.3, 1.0]) |
|
|
|
history = history[-2:] |
|
|
|
else: |
|
memory_weights = np.array([0.3, 1.0]) |
|
|
|
history_sentence = np.zeros(shape=(len_history+1, 25)) |
|
|
|
for ind, h in enumerate(history): |
|
sentence = w2v_get_vector_for_sentence(h[0]) |
|
history_sentence[ind] = sentence * memory_weights[ind] |
|
|
|
question = w2v_get_vector_for_sentence(question) |
|
|
|
history_sentence[-1] = question |
|
history_sentence = history_sentence.mean(axis=0).reshape(1, -1) |
|
|
|
cos = 1-pairwise_distances(base, history_sentence, metric = 'cosine') |
|
index_value = cos.argmax() |
|
answer = df['answer'].loc[index_value] |
|
|
|
return answer |
|
|
|
|
|
|
|
|
|
model_name = "distilbert/distilbert-base-uncased" |
|
device = "cpu" |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModel.from_pretrained(model_name) |
|
|
|
class BERTSearchEngine: |
|
def __init__(self, model, tokenizer, text_database): |
|
self.raw_procesed_data = [self.preprocess(sample, tokenizer) for sample in text_database] |
|
self.base = [] |
|
self.retriever = None |
|
self.inverted_index = {} |
|
self._init_retriever(model, tokenizer, text_database) |
|
self._init_inverted_index(text_database) |
|
|
|
@staticmethod |
|
def preprocess(sentence: str, tokenizer): |
|
return tokenizer(sentence, padding=True, truncation=True, return_tensors='pt') |
|
|
|
def _embed_bert_cls(self, tokenized_text: dict[torch.Tensor]) -> np.array: |
|
with torch.no_grad(): |
|
model_output = self.retriever(**{k: v.to(self.retriever.device) for k, v in tokenized_text.items()}) |
|
embeddings = model_output.last_hidden_state[:, 0, :] |
|
embeddings = torch.nn.functional.normalize(embeddings) |
|
return embeddings[0].cpu().numpy() |
|
|
|
def _init_retriever(self, model, tokenizer, text_database): |
|
self.retriever = model |
|
self.tokenizer = tokenizer |
|
self.base = np.load("bert_base.npy") |
|
|
|
def retrieve(self, query: str) -> np.array: |
|
return self._embed_bert_cls(self.preprocess(query, self.tokenizer)) |
|
|
|
def retrieve_documents(self, query: str, top_k=3) -> list[int]: |
|
query_vector = self.retrieve(query) |
|
cosine_similarities = cosine_similarity([query_vector], self.base).flatten() |
|
relevant_indices = np.argsort(cosine_similarities, axis=0)[::-1][:top_k] |
|
return relevant_indices.tolist() |
|
|
|
def _init_inverted_index(self, text_database: list[str]): |
|
self.inverted_index = dict(enumerate(text_database)) |
|
|
|
def display_relevant_docs(self, query, full_database, top_k=3) -> list[int]: |
|
docs_indexes = self.retrieve_documents(query, top_k=top_k) |
|
return [self.inverted_index[ind] for ind in docs_indexes] |
|
|
|
def find_answer(self, query: str) -> int: |
|
query_vector = self.retrieve(query) |
|
cosine_similarities = cosine_similarity([query_vector], self.base).flatten() |
|
relevant_indice = np.argmax(cosine_similarities, axis=0) |
|
return relevant_indice |
|
|
|
simple_search_engine = BERTSearchEngine(model, tokenizer, df["question"]) |
|
|
|
|
|
|
|
def chat_bert(question): |
|
ind = simple_search_engine.find_answer(question) |
|
answer = df['answer'].iloc[ind] |
|
return answer |
|
|
|
|
|
def chat_bert_context(question, history): |
|
|
|
len_history = len(history) |
|
|
|
if len_history > 1: |
|
memory_weights = np.array([0.1, 0.3, 1.0]) |
|
|
|
history = history[-2:] |
|
|
|
else: |
|
memory_weights = np.array([0.3, 1.0]) |
|
|
|
history_sentence = np.zeros(shape=(len_history+1, 768)) |
|
|
|
for ind, h in enumerate(history): |
|
|
|
sentence = simple_search_engine.retrieve(h) |
|
history_sentence[ind] = sentence * memory_weights[ind] |
|
|
|
question = simple_search_engine.retrieve(question) |
|
|
|
history_sentence[-1] = question |
|
history_sentence = history_sentence.mean(axis=0).reshape(1, -1) |
|
|
|
cosine_similarities = cosine_similarity(history_sentence, simple_search_engine.base).flatten() |
|
relevant_indice = np.argmax(cosine_similarities, axis=0) |
|
answer = df['answer'].loc[relevant_indice] |
|
|
|
return answer |
|
|
|
|
|
MAX_LENGTH = 128 |
|
inverted_answer = dict(enumerate(df.answer.tolist())) |
|
|
|
def mean_pool(token_embeds: torch.tensor, attention_mask: torch.tensor) -> torch.tensor: |
|
in_mask = attention_mask.unsqueeze(-1).expand(token_embeds.size()).float() |
|
pool = torch.sum(token_embeds * in_mask, 1) / torch.clamp(in_mask.sum(1), min=1e-9) |
|
return pool |
|
|
|
|
|
def encode(input_texts: list[str], tokenizer: AutoTokenizer, model: AutoModel, device: str = "cpu" |
|
) -> torch.tensor: |
|
|
|
model.eval() |
|
tokenized_texts = tokenizer(input_texts, max_length=128, |
|
padding='max_length', truncation=True, return_tensors="pt") |
|
token_embeds = model(tokenized_texts["input_ids"].to(device), |
|
tokenized_texts["attention_mask"].to(device)).last_hidden_state |
|
pooled_embeds = mean_pool(token_embeds, tokenized_texts["attention_mask"].to(device)) |
|
return pooled_embeds |
|
|
|
|
|
class Sbert(torch.nn.Module): |
|
def __init__(self, max_length: int = 128): |
|
super().__init__() |
|
self.max_length = max_length |
|
self.bert_model = AutoModel.from_pretrained('distilbert-base-uncased') |
|
self.bert_tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased') |
|
self.linear = torch.nn.Linear(self.bert_model.config.hidden_size * 3, 1) |
|
|
|
|
|
def forward(self, data: datasets.arrow_dataset.Dataset) -> torch.tensor: |
|
question_input_ids = data["question_input_ids"].to(device) |
|
question_attention_mask = data["question_attention_mask"].to(device) |
|
answer_input_ids = data["answer_input_ids"].to(device) |
|
answer_attention_mask = data["answer_attention_mask"].to(device) |
|
|
|
out_question = self.bert_model(question_input_ids, question_attention_mask) |
|
out_answer = self.bert_model(answer_input_ids, answer_attention_mask) |
|
question_embeds = out_question.last_hidden_state |
|
answer_embeds = out_answer.last_hidden_state |
|
|
|
pooled_question_embeds = mean_pool(question_embeds, question_attention_mask) |
|
pooled_answer_embeds = mean_pool(answer_embeds, answer_attention_mask) |
|
|
|
embeds = torch.cat([pooled_question_embeds, pooled_answer_embeds, |
|
torch.abs(pooled_question_embeds - pooled_answer_embeds)], |
|
dim=-1) |
|
|
|
return self.linear(embeds) |
|
|
|
|
|
model_bi_encoder = Sbert().to(device) |
|
|
|
model_bi_encoder.bert_model.from_pretrained("models/friends_bi_encoder") |
|
|
|
|
|
question_embeds = np.load("bi_bert_question.npy") |
|
|
|
def chat_bi_bert(question, history): |
|
question = encode(question, model_bi_encoder.bert_tokenizer, model_bi_encoder.bert_model, device).squeeze().cpu().detach().numpy() |
|
cosine_similarities = cosine_similarity([question], question_embeds).flatten() |
|
top_indice = np.argmax(cosine_similarities, axis=0) |
|
answer = df['answer'].iloc[top_indice] |
|
answer = inverted_answer[top_indice] |
|
return answer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CrossEncoderBert(torch.nn.Module): |
|
def __init__(self, max_length: int = MAX_LENGTH): |
|
super().__init__() |
|
self.max_length = max_length |
|
self.bert_model = AutoModel.from_pretrained('distilbert-base-uncased') |
|
self.bert_tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased') |
|
self.linear = torch.nn.Linear(self.bert_model.config.hidden_size, 1) |
|
|
|
def forward(self, input_ids, attention_mask): |
|
outputs = self.bert_model(input_ids=input_ids, attention_mask=attention_mask) |
|
pooled_output = outputs.last_hidden_state[:, 0] |
|
return self.linear(pooled_output) |
|
|
|
model_cross_encoder = CrossEncoderBert().to(device) |
|
model_cross_encoder.bert_model.from_pretrained("models/friends_cross_encoder") |
|
|
|
def chat_cross_bert(question, history): |
|
|
|
question_encoded = encode(question, model_bi_encoder.bert_tokenizer, model_bi_encoder.bert_model, device).squeeze().cpu().detach().numpy() |
|
cosine_similarities = cosine_similarity([question_encoded], question_embeds).flatten() |
|
topk_indices = np.argsort(cosine_similarities, axis=0)[::-1][:5] |
|
topk_indices=topk_indices.tolist() |
|
corpus = [inverted_answer[ind] for ind in topk_indices] |
|
|
|
queries = [question] * len(corpus) |
|
|
|
tokenized_texts = model_cross_encoder.bert_tokenizer( |
|
queries, corpus, max_length=MAX_LENGTH, padding=True, truncation=True, return_tensors="pt" |
|
).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
ce_scores = model_cross_encoder(tokenized_texts['input_ids'], tokenized_texts['attention_mask']).squeeze(-1) |
|
ce_scores = torch.sigmoid(ce_scores) |
|
|
|
|
|
|
|
scores = ce_scores.cpu().numpy() |
|
ix = np.argmax(scores) |
|
|
|
return corpus[ix] |
|
|
|
|
|
def echo(message, history, model): |
|
|
|
if model=="TF-IDF": |
|
|
|
answer = chat_tfidf_context(message, history) |
|
return answer |
|
|
|
elif model=="W2V": |
|
|
|
answer = chat_word2vec_context(message, history) |
|
return answer |
|
|
|
elif model=="BERT": |
|
answer = chat_bert_context(message, history) |
|
return answer |
|
|
|
elif model=="Bi-BERT-Encoder": |
|
answer = chat_bi_bert(message, history) |
|
return answer |
|
|
|
elif model=="Bi+Cross-BERT-Encoder": |
|
answer = chat_cross_bert(message, history) |
|
return answer |
|
|
|
|
|
|
|
|
|
title = "Chatbot who speaks like Rachel from Friends" |
|
description = "You have a good opportunity to have a dialog with actress from Friends - Rachel Green" |
|
|
|
|
|
model = gr.Dropdown(["TF-IDF", "W2V", "BERT", "Bi-BERT-Encoder", "Bi+Cross-BERT-Encoder"], label="Retrieval model", info="What model do you want to use?", value="TF-IDF") |
|
|
|
with gr.Blocks() as demo: |
|
|
|
gr.ChatInterface( |
|
fn=echo, |
|
title=title, |
|
description=description, |
|
additional_inputs=[model], |
|
retry_btn=None, |
|
undo_btn=None, |
|
clear_btn=None, |
|
) |
|
|
|
demo.launch(debug=False, share=True) |
|
|