|
import torch |
|
import torch.nn as nn |
|
from transformers import PreTrainedModel, PretrainedConfig |
|
from typing import Optional, Tuple |
|
|
|
class ConvaiCausalLMConfig(PretrainedConfig): |
|
model_type = "convaicausallm" |
|
|
|
def __init__( |
|
self, |
|
vocab_size=16000, |
|
hidden_size=768, |
|
num_hidden_layers=12, |
|
num_attention_heads=16, |
|
num_key_value_heads=4, |
|
intermediate_size=3072, |
|
hidden_act="silu", |
|
max_position_embeddings=512, |
|
**kwargs |
|
): |
|
super().__init__(**kwargs) |
|
self.vocab_size = vocab_size |
|
self.hidden_size = hidden_size |
|
self.num_hidden_layers = num_hidden_layers |
|
self.num_attention_heads = num_attention_heads |
|
self.num_key_value_heads = num_key_value_heads |
|
self.intermediate_size = intermediate_size |
|
self.hidden_act = hidden_act |
|
self.max_position_embeddings = max_position_embeddings |
|
|
|
class GroupedQueryAttention(nn.Module): |
|
def __init__(self, config): |
|
super().__init__() |
|
self.hidden_size = config.hidden_size |
|
self.num_heads = config.num_attention_heads |
|
self.num_kv_heads = config.num_key_value_heads |
|
self.head_dim = config.hidden_size // config.num_attention_heads |
|
|
|
|
|
self.num_key_value_groups = self.num_heads // self.num_kv_heads |
|
|
|
self.q_proj = nn.Linear(config.hidden_size, self.num_heads * self.head_dim) |
|
self.k_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim) |
|
self.v_proj = nn.Linear(config.hidden_size, self.num_kv_heads * self.head_dim) |
|
self.o_proj = nn.Linear(config.hidden_size, config.hidden_size) |
|
|
|
|
|
max_positions = config.max_position_embeddings |
|
self.register_buffer( |
|
"causal_mask", |
|
torch.triu(torch.ones(max_positions, max_positions) * -1e9, diagonal=1) |
|
) |
|
|
|
def forward(self, hidden_states, attention_mask=None): |
|
batch_size, seq_len, _ = hidden_states.size() |
|
|
|
|
|
q = self.q_proj(hidden_states) |
|
k = self.k_proj(hidden_states) |
|
v = self.v_proj(hidden_states) |
|
|
|
|
|
q = q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) |
|
k = k.view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) |
|
v = v.view(batch_size, seq_len, self.num_kv_heads, self.head_dim).transpose(1, 2) |
|
|
|
|
|
if self.num_key_value_groups > 1: |
|
|
|
k = k.repeat_interleave(self.num_key_value_groups, dim=1) |
|
v = v.repeat_interleave(self.num_key_value_groups, dim=1) |
|
|
|
|
|
attn_scores = torch.matmul(q, k.transpose(-1, -2)) / (self.head_dim ** 0.5) |
|
|
|
|
|
causal_mask = self.causal_mask[:seq_len, :seq_len] |
|
attn_scores = attn_scores + causal_mask |
|
|
|
|
|
if attention_mask is not None: |
|
|
|
attn_scores = attn_scores + attention_mask |
|
|
|
|
|
attn_probs = torch.softmax(attn_scores, dim=-1) |
|
|
|
|
|
context = torch.matmul(attn_probs, v) |
|
|
|
|
|
context = context.transpose(1, 2).contiguous() |
|
context = context.view(batch_size, seq_len, -1) |
|
|
|
|
|
output = self.o_proj(context) |
|
|
|
return output |
|
|
|
class ConvaiCausalLM(PreTrainedModel): |
|
config_class = ConvaiCausalLMConfig |
|
|
|
def __init__(self, config): |
|
super().__init__(config) |
|
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size) |
|
self.layers = nn.ModuleList([ |
|
nn.ModuleDict({ |
|
"self_attn": GroupedQueryAttention(config), |
|
"mlp": nn.Sequential( |
|
nn.Linear(config.hidden_size, config.intermediate_size), |
|
nn.SiLU(), |
|
nn.Linear(config.intermediate_size, config.hidden_size) |
|
), |
|
"input_layernorm": nn.LayerNorm(config.hidden_size), |
|
"post_attention_layernorm": nn.LayerNorm(config.hidden_size) |
|
}) for _ in range(config.num_hidden_layers) |
|
]) |
|
self.norm = nn.LayerNorm(config.hidden_size) |
|
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
|
|
|
|
|
self.apply(self._init_weights) |
|
|
|
def _init_weights(self, module): |
|
if isinstance(module, nn.Linear): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
if module.bias is not None: |
|
torch.nn.init.zeros_(module.bias) |
|
elif isinstance(module, nn.Embedding): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
|
|
def _prepare_attention_mask(self, attention_mask, input_shape, device): |
|
|
|
if attention_mask is None: |
|
attention_mask = torch.ones(input_shape, device=device) |
|
|
|
|
|
extended_mask = attention_mask.unsqueeze(1).unsqueeze(2) |
|
|
|
|
|
extended_mask = (1.0 - extended_mask) * -10000.0 |
|
|
|
return extended_mask |
|
|
|
def forward(self, input_ids, attention_mask=None): |
|
batch_size, seq_len = input_ids.shape |
|
device = input_ids.device |
|
|
|
|
|
if attention_mask is not None: |
|
attention_mask = self._prepare_attention_mask( |
|
attention_mask, (batch_size, seq_len), device |
|
) |
|
|
|
|
|
hidden_states = self.embed_tokens(input_ids) |
|
|
|
|
|
for layer in self.layers: |
|
residual = hidden_states |
|
|
|
|
|
hidden_states = layer["input_layernorm"](hidden_states) |
|
hidden_states = layer["self_attn"](hidden_states, attention_mask) |
|
hidden_states = residual + hidden_states |
|
|
|
|
|
residual = hidden_states |
|
hidden_states = layer["post_attention_layernorm"](hidden_states) |
|
hidden_states = layer["mlp"](hidden_states) |
|
hidden_states = residual + hidden_states |
|
|
|
|
|
hidden_states = self.norm(hidden_states) |
|
|
|
|
|
logits = self.lm_head(hidden_states) |
|
|
|
return logits |
|
|