chatbot / app.py
StKirill's picture
Update app.py
b8b02a2 verified
# import parsing # decomment to download data from the website and parse it #
from string import punctuation
from tqdm.auto import tqdm, trange
import torch
from transformers import AutoTokenizer, AutoModel
import datasets
import pandas as pd
import nltk
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import wordnet # for lemmtization
from nltk import pos_tag # for parts of speech
nltk.download('omw-1.4') #this is for the .apply() function to work
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
nltk.download('stopwords')
import numpy as np
import os
import re #regular expressions
import time
from sklearn.feature_extraction.text import CountVectorizer # for bag of words (bow)
from sklearn.feature_extraction.text import TfidfVectorizer #for tfidf
from sklearn.metrics import pairwise_distances # cosine similarity
from sklearn.metrics.pairwise import cosine_similarity
from gensim.models import Word2Vec, KeyedVectors
import gensim.downloader as api
import gradio as gr
import time
# Take Rachel as main character
df = pd.read_csv("rachel_friends.csv") # read the database into a data frame
#-------------------------------------TF-IDF------------------------------------------#
# Define function for text normalization
def text_normalization(text):
text = str(text).lower() # convert to all lower letters
spl_char_text = re.sub(r'[^a-z]', ' ', text) # remove any special characters including numbers
tokens = nltk.word_tokenize(spl_char_text) # tokenize words
lema = wordnet.WordNetLemmatizer() # lemmatizer initiation
tags_list = pos_tag(tokens, tagset = None) # parts of speech
lema_words = []
for token, pos_token in tags_list:
if pos_token.startswith('V'): # if the tag from tag_list is a verb, assign 'v' to it's pos_val
pos_val = 'v'
elif pos_token.startswith('J'): # adjective
pos_val = 'a'
elif pos_token.startswith('R'): # adverb
pos_val = 'r'
else: # otherwise it must be a noun
pos_val = 'n'
lema_token = lema.lemmatize(token, pos_val) # performing lemmatization
lema_words.append(lema_token) # addid the lemmatized words into our list
return " ".join(lema_words) # return our list as a human sentence
# Preprocess data and insert to dataframe
question_normalized = df['question'].apply(text_normalization)
df.insert(2, 'Normalized question', question_normalized, True)
# Define function to delete stopwords from the sentences
stop = stopwords.words('english') # Include stop words
stop = [] # Exclude stopwords
def removeStopWords(text):
Q = []
s = text.split() # create an array of words from our text sentence, cut it into words
q = ''
for w in s: # for every word in the given sentence if the word is a stop word ignore it
if w in stop:
continue
else: # otherwise add it to the end of our array
Q.append(w)
q = " ".join(Q) # create a sentence out of our array of non stop words
return q
# Preprocess data and insert to dataframe
question_norm_and_stop = df['Normalized question'].apply(removeStopWords)
df.insert(3, 'Normalized and StopWords question', question_norm_and_stop, True)
tfidf = TfidfVectorizer(ngram_range=(1,3), max_features=5024) # initializing tf-idf
x_tfidf = tfidf.fit_transform(df['Normalized and StopWords question']).toarray() # oversimplifying this converts words to vectors
features_tfidf = tfidf.get_feature_names_out() # use function to get all the normalized words
df_tfidf = pd.DataFrame(x_tfidf, columns = features_tfidf) # create dataframe to show the 0, 1 value for each word
# bot tf idf algorithm without context
def chat_tfidf(question):
tidy_question = text_normalization(removeStopWords(question)) # clean & lemmatize the question
tf = tfidf.transform([tidy_question]).toarray() # convert the question into a vector
cos = 1- pairwise_distances(df_tfidf, tf, metric = 'cosine') # calculate the cosine value
index_value = cos.argmax() # find the index of the maximum cosine value
# answer = Answer("Ross", df['answer'].loc[index_value])
answer = df['answer'].loc[index_value]
return answer
# bot tf idf algorithm with context
def chat_tfidf_context(question, history):
len_history = len(history)
if len_history > 1:
memory_weights = np.array([0.1, 0.3, 1.0]) # .reshape((3,1))
# take last two sentences in accordance to bot's memory
history = history[-2:]
else:
memory_weights = np.array([0.3, 1.0])
history_sentence = np.zeros(shape=(len_history+1, 5024))
for ind, h in enumerate(history):
# normalize first question from context
tidy_question = text_normalization(removeStopWords(h[0]))
# pass via tfidf
tf = tfidf.transform([tidy_question]).toarray()
# assign tf idf vector to history sentence
history_sentence[ind] = tf * memory_weights[ind]
tidy_question = text_normalization(removeStopWords(question))
tf = tfidf.transform([tidy_question]).toarray()
history_sentence[-1] = tf
history_sentence = history_sentence.mean(axis=0).reshape(1,-1)
cos = 1- pairwise_distances(df_tfidf, history_sentence, metric = 'cosine')
index_value = cos.argmax()
answer = df['answer'].loc[index_value]
return answer
#-------------------------------------W2V------------------------------------------#
punkt = [p for p in punctuation] + ["`", "``" ,"''", "'"]
def tokenize(sent: str) -> str:
tokens = nltk.word_tokenize(sent.lower()) # tokenize words
return ' '.join([word for word in tokens if word not in stop and word not in punkt])
questions_preprocessed = []
for question in df["question"].tolist() + df["answer"].tolist():
questions_preprocessed.append(tokenize(question))
questions_w2v = [sent.split(" ") for sent in questions_preprocessed]
w2v = KeyedVectors.load('w2v.bin')
unknown_vector = np.random.uniform(low=-0.2, high=0.2, size=(25,))
# define function to form sentences with w2v
def w2v_get_vector_for_sentence(sentence):
sent = nltk.word_tokenize(sentence.lower())
sent = [word for word in sent if word not in punkt]
sentence_vector = []
if len(sent)==0:
sentence_vector.append(unknown_vector)
else:
for word in sent:
if word in w2v.key_to_index:
sentence_vector.append(w2v[word])
else:
sentence_vector.append(unknown_vector)
return np.array(sentence_vector).mean(axis=0)
# create base for w2v
base = np.zeros(shape=(len(df.question), 25))
for ind, sentence in enumerate(df['question']): # df[df['question'].str.len() >= 1]
base[ind] = w2v_get_vector_for_sentence(sentence)
# bot w2v algorithm without context
def chat_word2vec(question):
question = [w2v_get_vector_for_sentence(question)]
cos = 1-pairwise_distances(base, question, metric = 'cosine') # calculate the cosine value
index_value = cos.argmax() # find the index of the maximum cosine value
answer = df['answer'].loc[index_value]
return answer
# bot w2v algorithm with context
def chat_word2vec_context(question, history):
len_history = len(history)
if len_history > 1:
memory_weights = np.array([0.1, 0.3, 1.0]) # .reshape((3,1))
# take last two sentences in accordance to bot's memory
history = history[-2:]
else:
memory_weights = np.array([0.3, 1.0])
history_sentence = np.zeros(shape=(len_history+1, 25))
for ind, h in enumerate(history):
sentence = w2v_get_vector_for_sentence(h[0])
history_sentence[ind] = sentence * memory_weights[ind]
question = w2v_get_vector_for_sentence(question)
history_sentence[-1] = question
history_sentence = history_sentence.mean(axis=0).reshape(1, -1)
cos = 1-pairwise_distances(base, history_sentence, metric = 'cosine')
index_value = cos.argmax()
answer = df['answer'].loc[index_value]
return answer
#-------------------------------------BERT------------------------------------------#
# Let's try bert model by elastic and with e5
model_name = "distilbert/distilbert-base-uncased"
device = "cpu"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
class BERTSearchEngine:
def __init__(self, model, tokenizer, text_database):
self.raw_procesed_data = [self.preprocess(sample, tokenizer) for sample in text_database]
self.base = []
self.retriever = None
self.inverted_index = {}
self._init_retriever(model, tokenizer, text_database)
self._init_inverted_index(text_database)
@staticmethod
def preprocess(sentence: str, tokenizer):
return tokenizer(sentence, padding=True, truncation=True, return_tensors='pt')
def _embed_bert_cls(self, tokenized_text: dict[torch.Tensor]) -> np.array:
with torch.no_grad():
model_output = self.retriever(**{k: v.to(self.retriever.device) for k, v in tokenized_text.items()})
embeddings = model_output.last_hidden_state[:, 0, :]
embeddings = torch.nn.functional.normalize(embeddings)
return embeddings[0].cpu().numpy()
def _init_retriever(self, model, tokenizer, text_database):
self.retriever = model
self.tokenizer = tokenizer
self.base = np.load("bert_base.npy") #np.array([self._embed_bert_cls(self.preprocess(text, tokenizer)) for text in tqdm(text_database)])
def retrieve(self, query: str) -> np.array:
return self._embed_bert_cls(self.preprocess(query, self.tokenizer))
def retrieve_documents(self, query: str, top_k=3) -> list[int]:
query_vector = self.retrieve(query)
cosine_similarities = cosine_similarity([query_vector], self.base).flatten()
relevant_indices = np.argsort(cosine_similarities, axis=0)[::-1][:top_k]
return relevant_indices.tolist()
def _init_inverted_index(self, text_database: list[str]):
self.inverted_index = dict(enumerate(text_database))
def display_relevant_docs(self, query, full_database, top_k=3) -> list[int]:
docs_indexes = self.retrieve_documents(query, top_k=top_k)
return [self.inverted_index[ind] for ind in docs_indexes]
def find_answer(self, query: str) -> int:
query_vector = self.retrieve(query)
cosine_similarities = cosine_similarity([query_vector], self.base).flatten()
relevant_indice = np.argmax(cosine_similarities, axis=0)
return relevant_indice
simple_search_engine = BERTSearchEngine(model, tokenizer, df["question"])
# simple_search_engine.bert = np.load(bert_base.npy)
# bot bert algorithm without context
def chat_bert(question):
ind = simple_search_engine.find_answer(question)
answer = df['answer'].iloc[ind]
return answer
# bot bert algorithm with context
def chat_bert_context(question, history):
len_history = len(history)
if len_history > 1:
memory_weights = np.array([0.1, 0.3, 1.0]) # .reshape((3,1))
# take last two sentences in accordance to bot's memory
history = history[-2:]
else:
memory_weights = np.array([0.3, 1.0])
history_sentence = np.zeros(shape=(len_history+1, 768))
for ind, h in enumerate(history):
sentence = simple_search_engine.retrieve(h)
history_sentence[ind] = sentence * memory_weights[ind]
question = simple_search_engine.retrieve(question)
history_sentence[-1] = question
history_sentence = history_sentence.mean(axis=0).reshape(1, -1)
cosine_similarities = cosine_similarity(history_sentence, simple_search_engine.base).flatten()
relevant_indice = np.argmax(cosine_similarities, axis=0)
answer = df['answer'].loc[relevant_indice]
return answer
#-------------------------------------Bi-BERT-Encoder------------------------------------------#
MAX_LENGTH = 128
inverted_answer = dict(enumerate(df.answer.tolist()))
# Define function for mean-pooling
def mean_pool(token_embeds: torch.tensor, attention_mask: torch.tensor) -> torch.tensor:
in_mask = attention_mask.unsqueeze(-1).expand(token_embeds.size()).float()
pool = torch.sum(token_embeds * in_mask, 1) / torch.clamp(in_mask.sum(1), min=1e-9)
return pool
# Define function for tokenization of the sentence and encoding it
def encode(input_texts: list[str], tokenizer: AutoTokenizer, model: AutoModel, device: str = "cpu"
) -> torch.tensor:
model.eval()
tokenized_texts = tokenizer(input_texts, max_length=128,
padding='max_length', truncation=True, return_tensors="pt")
token_embeds = model(tokenized_texts["input_ids"].to(device),
tokenized_texts["attention_mask"].to(device)).last_hidden_state
pooled_embeds = mean_pool(token_embeds, tokenized_texts["attention_mask"].to(device))
return pooled_embeds
# Define architecture for bi-bert-encoder
class Sbert(torch.nn.Module):
def __init__(self, max_length: int = 128):
super().__init__()
self.max_length = max_length
self.bert_model = AutoModel.from_pretrained('distilbert-base-uncased')
self.bert_tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
self.linear = torch.nn.Linear(self.bert_model.config.hidden_size * 3, 1)
# self.sigmoid = torch.nn.Sigmoid()
def forward(self, data: datasets.arrow_dataset.Dataset) -> torch.tensor:
question_input_ids = data["question_input_ids"].to(device)
question_attention_mask = data["question_attention_mask"].to(device)
answer_input_ids = data["answer_input_ids"].to(device)
answer_attention_mask = data["answer_attention_mask"].to(device)
out_question = self.bert_model(question_input_ids, question_attention_mask)
out_answer = self.bert_model(answer_input_ids, answer_attention_mask)
question_embeds = out_question.last_hidden_state
answer_embeds = out_answer.last_hidden_state
pooled_question_embeds = mean_pool(question_embeds, question_attention_mask)
pooled_answer_embeds = mean_pool(answer_embeds, answer_attention_mask)
embeds = torch.cat([pooled_question_embeds, pooled_answer_embeds,
torch.abs(pooled_question_embeds - pooled_answer_embeds)],
dim=-1)
# return self.sigmoid(self.linear(embeds))
return self.linear(embeds)
# Initialize the model
model_bi_encoder = Sbert().to(device)
# Load weights from training step
model_bi_encoder.bert_model.from_pretrained("models/friends_bi_encoder")
# Load question embeds
question_embeds = np.load("bi_bert_question.npy")
def chat_bi_bert(question, history):
question = encode(question, model_bi_encoder.bert_tokenizer, model_bi_encoder.bert_model, device).squeeze().cpu().detach().numpy()
cosine_similarities = cosine_similarity([question], question_embeds).flatten()
top_indice = np.argmax(cosine_similarities, axis=0)
answer = df['answer'].iloc[top_indice]
answer = inverted_answer[top_indice]
return answer
#-------------------------------------Bi+Cross-BERT-Encoder------------------------------------------#
#Define class for CrossEncoderBert
class CrossEncoderBert(torch.nn.Module):
def __init__(self, max_length: int = MAX_LENGTH):
super().__init__()
self.max_length = max_length
self.bert_model = AutoModel.from_pretrained('distilbert-base-uncased')
self.bert_tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
self.linear = torch.nn.Linear(self.bert_model.config.hidden_size, 1)
def forward(self, input_ids, attention_mask):
outputs = self.bert_model(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.last_hidden_state[:, 0] # Use the CLS token's output
return self.linear(pooled_output)
model_cross_encoder = CrossEncoderBert().to(device)
model_cross_encoder.bert_model.from_pretrained("models/friends_cross_encoder")
def chat_cross_bert(question, history):
question_encoded = encode(question, model_bi_encoder.bert_tokenizer, model_bi_encoder.bert_model, device).squeeze().cpu().detach().numpy()
cosine_similarities = cosine_similarity([question_encoded], question_embeds).flatten()
topk_indices = np.argsort(cosine_similarities, axis=0)[::-1][:5]
topk_indices=topk_indices.tolist()
corpus = [inverted_answer[ind] for ind in topk_indices]
queries = [question] * len(corpus)
tokenized_texts = model_cross_encoder.bert_tokenizer(
queries, corpus, max_length=MAX_LENGTH, padding=True, truncation=True, return_tensors="pt"
).to(device)
# Finetuned CrossEncoder model scoring
with torch.no_grad():
ce_scores = model_cross_encoder(tokenized_texts['input_ids'], tokenized_texts['attention_mask']).squeeze(-1)
ce_scores = torch.sigmoid(ce_scores) # Apply sigmoid if needed
# Process scores for finetuned model
scores = ce_scores.cpu().numpy()
ix = np.argmax(scores)
# print(f"{corpus[scores_ix]}")
return corpus[ix]
# gradio part
def echo(message, history, model):
if model=="TF-IDF":
# answer = chat_tfidf(message)
answer = chat_tfidf_context(message, history)
return answer
elif model=="W2V":
# answer = chat_word2vec(message)
answer = chat_word2vec_context(message, history)
return answer
elif model=="BERT":
answer = chat_bert_context(message, history)
return answer
elif model=="Bi-BERT-Encoder":
answer = chat_bi_bert(message, history)
return answer
elif model=="Bi+Cross-BERT-Encoder":
answer = chat_cross_bert(message, history)
return answer
title = "Chatbot who speaks like Rachel from Friends"
description = "You have a good opportunity to have a dialog with actress from Friends - Rachel Green"
# model = gr.CheckboxGroup(["TF-IDF", "W2V", "BERT", "BI-Encoder", "Cross-Encoder"], label="Model", info="What model do you want to use?", value="TF-IDF")
model = gr.Dropdown(["TF-IDF", "W2V", "BERT", "Bi-BERT-Encoder", "Bi+Cross-BERT-Encoder"], label="Retrieval model", info="What model do you want to use?", value="TF-IDF")
with gr.Blocks() as demo:
gr.ChatInterface(
fn=echo,
title=title,
description=description,
additional_inputs=[model],
retry_btn=None,
undo_btn=None,
clear_btn=None,
)
demo.launch(debug=False, share=True)