# utils/helpers.py import os import json import time import re def create_response(success, message="", data=None, status_code=200): """创建标准化的API响应""" response = {"success": success} if message: response["message"] = message if data is not None: response["data"] = data return response, status_code def validate_agent_access(agent_id, token=None): """验证Agent访问权限""" agent_path = os.path.join('agents', f"{agent_id}.json") if not os.path.exists(agent_path): return False, "Agent不存在" try: with open(agent_path, 'r', encoding='utf-8') as f: agent_config = json.load(f) # 如果没有提供令牌,则允许访问 if not token: return True, agent_config # 验证令牌 if "distributions" in agent_config: for dist in agent_config["distributions"]: if dist.get("token") == token: # 检查是否过期 if dist.get("expires_at", 0) > 0 and dist.get("expires_at", 0) < time.time(): return False, "访问令牌已过期" # 更新使用计数 dist["usage_count"] = dist.get("usage_count", 0) + 1 # 更新Agent使用统计 if "stats" not in agent_config: agent_config["stats"] = {} agent_config["stats"]["usage_count"] = agent_config["stats"].get("usage_count", 0) + 1 agent_config["stats"]["last_used"] = int(time.time()) # 保存更新后的配置 with open(agent_path, 'w', encoding='utf-8') as f: json.dump(agent_config, f, ensure_ascii=False, indent=2) return True, agent_config return False, "访问令牌无效" except Exception as e: return False, f"验证过程出错: {str(e)}" def extract_code_from_text(text): """从文本中提取代码块""" # 尝试找到Python代码块 pattern = r'```(?:python)?\n([\s\S]*?)\n```' match = re.search(pattern, text) if match: return match.group(1) # 如果没有找到带有语言标记的代码块,尝试无语言标记的 pattern = r'```\n([\s\S]*?)\n```' match = re.search(pattern, text) if match: return match.group(1) # 如果没有代码块标记,返回原始文本 return text