Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""app | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1GzjDFYPEtsFsBFnhi3x3B0vWyCE-Dtpb | |
""" | |
import gradio as gr | |
import numpy as np | |
from transformers import pipeline | |
import os | |
import time | |
import groq | |
import uuid # For generating unique filenames | |
# Updated imports to address LangChain deprecation warnings: | |
from langchain_groq import ChatGroq | |
from langchain.schema import HumanMessage | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.docstore.document import Document | |
# Importing chardet (make sure to add chardet to your requirements.txt) | |
import chardet | |
import fitz # PyMuPDF for PDFs | |
import docx # python-docx for Word files | |
import gtts # Google Text-to-Speech library | |
from pptx import Presentation # python-pptx for PowerPoint files | |
import re | |
# Initialize Whisper model for speech-to-text | |
transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base.en") | |
# Set API Key (Ensure it's stored securely in an environment variable) | |
groq.api_key = os.getenv("GROQ_API_KEY") # Replace with a valid API key | |
# Initialize Chat Model | |
chat_model = ChatGroq(model_name="deepseek-r1-distill-qwen-32b", api_key=groq.api_key) | |
# Initialize Embeddings and chromaDB | |
embedding_model = HuggingFaceEmbeddings() | |
vectorstore = Chroma(embedding_function=embedding_model) | |
# Short-term memory for the LLM | |
chat_memory = [] | |
# Prompt for quiz generation with added remark | |
quiz_prompt = """ | |
You are an AI assistant specialized in education and assessment creation. Given an uploaded document or text, generate a quiz with a mix of multiple-choice questions (MCQs) and fill-in-the-blank questions. The quiz should be directly based on the key concepts, facts, and details from the provided material. | |
Remove all unnecessary formatting generated by the LLM, including <think> tags, asterisks, markdown formatting, and any bold or italic text, as well as **, ###, ##, and # tags. | |
Please generate 20 Questions. | |
For each question: | |
- Provide 4 answer choices (for MCQs), with only one correct answer. | |
- Ensure fill-in-the-blank questions focus on key terms, phrases, or concepts from the document. | |
- Include an answer key for all questions. | |
- Ensure questions vary in difficulty and encourage comprehension rather than memorization. | |
- Additionally, implement an instant feedback mechanism: | |
- When a user selects an answer, indicate whether it is correct or incorrect. | |
- If incorrect, provide a brief explanation from the document to guide learning. | |
- Ensure responses are concise and educational to enhance understanding. | |
Output Example: | |
1. Fill in the blank: The LLM Agent framework has a central decision-making unit called the _______________________. | |
Answer: Agent Core | |
Feedback: The Agent Core is the central component of the LLM Agent framework, responsible for managing goals, tool instructions, planning modules, memory integration, and agent persona. | |
2. What is the main limitation of LLM-based applications? | |
a) Limited token capacity | |
b) Lack of domain expertise | |
c) Prone to hallucination | |
d) All of the above | |
Answer: d) All of the above | |
Feedback: LLM-based applications have several limitations, including limited token capacity, lack of domain expertise, and being prone to hallucination, among others. | |
""" | |
# Function to clean AI response by removing unwanted formatting | |
def clean_response(response): | |
"""Removes <think> tags, asterisks, and markdown formatting.""" | |
cleaned_text = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL) | |
cleaned_text = re.sub(r"(\*\*|\*|\[|\]|\\n)", "", cleaned_text) | |
cleaned_text = re.sub(r"^##+\s*", "", cleaned_text, flags=re.MULTILINE) | |
cleaned_text = re.sub(r"\\", "", cleaned_text) | |
cleaned_text = re.sub(r"---", "", cleaned_text) | |
return cleaned_text.strip() | |
# Function to generate quiz based on content | |
def generate_quiz(content): | |
prompt = f"{quiz_prompt}\n\nDocument content:\n{content}" | |
response = chat_model([HumanMessage(content=prompt)]) | |
cleaned_response = clean_response(response.content) | |
return cleaned_response | |
# Function to retrieve relevant documents from vectorstore based on user query | |
def retrieve_documents(query): | |
results = vectorstore.similarity_search(query, k=3) | |
return [doc.page_content for doc in results] | |
# Function to check content in vector store | |
def check_vectorstore(): | |
# Check the content of vectorstore by retrieving some documents | |
results = vectorstore.similarity_search("test", k=3) | |
return [doc.page_content for doc in results] | |
# RAG Function: Retrieve context and generate response based on context and query | |
def rag_query_handler(user_input): | |
try: | |
# Retrieve relevant documents for additional context (RAG - retrieval-augmented generation) | |
relevant_docs = retrieve_documents(user_input) | |
context = "\n".join(relevant_docs) if relevant_docs else "No relevant documents found." | |
# Combine the context with the user input and conversation history for the final prompt | |
system_prompt = "You are a helpful AI assistant. Answer questions accurately and concisely." | |
conversation_history = "\n".join(chat_memory[-10:]) # Keep the last 10 exchanges | |
prompt = f"{system_prompt}\n\nConversation History:\n{conversation_history}\n\nUser Input: {user_input}\n\nContext:\n{context}" | |
# Call the chat model for RAG generation (Retrieve + Generate) | |
response = chat_model([HumanMessage(content=prompt)]) | |
# Clean response to remove any unwanted formatting | |
cleaned_response_text = clean_response(response.content) | |
# Append conversation history for future queries | |
chat_memory.append(f"User: {user_input}") | |
chat_memory.append(f"AI: {cleaned_response_text}") | |
# Convert response to speech | |
audio_file = speech_playback(cleaned_response_text) | |
# Return both chat response and audio file path | |
return [(user_input, cleaned_response_text)], audio_file # Return as a tuple | |
except Exception as e: | |
return [("Error", str(e))], None | |
# Function to play response as speech using gTTS | |
def speech_playback(text): | |
try: | |
# Generate a unique filename for each audio file | |
unique_id = str(uuid.uuid4()) | |
audio_file = f"output_audio_{unique_id}.mp3" | |
# Convert text to speech | |
tts = gtts.gTTS(text, lang='zh-CN') | |
tts.save(audio_file) | |
# Return the path to the audio file | |
return audio_file | |
except Exception as e: | |
print(f"Error in speech_playback: {e}") | |
return None | |
# Function to detect encoding safely | |
def detect_encoding(file_path): | |
try: | |
with open(file_path, "rb") as f: | |
raw_data = f.read(4096) | |
detected = chardet.detect(raw_data) | |
encoding = detected["encoding"] | |
return encoding if encoding else "utf-8" | |
except Exception: | |
return "utf-8" | |
# Function to extract text from PDF | |
def extract_text_from_pdf(pdf_path): | |
try: | |
doc = fitz.open(pdf_path) | |
text = "\n".join([page.get_text("text") for page in doc]) | |
return text if text.strip() else "No extractable text found." | |
except Exception as e: | |
return f"Error extracting text from PDF: {str(e)}" | |
# Function to extract text from Word files (.docx) | |
def extract_text_from_docx(docx_path): | |
try: | |
doc = docx.Document(docx_path) | |
text = "\n".join([para.text for para in doc.paragraphs]) | |
return text if text.strip() else "No extractable text found." | |
except Exception as e: | |
return f"Error extracting text from Word document: {str(e)}" | |
# Function to extract text from PowerPoint files (.pptx) | |
def extract_text_from_pptx(pptx_path): | |
try: | |
presentation = Presentation(pptx_path) | |
text = "" | |
for slide in presentation.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text += shape.text + "\n" | |
return text if text.strip() else "No extractable text found." | |
except Exception as e: | |
return f"Error extracting text from PowerPoint: {str(e)}" | |
# Function to process documents safely | |
def process_document(file): | |
try: | |
file_extension = os.path.splitext(file.name)[-1].lower() | |
if file_extension in [".png", ".jpg", ".jpeg"]: | |
return "Error: Images cannot be processed for text extraction." | |
if file_extension == ".pdf": | |
content = extract_text_from_pdf(file.name) | |
elif file_extension == ".docx": | |
content = extract_text_from_docx(file.name) | |
elif file_extension == ".pptx": | |
content = extract_text_from_pptx(file.name) | |
else: | |
encoding = detect_encoding(file.name) | |
with open(file.name, "r", encoding=encoding, errors="replace") as f: | |
content = f.read() | |
# Split content into chunks for vector store indexing | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) | |
documents = [Document(page_content=chunk) for chunk in text_splitter.split_text(content)] | |
# Add documents to vectorstore | |
vectorstore.add_documents(documents) | |
# Check the content in vectorstore | |
vectorstore_content = check_vectorstore() | |
# Generate quiz based on document content | |
quiz = generate_quiz(content) | |
return f"Document processed successfully (File Type: {file_extension}). Quiz generated:\n{quiz}\n\nVectorstore Content:\n{vectorstore_content}" | |
except Exception as e: | |
return f"Error processing document: {str(e)}" | |
# Create Gradio interface for uploading files and interacting with the model | |
def chatbot_interface(): | |
with gr.Blocks() as demo: | |
with gr.Tab("Upload Document [Test] (Vector + RAG)"): | |
with gr.Column(): | |
file_input = gr.File(label="Upload Document") | |
submit_button = gr.Button("Submit") | |
result_output = gr.Textbox(label="Processed Output", interactive=False) | |
#audio_output = gr.Audio(label="Generated Speech") | |
with gr.Tab("Chat with AI or Query"): | |
with gr.Column(): | |
user_input = gr.Textbox(label="Ask a Question") | |
chat_button = gr.Button("Ask") | |
chat_output = gr.Textbox(label="Chat Response", interactive=False, elem_id="chat_output", lines=10, # Number of lines for the Textbox | |
max_lines=20, placeholder="Your response will appear here...") | |
audio_output = gr.Audio(label="Output Speech") | |
submit_button.click(process_document, inputs=file_input, outputs=result_output) | |
chat_button.click(rag_query_handler, inputs=user_input, outputs=[chat_output, audio_output]) | |
demo.launch() | |
# Run chatbot interface | |
chatbot_interface() | |