|
|
|
import os
|
|
import json
|
|
import time
|
|
import re
|
|
|
|
def create_response(success, message="", data=None, status_code=200):
|
|
"""创建标准化的API响应"""
|
|
response = {"success": success}
|
|
|
|
if message:
|
|
response["message"] = message
|
|
|
|
if data is not None:
|
|
response["data"] = data
|
|
|
|
return response, status_code
|
|
|
|
def validate_agent_access(agent_id, token=None):
|
|
"""验证Agent访问权限"""
|
|
agent_path = os.path.join('agents', f"{agent_id}.json")
|
|
|
|
if not os.path.exists(agent_path):
|
|
return False, "Agent不存在"
|
|
|
|
try:
|
|
with open(agent_path, 'r', encoding='utf-8') as f:
|
|
agent_config = json.load(f)
|
|
|
|
|
|
if not token:
|
|
return True, agent_config
|
|
|
|
|
|
if "distributions" in agent_config:
|
|
for dist in agent_config["distributions"]:
|
|
if dist.get("token") == token:
|
|
|
|
if dist.get("expires_at", 0) > 0 and dist.get("expires_at", 0) < time.time():
|
|
return False, "访问令牌已过期"
|
|
|
|
|
|
dist["usage_count"] = dist.get("usage_count", 0) + 1
|
|
|
|
|
|
if "stats" not in agent_config:
|
|
agent_config["stats"] = {}
|
|
|
|
agent_config["stats"]["usage_count"] = agent_config["stats"].get("usage_count", 0) + 1
|
|
agent_config["stats"]["last_used"] = int(time.time())
|
|
|
|
|
|
with open(agent_path, 'w', encoding='utf-8') as f:
|
|
json.dump(agent_config, f, ensure_ascii=False, indent=2)
|
|
|
|
return True, agent_config
|
|
|
|
return False, "访问令牌无效"
|
|
|
|
except Exception as e:
|
|
return False, f"验证过程出错: {str(e)}"
|
|
|
|
def extract_code_from_text(text):
|
|
"""从文本中提取代码块"""
|
|
|
|
pattern = r'```(?:python)?\n([\s\S]*?)\n```'
|
|
match = re.search(pattern, text)
|
|
|
|
if match:
|
|
return match.group(1)
|
|
|
|
|
|
pattern = r'```\n([\s\S]*?)\n```'
|
|
match = re.search(pattern, text)
|
|
|
|
if match:
|
|
return match.group(1)
|
|
|
|
|
|
return text |