from bidirectional_mistral import MistralBiModel from transformers import MistralPreTrainedModel import torch import numpy as np from typing import List, Optional, Tuple, Union from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss from transformers.modeling_outputs import SequenceClassifierOutputWithPast from transformers.modeling_attn_mask_utils import AttentionMaskConverter from transformers import ( MistralModel, MistralPreTrainedModel, MistralForCausalLM, MistralConfig, ) from transformers.modeling_outputs import BaseModelOutputWithPast from transformers.cache_utils import Cache, DynamicCache from transformers.models.mistral.modeling_mistral import ( MistralDecoderLayer, MistralRMSNorm, MistralAttention, MistralFlashAttention2, MistralSdpaAttention, MistralMLP, ) from torch import nn from transformers.utils import logging def _prepare_4d_causal_attention_mask( attention_mask: Optional[torch.Tensor], input_shape: Union[torch.Size, Tuple, List], inputs_embeds: torch.Tensor, past_key_values_length: int, sliding_window: Optional[int] = None, ): """ Creates a causal 4D mask of shape `(batch_size, 1, query_length, key_value_length)` from a 2D mask of shape `(batch_size, key_value_length)` Args: attention_mask (`torch.Tensor` or `None`): A 2D attention mask of shape `(batch_size, key_value_length)` input_shape (`tuple(int)` or `list(int)` or `torch.Size`): The input shape should be a tuple that defines `(batch_size, query_length)`. inputs_embeds (`torch.Tensor`): The embedded inputs as a torch Tensor. past_key_values_length (`int`): The length of the key value cache. sliding_window (`int`, *optional*): If the model uses windowed attention, a sliding window should be passed. """ attn_mask_converter = AttentionMaskConverter( is_causal=False, sliding_window=sliding_window ) # is_causal=True in original implementation key_value_length = input_shape[-1] + past_key_values_length # 4d mask is passed through the layers if attention_mask is not None and len(attention_mask.shape) == 2: attention_mask = attn_mask_converter.to_4d( attention_mask, input_shape[-1], key_value_length=key_value_length, dtype=inputs_embeds.dtype, ) elif attention_mask is not None and len(attention_mask.shape) == 4: expected_shape = (input_shape[0], 1, input_shape[1], key_value_length) if tuple(attention_mask.shape) != expected_shape: raise ValueError( f"Incorrect 4D attention_mask shape: {tuple(attention_mask.shape)}; expected: {expected_shape}." ) else: # if the 4D mask has correct shape - invert it and fill with negative infinity inverted_mask = 1.0 - attention_mask attention_mask = inverted_mask.masked_fill( inverted_mask.to(torch.bool), torch.finfo(inputs_embeds.dtype).min ) else: attention_mask = attn_mask_converter.to_causal_4d( input_shape[0], input_shape[-1], key_value_length, dtype=inputs_embeds.dtype, device=inputs_embeds.device, ) return attention_mask # Adapted from _prepare_4d_causal_attention_mask def _prepare_4d_causal_attention_mask_for_sdpa( attention_mask: Optional[torch.Tensor], input_shape: Union[torch.Size, Tuple, List], inputs_embeds: torch.Tensor, past_key_values_length: int, sliding_window: Optional[int] = None, ): """ Prepares the correct `attn_mask` argument to be used by `torch.nn.functional.scaled_dot_product_attention`. In case no token is masked in the `attention_mask` argument, we simply set it to `None` for the cases `query_length == 1` and `key_value_length == query_length`, and rely instead on SDPA `is_causal` argument to use causal/non-causal masks, allowing to dispatch to the flash attention kernel (that can otherwise not be used if a custom `attn_mask` is passed). """ attn_mask_converter = AttentionMaskConverter( is_causal=False, sliding_window=sliding_window ) # is_causal=True in original implementation key_value_length = input_shape[-1] + past_key_values_length batch_size, query_length = input_shape # torch.jit.trace, symbolic_trace and torchdynamo with fullgraph=True are unable to capture the controlflow `is_causal=attention_mask is None and q_len > 1` # used as an SDPA argument. We keep compatibility with these tracing tools by always using SDPA's `attn_mask` argument in case we are tracing. # TODO: For dynamo, rather use a check on fullgraph=True once this is # possible (https://github.com/pytorch/pytorch/pull/120400). is_tracing = ( torch.jit.is_tracing() or isinstance(inputs_embeds, torch.fx.Proxy) or (hasattr(torch, "_dynamo") and torch._dynamo.is_compiling()) ) if attention_mask is not None: # 4d mask is passed through if len(attention_mask.shape) == 4: expected_shape = (input_shape[0], 1, input_shape[1], key_value_length) if tuple(attention_mask.shape) != expected_shape: raise ValueError( f"Incorrect 4D attention_mask shape: {tuple(attention_mask.shape)}; expected: {expected_shape}." ) else: # if the 4D mask has correct shape - invert it and fill with negative infinity inverted_mask = 1.0 - attention_mask.to(inputs_embeds.dtype) attention_mask = inverted_mask.masked_fill( inverted_mask.to(torch.bool), torch.finfo(inputs_embeds.dtype).min ) return attention_mask elif not is_tracing and torch.all(attention_mask == 1): if query_length == 1: # For query_length == 1, causal attention and bi-directional attention are the same. attention_mask = None elif key_value_length == query_length: attention_mask = None else: # Unfortunately, for query_length > 1 and key_value_length != query_length, we cannot generally ignore the attention mask, as SDPA causal mask generation # may be wrong. We will set `is_causal=False` in SDPA and rely on Transformers attention_mask instead, hence not setting it to None here. # Reference: https://github.com/pytorch/pytorch/issues/108108 pass elif query_length > 1 and key_value_length != query_length: # See the comment above (https://github.com/pytorch/pytorch/issues/108108). # Ugly: we set it to True here to dispatch in the following controlflow to `to_causal_4d`. attention_mask = True elif is_tracing: raise ValueError( 'Attention using SDPA can not be traced with torch.jit.trace when no attention_mask is provided. To solve this issue, please either load your model with the argument `attn_implementation="eager"` or pass an attention_mask input when tracing the model.' ) if attention_mask is None: expanded_4d_mask = None elif attention_mask is True: expanded_4d_mask = attn_mask_converter.to_causal_4d( input_shape[0], input_shape[-1], key_value_length, dtype=inputs_embeds.dtype, device=inputs_embeds.device, ) else: expanded_4d_mask = attn_mask_converter.to_4d( attention_mask, input_shape[-1], dtype=inputs_embeds.dtype, key_value_length=key_value_length, ) # Attend to all tokens in masked rows from the causal_mask, for example the relevant first rows when # using left padding. This is required by F.scaled_dot_product_attention memory-efficient attention path. # Details: https://github.com/pytorch/pytorch/issues/110213 if not is_tracing and expanded_4d_mask.device.type == "cuda": expanded_4d_mask = AttentionMaskConverter._unmask_unattended( expanded_4d_mask, min_dtype=torch.finfo(inputs_embeds.dtype).min ) return expanded_4d_mask class ModifiedMistralAttention(MistralAttention): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.is_causal = False class ModifiedMistralFlashAttention2(MistralFlashAttention2): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.is_causal = False class ModifiedMistralSdpaAttention(MistralSdpaAttention): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.is_causal = False MISTRAL_ATTENTION_CLASSES = { "eager": ModifiedMistralAttention, "flash_attention_2": ModifiedMistralFlashAttention2, "sdpa": ModifiedMistralSdpaAttention, } class ModifiedMistralDecoderLayer(MistralDecoderLayer): def __init__(self, config: MistralConfig, layer_idx: int): nn.Module.__init__(self) self.hidden_size = config.hidden_size self.self_attn = MISTRAL_ATTENTION_CLASSES[config._attn_implementation]( config, layer_idx ) self.mlp = MistralMLP(config) self.input_layernorm = MistralRMSNorm( config.hidden_size, eps=config.rms_norm_eps ) self.post_attention_layernorm = MistralRMSNorm( config.hidden_size, eps=config.rms_norm_eps ) class MistralBiModel(MistralModel): def __init__(self, config: MistralConfig): MistralPreTrainedModel.__init__(self, config) self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size self.embed_tokens = nn.Embedding( config.vocab_size, config.hidden_size, self.padding_idx ) self.layers = nn.ModuleList( [ ModifiedMistralDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers) ] ) self._attn_implementation = config._attn_implementation self.norm = MistralRMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.gradient_checkpointing = False # Initialize weights and apply final processing self.post_init() # Copied from forward() in transformers.models.mistral.modeling_mistral.MistralModel def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[List[torch.FloatTensor]] = None, inputs_embeds: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple, BaseModelOutputWithPast]: output_attentions = ( output_attentions if output_attentions is not None else self.config.output_attentions ) output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) use_cache = use_cache if use_cache is not None else self.config.use_cache return_dict = ( return_dict if return_dict is not None else self.config.use_return_dict ) # retrieve input_ids and inputs_embeds if input_ids is not None and inputs_embeds is not None: raise ValueError( "You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time" ) elif input_ids is not None: batch_size, seq_length = input_ids.shape elif inputs_embeds is not None: batch_size, seq_length, _ = inputs_embeds.shape else: raise ValueError( "You have to specify either decoder_input_ids or decoder_inputs_embeds" ) if self.gradient_checkpointing and self.training: if use_cache: logger.warning_once( "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." ) use_cache = False past_key_values_length = 0 if use_cache: use_legacy_cache = not isinstance(past_key_values, Cache) if use_legacy_cache: past_key_values = DynamicCache.from_legacy_cache(past_key_values) past_key_values_length = past_key_values.get_usable_length(seq_length) if position_ids is None: device = input_ids.device if input_ids is not None else inputs_embeds.device position_ids = torch.arange( past_key_values_length, seq_length + past_key_values_length, dtype=torch.long, device=device, ) position_ids = position_ids.unsqueeze(0).view(-1, seq_length) else: position_ids = position_ids.view(-1, seq_length).long() if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if ( attention_mask is not None and self._attn_implementation == "flash_attention_2" and use_cache ): is_padding_right = attention_mask[:, -1].sum().item() != batch_size if is_padding_right: raise ValueError( "You are attempting to perform batched generation with padding_side='right'" " this may lead to unexpected behaviour for Flash Attention version of Mistral. Make sure to " " call `tokenizer.padding_side = 'left'` before tokenizing the input. ") if self._attn_implementation == "flash_attention_2": # 2d mask is passed through the layers attention_mask = ( attention_mask if (attention_mask is not None and 0 in attention_mask) else None ) elif self._attn_implementation == "sdpa" and not output_attentions: # The original implementation is by-passed, see attn_mask_utils.py attention_mask = _prepare_4d_causal_attention_mask_for_sdpa( attention_mask, (batch_size, seq_length), inputs_embeds, past_key_values_length, ) else: # 4d mask is passed through the layers attention_mask = _prepare_4d_causal_attention_mask( attention_mask, (batch_size, seq_length), inputs_embeds, past_key_values_length, sliding_window=self.config.sliding_window, ) hidden_states = inputs_embeds # decoder layers all_hidden_states = () if output_hidden_states else None all_self_attns = () if output_attentions else None next_decoder_cache = None for decoder_layer in self.layers: if output_hidden_states: all_hidden_states += (hidden_states,) if self.gradient_checkpointing and self.training: layer_outputs = self._gradient_checkpointing_func( decoder_layer.__call__, hidden_states, attention_mask, position_ids, past_key_values, output_attentions, use_cache, ) else: layer_outputs = decoder_layer( hidden_states, attention_mask=attention_mask, position_ids=position_ids, past_key_value=past_key_values, output_attentions=output_attentions, use_cache=use_cache, ) hidden_states = layer_outputs[0] if use_cache: next_decoder_cache = layer_outputs[2 if output_attentions else 1] if output_attentions: all_self_attns += (layer_outputs[1],) hidden_states = self.norm(hidden_states) # add hidden states from the last decoder layer if output_hidden_states: all_hidden_states += (hidden_states,) next_cache = None if use_cache: next_cache = ( next_decoder_cache.to_legacy_cache() if use_legacy_cache else next_decoder_cache ) if not return_dict: return tuple( v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None ) return BaseModelOutputWithPast( last_hidden_state=hidden_states, past_key_values=next_cache, hidden_states=all_hidden_states, attentions=all_self_attns, ) class MistralBiForMNTP(MistralForCausalLM): def __init__(self, config): MistralPreTrainedModel.__init__(self, config) self.model = MistralBiModel(config) self.vocab_size = config.vocab_size self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) # Initialize weights and apply final processing self.post_init() class MistralForSequenceClassification(MistralPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.model = MistralBiModel(config) self.score = nn.Linear(config.hidden_size, self.num_labels, bias=False) # Initialize weights and apply final processing self.post_init() def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[List[torch.FloatTensor]] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ): r""" labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If `config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict transformer_outputs = self.model( input_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) pooled_output = transformer_outputs[0][:, 0] logits = self.score(pooled_output) loss = None if labels is not None: if self.config.problem_type is None: if self.num_labels == 1: self.config.problem_type = "regression" elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() if self.num_labels == 1: loss = loss_fct(logits.squeeze(), labels.squeeze()) else: loss = loss_fct(logits, labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + transformer_outputs[2:] return ((loss,) + output) if loss is not None else output return SequenceClassifierOutputWithPast( loss=loss, logits=logits, past_key_values=transformer_outputs.past_key_values, hidden_states=transformer_outputs.hidden_states, attentions=transformer_outputs.attentions, )