|
import torch |
|
import torchaudio |
|
import gradio as gr |
|
import time |
|
import numpy as np |
|
import scipy.io.wavfile |
|
from omegaconf import OmegaConf |
|
|
|
|
|
device = torch.device("cpu") |
|
torch_dtype = torch.float32 |
|
|
|
|
|
torch.set_num_threads(4) |
|
model, decoder, utils = torch.hub.load(repo_or_dir="snakers4/silero-models", |
|
model="silero_stt", |
|
language="en", |
|
device=device, |
|
trust_repo=True) |
|
(read_batch, split_into_batches, read_audio, prepare_model_input) = utils |
|
|
|
|
|
def stream_transcribe(stream, new_chunk): |
|
start_time = time.time() |
|
try: |
|
sr, y = new_chunk |
|
|
|
|
|
if y.ndim > 1: |
|
y = y.mean(axis=1) |
|
|
|
y = y.astype(np.float32) |
|
y /= np.max(np.abs(y)) |
|
|
|
|
|
y_tensor = torch.tensor(y) |
|
y_resampled = torchaudio.functional.resample(y_tensor, orig_freq=sr, new_freq=16000).numpy() |
|
|
|
|
|
if stream is not None: |
|
stream = np.concatenate([stream, y_resampled]) |
|
else: |
|
stream = y_resampled |
|
|
|
|
|
input_tensor = torch.from_numpy(stream).unsqueeze(0) |
|
input_tensor = prepare_model_input(input_tensor, device=device) |
|
|
|
|
|
transcription = model(input_tensor) |
|
text = decoder(transcription[0].cpu()) |
|
|
|
latency = time.time() - start_time |
|
return stream, text, f"{latency:.2f} sec" |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return stream, str(e), "Error" |
|
|
|
|
|
def transcribe(inputs, previous_transcription): |
|
start_time = time.time() |
|
try: |
|
|
|
sample_rate, audio_data = inputs |
|
|
|
|
|
audio_tensor = torch.tensor(audio_data) |
|
resampled_audio = torchaudio.functional.resample(audio_tensor, orig_freq=sample_rate, new_freq=16000).numpy() |
|
|
|
|
|
input_tensor = torch.from_numpy(resampled_audio).unsqueeze(0) |
|
input_tensor = prepare_model_input(input_tensor, device=device) |
|
|
|
|
|
transcription = model(input_tensor) |
|
text = decoder(transcription[0].cpu()) |
|
|
|
previous_transcription += text |
|
latency = time.time() - start_time |
|
|
|
return previous_transcription, f"{latency:.2f} sec" |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return previous_transcription, "Error" |
|
|
|
|
|
def clear(): |
|
return "" |
|
|
|
|
|
with gr.Blocks() as microphone: |
|
gr.Markdown(f"# Silero STT - Real-Time Transcription (Optimized CPU) ποΈ") |
|
gr.Markdown("Using `Silero STT` for lightweight, accurate speech-to-text.") |
|
|
|
with gr.Row(): |
|
input_audio_microphone = gr.Audio(sources=["microphone"], type="numpy", streaming=True) |
|
output = gr.Textbox(label="Live Transcription", value="") |
|
latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") |
|
|
|
with gr.Row(): |
|
clear_button = gr.Button("Clear Output") |
|
|
|
state = gr.State() |
|
input_audio_microphone.stream( |
|
stream_transcribe, [state, input_audio_microphone], |
|
[state, output, latency_textbox], time_limit=30, stream_every=1 |
|
) |
|
clear_button.click(clear, outputs=[output]) |
|
|
|
|
|
with gr.Blocks() as file: |
|
gr.Markdown(f"# Upload Audio File for Transcription π΅") |
|
gr.Markdown("Using `Silero STT` for offline, high-accuracy transcription.") |
|
|
|
with gr.Row(): |
|
input_audio = gr.Audio(sources=["upload"], type="numpy") |
|
output = gr.Textbox(label="Transcription", value="") |
|
latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") |
|
|
|
with gr.Row(): |
|
submit_button = gr.Button("Submit") |
|
clear_button = gr.Button("Clear Output") |
|
|
|
submit_button.click(transcribe, [input_audio, output], [output, latency_textbox]) |
|
clear_button.click(clear, outputs=[output]) |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Ocean()) as demo: |
|
gr.TabbedInterface([microphone, file], ["Microphone", "Upload Audio"]) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|