|
from transformers.models.qwen2.modeling_qwen2 import * |
|
from transformers.modeling_outputs import dataclass, ModelOutput |
|
import torch.nn as nn |
|
import torch.nn.init as init |
|
|
|
@dataclass |
|
class CausalLMOutputWithPastAndScore(ModelOutput): |
|
""" |
|
Base class for causal language model (or autoregressive) outputs. |
|
|
|
Args: |
|
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided): |
|
Language modeling loss (for next-token prediction). |
|
logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`): |
|
Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). |
|
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): |
|
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape |
|
`(batch_size, num_heads, sequence_length, embed_size_per_head)`) |
|
|
|
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see |
|
`past_key_values` input) to speed up sequential decoding. |
|
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`): |
|
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, + |
|
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`. |
|
|
|
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs. |
|
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`): |
|
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length, |
|
sequence_length)`. |
|
|
|
Attentions weights after the attention softmax, used to compute the weighted average in the self-attention |
|
heads. |
|
""" |
|
loss: Optional[torch.FloatTensor] = None |
|
logits: torch.FloatTensor = None |
|
scores: torch.FloatTensor = None |
|
experts_scores: torch.FloatTensor = None |
|
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None |
|
hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None |
|
attentions: Optional[Tuple[torch.FloatTensor, ...]] = None |
|
|
|
def fixed_cross_entropy(source, target, num_items_in_batch: int = None, ignore_index: int = -100, **kwargs): |
|
reduction = "sum" if num_items_in_batch is not None else "mean" |
|
loss = nn.functional.cross_entropy(source, target, ignore_index=ignore_index, reduction=reduction) |
|
if reduction == "sum": |
|
loss = loss / num_items_in_batch |
|
return loss |
|
|
|
|
|
def ForCausalLMLoss( |
|
logits, labels, vocab_size: int, num_items_in_batch: int = None, ignore_index: int = -100, **kwargs |
|
): |
|
|
|
logits = logits.float() |
|
|
|
shift_logits = logits[..., :-1, :].contiguous() |
|
shift_labels = labels[..., 1:].contiguous() |
|
|
|
|
|
shift_logits = shift_logits.view(-1, vocab_size) |
|
shift_labels = shift_labels.view(-1) |
|
|
|
shift_labels = shift_labels.to(shift_logits.device) |
|
loss = fixed_cross_entropy(shift_logits, shift_labels, num_items_in_batch, ignore_index, **kwargs) |
|
return loss |
|
|
|
def ForMseloss(logits, labels): |
|
logits = logits.contiguous() |
|
labels = labels.contiguous().to(device=logits.device,dtype=logits.dtype) |
|
return nn.functional.mse_loss(logits, labels) |
|
|
|
def ForMaeloss(logits, labels): |
|
logits = logits.contiguous() |
|
labels = labels.contiguous().to(device=logits.device,dtype=logits.dtype) |
|
return nn.functional.l1_loss(logits, labels) |
|
|
|
class Expert_Head(nn.Module): |
|
def __init__(self, hidden_size): |
|
super(Expert_Head, self).__init__() |
|
self.expert_head1 = nn.Linear(hidden_size, 9) |
|
self.linears = nn.ModuleList([nn.Linear(1,1) for _ in range(11)]) |
|
self.expert_head2 = nn.Sequential(nn.ReLU(), |
|
nn.Linear(5, 1)) |
|
self.expert_head3 = nn.Sequential(nn.ReLU(), |
|
nn.Linear(3, 1)) |
|
self.expert_head4 = nn.Sequential(nn.ReLU(), |
|
nn.Linear(3, 1)) |
|
|
|
def forward(self, hidden_states, batch_size, sequence_lengths, is_expert): |
|
scores2 = self.expert_head1(hidden_states) |
|
pooled_scores2_temp = scores2[torch.arange(batch_size, device=scores2.device), sequence_lengths.to(device=scores2.device)] |
|
pooled_scores2 = torch.zeros_like(pooled_scores2_temp).to(device=pooled_scores2_temp.device, dtype=pooled_scores2_temp.dtype) |
|
for i in range(9): |
|
pooled_scores2[:, i] = self.linears[i](pooled_scores2_temp[:, i]) |
|
|
|
if is_expert is not None and is_expert[0] == 0: |
|
with torch.no_grad(): |
|
pooled_scores3_temp = self.expert_head2(pooled_scores2[:,:5]) |
|
pooled_scores3 = self.linears[9](pooled_scores3_temp) |
|
pooled_scores4_temp = self.expert_head3(pooled_scores2[:,5:-1]) |
|
pooled_scores4 = self.linears[10](pooled_scores4_temp) |
|
|
|
expert_scores = self.expert_head4(torch.cat([pooled_scores3, pooled_scores4,pooled_scores2[:,-1].unsqueeze(1)], dim=1)) |
|
|
|
pooled_expert_scores = torch.cat([pooled_scores2[:,:5], pooled_scores3, pooled_scores2[:,5:], pooled_scores4, expert_scores], dim=1) |
|
else: |
|
pooled_scores3_temp = self.expert_head2(pooled_scores2[:,:5]) |
|
pooled_scores3 = self.linears[9](pooled_scores3_temp) |
|
pooled_scores4_temp = self.expert_head3(pooled_scores2[:,5:-1]) |
|
pooled_scores4 = self.linears[10](pooled_scores4_temp) |
|
|
|
expert_scores = self.expert_head4(torch.cat([pooled_scores3, pooled_scores4,pooled_scores2[:,-1].unsqueeze(1)], dim=1)) |
|
|
|
pooled_expert_scores = torch.cat([pooled_scores2[:,:5], pooled_scores3, pooled_scores2[:,5:], pooled_scores4, expert_scores], dim=1) |
|
|
|
return pooled_expert_scores |
|
|
|
|
|
class Qwen2ForCausalLM_score(Qwen2ForCausalLM): |
|
_tied_weights_keys = ["lm_head.weight", "regression_head.weight"] |
|
|
|
def __init__(self, config): |
|
super().__init__(config) |
|
self.lm_regression_head = nn.Linear(config.hidden_size, 1) |
|
self.expert_head = Expert_Head(config.hidden_size) |
|
|
|
@add_start_docstrings_to_model_forward(QWEN2_INPUTS_DOCSTRING) |
|
@replace_return_docstrings(output_type=CausalLMOutputWithPastAndScore, config_class="Qwen2Config") |
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
position_ids: Optional[torch.LongTensor] = None, |
|
past_key_values: Optional[List[torch.FloatTensor]] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
labels: Optional[torch.LongTensor] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
cache_position: Optional[torch.LongTensor] = None, |
|
num_logits_to_keep: int = 0, |
|
scores_labels: Optional[torch.LongTensor] = None, |
|
is_expert: Optional[torch.BoolTensor] = None, |
|
**loss_kwargs, |
|
) -> Union[Tuple, CausalLMOutputWithPastAndScore]: |
|
r""" |
|
Args: |
|
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., |
|
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored |
|
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. |
|
|
|
num_logits_to_keep (`int`, *optional*): |
|
Calculate logits for the last `num_logits_to_keep` tokens. If `0`, calculate logits for all |
|
`input_ids` (special case). Only last token logits are needed for generation, and calculating them only for that |
|
token can save memory, which becomes pretty significant for long sequences or large vocabulary size. |
|
|
|
Returns: |
|
|
|
Example: |
|
|
|
```python |
|
>>> from transformers import AutoTokenizer, Qwen2ForCausalLM |
|
|
|
>>> model = Qwen2ForCausalLM.from_pretrained(PATH_TO_CONVERTED_WEIGHTS) |
|
>>> tokenizer = AutoTokenizer.from_pretrained(PATH_TO_CONVERTED_TOKENIZER) |
|
|
|
>>> prompt = "Hey, are you conscious? Can you talk to me?" |
|
>>> inputs = tokenizer(prompt, return_tensors="pt") |
|
|
|
>>> # Generate |
|
>>> generate_ids = model.generate(inputs.input_ids, max_length=30) |
|
>>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] |
|
"Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you." |
|
```""" |
|
|
|
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions |
|
output_hidden_states = ( |
|
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
|
) |
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
|
|
outputs = self.model( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
position_ids=position_ids, |
|
past_key_values=past_key_values, |
|
inputs_embeds=inputs_embeds, |
|
use_cache=use_cache, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
cache_position=cache_position, |
|
) |
|
|
|
hidden_states = outputs[0] |
|
|
|
logits = self.lm_head(hidden_states[:, -num_logits_to_keep:, :]) |
|
|
|
scores = self.lm_regression_head(hidden_states) |
|
|
|
if input_ids is not None: |
|
batch_size = input_ids.shape[0] |
|
else: |
|
batch_size = inputs_embeds.shape[0] |
|
|
|
if self.config.pad_token_id is None and batch_size != 1: |
|
raise ValueError("Cannot handle batch sizes > 1 if no padding token is defined.") |
|
if self.config.pad_token_id is None: |
|
sequence_lengths = torch.tensor(-1, device=scores.device).int() |
|
else: |
|
if input_ids is not None: |
|
|
|
sequence_lengths = torch.eq(input_ids, self.config.pad_token_id).int().argmax(-1) - 1 |
|
sequence_lengths = sequence_lengths % input_ids.shape[-1] |
|
sequence_lengths = sequence_lengths.to(scores.device) |
|
else: |
|
sequence_lengths = torch.tensor(-1, device=scores.device).int() |
|
pooled_scores = scores[torch.arange(batch_size, device=scores.device), sequence_lengths] |
|
|
|
pooled_expert_scores = self.expert_head(hidden_states, batch_size, sequence_lengths, is_expert) |
|
|
|
loss = None |
|
if labels is not None: |
|
if scores_labels is not None and is_expert is not None and is_expert[0] == 0: |
|
loss = ForCausalLMLoss(logits, labels, self.vocab_size, **loss_kwargs) + ForMseloss(pooled_scores, scores_labels[:,-1].unsqueeze(1)) |
|
elif scores_labels is not None and is_expert is not None and is_expert[0] == 1: |
|
loss = ForCausalLMLoss(logits, labels, self.vocab_size, **loss_kwargs) + ForMseloss(pooled_expert_scores, scores_labels) |
|
else: |
|
loss = ForCausalLMLoss(logits, labels, self.vocab_size, **loss_kwargs) |
|
|
|
|
|
if not return_dict: |
|
output = (logits,) + outputs[1:] |
|
return (loss,) + output if loss is not None else output |
|
|
|
return CausalLMOutputWithPastAndScore( |
|
loss=loss, |
|
logits=logits, |
|
scores=pooled_scores, |
|
experts_scores=pooled_expert_scores, |
|
past_key_values=outputs.past_key_values, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
) |