|
import torch |
|
import torchaudio |
|
import gradio as gr |
|
import time |
|
import numpy as np |
|
import scipy.io.wavfile |
|
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline |
|
|
|
|
|
device = "cpu" |
|
torch_dtype = torch.float32 |
|
MODEL_NAME = "openai/whisper-small" |
|
|
|
|
|
model = AutoModelForSpeechSeq2Seq.from_pretrained( |
|
MODEL_NAME, torch_dtype=torch_dtype, use_safetensors=True |
|
) |
|
model.to(device) |
|
|
|
|
|
model = torch.compile(model) |
|
|
|
|
|
processor = AutoProcessor.from_pretrained(MODEL_NAME) |
|
processor.feature_extractor.sampling_rate = 16000 |
|
|
|
pipe = pipeline( |
|
task="automatic-speech-recognition", |
|
model=model, |
|
tokenizer=processor.tokenizer, |
|
feature_extractor=processor.feature_extractor, |
|
chunk_length_s=5, |
|
torch_dtype=torch_dtype, |
|
device=device, |
|
generate_kwargs={"num_beams": 5, "language": "en"}, |
|
) |
|
|
|
|
|
def stream_transcribe(stream, new_chunk): |
|
start_time = time.time() |
|
try: |
|
sr, y = new_chunk |
|
|
|
|
|
if y.ndim > 1: |
|
y = y.mean(axis=1) |
|
|
|
y = y.astype(np.float32) |
|
y /= np.max(np.abs(y)) |
|
|
|
|
|
y_tensor = torch.tensor(y) |
|
y_resampled = torchaudio.functional.resample(y_tensor, orig_freq=sr, new_freq=16000).numpy() |
|
|
|
|
|
if stream is not None: |
|
stream = np.concatenate([stream, y_resampled]) |
|
else: |
|
stream = y_resampled |
|
|
|
|
|
transcription = pipe({"sampling_rate": 16000, "raw": stream})["text"] |
|
latency = time.time() - start_time |
|
|
|
return stream, transcription, f"{latency:.2f} sec" |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return stream, str(e), "Error" |
|
|
|
|
|
def transcribe(inputs, previous_transcription): |
|
start_time = time.time() |
|
try: |
|
|
|
sample_rate, audio_data = inputs |
|
|
|
|
|
audio_tensor = torch.tensor(audio_data) |
|
resampled_audio = torchaudio.functional.resample(audio_tensor, orig_freq=sample_rate, new_freq=16000).numpy() |
|
|
|
transcription = pipe({"sampling_rate": 16000, "raw": resampled_audio})["text"] |
|
|
|
previous_transcription += transcription |
|
latency = time.time() - start_time |
|
|
|
return previous_transcription, f"{latency:.2f} sec" |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return previous_transcription, "Error" |
|
|
|
|
|
def clear(): |
|
return "" |
|
|
|
|
|
with gr.Blocks() as microphone: |
|
gr.Markdown(f"# Whisper Small - Real-Time Transcription (Optimized CPU) ποΈ") |
|
gr.Markdown(f"Using [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) for ultra-fast speech-to-text with better accuracy.") |
|
|
|
with gr.Row(): |
|
input_audio_microphone = gr.Audio(sources=["microphone"], type="numpy", streaming=True) |
|
output = gr.Textbox(label="Live Transcription", value="") |
|
latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") |
|
|
|
with gr.Row(): |
|
clear_button = gr.Button("Clear Output") |
|
|
|
state = gr.State() |
|
input_audio_microphone.stream( |
|
stream_transcribe, [state, input_audio_microphone], |
|
[state, output, latency_textbox], time_limit=30, stream_every=1 |
|
) |
|
clear_button.click(clear, outputs=[output]) |
|
|
|
|
|
with gr.Blocks() as file: |
|
gr.Markdown(f"# Upload Audio File for Transcription π΅") |
|
gr.Markdown(f"Using [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) for better transcription accuracy.") |
|
|
|
with gr.Row(): |
|
input_audio = gr.Audio(sources=["upload"], type="numpy") |
|
output = gr.Textbox(label="Transcription", value="") |
|
latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") |
|
|
|
with gr.Row(): |
|
submit_button = gr.Button("Submit") |
|
clear_button = gr.Button("Clear Output") |
|
|
|
submit_button.click(transcribe, [input_audio, output], [output, latency_textbox]) |
|
clear_button.click(clear, outputs=[output]) |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Ocean()) as demo: |
|
gr.TabbedInterface([microphone, file], ["Microphone", "Upload Audio"]) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|