Spaces:
Running
Running
from smolagents import Tool | |
from typing import Any, Optional | |
class SimpleTool(Tool): | |
name = "analyze_content" | |
description = "Enhanced web content analyzer with multiple analysis modes." | |
inputs = {"input_text":{"type":"string","description":"URL or direct text to analyze."},"mode":{"type":"string","nullable":True,"description":"Analysis mode ('analyze', 'summarize', 'sentiment', 'topics')."}} | |
output_type = "string" | |
def forward(self, input_text: str, mode: str = "analyze") -> str: | |
"""Enhanced web content analyzer with multiple analysis modes. | |
Args: | |
input_text: URL or direct text to analyze. | |
mode: Analysis mode ('analyze', 'summarize', 'sentiment', 'topics'). | |
Returns: | |
str: JSON-formatted analysis results | |
""" | |
import requests | |
from bs4 import BeautifulSoup | |
import re | |
from transformers import pipeline | |
import json | |
try: | |
# Setup request headers | |
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} | |
# Process input | |
if input_text.startswith(('http://', 'https://')): | |
response = requests.get(input_text, headers=headers, timeout=10) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Clean page content | |
for tag in soup(['script', 'style', 'meta']): | |
tag.decompose() | |
title = soup.title.string if soup.title else "No title found" | |
content = soup.get_text() | |
else: | |
title = "Text Analysis" | |
content = input_text | |
# Clean text | |
clean_text = re.sub(r'\s+', ' ', content).strip() | |
if len(clean_text) < 100: | |
return json.dumps({ | |
"status": "error", | |
"message": "Content too short for analysis (minimum 100 characters)" | |
}) | |
# Initialize models | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
classifier = pipeline("text-classification", | |
model="nlptown/bert-base-multilingual-uncased-sentiment") | |
# Basic stats | |
stats = { | |
"title": title, | |
"characters": len(clean_text), | |
"words": len(clean_text.split()), | |
"paragraphs": len([p for p in clean_text.split("\n") if p.strip()]), | |
"reading_time": f"{len(clean_text.split()) // 200} minutes" | |
} | |
result = {"status": "success", "stats": stats} | |
# Mode-specific processing | |
if mode == "analyze": | |
# Get summary | |
summary = summarizer(clean_text[:1024], max_length=100, min_length=30)[0]['summary_text'] | |
# Get overall sentiment | |
sentiment = classifier(clean_text[:512])[0] | |
score = int(sentiment['label'][0]) | |
sentiment_text = ["Very Negative", "Negative", "Neutral", "Positive", "Very Positive"][score-1] | |
result.update({ | |
"summary": summary, | |
"sentiment": { | |
"overall": sentiment_text, | |
"score": score, | |
"confidence": f"{score/5*100:.1f}%" | |
} | |
}) | |
elif mode == "sentiment": | |
# Analyze paragraphs | |
paragraphs = [p for p in clean_text.split("\n") if len(p.strip()) > 50] | |
sentiments = [] | |
for i, para in enumerate(paragraphs[:5]): | |
sent = classifier(para[:512])[0] | |
score = int(sent['label'][0]) | |
sentiments.append({ | |
"section": i + 1, | |
"text": para[:100] + "...", | |
"sentiment": ["Very Negative", "Negative", "Neutral", "Positive", "Very Positive"][score-1], | |
"score": score | |
}) | |
result.update({ | |
"sentiment_analysis": { | |
"sections": sentiments, | |
"total_sections": len(sentiments) | |
} | |
}) | |
elif mode == "summarize": | |
# Process in chunks | |
chunks = [clean_text[i:i+1024] for i in range(0, min(len(clean_text), 3072), 1024)] | |
summaries = [] | |
for chunk in chunks: | |
if len(chunk) > 100: | |
summary = summarizer(chunk, max_length=100, min_length=30)[0]['summary_text'] | |
summaries.append(summary) | |
result.update({ | |
"summaries": summaries, | |
"chunks_analyzed": len(summaries) | |
}) | |
elif mode == "topics": | |
# Basic topic categorization | |
categories = { | |
"Technology": r"tech|software|hardware|digital|computer|AI|data", | |
"Business": r"business|market|finance|economy|industry", | |
"Science": r"science|research|study|discovery", | |
"Health": r"health|medical|medicine|wellness", | |
"General": r"news|world|people|life" | |
} | |
topic_scores = {} | |
for topic, pattern in categories.items(): | |
matches = len(re.findall(pattern, clean_text.lower())) | |
topic_scores[topic] = matches | |
result.update({ | |
"topic_analysis": { | |
"detected_topics": topic_scores, | |
"primary_topic": max(topic_scores.items(), key=lambda x: x[1])[0] | |
} | |
}) | |
return json.dumps(result, indent=2) | |
except requests.exceptions.RequestException as e: | |
return json.dumps({ | |
"status": "error", | |
"message": f"Failed to fetch content: {str(e)}", | |
"type": "request_error" | |
}) | |
except Exception as e: | |
return json.dumps({ | |
"status": "error", | |
"message": f"Analysis failed: {str(e)}", | |
"type": "general_error" | |
}) |