import torch import torch.nn as nn from transformers import PreTrainedModel, PretrainedConfig from typing import Optional, Tuple class ConvaiCausalLMConfig(PretrainedConfig): model_type = "convaicausallm" def __init__( self, vocab_size=16000, hidden_size=768, num_hidden_layers=12, num_attention_heads=16, num_key_value_heads=4, intermediate_size=3072, hidden_act="silu", max_position_embeddings=512, **kwargs ): super().__init__(**kwargs) self.vocab_size = vocab_size self.hidden_size = hidden_size self.num_hidden_layers = num_hidden_layers self.num_attention_heads = num_attention_heads self.num_key_value_heads = num_key_value_heads self.intermediate_size = intermediate_size self.hidden_act = hidden_act self.max_position_embeddings = max_position_embeddings class GroupedQueryAttention(nn.Module): def __init__(self, config): super().__init__() self.hidden_size = config.hidden_size self.num_heads = config.num_attention_heads self.num_kv_heads = config.num_key_value_heads self.head_dim = config.hidden_size // config.num_attention_heads # For MQA/GQA support self.num_key_value_groups = self.num_heads // self.num_kv_heads self.q_proj = nn.Linear(config.hidden_size, self.num_heads * self.head_dim) self.k_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim) self.v_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim) self.o_proj = nn.Linear(config.hidden_size, config.hidden_size) # Create causal mask for attention max_positions = config.max_position_embeddings self.register_buffer( "causal_mask", torch.triu(torch.ones(max_positions, max_positions) * -1e9, diagonal=1) ) def forward(self, hidden_states, attention_mask=None): batch_size, seq_len, _ = hidden_states.size() # Project queries, keys, values q = self.q_proj(hidden_states) k = self.k_proj(hidden_states) v = self.v_proj(hidden_states) # Reshape for attention computation q = q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) # [b, n_heads, seq, head_dim] k = k.view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) # [b, n_kv_heads, seq, head_dim] v = v.view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) # [b, n_kv_heads, seq, head_dim] # Handle Multi-Query Attention / Grouped-Query Attention if self.num_key_value_groups > 1: # Repeat k, v for each query in the group k = k.repeat_interleave(self.num_key_value_groups, dim=1) # [b, n_heads, seq, head_dim] v = v.repeat_interleave(self.num_key_value_groups, dim=1) # [b, n_heads, seq, head_dim] # Compute attention scores: [batch, n_heads, seq_len, seq_len] attn_scores = torch.matmul(q, k.transpose(-1, -2)) / (self.head_dim ** 0.5) # Apply causal mask - only attend to previous tokens causal_mask = self.causal_mask[:seq_len, :seq_len] attn_scores = attn_scores + causal_mask # Apply attention mask if provided if attention_mask is not None: # attention_mask: [batch, 1, 1, seq_len] attn_scores = attn_scores + attention_mask # Normalize the attention scores to probabilities attn_probs = torch.softmax(attn_scores, dim=-1) # Apply attention to values context = torch.matmul(attn_probs, v) # [b, n_heads, seq, head_dim] # Reshape back to [batch_size, seq_length, hidden_size] context = context.transpose(1, 2).contiguous() context = context.view(batch_size, seq_len, -1) # Final projection output = self.o_proj(context) return output class ConvaiCausalLM(PreTrainedModel): config_class = ConvaiCausalLMConfig def __init__(self, config): super().__init__(config) self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size) self.layers = nn.ModuleList([ nn.ModuleDict({ "self_attn": GroupedQueryAttention(config), "mlp": nn.Sequential( nn.Linear(config.hidden_size, config.intermediate_size), nn.SiLU(), nn.Linear(config.intermediate_size, config.hidden_size) ), "input_layernorm": nn.LayerNorm(config.hidden_size), "post_attention_layernorm": nn.LayerNorm(config.hidden_size) }) for _ in range(config.num_hidden_layers) ]) self.norm = nn.LayerNorm(config.hidden_size) self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) # Initialize weights self.apply(self._init_weights) def _init_weights(self, module): if isinstance(module, nn.Linear): torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) if module.bias is not None: torch.nn.init.zeros_(module.bias) elif isinstance(module, nn.Embedding): torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) def _prepare_attention_mask(self, attention_mask, input_shape, device): # Prepare masks for attention if attention_mask is None: attention_mask = torch.ones(input_shape, device=device) # Make broadcastable shape: [batch, 1, 1, seq_len] extended_mask = attention_mask.unsqueeze(1).unsqueeze(2) # Convert to additive mask (0 for valid, -10000 for masked) extended_mask = (1.0 - extended_mask) * -10000.0 return extended_mask def forward(self, input_ids, attention_mask=None): batch_size, seq_len = input_ids.shape device = input_ids.device # Prepare attention mask if attention_mask is not None: attention_mask = self._prepare_attention_mask( attention_mask, (batch_size, seq_len), device ) # Get embeddings hidden_states = self.embed_tokens(input_ids) # Apply each layer for layer in self.layers: residual = hidden_states # First norm and attention hidden_states = layer["input_layernorm"](hidden_states) hidden_states = layer["self_attn"](hidden_states, attention_mask) hidden_states = residual + hidden_states # Second norm and MLP residual = hidden_states hidden_states = layer["post_attention_layernorm"](hidden_states) hidden_states = layer["mlp"](hidden_states) hidden_states = residual + hidden_states # Final norm hidden_states = self.norm(hidden_states) # Compute logits logits = self.lm_head(hidden_states) return logits