HumanAesExpert-1B / modeling_qwen.py
HumanBeauty
Update modeling_qwen.py
005b341 verified
from transformers.models.qwen2.modeling_qwen2 import *
from transformers.modeling_outputs import dataclass, ModelOutput
import torch.nn as nn
import torch.nn.init as init
@dataclass
class CausalLMOutputWithPastAndScore(ModelOutput):
"""
Base class for causal language model (or autoregressive) outputs.
Args:
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided):
Language modeling loss (for next-token prediction).
logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`):
Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax).
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
`(batch_size, num_heads, sequence_length, embed_size_per_head)`)
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see
`past_key_values` input) to speed up sequential decoding.
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
sequence_length)`.
Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
heads.
"""
loss: Optional[torch.FloatTensor] = None
logits: torch.FloatTensor = None
scores: torch.FloatTensor = None
experts_scores: torch.FloatTensor = None
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
attentions: Optional[Tuple[torch.FloatTensor, ...]] = None
def fixed_cross_entropy(source, target, num_items_in_batch: int = None, ignore_index: int = -100, **kwargs):
reduction = "sum" if num_items_in_batch is not None else "mean"
loss = nn.functional.cross_entropy(source, target, ignore_index=ignore_index, reduction=reduction)
if reduction == "sum":
loss = loss / num_items_in_batch
return loss
def ForCausalLMLoss(
logits, labels, vocab_size: int, num_items_in_batch: int = None, ignore_index: int = -100, **kwargs
):
# Upcast to float if we need to compute the loss to avoid potential precision issues
logits = logits.float()
# Shift so that tokens < n predict n
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
# Flatten the tokens
shift_logits = shift_logits.view(-1, vocab_size)
shift_labels = shift_labels.view(-1)
# Enable model parallelism
shift_labels = shift_labels.to(shift_logits.device)
loss = fixed_cross_entropy(shift_logits, shift_labels, num_items_in_batch, ignore_index, **kwargs)
return loss
def ForMseloss(logits, labels):
logits = logits.contiguous()
labels = labels.contiguous().to(device=logits.device,dtype=logits.dtype)
return nn.functional.mse_loss(logits, labels)
def ForMaeloss(logits, labels):
logits = logits.contiguous()
labels = labels.contiguous().to(device=logits.device,dtype=logits.dtype)
return nn.functional.l1_loss(logits, labels)
class Expert_Head(nn.Module):
def __init__(self, hidden_size):
super(Expert_Head, self).__init__()
self.expert_head1 = nn.Linear(hidden_size, 9)
self.linears = nn.ModuleList([nn.Linear(1,1) for _ in range(11)])
self.expert_head2 = nn.Sequential(nn.ReLU(),
nn.Linear(5, 1))
self.expert_head3 = nn.Sequential(nn.ReLU(),
nn.Linear(3, 1))
self.expert_head4 = nn.Sequential(nn.ReLU(),
nn.Linear(3, 1))
def forward(self, hidden_states, batch_size, sequence_lengths, is_expert):
scores2 = self.expert_head1(hidden_states)
pooled_scores2_temp = scores2[torch.arange(batch_size, device=scores2.device), sequence_lengths.to(device=scores2.device)]
pooled_scores2 = torch.zeros_like(pooled_scores2_temp).to(device=pooled_scores2_temp.device, dtype=pooled_scores2_temp.dtype)
for i in range(9):
pooled_scores2[:, i] = self.linears[i](pooled_scores2_temp[:, i])
if is_expert is not None and is_expert[0] == 0:
with torch.no_grad():
pooled_scores3_temp = self.expert_head2(pooled_scores2[:,:5])
pooled_scores3 = self.linears[9](pooled_scores3_temp)
pooled_scores4_temp = self.expert_head3(pooled_scores2[:,5:-1])
pooled_scores4 = self.linears[10](pooled_scores4_temp)
expert_scores = self.expert_head4(torch.cat([pooled_scores3, pooled_scores4,pooled_scores2[:,-1].unsqueeze(1)], dim=1))
pooled_expert_scores = torch.cat([pooled_scores2[:,:5], pooled_scores3, pooled_scores2[:,5:], pooled_scores4, expert_scores], dim=1)
else:
pooled_scores3_temp = self.expert_head2(pooled_scores2[:,:5])
pooled_scores3 = self.linears[9](pooled_scores3_temp)
pooled_scores4_temp = self.expert_head3(pooled_scores2[:,5:-1])
pooled_scores4 = self.linears[10](pooled_scores4_temp)
expert_scores = self.expert_head4(torch.cat([pooled_scores3, pooled_scores4,pooled_scores2[:,-1].unsqueeze(1)], dim=1))
pooled_expert_scores = torch.cat([pooled_scores2[:,:5], pooled_scores3, pooled_scores2[:,5:], pooled_scores4, expert_scores], dim=1)
return pooled_expert_scores
class Qwen2ForCausalLM_score(Qwen2ForCausalLM):
_tied_weights_keys = ["lm_head.weight", "regression_head.weight"]
def __init__(self, config):
super().__init__(config)
self.lm_regression_head = nn.Linear(config.hidden_size, 1)
self.expert_head = Expert_Head(config.hidden_size)
@add_start_docstrings_to_model_forward(QWEN2_INPUTS_DOCSTRING)
@replace_return_docstrings(output_type=CausalLMOutputWithPastAndScore, config_class="Qwen2Config")
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
num_logits_to_keep: int = 0,
scores_labels: Optional[torch.LongTensor] = None,
is_expert: Optional[torch.BoolTensor] = None,
**loss_kwargs,
) -> Union[Tuple, CausalLMOutputWithPastAndScore]:
r"""
Args:
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
num_logits_to_keep (`int`, *optional*):
Calculate logits for the last `num_logits_to_keep` tokens. If `0`, calculate logits for all
`input_ids` (special case). Only last token logits are needed for generation, and calculating them only for that
token can save memory, which becomes pretty significant for long sequences or large vocabulary size.
Returns:
Example:
```python
>>> from transformers import AutoTokenizer, Qwen2ForCausalLM
>>> model = Qwen2ForCausalLM.from_pretrained(PATH_TO_CONVERTED_WEIGHTS)
>>> tokenizer = AutoTokenizer.from_pretrained(PATH_TO_CONVERTED_TOKENIZER)
>>> prompt = "Hey, are you conscious? Can you talk to me?"
>>> inputs = tokenizer(prompt, return_tensors="pt")
>>> # Generate
>>> generate_ids = model.generate(inputs.input_ids, max_length=30)
>>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
"Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you."
```"""
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn)
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
cache_position=cache_position,
)
hidden_states = outputs[0]
# Only compute necessary logits, and do not upcast them to float if we are not computing the loss
logits = self.lm_head(hidden_states[:, -num_logits_to_keep:, :])
scores = self.lm_regression_head(hidden_states)
if input_ids is not None:
batch_size = input_ids.shape[0]
else:
batch_size = inputs_embeds.shape[0]
if self.config.pad_token_id is None and batch_size != 1:
raise ValueError("Cannot handle batch sizes > 1 if no padding token is defined.")
if self.config.pad_token_id is None:
sequence_lengths = torch.tensor(-1, device=scores.device).int()
else:
if input_ids is not None:
# if no pad token found, use modulo instead of reverse indexing for ONNX compatibility
sequence_lengths = torch.eq(input_ids, self.config.pad_token_id).int().argmax(-1) - 1
sequence_lengths = sequence_lengths % input_ids.shape[-1]
sequence_lengths = sequence_lengths.to(scores.device)
else:
sequence_lengths = torch.tensor(-1, device=scores.device).int()
pooled_scores = scores[torch.arange(batch_size, device=scores.device), sequence_lengths]
pooled_expert_scores = self.expert_head(hidden_states, batch_size, sequence_lengths, is_expert)
loss = None
if labels is not None:
if scores_labels is not None and is_expert is not None and is_expert[0] == 0:
loss = ForCausalLMLoss(logits, labels, self.vocab_size, **loss_kwargs) + ForMseloss(pooled_scores, scores_labels[:,-1].unsqueeze(1))
elif scores_labels is not None and is_expert is not None and is_expert[0] == 1:
loss = ForCausalLMLoss(logits, labels, self.vocab_size, **loss_kwargs) + ForMseloss(pooled_expert_scores, scores_labels)
else:
loss = ForCausalLMLoss(logits, labels, self.vocab_size, **loss_kwargs)
if not return_dict:
output = (logits,) + outputs[1:]
return (loss,) + output if loss is not None else output
return CausalLMOutputWithPastAndScore(
loss=loss,
logits=logits,
scores=pooled_scores,
experts_scores=pooled_expert_scores,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)