import torch import torchaudio import gradio as gr import time import numpy as np import scipy.io.wavfile from omegaconf import OmegaConf # ✅ Fix: Import omegaconf # ✅ 1️⃣ Load Silero STT Model for CPU device = torch.device("cpu") torch_dtype = torch.float32 # ✅ 2️⃣ Load Silero Model & Decoder with `trust_repo=True` torch.set_num_threads(4) model, decoder, utils = torch.hub.load(repo_or_dir="snakers4/silero-models", model="silero_stt", language="en", device=device, trust_repo=True) # ✅ Fix: Avoids untrusted repo warning (read_batch, split_into_batches, read_audio, prepare_model_input) = utils # ✅ 3️⃣ Real-Time Streaming Transcription (Microphone) def stream_transcribe(stream, new_chunk): start_time = time.time() try: sr, y = new_chunk # ✅ Convert stereo to mono if y.ndim > 1: y = y.mean(axis=1) y = y.astype(np.float32) y /= np.max(np.abs(y)) # ✅ Resample audio to 16kHz using torchaudio y_tensor = torch.tensor(y) y_resampled = torchaudio.functional.resample(y_tensor, orig_freq=sr, new_freq=16000).numpy() # ✅ Append to Stream if stream is not None: stream = np.concatenate([stream, y_resampled]) else: stream = y_resampled # ✅ Prepare Model Input input_tensor = torch.from_numpy(stream).unsqueeze(0) input_tensor = prepare_model_input(input_tensor, device=device) # ✅ Run Transcription transcription = model(input_tensor) text = decoder(transcription[0].cpu()) latency = time.time() - start_time return stream, text, f"{latency:.2f} sec" except Exception as e: print(f"Error: {e}") return stream, str(e), "Error" # ✅ 4️⃣ Transcription for File Upload def transcribe(inputs, previous_transcription): start_time = time.time() try: # ✅ Convert file input to correct format sample_rate, audio_data = inputs # ✅ Resample using torchaudio (optimized) audio_tensor = torch.tensor(audio_data) resampled_audio = torchaudio.functional.resample(audio_tensor, orig_freq=sample_rate, new_freq=16000).numpy() # ✅ Prepare Model Input input_tensor = torch.from_numpy(resampled_audio).unsqueeze(0) input_tensor = prepare_model_input(input_tensor, device=device) # ✅ Run Transcription transcription = model(input_tensor) text = decoder(transcription[0].cpu()) previous_transcription += text latency = time.time() - start_time return previous_transcription, f"{latency:.2f} sec" except Exception as e: print(f"Error: {e}") return previous_transcription, "Error" # ✅ 5️⃣ Clear Function def clear(): return "" # ✅ 6️⃣ Gradio Interface (Microphone Streaming) with gr.Blocks() as microphone: gr.Markdown(f"# Silero STT - Real-Time Transcription (Optimized CPU) 🎙️") gr.Markdown("Using `Silero STT` for lightweight, accurate speech-to-text.") with gr.Row(): input_audio_microphone = gr.Audio(sources=["microphone"], type="numpy", streaming=True) output = gr.Textbox(label="Live Transcription", value="") latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") with gr.Row(): clear_button = gr.Button("Clear Output") state = gr.State() input_audio_microphone.stream( stream_transcribe, [state, input_audio_microphone], [state, output, latency_textbox], time_limit=30, stream_every=1 ) clear_button.click(clear, outputs=[output]) # ✅ 7️⃣ Gradio Interface (File Upload) with gr.Blocks() as file: gr.Markdown(f"# Upload Audio File for Transcription 🎵") gr.Markdown("Using `Silero STT` for offline, high-accuracy transcription.") with gr.Row(): input_audio = gr.Audio(sources=["upload"], type="numpy") output = gr.Textbox(label="Transcription", value="") latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") with gr.Row(): submit_button = gr.Button("Submit") clear_button = gr.Button("Clear Output") submit_button.click(transcribe, [input_audio, output], [output, latency_textbox]) clear_button.click(clear, outputs=[output]) # ✅ 8️⃣ Final Gradio App with gr.Blocks(theme=gr.themes.Ocean()) as demo: gr.TabbedInterface([microphone, file], ["Microphone", "Upload Audio"]) # ✅ 9️⃣ Run Gradio Locally if __name__ == "__main__": demo.launch()