from transformers import PretrainedConfig, PreTrainedModel, AutoModel, AutoConfig |
import torch |
import os |
import numpy as np |
import json |
import onnxruntime as ort |
from huggingface_hub import snapshot_download |
class IndicASRConfig(PretrainedConfig): |
model_type = "iasr" |
def __init__(self, ts_folder: str = "path", BLANK_ID: int = 256, RNNT_MAX_SYMBOLS: int = 10, |
PRED_RNN_LAYERS: int = 2, PRED_RNN_HIDDEN_DIM: int = 640, SOS: int = 256, **kwargs): |
super().__init__(**kwargs) |
self.ts_folder = ts_folder |
self.SOS = SOS |
class IndicASRModel(PreTrainedModel): |
config_class = IndicASRConfig |
def __init__(self, config): |
super().__init__(config) |
self.models = {} |
names = ['encoder', 'ctc_decoder', 'rnnt_decoder', 'joint_enc', 'joint_pred', 'joint_pre_net'] + [f'joint_post_net_{z}' for z in ['as', 'bn', 'brx', 'doi', 'gu', 'hi', 'kn', 'kok', 'ks', 'mai', 'ml', 'mni', 'mr', 'ne', 'or', 'pa', 'sa', 'sat', 'sd', 'ta', 'te', 'ur']] |
self.models = {} |
self.d = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
self.models['preprocessor'] = torch.jit.load(f'{config.ts_folder}/assets/preprocessor.ts', map_location=self.d) |
for n in names: |
component_name = f'{config.ts_folder}/assets/{n}.onnx' |
if os.path.exists(config.ts_folder): |
self.models[n] = ort.InferenceSession(component_name, providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if torch.cuda.is_available() else ['CPUExecutionProvider']) |
else: |
self.models[n] = None |
print('Failed to load', component_name) |
with open(f'{config.ts_folder}/assets/vocab.json') as reader: |
self.vocab = json.load(reader) |
with open(f'{config.ts_folder}/assets/language_masks.json') as reader: |
self.language_masks = json.load(reader) |
def forward(self, wav, lang, decoding='ctc'): |
encoder_outputs, encoded_lengths = self.encode(wav) |
if decoding == 'ctc': |
return self._ctc_decode(encoder_outputs, encoded_lengths, lang) |
if decoding == 'rnnt': |
return self._rnnt_decode(encoder_outputs, encoded_lengths, lang) |
def encode(self, wav): |
audio_signal, length = self.models['preprocessor'](input_signal=wav.to(self.d), length=torch.tensor([wav.shape[-1]]).to(self.d)) |
outputs, encoded_lengths = self.models['encoder'].run(['outputs', 'encoded_lengths'], {'audio_signal': audio_signal.cpu().numpy(), 'length': length.cpu().numpy()}) |
return outputs, encoded_lengths |
def _ctc_decode(self, encoder_outputs, encoded_lengths, lang): |
logprobs = self.models['ctc_decoder'].run(['logprobs'], {'encoder_output': encoder_outputs})[0] |
logprobs = torch.from_numpy(logprobs[:, :, self.language_masks[lang]]).log_softmax(dim=-1) |
indices = torch.argmax(logprobs[0],dim=-1) |
collapsed_indices = torch.unique_consecutive(indices, dim=-1) |
hyp = ''.join([self.vocab[lang][x] for x in collapsed_indices if x != self.config.BLANK_ID]).replace('▁',' ').strip() |
del logprobs, indices, collapsed_indices |
return hyp |
def _rnnt_decode(self, encoder_outputs, encoded_lengths, lang): |
joint_enc = self.models['joint_enc'].run(['output'], {'input': encoder_outputs.transpose(0, 2, 1)})[0] |
joint_enc = torch.from_numpy(joint_enc) |
hyp = [self.config.SOS] |
prev_dec_state = (np.zeros((self.config.PRED_RNN_LAYERS, 1, self.config.PRED_RNN_HIDDEN_DIM), dtype=np.float32), |
np.zeros((self.config.PRED_RNN_LAYERS, 1, self.config.PRED_RNN_HIDDEN_DIM), dtype=np.float32)) |
for t in range(joint_enc.size(1)): |
f = joint_enc[:, t, :].unsqueeze(1) |
not_blank = True |
symbols_added = 0 |
while not_blank and ((self.config.RNNT_MAX_SYMBOLS is None) or (symbols_added < self.config.RNNT_MAX_SYMBOLS)): |
g, _, dec_state_0, dec_state_1 = self.models['rnnt_decoder'].run( |
['outputs', 'prednet_lengths', 'states', '162'], |
{'targets': np.array([[hyp[-1]]], dtype=np.int32), |
'target_length': np.array([1], dtype=np.int32), |
'states.1': prev_dec_state[0], |
'onnx::Slice_3': prev_dec_state[1]}) |
g = self.models['joint_pred'].run(['output'], {'input': g.transpose(0,2,1)})[0] |
joint_out = f + g |
joint_out = self.models['joint_pre_net'].run(['output'], {'input': joint_out.numpy()})[0] |
logits = self.models[f'joint_post_net_{lang}'].run(['output'], {'input': joint_out})[0] |
log_probs = torch.from_numpy(logits).log_softmax(dim=-1) |
pred_token = log_probs.argmax(dim=-1).item() |
if pred_token == self.config.BLANK_ID: |
not_blank = False |
else: |
hyp.append(pred_token) |
prev_dec_state = (dec_state_0, dec_state_1) |
symbols_added += 1 |
pred_text = ''.join([self.vocab[lang][x] for x in hyp if x != self.config.SOS]).replace('▁',' ').strip() |
return pred_text |
@classmethod |
def from_pretrained(cls, |
pretrained_model_name_or_path, |
*, |
force_download=False, |
resume_download=None, |
proxies=None, |
token=None, |
cache_dir=None, |
local_files_only=False, |
revision=None, **kwargs): |
loc = snapshot_download(repo_id=pretrained_model_name_or_path, token=token) |
return cls(IndicASRConfig(ts_folder=loc, **kwargs)) |
if __name__ == '__main__': |
from transformers import AutoConfig, AutoModel |
AutoConfig.register("iasr", IndicASRConfig) |
AutoModel.register(IndicASRConfig, IndicASRModel) |