# --- Imports --- import os import gradio as gr import pickle # Keep for loading the custom image model pkl import torch import numpy as np from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline, AutoFeatureExtractor # Needed for custom ViT model ) from huggingface_hub import login, HfFolder # Added HfFolder for token check from PIL import Image import requests from io import BytesIO import torchvision.transforms as transforms import traceback # --- Hugging Face Token Handling (Using Secrets) --- # Load token from environment variable if available (recommended for Spaces) HF_TOKEN = os.environ.get("HF_TOKEN") # Attempt login using the token from secrets logged_in = False if HF_TOKEN: try: login(token=HF_TOKEN) logged_in = True print("Successfully logged in to Hugging Face Hub using token from environment variable.") except Exception as e: print(f"Hugging Face Hub login using provided token failed: {e}") print("Proceeding without explicit login. Private models may fail.") else: # Check if already logged in via CLI/notebook login if HfFolder.get_token(): print("Already logged in to Hugging Face Hub (found existing token).") logged_in = True HF_TOKEN = HfFolder.get_token() # Use existing token if needed later else: print("HF_TOKEN secret not set. Proceeding without login. Public models should still work.") print("If you need to use private models, add HF_TOKEN as a secret to this Space.") # --- CombinedAnalyzer Class Definition --- # (Keep this class exactly as you provided it) class CombinedAnalyzer: """ A class to encapsulate sentiment analysis and AI text detection pipelines for reviews. """ def __init__(self, sentiment_model_name="distilbert-base-uncased-finetuned-sst-2-english", detector_model_name="Hello-SimpleAI/chatgpt-detector-roberta", auth_token=None): print(f"Initializing CombinedAnalyzer with Sentiment: '{sentiment_model_name}' and Detector: '{detector_model_name}'...") self.device = 0 if torch.cuda.is_available() else -1 # Use pipeline's device handling convention (-1 for CPU, >=0 for GPU) self.sentiment_model_name = sentiment_model_name self.detector_model_name = detector_model_name self.sentiment_pipeline = None self.detector_pipeline = None # --- Load pipelines INSIDE __init__ --- try: print(f" -> Loading sentiment pipeline: {self.sentiment_model_name}") self.sentiment_pipeline = pipeline("sentiment-analysis", model=self.sentiment_model_name, device=self.device, token=auth_token if auth_token else None) print(" -> Sentiment pipeline loaded.") except Exception as e: print(f"ERROR loading sentiment pipeline '{self.sentiment_model_name}': {e}") try: print(f" -> Loading AI text detector pipeline: {self.detector_model_name}") self.detector_pipeline = pipeline("text-classification", model=self.detector_model_name, device=self.device, token=auth_token if auth_token else None) print(" -> AI text detector pipeline loaded.") except Exception as e: print(f"ERROR loading AI text detector pipeline '{self.detector_model_name}': {e}") print("CombinedAnalyzer initialization complete.") def analyze(self, text): """Analyzes text for sentiment and AI generation likelihood.""" if not isinstance(text, str) or not text.strip(): return { "sentiment_label": "N/A", "sentiment_score": 0, "authenticity_label": "N/A", "authenticity_score": 0, "error": "Input text cannot be empty." } results = {} # 1. Sentiment Analysis if self.sentiment_pipeline and callable(self.sentiment_pipeline): try: sentiment_result = self.sentiment_pipeline(text)[0] results['sentiment_label'] = sentiment_result['label'] results['sentiment_score'] = round(sentiment_result['score'] * 100, 2) except Exception as e: print(f"Sentiment Analysis Error: {e}") results['sentiment_label'] = "Error" results['sentiment_score'] = 0 results['error'] = results.get('error', '') + f" Sentiment Error: {e};" else: results['sentiment_label'] = "Model N/A" results['sentiment_score'] = 0 # 2. AI Text Detection (Authenticity) if self.detector_pipeline and callable(self.detector_pipeline): try: detector_result = self.detector_pipeline(text)[0] auth_label_raw = detector_result['label'] auth_score = round(detector_result['score'] * 100, 2) if auth_label_raw.lower() in ['chatgpt', 'ai', 'generated', 'label_1', 'fake']: auth_label_display = "Likely AI-Generated" elif auth_label_raw.lower() in ['human', 'real', 'label_0']: auth_label_display = "Likely Human-Written" else: auth_label_display = f"Label: {auth_label_raw}" # Fallback results['authenticity_label'] = auth_label_display results['authenticity_score'] = auth_score # Keep score as model's confidence in the label except Exception as e: print(f"AI Text Detection Error: {e}") results['authenticity_label'] = "Error" results['authenticity_score'] = 0 results['error'] = results.get('error', '') + f" Authenticity Error: {e};" else: results['authenticity_label'] = "Model N/A" results['authenticity_score'] = 0 return results # --- Define the Main Multi-Detection System --- class MultiDetectionSystem: """ Encapsulates models for fake news, AI image, and review analysis. Handles loading, preprocessing, and inference for HF Spaces. """ def __init__(self, auth_token=None): print("\nLoading MultiDetectionSystem models. This may take a few minutes...") self.auth_token = auth_token self.device = 'cuda' if torch.cuda.is_available() else 'cpu' # Pipeline device uses -1 for CPU, >=0 for GPU index self.device_pipeline_arg = 0 if torch.cuda.is_available() else -1 print(f"Using device (torch models): {self.device}") print(f"Using device (pipelines): {self.device_pipeline_arg}") # --- Fake News Detection --- self.fake_news_model_name = "MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli" self.fake_news_tokenizer = None self.fake_news_model = None try: print(f" -> Loading fake news tokenizer: {self.fake_news_model_name}") self.fake_news_tokenizer = AutoTokenizer.from_pretrained( self.fake_news_model_name, token=self.auth_token # Pass token if available ) print(f" -> Loading fake news model: {self.fake_news_model_name}") self.fake_news_model = AutoModelForSequenceClassification.from_pretrained( self.fake_news_model_name, token=self.auth_token # Pass token if available ).to(self.device) self.fake_news_model.eval() print(" -> Fake news model loaded.") except Exception as e: print(f"ERROR loading fake news model '{self.fake_news_model_name}': {e}") self.fake_news_tokenizer = None self.fake_news_model = None # --- End of Fake News Section --- # --- AI Image Detection (Custom PKL Model) --- # IMPORTANT: Place 'finetune_vit_model.pkl' in the root of your HF Space repo # Or change this path if you place it in a subdirectory (e.g., "models/finetune_vit_model.pkl") self.image_model_path = "finetune_vit_model.pkl" # <<<--- ADJUSTED PATH # IMPORTANT: Ensure this matches the BASE model you fine-tuned self.image_feature_extractor_name = "google/vit-base-patch16-224-in21k" # <<<--- VERIFY THIS NAME self.image_classifier = None self.image_feature_extractor = None try: # 1. Load the Feature Extractor print(f" -> Loading image feature extractor: {self.image_feature_extractor_name}") self.image_feature_extractor = AutoFeatureExtractor.from_pretrained( self.image_feature_extractor_name, token=self.auth_token # Pass token if available ) print(" -> Image feature extractor loaded.") # 2. Load CUSTOM Model from PKL (relative path) print(f" -> Loading CUSTOM AI image model from PKL: {self.image_model_path}") if not os.path.exists(self.image_model_path): # Provide more specific error for Spaces deployment raise FileNotFoundError( f"PKL file not found at '{self.image_model_path}'. " f"Make sure '{os.path.basename(self.image_model_path)}' is uploaded to the root of this Space repository " f"and Git LFS is tracking it if it's large." ) with open(self.image_model_path, 'rb') as f: # Load assuming the necessary classes are defined or imported self.image_classifier = pickle.load(f) print(" -> Custom AI image model loaded successfully from PKL.") if not isinstance(self.image_classifier, torch.nn.Module): print(f"Warning: Loaded object from PKL is type {type(self.image_classifier)}, not torch.nn.Module.") # 3. Prepare the model self.image_classifier = self.image_classifier.to(self.device) self.image_classifier.eval() print(f" -> Custom AI image model moved to {self.device} and set to eval mode.") except FileNotFoundError as e: print(f"FATAL ERROR: {e}. AI Image detection will not work.") self.image_classifier = None self.image_feature_extractor = None except (pickle.UnpicklingError, ImportError) as e: print(f"FATAL ERROR unpickling model from '{self.image_model_path}': {e}") print("Ensure the environment has all necessary libraries and class definitions required by the PKL file.") traceback.print_exc() self.image_classifier = None self.image_feature_extractor = None except Exception as e: print(f"ERROR loading image feature extractor or custom model: {e}") traceback.print_exc() self.image_classifier = None self.image_feature_extractor = None # --- End of AI Image Detection Section --- # --- Review Analysis (using CombinedAnalyzer) --- # Pass the pipeline device argument and token self.review_analyzer = CombinedAnalyzer(auth_token=self.auth_token) # Override device for CombinedAnalyzer pipelines if needed (optional) # self.review_analyzer.device = self.device_pipeline_arg # self.review_analyzer.sentiment_pipeline.device = torch.device(f'cuda:{self.device_pipeline_arg}') if self.device_pipeline_arg >= 0 else torch.device('cpu') # self.review_analyzer.detector_pipeline.device = torch.device(f'cuda:{self.device_pipeline_arg}') if self.device_pipeline_arg >= 0 else torch.device('cpu') print("\nMultiDetectionSystem initialization complete!") # --- detect_fake_news method --- # (Keep this method exactly as you provided it) def detect_fake_news(self, text): """Detects likelihood of text being fake news.""" if not self.fake_news_tokenizer or not self.fake_news_model: return {"real": 0, "fake": 0, "conclusion": "Fake News Model N/A"} if not text or not isinstance(text, str) or not text.strip(): return {"real": 0, "fake": 0, "conclusion": "Please provide text"} try: inputs = self.fake_news_tokenizer(text, truncation=True, return_tensors="pt", max_length=512).to(self.device) with torch.no_grad(): outputs = self.fake_news_model(**inputs) scores = torch.softmax(outputs.logits.cpu(), dim=1)[0].tolist() # NLI model mapping: 0: contradiction (Fake), 1: neutral, 2: entailment (Real) fake_score = scores[0] real_score = scores[2] total_relevant_score = fake_score + real_score if total_relevant_score > 1e-6: display_real = (real_score / total_relevant_score) * 100 display_fake = (fake_score / total_relevant_score) * 100 else: display_real, display_fake = 0, 0 if display_fake > display_real: conclusion = "Likely FAKE news" elif display_real > display_fake: conclusion = "Likely REAL news" else: conclusion = "UNCERTAIN (Scores are equal or very low)" return {"real": round(display_real, 2), "fake": round(display_fake, 2), "conclusion": conclusion} except Exception as e: print(f"Error during fake news detection: {e}") traceback.print_exc() return {"real": 0, "fake": 0, "conclusion": "Detection Error"} # --- detect_ai_image method --- # (Keep this method exactly as you provided it, ensuring Label Mapping is correct) def detect_ai_image(self, image): """Detects likelihood of an image being AI-generated using the custom model.""" if not self.image_classifier or not self.image_feature_extractor: return {"human-generated": 0, "ai-generated": 0, "conclusion": "Image Model/Extractor N/A"} if image is None: return {"human-generated": 0, "ai-generated": 0, "conclusion": "Please provide an image"} try: if not isinstance(image, Image.Image): try: image = Image.fromarray(np.uint8(image)).convert('RGB') except Exception as e: print(f"Image conversion error: Input type was {type(image)}. Error: {e}") return {"human-generated": 0, "ai-generated": 0, "conclusion": "Invalid image format"} if image.mode != 'RGB': image = image.convert('RGB') inputs = self.image_feature_extractor(images=image, return_tensors="pt") pixel_values = inputs['pixel_values'].to(self.device) with torch.no_grad(): outputs = self.image_classifier(pixel_values=pixel_values) if not hasattr(outputs, 'logits'): # Check if it's a direct tensor output (less common from HF models but possible) if isinstance(outputs, torch.Tensor): logits = outputs else: print(f"Error: Model output (type: {type(outputs)}) has no 'logits' and isn't a tensor.") return {"human-generated": 0, "ai-generated": 0, "conclusion": "Model Output Error (Format)"} else: logits = outputs.logits probabilities = torch.softmax(logits, dim=-1)[0].cpu().tolist() # !!! --- CRITICAL: Verify Label Mapping --- !!! # These indices MUST match how your custom model was trained and saved. # If your model outputs [prob_human, prob_ai]: human_prob_index = 0 # <<<--- ADJUST IF NEEDED ai_prob_index = 1 # <<<--- ADJUST IF NEEDED # If your model outputs [prob_ai, prob_human]: # human_prob_index = 1 # ai_prob_index = 0 # !!! --- --- --- --- --- --- --- --- --- --- --- !!! print(f"Using label indices -> Human: {human_prob_index}, AI: {ai_prob_index}") # Log the indices being used num_classes = len(probabilities) if not (0 <= human_prob_index < num_classes and 0 <= ai_prob_index < num_classes): print(f"ERROR: Invalid probability indices ({human_prob_index}, {ai_prob_index}) for {num_classes} output classes.") return {"human-generated": 0, "ai-generated": 0, "conclusion": "Model Output Error (Index)"} if human_prob_index == ai_prob_index: print(f"ERROR: Human and AI probability indices cannot be the same ({human_prob_index}).") return {"human-generated": 0, "ai-generated": 0, "conclusion": "Configuration Error (Index)"} human_score = probabilities[human_prob_index] ai_score = probabilities[ai_prob_index] print(f"Raw probabilities: {probabilities}") print(f" -> Human Score (idx {human_prob_index}): {human_score:.4f}, AI Score (idx {ai_prob_index}): {ai_score:.4f}") display_human = round(human_score * 100, 2) display_ai = round(ai_score * 100, 2) confidence_threshold = 50.0 if display_ai > display_human and display_ai >= confidence_threshold: conclusion = "Likely AI-GENERATED image" elif display_human > display_ai and display_human >= confidence_threshold: conclusion = "Likely HUMAN-CREATED image" else: conclusion = "UNCERTAIN origin" return {"human-generated": display_human, "ai-generated": display_ai, "conclusion": conclusion} except Exception as e: print(f"Error during AI image detection: {e}") traceback.print_exc() return {"human-generated": 0, "ai-generated": 0, "conclusion": "Detection Error"} # --- analyze_review method --- # (Keep this method exactly as you provided it) def analyze_review(self, review_text): """Analyzes a review text using the CombinedAnalyzer.""" if not self.review_analyzer: print("Error: Review Analyzer was not initialized.") return {"sentiment_label": "System Error", "sentiment_score": 0, "authenticity_label": "System Error", "authenticity_score": 0, "error": "Review Analyzer N/A"} if not review_text or not isinstance(review_text, str) or not review_text.strip(): return {"sentiment_label": "N/A", "sentiment_score": 0, "authenticity_label": "N/A", "authenticity_score": 0, "error": "Please provide review text"} try: analysis_result = self.review_analyzer.analyze(review_text) return analysis_result except Exception as e: print(f"Error during review analysis delegation: {e}") traceback.print_exc() return {"sentiment_label": "Error", "sentiment_score": 0, "authenticity_label": "Error", "authenticity_score": 0, "error": f"Analysis Error"} # --- analyze_all method --- # (Keep this method exactly as you provided it) def analyze_all(self, news_text, image, review_text): """Runs all relevant analyses based on the provided inputs.""" news_text_to_analyze = news_text if news_text and isinstance(news_text, str) and news_text.strip() else "" review_text_to_analyze = review_text if review_text and isinstance(review_text, str) and review_text.strip() else "" image_to_analyze = image fake_news_result = self.detect_fake_news(news_text_to_analyze) if news_text_to_analyze else {"real": 0, "fake": 0, "conclusion": "No text provided"} ai_image_result = self.detect_ai_image(image_to_analyze) if image_to_analyze is not None else {"human-generated": 0, "ai-generated": 0, "conclusion": "No image provided"} review_result = self.analyze_review(review_text_to_analyze) if review_text_to_analyze else {"sentiment_label": "N/A", "sentiment_score": 0, "authenticity_label": "N/A", "authenticity_score": 0, "error": "No text provided"} return { "fake_news_analysis": fake_news_result, "ai_image_analysis": ai_image_result, "review_analysis": review_result } # --- Gradio Interface Creation --- # (Keep this function exactly as you provided it, including format_results_html) def create_interface(system_instance): """Creates the Gradio interface using the loaded MultiDetectionSystem.""" if system_instance is None: with gr.Blocks(theme=gr.themes.Soft()) as interface: gr.Markdown("# Error: Multi-Detection System Failed to Initialize") gr.Markdown(""" The application cannot start because the underlying AI models could not be loaded or initialized. Please check the Space logs for specific errors: * **PKL File:** Ensure `finetune_vit_model.pkl` is uploaded to the Space repository (root directory by default) and tracked with Git LFS if large. * **Feature Extractor:** Verify `image_feature_extractor_name` in the code matches the base model used for fine-tuning the PKL. * **Model Names:** Double-check all Hugging Face model names (`fake_news_model_name`, etc.). * **HF Token:** Ensure the `HF_TOKEN` secret is set correctly if using private models. * **Dependencies:** Check `requirements.txt` and potential conflicts. * **Pickle Compatibility:** The PKL file might require specific library versions or class definitions present in the environment. """) return interface # Helper function to format the analysis results into HTML for display def format_results_html(results_dict): # (This function remains the same as before) if not results_dict: return '
An unexpected error occurred: No results dictionary received.
' html = "Conclusion: {news_conclusion}
Conclusion: {img_conclusion}
{sentiment_text}
{authenticity_text}
Analysis Note: {review_error}
' html += "