vocacity / app.py
Manojkumarpandi's picture
Update app.py
fa18c1a verified
import os
import streamlit as st
import speech_recognition as sr
from gtts import gTTS
import google.generativeai as genai
import base64
from transformers import pipeline
import asyncio
# Ensure event loop exists (Fix for Streamlit async issue)
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# Configure Generative AI (Ensure to use a secure way to handle API keys)
GOOGLE_API_KEY = "------------------------------------------------"
genai.configure(api_key=GOOGLE_API_KEY)
# Initialize recognizer
recognizer = sr.Recognizer()
# Emotion Detection Model
emotion_model = pipeline("text-classification", model="bhadresh-savani/distilbert-base-uncased-emotion")
def detect_emotion(text):
"""Detects emotion from text."""
try:
return emotion_model(text)[0]['label']
except Exception as e:
return f"Error detecting emotion: {str(e)}"
def listen_to_customer():
"""Captures voice input and converts it to text."""
with sr.Microphone() as source:
st.write("Listening...")
audio = recognizer.listen(source)
try:
return recognizer.recognize_google(audio)
except (sr.UnknownValueError, sr.RequestError):
return None
def process_text(customer_input):
"""Processes customer input using Generative AI."""
try:
model = genai.GenerativeModel('gemini-1.5-flash')
response = model.generate_content(customer_input)
return response.text
except Exception as e:
return f"Error in AI response: {str(e)}"
def text_to_speech(text, voice_option, language):
"""Converts AI response text to speech."""
try:
lang_code = {"English": "en", "Spanish": "es", "French": "fr", "Hindi": "hi"}.get(language, "en")
tts = gTTS(text=text, lang=lang_code, tld='com' if voice_option == "Male" else 'co.uk')
file_path = "response.mp3"
tts.save(file_path)
return file_path
except Exception as e:
st.error(f"Text-to-Speech Error: {str(e)}")
return None
def autoplay_audio(file_path):
"""Autoplays generated speech audio in Streamlit."""
try:
with open(file_path, "rb") as f:
data = f.read()
b64 = base64.b64encode(data).decode()
st.markdown(f"""
<audio controls autoplay>
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
</audio>
""", unsafe_allow_html=True)
except Exception as e:
st.error(f"Error playing audio: {str(e)}")
def main():
st.title("Vocacity AI Voice Agent πŸŽ™οΈ")
st.sidebar.header("Settings")
language = st.sidebar.selectbox("Choose Language:", ["English", "Spanish", "French", "Hindi"])
voice_option = st.sidebar.selectbox("Choose AI Voice:", ["Male", "Female"])
clear_chat = st.sidebar.button("πŸ—‘οΈ Clear Chat")
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
user_text_input = st.text_input("Type your query here:", "")
if st.button("πŸŽ™οΈ Speak"):
customer_input = listen_to_customer()
else:
customer_input = user_text_input.strip() if user_text_input else None
if customer_input:
emotion = detect_emotion(customer_input)
ai_response = process_text(customer_input)
st.session_state.chat_history.append((customer_input, ai_response))
st.write(f"**AI Response:** {ai_response} (Emotion: {emotion})")
audio_file = text_to_speech(ai_response, voice_option, language)
if audio_file:
autoplay_audio(audio_file)
os.remove(audio_file)
st.write("### Chat History")
for user, ai in st.session_state.chat_history[-5:]:
st.write(f"πŸ‘€ {user}")
st.write(f"πŸ€– {ai}")
if clear_chat:
st.session_state.chat_history = []
st.experimental_rerun()
if __name__ == "__main__":
main()