import os | |
import streamlit as st | |
import speech_recognition as sr | |
from gtts import gTTS | |
import google.generativeai as genai | |
import base64 | |
from transformers import pipeline | |
import asyncio | |
# Ensure event loop exists (Fix for Streamlit async issue) | |
try: | |
asyncio.get_running_loop() | |
except RuntimeError: | |
asyncio.set_event_loop(asyncio.new_event_loop()) | |
# Configure Generative AI (Ensure to use a secure way to handle API keys) | |
GOOGLE_API_KEY = "------------------------------------------------" | |
genai.configure(api_key=GOOGLE_API_KEY) | |
# Initialize recognizer | |
recognizer = sr.Recognizer() | |
# Emotion Detection Model | |
emotion_model = pipeline("text-classification", model="bhadresh-savani/distilbert-base-uncased-emotion") | |
def detect_emotion(text): | |
"""Detects emotion from text.""" | |
try: | |
return emotion_model(text)[0]['label'] | |
except Exception as e: | |
return f"Error detecting emotion: {str(e)}" | |
def listen_to_customer(): | |
"""Captures voice input and converts it to text.""" | |
with sr.Microphone() as source: | |
st.write("Listening...") | |
audio = recognizer.listen(source) | |
try: | |
return recognizer.recognize_google(audio) | |
except (sr.UnknownValueError, sr.RequestError): | |
return None | |
def process_text(customer_input): | |
"""Processes customer input using Generative AI.""" | |
try: | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
response = model.generate_content(customer_input) | |
return response.text | |
except Exception as e: | |
return f"Error in AI response: {str(e)}" | |
def text_to_speech(text, voice_option, language): | |
"""Converts AI response text to speech.""" | |
try: | |
lang_code = {"English": "en", "Spanish": "es", "French": "fr", "Hindi": "hi"}.get(language, "en") | |
tts = gTTS(text=text, lang=lang_code, tld='com' if voice_option == "Male" else '') | |
file_path = "response.mp3" | | | |
return file_path | |
except Exception as e: | |
st.error(f"Text-to-Speech Error: {str(e)}") | |
return None | |
def autoplay_audio(file_path): | |
"""Autoplays generated speech audio in Streamlit.""" | |
try: | |
with open(file_path, "rb") as f: | |
data = | |
b64 = base64.b64encode(data).decode() | |
st.markdown(f""" | |
<audio controls autoplay> | |
<source src="data:audio/mp3;base64,{b64}" type="audio/mp3"> | |
</audio> | |
""", unsafe_allow_html=True) | |
except Exception as e: | |
st.error(f"Error playing audio: {str(e)}") | |
def main(): | |
st.title("Vocacity AI Voice Agent ποΈ") | |
st.sidebar.header("Settings") | |
language = st.sidebar.selectbox("Choose Language:", ["English", "Spanish", "French", "Hindi"]) | |
voice_option = st.sidebar.selectbox("Choose AI Voice:", ["Male", "Female"]) | |
clear_chat = st.sidebar.button("ποΈ Clear Chat") | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
user_text_input = st.text_input("Type your query here:", "") | |
if st.button("ποΈ Speak"): | |
customer_input = listen_to_customer() | |
else: | |
customer_input = user_text_input.strip() if user_text_input else None | |
if customer_input: | |
emotion = detect_emotion(customer_input) | |
ai_response = process_text(customer_input) | |
st.session_state.chat_history.append((customer_input, ai_response)) | |
st.write(f"**AI Response:** {ai_response} (Emotion: {emotion})") | |
audio_file = text_to_speech(ai_response, voice_option, language) | |
if audio_file: | |
autoplay_audio(audio_file) | |
os.remove(audio_file) | |
st.write("### Chat History") | |
for user, ai in st.session_state.chat_history[-5:]: | |
st.write(f"π€ {user}") | |
st.write(f"π€ {ai}") | |
if clear_chat: | |
st.session_state.chat_history = [] | |
st.experimental_rerun() | |
if __name__ == "__main__": | |
main() | |