from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool, VisitWebpageTool import datetime import requests import pytz import yaml import os from datasets import Dataset from huggingface_hub import HfApi from openai import OpenAI from tools.final_answer import FinalAnswerTool from huggingface_hub import InferenceClient from Gradio_UI import GradioUI # Define the Perplexity system prompt Perplex_Assistant_Prompt = """You are a helpful AI assistant that searches the web for accurate information.""" # Set up API key in environment variable as expected by HfApiModel os.environ["HUGGINGFACE_API_TOKEN"] = os.getenv("HUGGINGFACE_API_KEY", "") # Initialize search tools with fallback capability try: # Try DuckDuckGo first (default) print("Initializing DuckDuckGo search tool...") ddg_search_tool = DuckDuckGoSearchTool(max_results=10) # Test the tool with a simple query test_result = ddg_search_tool("test query") print("DuckDuckGo search tool initialized successfully.") # Use DuckDuckGo as the primary search tool primary_search_tool = ddg_search_tool search_tool_name = "DuckDuckGo" except Exception as e: print(f"Error initializing DuckDuckGo search tool: {str(e)}") print("Falling back to Google search tool...") try: # Import GoogleSearchTool only if needed from smolagents import GoogleSearchTool google_search_tool = GoogleSearchTool() # Test the Google search tool test_result = google_search_tool("test query") print("Google search tool initialized successfully.") # Use Google as the fallback search tool primary_search_tool = google_search_tool search_tool_name = "Google" except Exception as google_error: print(f"Error initializing Google search tool: {str(google_error)}") print("WARNING: No working search tool available. Agent functionality will be limited.") # Create a minimal replacement that returns an explanatory message def search_fallback(query): return f"Search functionality unavailable. Both DuckDuckGo and Google search tools failed to initialize. Query was: {query}" primary_search_tool = search_fallback search_tool_name = "Unavailable" # Initialize the VisitWebpageTool visit_webpage_tool = VisitWebpageTool() #@weave.op() def tracked_perplexity_call(prompt: str, system_messages: str, model_name: str = "sonar-pro", assistant_meta: bool = False): """Enhanced Perplexity API call with explicit model tracking.""" client = OpenAI(api_key=os.getenv("PERPLEXITY_API_KEY"), base_url="https://api.perplexity.ai") system_message = Perplex_Assistant_Prompt if assistant_meta: system_message += f"\n\n{system_messages}" # Minimal parameters for Perplexity return client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": prompt}, ], stream=False, ).choices[0].message.content @tool def Sonar_Web_Search_Tool(arg1: str, arg2: str) -> str: """A tool that accesses Perplexity Sonar to search the web when the answer requires or would benefit from a real world web reference. Args: arg1: User Prompt arg2: Details on the desired web search results as system message for sonar web search """ try: sonar_response = tracked_perplexity_call(arg1, arg2) return sonar_response except Exception as e: return f"Error using Sonar Websearch tool '{arg1} {arg2}': {str(e)}" def parse_json(text: str): """ A safer JSON parser using ast.literal_eval. Converts JSON-like strings to Python objects without executing code. Handles common JSON literals (true, false, null) by converting them to Python equivalents. """ # Replace JSON literals with Python equivalents prepared_text = text.replace("true", "True").replace("false", "False").replace("null", "None") try: import ast return ast.literal_eval(prepared_text) except (SyntaxError, ValueError) as e: raise ValueError(f"Failed to parse JSON: {str(e)}") def Dataset_Creator_Function(dataset_name: str, conversation_data: str) -> str: """Creates and pushes a dataset to Hugging Face with the conversation history. Args: dataset_name: Name for the dataset (will be prefixed with username) conversation_data: String representing the conversation data. Can be: - JSON array of objects (each object becomes a row) - Pipe-separated values (first row as headers, subsequent rows as values) - Plain text (stored in a single 'text' column) Returns: URL of the created dataset or error message along with the log output. """ log_text = "" try: # Required imports import pandas as pd from datasets import Dataset, DatasetDict from huggingface_hub import HfApi # Get API key api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY") if not api_key: return "Error: No Hugging Face API key found in environment variables" # Set fixed username username = "Misfits-and-Machines" safe_dataset_name = dataset_name.replace(" ", "_").lower() repo_id = f"{username}/{safe_dataset_name}" log_text += f"Creating dataset: {repo_id}\n" # Ensure repository exists hf_api = HfApi(token=api_key) try: if not hf_api.repo_exists(repo_id=repo_id, repo_type="dataset"): hf_api.create_repo(repo_id=repo_id, repo_type="dataset") log_text += f"Created repository: {repo_id}\n" else: log_text += f"Repository already exists: {repo_id}\n" except Exception as e: log_text += f"Note when checking/creating repository: {str(e)}\n" # Process input data created_ds = None try: # Try parsing as JSON using the safer parse_json function try: json_data = parse_json(conversation_data) # Process based on data structure if isinstance(json_data, list) and all(isinstance(item, dict) for item in json_data): log_text += f"Processing JSON array with {len(json_data)} items\n" # Create a dataset with columns for all keys in the first item # This ensures the dataset structure is consistent first_item = json_data[0] columns = list(first_item.keys()) log_text += f"Detected columns: {columns}\n" # Initialize data dictionary with empty lists for each column data_dict = {col: [] for col in columns} # Process each item for item in json_data: for col in columns: # Get the value for this column, or empty string if missing value = item.get(col, "") data_dict[col].append(value) # Debug output to verify data structure for col in columns: log_text += f"Column '{col}' has {len(data_dict[col])} entries\n" # Create dataset from dictionary ds = Dataset.from_dict(data_dict) log_text += f"Created dataset with {len(ds)} rows\n" created_ds = DatasetDict({"train": ds}) elif isinstance(json_data, dict): log_text += "Processing single JSON object\n" # For a single object, create a dataset with one row data_dict = {k: [v] for k, v in json_data.items()} ds = Dataset.from_dict(data_dict) created_ds = DatasetDict({"train": ds}) else: raise ValueError("JSON not recognized as array or single object") except Exception as json_error: log_text += f"Not processing as JSON: {str(json_error)}\n" raise json_error # Propagate to next handler except Exception: # Try pipe-separated format lines = conversation_data.strip().split('\n') if '|' in conversation_data and len(lines) > 1: log_text += "Processing as pipe-separated data\n" headers = [h.strip() for h in lines[0].split('|')] log_text += f"Detected headers: {headers}\n" # Initialize data dictionary data_dict = {header: [] for header in headers} # Process each data row for i, line in enumerate(lines[1:], 1): if not line.strip(): continue values = [val.strip() for val in line.split('|')] if len(values) == len(headers): for j, header in enumerate(headers): data_dict[header].append(values[j]) else: log_text += f"Warning: Skipping row {i} (column count mismatch)\n" # Create dataset from dictionary if all(len(values) > 0 for values in data_dict.values()): ds = Dataset.from_dict(data_dict) log_text += f"Created dataset with {len(ds)} rows\n" created_ds = DatasetDict({"train": ds}) else: log_text += "No valid rows found in pipe-separated data\n" created_ds = DatasetDict({"train": Dataset.from_dict({"text": [conversation_data]})}) else: # Fallback for plain text log_text += "Processing as plain text\n" created_ds = DatasetDict({"train": Dataset.from_dict({"text": [conversation_data]})}) # Push using the DatasetDict push_to_hub method. log_text += f"Pushing dataset to {repo_id}\n" created_ds.push_to_hub( repo_id=repo_id, token=api_key, commit_message=f"Upload dataset: {dataset_name}" ) dataset_url = f"https://huggingface.co/datasets/{repo_id}" log_text += f"Dataset successfully pushed to: {dataset_url}\n" return f"Successfully created dataset at {dataset_url}\nLogs:\n{log_text}" except Exception as e: import traceback error_trace = traceback.format_exc() log_text += f"Dataset creation error: {str(e)}\n{error_trace}\n" return f"Error creating dataset: {str(e)}\nLogs:\n{log_text}" @tool def Dataset_Creator_Tool(dataset_name: str, conversation_data: str) -> str: """A tool that creates and pushes a dataset to Hugging Face. Args: dataset_name: Name for the dataset (will be prefixed with 'Misfits-and-Machines/') conversation_data: Data content to save in the dataset. Formats supported: 1. JSON array of objects – Each object becomes a row (keys as columns). Example: [{"name": "Product A", "brand": "Company X"}, {"name": "Product B", "brand": "Company Y"}] 2. Pipe-separated values – First row as headers, remaining rows as values. Example: "name | brand\nProduct A | Company X\nProduct B | Company Y" 3. Plain text – Stored in a single 'text' column. Returns: A link to the created dataset on the Hugging Face Hub or an error message, along with log details. """ try: log_text = f"Creating dataset '{dataset_name}' with {len(conversation_data)} characters of data\n" log_text += f"Dataset will be created at Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}\n" # Call Dataset_Creator_Function directly without trying to define any new functions result = Dataset_Creator_Function(dataset_name, conversation_data) log_text += f"Dataset creation result: {result}\n" return log_text except Exception as e: import traceback error_trace = traceback.format_exc() return f"Error using Dataset Creator tool: {str(e)}\n{error_trace}" def verify_dataset_exists(repo_id: str) -> dict: """Verify that a dataset exists and is valid on the Hugging Face Hub. Args: repo_id: Full repository ID in format "username/dataset_name" Returns: Dict with "exists" boolean and "message" string """ try: # Check if dataset exists using the datasets-server API api_url = f"https://datasets-server.huggingface.co/is-valid?dataset={repo_id}" response = requests.get(api_url) # Parse the response if response.status_code == 200: data = response.json() # If any of these are True, the dataset exists in some form if data.get("viewer", False) or data.get("preview", False): return {"exists": True, "message": "Dataset is valid and accessible"} else: return {"exists": False, "message": "Dataset exists but may not be fully processed yet"} else: return {"exists": False, "message": f"API returned status code {response.status_code}"} except Exception as e: return {"exists": False, "message": f"Error verifying dataset: {str(e)}"} @tool def Check_Dataset_Validity(dataset_name: str) -> str: """A tool that checks if a dataset exists and is valid on Hugging Face. Args: dataset_name: Name of the dataset to check (with or without organization prefix) Returns: Status message about the dataset validity """ try: # Ensure the dataset name has the organization prefix if "/" not in dataset_name: dataset_name = f"Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}" # Check dataset validity result = verify_dataset_exists(dataset_name) if result["exists"]: return f"Dataset '{dataset_name}' exists and is valid. You can access it at https://huggingface.co/datasets/{dataset_name}" else: return f"Dataset '{dataset_name}' could not be verified: {result['message']}. It may still be processing or may not exist." except Exception as e: return f"Error checking dataset validity: {str(e)}" @tool def get_current_time_in_timezone(timezone: str) -> str: """A tool that fetches the current local time in a specified timezone. Args: timezone: A string representing a valid timezone (e.g., 'America/New_York'). """ try: # Create timezone object tz = pytz.timezone(timezone) # Get current time in that timezone local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") return f"The current local time in {timezone} is: {local_time}" except Exception as e: return f"Error fetching time for timezone '{timezone}': {str(e)}" final_answer = FinalAnswerTool() # Create Perplexity R1 model implementation directly without referencing an undefined variable # Import necessary modules (already imported above) # from huggingface_hub import InferenceClient # Create a new model implementation that uses the larger context window model through InferenceClient class PerplexityR1Model: def __init__(self, temperature=0.5, max_tokens=1500): """Initialize Perplexity R1-1776 model with 128K context window.""" self.temperature = temperature self.max_tokens = max_tokens self.model_name = "perplexity-ai/r1-1776" self.provider = "fireworks-ai" self.last_input_token_count = 0 self.last_output_token_count = 0 # Added attribute for output tokens # Get the API key self.api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY") if not self.api_key: raise ValueError("No Hugging Face API key found in environment variables") # Create the inference client self.client = InferenceClient(provider=self.provider, api_key=self.api_key) print("Initialized Perplexity R1-1776 model with 128K context window") def __call__(self, prompt): """Call the model with the prompt.""" # Determine message format and count tokens if isinstance(prompt, list): # Convert each message's content to a string to avoid nested lists combined_prompt = " ".join(str(msg.get("content", "")) for msg in prompt) self.last_input_token_count = len(combined_prompt.split()) messages = prompt # Already in message format elif isinstance(prompt, str): self.last_input_token_count = len(prompt.split()) messages = [{"role": "user", "content": prompt}] else: prompt_str = str(prompt) self.last_input_token_count = len(prompt_str.split()) messages = [{"role": "user", "content": prompt_str}] print(f"Sending approximately {self.last_input_token_count} tokens to Perplexity R1-1776") try: completion = self.client.chat.completions.create( model=self.model_name, messages=messages, temperature=self.temperature, max_tokens=self.max_tokens ) output = completion.choices[0].message.content self.last_output_token_count = len(output.split()) return output except Exception as e: print(f"Error calling Perplexity R1-1776: {str(e)}") # For context length errors, try simple truncation if "context length" in str(e).lower() or "token limit" in str(e).lower(): print("Context length error with R1-1776 - truncating prompt and retrying") if isinstance(prompt, str): truncated_prompt = prompt[-80000:] if len(prompt) > 80000 else prompt messages = [{"role": "user", "content": truncated_prompt}] else: combined_prompt = " ".join(str(msg.get("content", "")) for msg in prompt) truncated_prompt = combined_prompt[-80000:] if len(combined_prompt) > 80000 else combined_prompt messages = [{"role": "user", "content": truncated_prompt}] try: completion = self.client.chat.completions.create( model=self.model_name, messages=messages, temperature=self.temperature, max_tokens=self.max_tokens ) output = completion.choices[0].message.content self.last_output_token_count = len(output.split()) return output except Exception as retry_error: print(f"Error on retry: {str(retry_error)}") return f"ERROR: Model call failed even with reduced context. Please try a shorter query." else: return f"ERROR: {str(e)}" # Initialize our model with Perplexity R1-1776 model = PerplexityR1Model(temperature=0.5, max_tokens=1500) # Import tool from Hub - do this before using the tool in the agent image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True) # Load prompt templates before using them in the agent with open("prompts.yaml", 'r') as stream: prompt_templates = yaml.safe_load(stream) # Initialize the agent with all required components already defined agent = CodeAgent( model=model, tools=[ final_answer, Sonar_Web_Search_Tool, primary_search_tool, get_current_time_in_timezone, image_generation_tool, Dataset_Creator_Tool, Check_Dataset_Validity, visit_webpage_tool, ], max_steps=12, verbosity_level=1, grammar=None, planning_interval=2, name="Research Assistant", description="""An AI assistant that can search the web, create datasets, and answer questions. Using Perplexity R1-1776 model with 128K token context window.""", prompt_templates=prompt_templates ) # Add informative message about the model print("Using Perplexity R1-1776 model with 128K token context window") # Add informative message about which search tool is being used print(f"Agent initialized with {search_tool_name} as primary search tool") print(f"Available tools: final_answer, Sonar_Web_Search_Tool, {search_tool_name}, get_current_time_in_timezone, image_generation_tool, Dataset_Creator_Tool, Check_Dataset_Validity, visit_webpage_tool") # Note about working within token limits - add this comment # When using with queries that might exceed token limits, consider: # 1. Breaking tasks into smaller sub-tasks # 2. Limiting the amount of data returned by search tools # 3. Using the planning_interval to enable more effective reasoning # To fix the TypeError in Gradio_UI.py, you would need to modify that file # For now, we'll just use the agent directly try: GradioUI(agent).launch() except TypeError as e: if "unsupported operand type(s) for +=" in str(e): print("Error: Token counting issue in Gradio UI") print("To fix, edit Gradio_UI.py and change:") print("total_input_tokens += agent.model.last_input_token_count") print("To:") print("total_input_tokens += (agent.model.last_input_token_count or 0)") else: raise e