Spaces:
Running
Running
import os | |
# β Load secrets from Hugging Face Spaces environment | |
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY") | |
# β Verify that keys are loaded (prints only in development mode) | |
if MISTRAL_API_KEY is None or LLAMA_CLOUD_API_KEY is None: | |
print("π¨ ERROR: Missing API keys. Please set them in Hugging Face Secrets.") | |
import nest_asyncio | |
nest_asyncio.apply() | |
# imports | |
from llama_index.embeddings.mistralai import MistralAIEmbedding | |
from llama_index.core import Settings | |
Settings.embed_model = MistralAIEmbedding(model_name="mistral-embed") | |
from llama_index.core import VectorStoreIndex | |
from llama_parse import LlamaParse | |
from llama_index.llms.mistralai import MistralAI | |
llm = MistralAI(model="mistral-large-latest", api_key=MISTRAL_API_KEY) | |
from llama_index.core.workflow import ( | |
StartEvent, | |
StopEvent, | |
Workflow, | |
step, | |
Event, | |
Context | |
) | |
class QueryEvent(Event): | |
query: str | |
from llama_index.core import StorageContext, load_index_from_storage | |
import os | |
import hashlib | |
from pathlib import Path | |
class RAGWorkflow(Workflow): | |
storage_dir = "./storage" | |
hash_file = "./last_resume_hash.txt" | |
llm: MistralAI | |
query_engine: VectorStoreIndex | |
def compute_file_hash(self, file_path): | |
"""Compute SHA256 hash of a file from its path.""" | |
hasher = hashlib.sha256() | |
with open(file_path, "rb") as f: # Read file in binary mode | |
while chunk := f.read(8192): | |
hasher.update(chunk) | |
return hasher.hexdigest() | |
def get_last_stored_hash(self): | |
"""Retrieve the last stored resume hash, if available.""" | |
if os.path.exists(self.hash_file): | |
with open(self.hash_file, "r") as f: | |
return f.read().strip() | |
return None | |
def update_stored_hash(self, new_hash): | |
"""Update the stored resume hash after processing a new file.""" | |
with open(self.hash_file, "w") as f: | |
f.write(new_hash) | |
async def set_up(self, ctx: Context, ev: StartEvent) -> QueryEvent: | |
if not ev.resume_file: | |
raise ValueError("β No resume file provided") | |
# β Extract the correct file path | |
if isinstance(ev.resume_file, gr.utils.NamedString): | |
file_path = ev.resume_file.name | |
elif isinstance(ev.resume_file, str) and os.path.exists(ev.resume_file): | |
file_path = ev.resume_file | |
else: | |
raise ValueError("β οΈ Invalid file format received!") | |
print(f"β Resume File Path: {file_path}") | |
self.llm = MistralAI(model="mistral-large-latest") | |
# β Compute hash of the uploaded resume file | |
new_resume_hash = self.compute_file_hash(file_path) | |
last_stored_hash = self.get_last_stored_hash() | |
if os.path.exists(self.storage_dir) and last_stored_hash == new_resume_hash: | |
# Resume hasn't changed; load the existing index | |
storage_context = StorageContext.from_defaults(persist_dir=self.storage_dir) | |
index = load_index_from_storage(storage_context) | |
else: | |
# Resume is new; process and update storage | |
documents = LlamaParse( | |
result_type="markdown", | |
content_guideline_instruction="Extract structured bullet points from the resume." | |
).load_data(file_path, extra_info={"file_name": os.path.basename(file_path)}) | |
index = VectorStoreIndex.from_documents( | |
documents, | |
embed_model=Settings.embed_model # Use Hugging Face embeddings | |
) | |
index.storage_context.persist(persist_dir=self.storage_dir) | |
# β Update stored hash | |
self.update_stored_hash(new_resume_hash) | |
self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5) | |
return QueryEvent(query=ev.query) | |
async def ask_question(self, ctx: Context, ev: QueryEvent) -> StopEvent: | |
response = self.query_engine.query(f"This is a question about the resume: {ev.query}") | |
return StopEvent(result=response.response) | |
import gradio as gr | |
import asyncio | |
import os | |
# β Ensure you have your RAGWorkflow properly initialized | |
w = RAGWorkflow(timeout=120, verbose=False) | |
async def process_resume(file, query): | |
"""Handles Gradio file upload and query processing (Async).""" | |
if file is None: | |
return "β Please upload a resume." | |
if not query: | |
return "β Please enter a question." | |
try: | |
# β Use the actual file path from Gradio | |
file_path = file.name | |
# β Debugging information | |
print(f"β File uploaded: {file_path}") | |
print(f"β File size: {os.path.getsize(file_path)} bytes") | |
# β Run the RAG workflow with the actual file path | |
result = await w.run( | |
resume_file=file_path, # β Pass file path, not BytesIO | |
query=query | |
) | |
print("β Result:", result) # Debug output | |
return result if result else "β οΈ No relevant information found." | |
except Exception as e: | |
print("π¨ Error:", str(e)) | |
return f"π¨ Error occurred: {str(e)}" | |
# β Function to clear inputs | |
def clear_inputs(): | |
return None, "", "" | |
# β Create Gradio UI | |
with gr.Blocks() as demo: | |
gr.Markdown("# π RAGResume") | |
gr.Markdown(""" | |
**Upload a resume and ask questions about it!** | |
""") | |
gr.Markdown(""" | |
1. Upload a resume in PDF format. | |
2. Enter a question about the resume (example: where does the applicant currently work?). | |
3. Click on the "Submit" button to get the response. | |
4. Click on the "Clear" button to reset the inputs. | |
""") | |
with gr.Row(): | |
file_input = gr.File(label="π Upload Resume (PDF)") | |
query_input = gr.Textbox(label="π¬ Enter your question") | |
output = gr.Textbox(label="π Response") | |
with gr.Row(): | |
submit_btn = gr.Button("π Submit") | |
clear_btn = gr.Button("π§Ή Clear") | |
submit_btn.click(process_resume, inputs=[file_input, query_input], outputs=output) | |
clear_btn.click(clear_inputs, outputs=[file_input, query_input, output]) | |
# β Fix for Colab & Hugging Face Spaces | |
try: | |
import nest_asyncio | |
nest_asyncio.apply() # β Fix for Jupyter/Colab Notebooks | |
except ImportError: | |
pass | |
# β Launch Gradio with proper Async Handling | |
def run_demo(): | |
demo.queue() # Enables async functions | |
demo.launch(share=True) # β Public link enabled | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(run_demo()) # β Runs correctly in scripts | |
except RuntimeError: | |
asyncio.run(run_demo()) # β Fallback for runtime errors | |