import os import torch import json import numpy as np from torch import nn from torch.nn import functional as F import sentencepiece as spm from sklearn.metrics.pairwise import cosine_similarity from tqdm import tqdm import matplotlib.pyplot as plt from sklearn.manifold import TSNE # Tokenizer wrapper class class SentencePieceTokenizerWrapper: def __init__(self, sp_model_path): self.sp_model = spm.SentencePieceProcessor() self.sp_model.Load(sp_model_path) self.vocab_size = self.sp_model.GetPieceSize() # Special token IDs from tokenizer training self.pad_token_id = 0 self.bos_token_id = 1 self.eos_token_id = 2 self.unk_token_id = 3 # Set special tokens self.pad_token = "" self.bos_token = "" self.eos_token = "" self.unk_token = "" self.mask_token = "" def __call__(self, text, padding=False, truncation=False, max_length=None, return_tensors=None): # Handle both string and list inputs if isinstance(text, str): # Encode a single string ids = self.sp_model.EncodeAsIds(text) # Handle truncation if truncation and max_length and len(ids) > max_length: ids = ids[:max_length] attention_mask = [1] * len(ids) # Handle padding if padding and max_length: padding_length = max(0, max_length - len(ids)) ids = ids + [self.pad_token_id] * padding_length attention_mask = attention_mask + [0] * padding_length result = { 'input_ids': ids, 'attention_mask': attention_mask } # Convert to tensors if requested if return_tensors == 'pt': import torch result = {k: torch.tensor([v]) for k, v in result.items()} return result # Process a batch of texts batch_encoded = [self.sp_model.EncodeAsIds(t) for t in text] # Apply truncation if needed if truncation and max_length: batch_encoded = [ids[:max_length] for ids in batch_encoded] # Create attention masks batch_attention_mask = [[1] * len(ids) for ids in batch_encoded] # Apply padding if needed if padding: if max_length: max_len = max_length else: max_len = max(len(ids) for ids in batch_encoded) # Pad sequences to max_len batch_encoded = [ids + [self.pad_token_id] * (max_len - len(ids)) for ids in batch_encoded] batch_attention_mask = [mask + [0] * (max_len - len(mask)) for mask in batch_attention_mask] result = { 'input_ids': batch_encoded, 'attention_mask': batch_attention_mask } # Convert to tensors if requested if return_tensors == 'pt': import torch result = {k: torch.tensor(v) for k, v in result.items()} return result # Model architecture components class MultiHeadAttention(nn.Module): """Multi-headed attention mechanism""" def __init__(self, config): super().__init__() self.num_attention_heads = config["num_attention_heads"] self.attention_head_size = config["hidden_size"] // config["num_attention_heads"] self.all_head_size = self.num_attention_heads * self.attention_head_size # Query, Key, Value projections self.query = nn.Linear(config["hidden_size"], self.all_head_size) self.key = nn.Linear(config["hidden_size"], self.all_head_size) self.value = nn.Linear(config["hidden_size"], self.all_head_size) # Output projection self.output = nn.Sequential( nn.Linear(self.all_head_size, config["hidden_size"]), nn.Dropout(config["attention_probs_dropout_prob"]) ) # Simplified relative position bias self.max_position_embeddings = config["max_position_embeddings"] self.relative_attention_bias = nn.Embedding( 2 * config["max_position_embeddings"] - 1, config["num_attention_heads"] ) def transpose_for_scores(self, x): new_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) x = x.view(*new_shape) return x.permute(0, 2, 1, 3) def forward(self, hidden_states, attention_mask=None): batch_size, seq_length = hidden_states.size()[:2] # Project inputs to queries, keys, and values query_layer = self.transpose_for_scores(self.query(hidden_states)) key_layer = self.transpose_for_scores(self.key(hidden_states)) value_layer = self.transpose_for_scores(self.value(hidden_states)) # Take the dot product between query and key to get the raw attention scores attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) # Generate relative position matrix position_ids = torch.arange(seq_length, dtype=torch.long, device=hidden_states.device) relative_position = position_ids.unsqueeze(1) - position_ids.unsqueeze(0) # [seq_len, seq_len] # Shift values to be >= 0 relative_position = relative_position + self.max_position_embeddings - 1 # Ensure indices are within bounds relative_position = torch.clamp(relative_position, 0, 2 * self.max_position_embeddings - 2) # Get relative position embeddings [seq_len, seq_len, num_heads] rel_attn_bias = self.relative_attention_bias(relative_position) # [seq_len, seq_len, num_heads] # Reshape to add to attention heads [1, num_heads, seq_len, seq_len] rel_attn_bias = rel_attn_bias.permute(2, 0, 1).unsqueeze(0) # Add to attention scores - now dimensions will match attention_scores = attention_scores + rel_attn_bias # Scale attention scores attention_scores = attention_scores / (self.attention_head_size ** 0.5) # Apply attention mask if attention_mask is not None: attention_scores = attention_scores + attention_mask # Normalize the attention scores to probabilities attention_probs = F.softmax(attention_scores, dim=-1) # Apply dropout attention_probs = F.dropout(attention_probs, p=0.1, training=self.training) # Apply attention to values context_layer = torch.matmul(attention_probs, value_layer) # Reshape back to [batch_size, seq_length, hidden_size] context_layer = context_layer.permute(0, 2, 1, 3).contiguous() new_shape = context_layer.size()[:-2] + (self.all_head_size,) context_layer = context_layer.view(*new_shape) # Final output projection output = self.output(context_layer) return output class EnhancedTransformerLayer(nn.Module): """Advanced transformer layer with pre-layer norm and enhanced attention""" def __init__(self, config): super().__init__() self.attention_pre_norm = nn.LayerNorm(config["hidden_size"], eps=config["layer_norm_eps"]) self.attention = MultiHeadAttention(config) self.ffn_pre_norm = nn.LayerNorm(config["hidden_size"], eps=config["layer_norm_eps"]) # Feed-forward network self.ffn = nn.Sequential( nn.Linear(config["hidden_size"], config["intermediate_size"]), nn.GELU(), nn.Dropout(config["hidden_dropout_prob"]), nn.Linear(config["intermediate_size"], config["hidden_size"]), nn.Dropout(config["hidden_dropout_prob"]) ) def forward(self, hidden_states, attention_mask=None): # Pre-layer norm for attention attn_norm_hidden = self.attention_pre_norm(hidden_states) # Self-attention attention_output = self.attention(attn_norm_hidden, attention_mask) # Residual connection hidden_states = hidden_states + attention_output # Pre-layer norm for feed-forward ffn_norm_hidden = self.ffn_pre_norm(hidden_states) # Feed-forward ffn_output = self.ffn(ffn_norm_hidden) # Residual connection hidden_states = hidden_states + ffn_output return hidden_states class AdvancedTransformerModel(nn.Module): """Advanced Transformer model for inference""" def __init__(self, config): super().__init__() self.config = config # Embeddings self.word_embeddings = nn.Embedding( config["vocab_size"], config["hidden_size"], padding_idx=config["pad_token_id"] ) # Position embeddings self.position_embeddings = nn.Embedding(config["max_position_embeddings"], config["hidden_size"]) # Embedding dropout self.embedding_dropout = nn.Dropout(config["hidden_dropout_prob"]) # Transformer layers self.layers = nn.ModuleList([ EnhancedTransformerLayer(config) for _ in range(config["num_hidden_layers"]) ]) # Final layer norm self.final_layer_norm = nn.LayerNorm(config["hidden_size"], eps=config["layer_norm_eps"]) def forward(self, input_ids, attention_mask=None): input_shape = input_ids.size() batch_size, seq_length = input_shape # Get position ids position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device) position_ids = position_ids.unsqueeze(0).expand(batch_size, -1) # Get embeddings word_embeds = self.word_embeddings(input_ids) position_embeds = self.position_embeddings(position_ids) # Sum embeddings embeddings = word_embeds + position_embeds # Apply dropout embeddings = self.embedding_dropout(embeddings) # Default attention mask if attention_mask is None: attention_mask = torch.ones(input_shape, device=input_ids.device) # Extended attention mask for transformer layers (1 for tokens to attend to, 0 for masked tokens) extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 # Apply transformer layers hidden_states = embeddings for layer in self.layers: hidden_states = layer(hidden_states, extended_attention_mask) # Final layer norm hidden_states = self.final_layer_norm(hidden_states) return hidden_states class AdvancedPooling(nn.Module): """Advanced pooling module supporting multiple pooling strategies""" def __init__(self, config): super().__init__() self.pooling_mode = config["pooling_mode"] # 'mean', 'max', 'cls', 'attention' self.hidden_size = config["hidden_size"] # For attention pooling if self.pooling_mode == 'attention': self.attention_weights = nn.Linear(config["hidden_size"], 1) # For weighted pooling elif self.pooling_mode == 'weighted': self.weight_layer = nn.Linear(config["hidden_size"], 1) def forward(self, token_embeddings, attention_mask=None): if attention_mask is None: attention_mask = torch.ones_like(token_embeddings[:, :, 0]) mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() if self.pooling_mode == 'cls': # Use [CLS] token (first token) pooled = token_embeddings[:, 0] elif self.pooling_mode == 'max': # Max pooling token_embeddings = token_embeddings.clone() # Set padding tokens to large negative value to exclude them from max token_embeddings[mask_expanded == 0] = -1e9 pooled = torch.max(token_embeddings, dim=1)[0] elif self.pooling_mode == 'attention': # Attention pooling weights = self.attention_weights(token_embeddings).squeeze(-1) # Mask out padding tokens weights = weights.masked_fill(attention_mask == 0, -1e9) weights = F.softmax(weights, dim=1).unsqueeze(-1) pooled = torch.sum(token_embeddings * weights, dim=1) elif self.pooling_mode == 'weighted': # Weighted average pooling weights = torch.sigmoid(self.weight_layer(token_embeddings)).squeeze(-1) # Apply mask weights = weights * attention_mask # Normalize weights sum_weights = torch.sum(weights, dim=1, keepdim=True) sum_weights = torch.clamp(sum_weights, min=1e-9) weights = weights / sum_weights # Apply weights pooled = torch.sum(token_embeddings * weights.unsqueeze(-1), dim=1) else: # Default to mean pooling # Mean pooling sum_embeddings = torch.sum(token_embeddings * mask_expanded, dim=1) sum_mask = torch.clamp(mask_expanded.sum(1), min=1e-9) pooled = sum_embeddings / sum_mask # L2 normalize pooled = F.normalize(pooled, p=2, dim=1) return pooled class SentenceEmbeddingModel(nn.Module): """Complete sentence embedding model for inference""" def __init__(self, config): super(SentenceEmbeddingModel, self).__init__() self.config = config # Create transformer model self.transformer = AdvancedTransformerModel(config) # Create pooling module self.pooling = AdvancedPooling(config) # Build projection module if needed if "projection_dim" in config and config["projection_dim"] > 0: self.use_projection = True self.projection = nn.Sequential( nn.Linear(config["hidden_size"], config["hidden_size"]), nn.GELU(), nn.Linear(config["hidden_size"], config["projection_dim"]), nn.LayerNorm(config["projection_dim"], eps=config["layer_norm_eps"]) ) else: self.use_projection = False def forward(self, input_ids, attention_mask=None): # Get token embeddings from transformer token_embeddings = self.transformer(input_ids, attention_mask) # Pool token embeddings pooled_output = self.pooling(token_embeddings, attention_mask) # Apply projection if enabled if self.use_projection: pooled_output = self.projection(pooled_output) pooled_output = F.normalize(pooled_output, p=2, dim=1) return pooled_output class HindiEmbedder: def __init__(self, model_path="/home/ubuntu/output/hindi-embeddings-custom-tokenizer/final"): """ Initialize the Hindi sentence embedder. Args: model_path: Path to the model directory """ self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {self.device}") # Load tokenizer - look for it in the model directory tokenizer_path = os.path.join(model_path, "tokenizer.model") if not os.path.exists(tokenizer_path): raise FileNotFoundError(f"Could not find tokenizer at {tokenizer_path}") self.tokenizer = SentencePieceTokenizerWrapper(tokenizer_path) print(f"Loaded tokenizer from {tokenizer_path} with vocabulary size: {self.tokenizer.vocab_size}") # Load model config config_path = os.path.join(model_path, "config.json") with open(config_path, "r") as f: self.config = json.load(f) print(f"Loaded model config with hidden_size={self.config['hidden_size']}") # Load model model_pt_path = os.path.join(model_path, "embedding_model.pt") try: # Support both PyTorch 2.6+ and older versions try: checkpoint = torch.load(model_pt_path, map_location=self.device, weights_only=False) print("Loaded model using PyTorch 2.6+ style loading") except TypeError: checkpoint = torch.load(model_pt_path, map_location=self.device) print("Loaded model using older PyTorch style loading") # Create model self.model = SentenceEmbeddingModel(self.config) # Load state dict if "model_state_dict" in checkpoint: state_dict = checkpoint["model_state_dict"] else: state_dict = checkpoint missing_keys, unexpected_keys = self.model.load_state_dict(state_dict, strict=False) print(f"Loaded model with {len(missing_keys)} missing keys and {len(unexpected_keys)} unexpected keys") # Move to device self.model.to(self.device) self.model.eval() print("Model loaded successfully and placed in evaluation mode") except Exception as e: print(f"Error loading model: {e}") raise RuntimeError(f"Failed to load the model: {e}") def encode(self, sentences, batch_size=32, normalize=True): """ Encode sentences to embeddings. Args: sentences: A string or list of strings to encode batch_size: Batch size for encoding normalize: Whether to normalize the embeddings Returns: Numpy array of embeddings """ # Handle single sentence if isinstance(sentences, str): sentences = [sentences] all_embeddings = [] # Process in batches with torch.no_grad(): for i in range(0, len(sentences), batch_size): batch = sentences[i:i+batch_size] # Tokenize inputs = self.tokenizer( batch, padding=True, truncation=True, max_length=self.config.get("max_position_embeddings", 128), return_tensors="pt" ) # Move to device input_ids = inputs["input_ids"].to(self.device) attention_mask = inputs["attention_mask"].to(self.device) # Get embeddings embeddings = self.model(input_ids, attention_mask) # Move to CPU and convert to numpy all_embeddings.append(embeddings.cpu().numpy()) # Concatenate all embeddings all_embeddings = np.vstack(all_embeddings) # Normalize if requested if normalize: all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1, keepdims=True) return all_embeddings def compute_similarity(self, texts1, texts2=None): """ Compute cosine similarity between texts. Args: texts1: First set of texts texts2: Second set of texts. If None, compute similarity matrix within texts1. Returns: Similarity scores """ # Convert single strings to lists for consistent handling if isinstance(texts1, str): texts1 = [texts1] if texts2 is not None and isinstance(texts2, str): texts2 = [texts2] embeddings1 = self.encode(texts1) if texts2 is None: # Compute similarity matrix within texts1 similarities = cosine_similarity(embeddings1) return similarities else: # Compute similarity between texts1 and texts2 embeddings2 = self.encode(texts2) if len(texts1) == len(texts2): # Compute pairwise similarity when the number of texts match similarities = np.array([ cosine_similarity([e1], [e2])[0][0] for e1, e2 in zip(embeddings1, embeddings2) ]) # If there's just one pair, return a scalar if len(similarities) == 1: return similarities[0] return similarities else: # Return full similarity matrix return cosine_similarity(embeddings1, embeddings2) def search(self, query, documents, top_k=5): """ Search for similar documents to a query. Args: query: The query text documents: List of documents to search top_k: Number of top results to return Returns: List of dictionaries with document and score """ # Get embeddings query_embedding = self.encode([query])[0] document_embeddings = self.encode(documents) # Compute similarities similarities = np.dot(document_embeddings, query_embedding) # Get top indices top_indices = np.argsort(similarities)[-top_k:][::-1] # Return results results = [] for idx in top_indices: results.append({ "document": documents[idx], "score": float(similarities[idx]) }) return results def evaluate_similarity_samples(self): """Evaluate model on some standard similarity examples for Hindi""" test_pairs = [ ( "मुझे हिंदी में पढ़ना बहुत पसंद है।", "मैं हिंदी किताबें बहुत पसंद करता हूँ।" ), ( "आज मौसम बहुत अच्छा है।", "आज बारिश हो रही है।" ), ( "भारत एक विशाल देश है।", "भारत में कई भाषाएँ बोली जाती हैं।" ), ( "कंप्यूटर विज्ञान एक रोचक विषय है।", "मैं कंप्यूटर साइंस का छात्र हूँ।" ), ( "मैं रोज सुबह योग करता हूँ।", "स्वस्थ रहने के लिए व्यायाम जरूरी है।" ), # Add contrasting pairs to test discrimination ( "मुझे हिंदी में पढ़ना बहुत पसंद है।", "क्रिकेट भारत में सबसे लोकप्रिय खेल है।" ), ( "आज मौसम बहुत अच्छा है।", "भारतीय व्यंजन दुनिया भर में मशहूर हैं।" ), ( "कंप्यूटर विज्ञान एक रोचक विषय है।", "हिमालय दुनिया का सबसे ऊंचा पर्वत है।" ) ] print("Evaluating model on standard similarity samples:") for i, (text1, text2) in enumerate(test_pairs): similarity = self.compute_similarity([text1], [text2])[0] print(f"\nPair {i+1}:") print(f" Sentence 1: {text1}") print(f" Sentence 2: {text2}") print(f" Similarity: {similarity:.4f}") return def visualize_embeddings(self, sentences, labels=None, output_path="hindi_embeddings_visualization.png"): """ Create a t-SNE visualization of the embeddings. Args: sentences: List of sentences to visualize labels: Optional list of labels for the points output_path: Path to save the visualization Returns: Path to the saved visualization """ # Encode sentences embeddings = self.encode(sentences) # Apply t-SNE tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(embeddings)-1)) reduced_embeddings = tsne.fit_transform(embeddings) # Create plot plt.figure(figsize=(12, 10)) # Plot points scatter = plt.scatter( reduced_embeddings[:, 0], reduced_embeddings[:, 1], c=range(len(reduced_embeddings)), cmap='viridis', alpha=0.8, s=100 ) # Add labels if provided if labels: for i, label in enumerate(labels): plt.annotate( label, (reduced_embeddings[i, 0], reduced_embeddings[i, 1]), fontsize=10, alpha=0.7 ) plt.title("t-SNE Visualization of Hindi Sentence Embeddings", fontsize=16) plt.xlabel("Dimension 1", fontsize=12) plt.ylabel("Dimension 2", fontsize=12) plt.colorbar(scatter, label="Sentence Index") plt.grid(alpha=0.3) # Save the figure plt.tight_layout() plt.savefig(output_path, dpi=300, bbox_inches='tight') plt.close() print(f"Visualization saved to {output_path}") return output_path def main(): # Create embedder embedder = HindiEmbedder() # Run sample evaluation embedder.evaluate_similarity_samples() # Example of semantic search print("\nSemantic Search Example:") query = "भारत की संस्कृति" documents = [ "भारतीय संस्कृति दुनिया की सबसे प्राचीन संस्कृतियों में से एक है।", "भारत की आबादी 1.3 अरब से अधिक है।", "हिमालय पर्वत श्रृंखला भारत के उत्तर में स्थित है।", "भारतीय व्यंजन में मसालों का प्रयोग किया जाता है।", "भारत में 22 आधिकारिक भाषाएँ हैं।", "संस्कृति लोगों के रहन-सहन का तरीका है।", "भारत के विभिन्न राज्यों की अपनी अलग संस्कृति है।", "रामायण और महाभारत भारतीय संस्कृति के महत्वपूर्ण हिस्से हैं।", ] results = embedder.search(query, documents) print(f"Query: {query}") print("Top results:") for i, result in enumerate(results): print(f"{i+1}. Score: {result['score']:.4f}") print(f" {result['document']}") # Create visualization example print("\nCreating embedding visualization...") visualization_sentences = [ "मुझे हिंदी में पढ़ना बहुत पसंद है।", "मैं हिंदी किताबें बहुत पसंद करता हूँ।", "आज मौसम बहुत अच्छा है।", "आज बारिश हो रही है।", "भारत एक विशाल देश है।", "भारत में कई भाषाएँ बोली जाती हैं।", "कंप्यूटर विज्ञान एक रोचक विषय है।", "मैं कंप्यूटर साइंस का छात्र हूँ।", "क्रिकेट भारत में सबसे लोकप्रिय खेल है।", "भारतीय व्यंजन दुनिया भर में मशहूर हैं।", "हिमालय दुनिया का सबसे ऊंचा पर्वत है।", "गंगा भारत की सबसे पवित्र नदी है।", "दिल्ली भारत की राजधानी है।", "मुंबई भारत का आर्थिक केंद्र है।", "तमिल, तेलुगु, कन्नड़ और मलयालम दक्षिण भारत की प्रमुख भाषाएँ हैं।" ] labels = ["पढ़ना", "किताबें", "मौसम", "बारिश", "भारत", "भाषाएँ", "कंप्यूटर", "छात्र", "क्रिकेट", "व्यंजन", "हिमालय", "गंगा", "दिल्ली", "मुंबई", "भाषाएँ"] embedder.visualize_embeddings(visualization_sentences, labels) if __name__ == "__main__": main()