Spaces:
Running
Running
import gradio as gr | |
import torch | |
import numpy as np | |
from transformers import pipeline, RobertaForSequenceClassification, RobertaTokenizer | |
from motif_tagging import detect_motifs | |
import re | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
from datetime import datetime | |
from transformers import pipeline as hf_pipeline # prevent name collision with gradio pipeline | |
def get_emotion_profile(text): | |
emotions = emotion_pipeline(text) | |
if isinstance(emotions, list) and isinstance(emotions[0], list): | |
emotions = emotions[0] | |
return {e['label'].lower(): round(e['score'], 3) for e in emotions} | |
# Emotion model (no retraining needed) | |
emotion_pipeline = hf_pipeline( | |
"text-classification", | |
model="j-hartmann/emotion-english-distilroberta-base", | |
top_k=None, | |
truncation=True | |
) | |
# --- Timeline Visualization Function --- | |
def generate_abuse_score_chart(dates, scores, labels): | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
from datetime import datetime | |
import re | |
# Determine if all entries are valid dates | |
if all(re.match(r"\d{4}-\d{2}-\d{2}", d) for d in dates): | |
parsed_x = [datetime.strptime(d, "%Y-%m-%d") for d in dates] | |
x_labels = [d.strftime("%Y-%m-%d") for d in parsed_x] | |
else: | |
parsed_x = list(range(1, len(dates) + 1)) | |
x_labels = [f"Message {i+1}" for i in range(len(dates))] | |
fig, ax = plt.subplots(figsize=(8, 3)) | |
ax.plot(parsed_x, scores, marker='o', linestyle='-', color='darkred', linewidth=2) | |
for x, y in zip(parsed_x, scores): | |
ax.text(x, y + 2, f"{int(y)}%", ha='center', fontsize=8, color='black') | |
ax.set_xticks(parsed_x) | |
ax.set_xticklabels(x_labels) | |
ax.set_xlabel("") # No axis label | |
ax.set_ylabel("Abuse Score (%)") | |
ax.set_ylim(0, 105) | |
ax.grid(True) | |
plt.tight_layout() | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png') | |
buf.seek(0) | |
return Image.open(buf) | |
# --- Abuse Model --- | |
from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
model_name = "SamanthaStorm/tether-multilabel-v3" | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
LABELS = [ | |
"blame shifting", "contradictory statements", "control", "dismissiveness", | |
"gaslighting", "guilt tripping", "insults", "obscure language", | |
"projection", "recovery phase", "threat" | |
] | |
THRESHOLDS = { | |
"blame shifting": 0.28, "contradictory statements": 0.27, "control": 0.08, "dismissiveness": 0.32, | |
"gaslighting": 0.27, "guilt tripping": 0.31, "insults": 0.10, "obscure language": 0.55, | |
"projection": 0.09, "recovery phase": 0.33, "threat": 0.15 | |
} | |
PATTERN_WEIGHTS = { | |
"gaslighting": 1.5, | |
"control": 1.2, | |
"dismissiveness": 0.7, | |
"blame shifting": 0.8, | |
"guilt tripping": 1.2, | |
"insults": 1.4, | |
"projection": 1.2, | |
"recovery phase": 1.1, | |
"contradictory statements": 0.75, | |
"threat": 1.6 # π§ New: raise weight for threat | |
} | |
RISK_STAGE_LABELS = { | |
1: "π Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", | |
2: "π₯ Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", | |
3: "π§οΈ Risk Stage: Reconciliation\nThis message reflects a reset attemptβapologies or emotional repair without accountability.", | |
4: "πΈ Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." | |
} | |
ESCALATION_QUESTIONS = [ | |
("Partner has access to firearms or weapons", 4), | |
("Partner threatened to kill you", 3), | |
("Partner threatened you with a weapon", 3), | |
("Partner has ever choked you, even if you considered it consensual at the time", 4), | |
("Partner injured or threatened your pet(s)", 3), | |
("Partner has broken your things, punched or kicked walls, or thrown things ", 2), | |
("Partner forced or coerced you into unwanted sexual acts", 3), | |
("Partner threatened to take away your children", 2), | |
("Violence has increased in frequency or severity", 3), | |
("Partner monitors your calls/GPS/social media", 2) | |
] | |
DARVO_PATTERNS = { | |
"blame shifting", "projection", "dismissiveness", "guilt tripping", "contradictory statements" | |
} | |
DARVO_MOTIFS = [ | |
"I never said that.", "Youβre imagining things.", "That never happened.", | |
"Youβre making a big deal out of nothing.", "It was just a joke.", "Youβre too sensitive.", | |
"I donβt know what youβre talking about.", "Youβre overreacting.", "I didnβt mean it that way.", | |
"Youβre twisting my words.", "Youβre remembering it wrong.", "Youβre always looking for something to complain about.", | |
"Youβre just trying to start a fight.", "I was only trying to help.", "Youβre making things up.", | |
"Youβre blowing this out of proportion.", "Youβre being paranoid.", "Youβre too emotional.", | |
"Youβre always so dramatic.", "Youβre just trying to make me look bad.", | |
"Youβre crazy.", "Youβre the one with the problem.", "Youβre always so negative.", | |
"Youβre just trying to control me.", "Youβre the abusive one.", "Youβre trying to ruin my life.", | |
"Youβre just jealous.", "Youβre the one who needs help.", "Youβre always playing the victim.", | |
"Youβre the one causing all the problems.", "Youβre just trying to make me feel guilty.", | |
"Youβre the one who canβt let go of the past.", "Youβre the one whoβs always angry.", | |
"Youβre the one whoβs always complaining.", "Youβre the one whoβs always starting arguments.", | |
"Youβre the one whoβs always making things worse.", "Youβre the one whoβs always making me feel bad.", | |
"Youβre the one whoβs always making me look like the bad guy.", | |
"Youβre the one whoβs always making me feel like a failure.", | |
"Youβre the one whoβs always making me feel like Iβm not good enough.", | |
"I canβt believe youβre doing this to me.", "Youβre hurting me.", | |
"Youβre making me feel like a terrible person.", "Youβre always blaming me for everything.", | |
"Youβre the one whoβs abusive.", "Youβre the one whoβs controlling.", "Youβre the one whoβs manipulative.", | |
"Youβre the one whoβs toxic.", "Youβre the one whoβs gaslighting me.", | |
"Youβre the one whoβs always putting me down.", "Youβre the one whoβs always making me feel bad.", | |
"Youβre the one whoβs always making me feel like Iβm not good enough.", | |
"Youβre the one whoβs always making me feel like Iβm the problem.", | |
"Youβre the one whoβs always making me feel like Iβm the bad guy.", | |
"Youβre the one whoβs always making me feel like Iβm the villain.", | |
"Youβre the one whoβs always making me feel like Iβm the one who needs to change.", | |
"Youβre the one whoβs always making me feel like Iβm the one whoβs wrong.", | |
"Youβre the one whoβs always making me feel like Iβm the one whoβs crazy.", | |
"Youβre the one whoβs always making me feel like Iβm the one whoβs abusive.", | |
"Youβre the one whoβs always making me feel like Iβm the one whoβs toxic." | |
] | |
def get_emotional_tone_tag(emotions, sentiment, patterns, abuse_score): | |
sadness = emotions.get("sadness", 0) | |
joy = emotions.get("joy", 0) | |
neutral = emotions.get("neutral", 0) | |
disgust = emotions.get("disgust", 0) | |
anger = emotions.get("anger", 0) | |
fear = emotions.get("fear", 0) | |
# 1. Performative Regret | |
if ( | |
sadness > 0.4 and | |
any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery phase"]) and | |
(sentiment == "undermining" or abuse_score > 40) | |
): | |
return "performative regret" | |
# 2. Coercive Warmth | |
if ( | |
(joy > 0.3 or sadness > 0.4) and | |
any(p in patterns for p in ["control", "gaslighting"]) and | |
sentiment == "undermining" | |
): | |
return "coercive warmth" | |
# 3. Cold Invalidation | |
if ( | |
(neutral + disgust) > 0.5 and | |
any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and | |
sentiment == "undermining" | |
): | |
return "cold invalidation" | |
# 4. Genuine Vulnerability | |
if ( | |
(sadness + fear) > 0.5 and | |
sentiment == "supportive" and | |
all(p in ["recovery phase"] for p in patterns) | |
): | |
return "genuine vulnerability" | |
# 5. Emotional Threat | |
if ( | |
(anger + disgust) > 0.5 and | |
any(p in patterns for p in ["control", "threat", "insults", "dismissiveness"]) and | |
sentiment == "undermining" | |
): | |
return "emotional threat" | |
# 6. Weaponized Sadness | |
if ( | |
sadness > 0.6 and | |
any(p in patterns for p in ["guilt tripping", "projection"]) and | |
sentiment == "undermining" | |
): | |
return "weaponized sadness" | |
# 7. Toxic Resignation | |
if ( | |
neutral > 0.5 and | |
any(p in patterns for p in ["dismissiveness", "obscure language"]) and | |
sentiment == "undermining" | |
): | |
return "toxic resignation" | |
return None | |
def detect_contradiction(message): | |
patterns = [ | |
(r"\b(i love you).{0,15}(i hate you|you ruin everything)", re.IGNORECASE), | |
(r"\b(iβm sorry).{0,15}(but you|if you hadnβt)", re.IGNORECASE), | |
(r"\b(iβm trying).{0,15}(you never|why do you)", re.IGNORECASE), | |
(r"\b(do what you want).{0,15}(youβll regret it|i always give everything)", re.IGNORECASE), | |
(r"\b(i donβt care).{0,15}(you never think of me)", re.IGNORECASE), | |
(r"\b(i guess iβm just).{0,15}(the bad guy|worthless|never enough)", re.IGNORECASE) | |
] | |
return any(re.search(p, message, flags) for p, flags in patterns) | |
def calculate_darvo_score(patterns, sentiment_before, sentiment_after, motifs_found, contradiction_flag=False): | |
# Count all detected DARVO-related patterns | |
pattern_hits = sum(1 for p in patterns if p.lower() in DARVO_PATTERNS) | |
# Sentiment delta | |
sentiment_shift_score = max(0.0, sentiment_after - sentiment_before) | |
# Match against DARVO motifs more loosely | |
motif_hits = sum( | |
any(phrase.lower() in motif.lower() or motif.lower() in phrase.lower() | |
for phrase in DARVO_MOTIFS) | |
for motif in motifs_found | |
) | |
motif_score = motif_hits / max(len(DARVO_MOTIFS), 1) | |
# Contradiction still binary | |
contradiction_score = 1.0 if contradiction_flag else 0.0 | |
# Final DARVO score | |
return round(min( | |
0.3 * pattern_hits + | |
0.3 * sentiment_shift_score + | |
0.25 * motif_score + | |
0.15 * contradiction_score, 1.0 | |
), 3) | |
def detect_weapon_language(text): | |
weapon_keywords = [ | |
"knife", "knives", "stab", "cut you", "cutting", | |
"gun", "shoot", "rifle", "firearm", "pistol", | |
"bomb", "blow up", "grenade", "explode", | |
"weapon", "armed", "loaded", "kill you", "take you out" | |
] | |
text_lower = text.lower() | |
return any(word in text_lower for word in weapon_keywords) | |
def get_risk_stage(patterns, sentiment): | |
if "threat" in patterns or "insults" in patterns: | |
return 2 | |
elif "recovery phase" in patterns: | |
return 3 | |
elif "control" in patterns or "guilt tripping" in patterns: | |
return 1 | |
elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): | |
return 4 | |
return 1 | |
def generate_risk_snippet(abuse_score, top_label, escalation_score, stage): | |
if abuse_score >= 85 or escalation_score >= 16: | |
risk_level = "high" | |
elif abuse_score >= 60 or escalation_score >= 8: | |
risk_level = "moderate" | |
elif stage == 2 and abuse_score >= 40: | |
risk_level = "moderate" # π§ New rule for escalation stage | |
else: | |
risk_level = "low" | |
if isinstance(top_label, str) and " β " in top_label: | |
pattern_label, pattern_score = top_label.split(" β ") | |
else: | |
pattern_label = str(top_label) if top_label is not None else "Unknown" | |
pattern_score = "" | |
WHY_FLAGGED = { | |
"control": "This message may reflect efforts to restrict someoneβs autonomy, even if it's framed as concern or care.", | |
"gaslighting": "This message could be manipulating someone into questioning their perception or feelings.", | |
"dismissiveness": "This message may include belittling, invalidating, or ignoring the other personβs experience.", | |
"insults": "Direct insults often appear in escalating abusive dynamics and can erode emotional safety.", | |
"threat": "This message includes threatening language, which is a strong predictor of harm.", | |
"blame shifting": "This message may redirect responsibility to avoid accountability, especially during conflict.", | |
"guilt tripping": "This message may induce guilt in order to control or manipulate behavior.", | |
"recovery phase": "This message may be part of a tension-reset cycle, appearing kind but avoiding change.", | |
"projection": "This message may involve attributing the abuserβs own behaviors to the victim.", | |
"default": "This message contains language patterns that may affect safety, clarity, or emotional autonomy." | |
} | |
explanation = WHY_FLAGGED.get(pattern_label.lower(), WHY_FLAGGED["default"]) | |
base = f"\n\nπ Risk Level: {risk_level.capitalize()}\n" | |
base += f"This message shows strong indicators of **{pattern_label}**. " | |
if risk_level == "high": | |
base += "The language may reflect patterns of emotional control, even when expressed in soft or caring terms.\n" | |
elif risk_level == "moderate": | |
base += "There are signs of emotional pressure or indirect control that may escalate if repeated.\n" | |
else: | |
base += "The message does not strongly indicate abuse, but it's important to monitor for patterns.\n" | |
base += f"\nπ‘ *Why this might be flagged:*\n{explanation}\n" | |
base += f"\nDetected Pattern: **{pattern_label} ({pattern_score})**\n" | |
base += "π§ You can review the pattern in context. This tool highlights possible dynamicsβnot judgments." | |
return base | |
def compute_abuse_score(matched_scores, sentiment): | |
if not matched_scores: | |
return 0 | |
# Weighted average of passed patterns | |
weighted_total = sum(score * weight for _, score, weight in matched_scores) | |
weight_sum = sum(weight for _, _, weight in matched_scores) | |
base_score = (weighted_total / weight_sum) * 100 | |
# Boost for pattern count | |
pattern_count = len(matched_scores) | |
scale = 1.0 + 0.25 * max(0, pattern_count - 1) # 1.25x for 2, 1.5x for 3+ | |
scaled_score = base_score * scale | |
# Pattern floors | |
FLOORS = { | |
"threat": 70, | |
"control": 40, | |
"gaslighting": 30, | |
"insults": 25 | |
} | |
floor = max(FLOORS.get(label, 0) for label, _, _ in matched_scores) | |
adjusted_score = max(scaled_score, floor) | |
# Sentiment tweak | |
if sentiment == "undermining" and adjusted_score < 50: | |
adjusted_score += 10 | |
return min(adjusted_score, 100) | |
def analyze_single_message(text, thresholds): | |
motif_hits, matched_phrases = detect_motifs(text) | |
# Get emotion profile | |
emotion_profile = get_emotion_profile(text) | |
sentiment_score = emotion_profile.get("anger", 0) + emotion_profile.get("disgust", 0) | |
# Get model scores first so they can be used in the neutral override | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
scores = torch.sigmoid(outputs.logits.squeeze(0)).numpy() | |
# Sentiment override if neutral masks abuse | |
if emotion_profile.get("neutral", 0) > 0.85 and any( | |
scores[label_idx] > thresholds[LABELS[label_idx]] | |
for label_idx in [LABELS.index(l) for l in ["control", "threat", "blame shifting"]] | |
): | |
sentiment = "undermining" | |
else: | |
sentiment = "undermining" if sentiment_score > 0.25 else "supportive" | |
weapon_flag = detect_weapon_language(text) | |
adjusted_thresholds = { | |
k: v + 0.05 if sentiment == "supportive" else v | |
for k, v in thresholds.items() | |
} | |
contradiction_flag = detect_contradiction(text) | |
threshold_labels = [ | |
label for label, score in zip(LABELS, scores) | |
if score > adjusted_thresholds[label] | |
] | |
motifs = [phrase for _, phrase in matched_phrases] | |
darvo_score = calculate_darvo_score( | |
threshold_labels, | |
sentiment_before=0.0, | |
sentiment_after=sentiment_score, | |
motifs_found=motifs, | |
contradiction_flag=contradiction_flag | |
) | |
top_patterns = sorted( | |
[(label, score) for label, score in zip(LABELS, scores)], | |
key=lambda x: x[1], | |
reverse=True | |
)[:2] | |
matched_scores = [ | |
(label, score, PATTERN_WEIGHTS.get(label, 1.0)) | |
for label, score in zip(LABELS, scores) | |
if score > adjusted_thresholds[label] | |
] | |
abuse_score_raw = compute_abuse_score(matched_scores, sentiment) | |
abuse_score = abuse_score_raw | |
stage = get_risk_stage(threshold_labels, sentiment) if threshold_labels else 1 | |
if weapon_flag and stage < 2: | |
stage = 2 | |
if weapon_flag: | |
abuse_score_raw = min(abuse_score_raw + 25, 100) | |
abuse_score = min(abuse_score_raw, 100 if "threat" in threshold_labels or "control" in threshold_labels else 95) | |
# Get tone tag | |
tone_tag = get_emotional_tone_tag(emotion_profile, sentiment, threshold_labels, abuse_score) | |
print(f"Emotional Tone Tag: {tone_tag}") | |
# Debug logs | |
print("Emotion Profile:") | |
for emotion, score in emotion_profile.items(): | |
print(f" {emotion.capitalize():10}: {score}") | |
print("\n--- Debug Info ---") | |
print(f"Text: {text}") | |
print(f"Sentiment (via emotion): {sentiment} (score: {round(sentiment_score, 3)})") | |
print("Abuse Pattern Scores:") | |
for label, score in zip(LABELS, scores): | |
passed = "β " if score > adjusted_thresholds[label] else "β" | |
print(f" {label:25} β {score:.3f} {passed}") | |
print(f"Matched for score: {[(l, round(s, 3)) for l, s, _ in matched_scores]}") | |
print(f"Abuse Score Raw: {round(abuse_score_raw, 1)}") | |
print(f"Motifs: {motifs}") | |
print(f"Contradiction: {contradiction_flag}") | |
print("------------------\n") | |
return abuse_score, threshold_labels, top_patterns, {"label": sentiment}, stage, darvo_score | |
def analyze_composite(msg1, date1, msg2, date2, msg3, date3, *answers_and_none): | |
none_selected_checked = answers_and_none[-1] | |
responses_checked = any(answers_and_none[:-1]) | |
none_selected = not responses_checked and none_selected_checked | |
if none_selected: | |
escalation_score = None | |
risk_level = "unknown" | |
else: | |
escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) | |
risk_level = ( | |
"High" if escalation_score >= 16 else | |
"Moderate" if escalation_score >= 8 else | |
"Low" | |
) | |
messages = [msg1, msg2, msg3] | |
dates = [date1, date2, date3] | |
active = [(m, d) for m, d in zip(messages, dates) if m.strip()] | |
if not active: | |
return "Please enter at least one message." | |
results = [(analyze_single_message(m, THRESHOLDS.copy()), d) for m, d in active] | |
for result, date in results: | |
assert len(result) == 6, "Unexpected output from analyze_single_message" | |
abuse_scores = [r[0][0] for r in results] | |
top_labels = [r[0][1][0] if r[0][1] else r[0][2][0][0] for r in results] | |
top_scores = [r[0][2][0][1] for r in results] | |
sentiments = [r[0][3]['label'] for r in results] | |
stages = [r[0][4] for r in results] | |
darvo_scores = [r[0][5] for r in results] | |
dates_used = [r[1] or "Undated" for r in results] # Store dates for future mapping | |
composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) | |
top_label = f"{top_labels[0]} β {int(round(top_scores[0] * 100))}%" | |
most_common_stage = max(set(stages), key=stages.count) | |
stage_text = RISK_STAGE_LABELS[most_common_stage] | |
avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) | |
darvo_blurb = "" | |
if avg_darvo > 0.25: | |
level = "moderate" if avg_darvo < 0.65 else "high" | |
darvo_blurb = f"\n\nπ **DARVO Score: {avg_darvo}** β This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." | |
out = f"Abuse Intensity: {composite_abuse}%\n" | |
out += "π This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" | |
# Save this line for later use at the | |
if escalation_score is None: | |
escalation_text = "π Escalation Potential: Unknown (Checklist not completed)\n" | |
escalation_text += "β οΈ *This section was not completed. Escalation potential is unknown.*\n" | |
else: | |
escalation_text = f"𧨠**Escalation Potential: {risk_level} ({escalation_score}/{sum(w for _, w in ESCALATION_QUESTIONS)})**\n" | |
escalation_text += "This score comes directly from the safety checklist and functions as a standalone escalation risk score.\n" | |
escalation_text += "It indicates how many serious risk factors are present based on your answers to the safety checklist.\n" | |
if top_label is None: | |
top_label = "Unknown β 0%" | |
out += generate_risk_snippet(composite_abuse, top_label, escalation_score if escalation_score is not None else 0, most_common_stage) | |
out += f"\n\n{stage_text}" | |
out += darvo_blurb | |
print(f"DEBUG: avg_darvo = {avg_darvo}") | |
pattern_labels = [r[0][2][0][0] for r in results] # top label for each message | |
timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) | |
out += "\n\n" + escalation_text | |
return out, timeline_image | |
message_date_pairs = [ | |
( | |
gr.Textbox(label=f"Message {i+1}"), | |
gr.Textbox(label=f"Date {i+1} (optional)", placeholder="YYYY-MM-DD") | |
) | |
for i in range(3) | |
] | |
textbox_inputs = [item for pair in message_date_pairs for item in pair] | |
quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS] | |
none_box = gr.Checkbox(label="None of the above") | |
iface = gr.Interface( | |
fn=analyze_composite, | |
inputs=textbox_inputs + quiz_boxes + [none_box], | |
outputs=[ | |
gr.Textbox(label="Results"), | |
gr.Image(label="Risk Stage Timeline", type="pil") | |
], | |
title="Abuse Pattern Detector + Escalation Quiz", | |
allow_flagging="manual" | |
) | |
if __name__ == "__main__": | |
iface.launch() |