Spaces:
Running
Running
# app.py | |
from flask import Flask, request, render_template, jsonify, send_file | |
from parser import parse_python_code | |
import os | |
import json | |
import io | |
import subprocess # To call process_hf_dataset.py | |
from database import init_chromadb, store_program, query_programs, load_chromadb_from_hf, DB_NAME, create_collection, save_chromadb_to_hf, HF_DATASET_NAME | |
from datasets import Dataset | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# User-configurable variables | |
UPLOAD_DIR = "./uploads" # Directory for uploads | |
app = Flask(__name__) | |
def reconstruct_code(parts): | |
"""Reconstruct the original code from parsed parts.""" | |
sorted_parts = sorted(parts, key=lambda p: p['location'][0]) | |
return ''.join(part['source'] for part in sorted_parts) | |
def index(): | |
if request.method == 'POST': | |
parts = None | |
filename = 'unnamed.py' | |
code_input = None | |
query_results = None | |
# Handle file upload or pasted code (parsing) | |
if 'file' in request.files and request.files['file'].filename: | |
file = request.files['file'] | |
if not file.filename.endswith('.py'): | |
return 'Invalid file type. Please upload a Python file.', 400 | |
filename = file.filename | |
file_path = os.path.join(UPLOAD_DIR, filename) | |
file.save(file_path) | |
with open(file_path, 'r') as f: | |
code_input = f.read() | |
try: | |
parts, sequence = parse_python_code(code_input) | |
client = init_chromadb() | |
vectors = [part['vector'] for part in parts] | |
store_program(client, code_input, sequence, vectors, DB_NAME) | |
logger.info(f"Stored code: {filename}") | |
# Verify storage | |
collection = create_collection(client, DB_NAME) | |
count = collection.count() | |
logger.info(f"ChromaDB now contains {count} entries") | |
except Exception as e: | |
logger.error(f"Error storing code {filename}: {e}") | |
return f"Error storing code: {e}", 500 | |
elif 'code' in request.form and request.form['code'].strip(): | |
code_input = request.form['code'] | |
filename = request.form.get('filename', 'unnamed.py') or 'unnamed.py' | |
if not filename.endswith('.py'): | |
filename += '.py' | |
try: | |
parts, sequence = parse_python_code(code_input) | |
client = init_chromadb() | |
vectors = [part['vector'] for part in parts] | |
store_program(client, code_input, sequence, vectors, DB_NAME) | |
logger.info(f"Stored code: {filename}") | |
# Verify storage | |
collection = create_collection(client, DB_NAME) | |
count = collection.count() | |
logger.info(f"ChromaDB now contains {count} entries") | |
except Exception as e: | |
logger.error(f"Error storing code {filename}: {e}") | |
return f"Error storing code: {e}", 500 | |
elif 'query_ops' in request.form and request.form['query_ops'].strip(): | |
# Handle query for operations (category sequence) | |
operations = [op.strip() for op in request.form['query_ops'].split(',')] | |
try: | |
client = load_chromadb_from_hf() | |
query_results = query_programs(client, operations, DB_NAME) | |
logger.info(f"Queried operations: {operations}") | |
# Verify query results | |
logger.info(f"Found {len(query_results)} matching programs in ChromaDB") | |
return render_template( | |
'results_partial.html', | |
parts=None, | |
filename=filename, | |
reconstructed_code=None, | |
code_input=None, | |
query_results=query_results | |
) | |
except Exception as e: | |
logger.error(f"Error querying operations: {e}") | |
return f"Error querying operations: {e}", 500 | |
elif 'semantic_query' in request.form and request.form['semantic_query'].strip(): | |
# Handle semantic query (natural language description) | |
semantic_query = request.form['semantic_query'] | |
try: | |
client = load_chromadb_from_hf() | |
query_results = query_programs(client, None, DB_NAME, semantic_query=semantic_query) | |
logger.info(f"Queried semantically: {semantic_query}") | |
# Verify query results | |
logger.info(f"Found {len(query_results)} matching programs in ChromaDB") | |
return render_template( | |
'results_partial.html', | |
parts=None, | |
filename=filename, | |
reconstructed_code=None, | |
code_input=None, | |
query_results=query_results | |
) | |
except Exception as e: | |
logger.error(f"Error querying semantically: {e}") | |
return f"Error querying semantically: {e}", 500 | |
elif 'process_hf' in request.form: | |
# Trigger processing of Hugging Face dataset with fresh database | |
try: | |
# Reset ChromaDB collection | |
client = init_chromadb() | |
try: | |
client.delete_collection(DB_NAME) | |
logger.info(f"Deleted ChromaDB collection: {DB_NAME}") | |
except Exception as e: | |
logger.warning(f"Failed to delete collection {DB_NAME}: {e}") | |
collection = client.create_collection(DB_NAME) | |
logger.info(f"Created fresh ChromaDB collection: {DB_NAME}") | |
# Verify collection | |
if collection is None or not hasattr(collection, 'add'): | |
raise ValueError("ChromaDB collection creation failed") | |
logger.info("Verified ChromaDB collection is valid") | |
# Verify collection is empty | |
count = collection.count() | |
logger.info(f"ChromaDB now contains {count} entries after reset (should be 0)") | |
# Reset Hugging Face dataset (replace with empty dataset) | |
try: | |
empty_data = { | |
"code": [], | |
"sequence": [], | |
"vectors": [], | |
"description_tokens": [], | |
"program_vectors": [] | |
} | |
empty_dataset = Dataset.from_dict(empty_data) | |
empty_dataset.push_to_hub(HF_DATASET_NAME, token=os.getenv("HF_KEY")) | |
logger.info(f"Replaced Hugging Face dataset {HF_DATASET_NAME} with empty dataset") | |
except Exception as e: | |
logger.error(f"Error replacing Hugging Face dataset: {e}") | |
raise | |
# Process dataset | |
result = subprocess.run(['python', 'process_hf_dataset.py'], check=True, capture_output=True, text=True, cwd=os.path.dirname(__file__)) | |
logger.info(f"Process Hugging Face dataset output: {result.stdout}") | |
if result.stderr: | |
logger.error(f"Process Hugging Face dataset errors: {result.stderr}") | |
# Verify database population | |
collection = create_collection(client, DB_NAME) | |
count = collection.count() | |
logger.info(f"ChromaDB now contains {count} entries after processing") | |
return render_template( | |
'results_partial.html', | |
parts=None, | |
filename="Hugging Face Dataset Processed", | |
reconstructed_code=None, | |
code_input=None, | |
query_results=None, | |
message="Hugging Face dataset processed and stored successfully with fresh database and empty dataset." | |
) | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error processing Hugging Face dataset: {e.stderr}") | |
return f"Error processing Hugging Face dataset: {e.stderr}", 500 | |
except Exception as e: | |
logger.error(f"Unexpected error processing Hugging Face dataset: {e}") | |
return f"Unexpected error processing Hugging Face dataset: {e}", 500 | |
elif 'load_dataset' in request.form: | |
# Trigger loading of Hugging Face dataset without resetting | |
try: | |
# Check if collection exists, get or create if needed | |
client = init_chromadb() | |
collection = client.get_or_create_collection(DB_NAME) | |
logger.info(f"Using existing or new ChromaDB collection: {DB_NAME}") | |
# Verify collection | |
if collection is None or not hasattr(collection, 'add'): | |
raise ValueError("ChromaDB collection access failed") | |
logger.info("Verified ChromaDB collection is valid") | |
# Verify collection state | |
count = collection.count() | |
logger.info(f"ChromaDB contains {count} entries before loading") | |
# Process dataset | |
result = subprocess.run(['python', 'process_hf_dataset.py'], check=True, capture_output=True, text=True, cwd=os.path.dirname(__file__)) | |
logger.info(f"Load Hugging Face dataset output: {result.stdout}") | |
if result.stderr: | |
logger.error(f"Load Hugging Face dataset errors: {result.stderr}") | |
# Verify database population | |
collection = create_collection(client, DB_NAME) | |
count = collection.count() | |
logger.info(f"ChromaDB now contains {count} entries after loading") | |
return render_template( | |
'results_partial.html', | |
parts=None, | |
filename="Hugging Face Dataset Loaded", | |
reconstructed_code=None, | |
code_input=None, | |
query_results=None, | |
message="Hugging Face dataset loaded and stored successfully." | |
) | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Error loading Hugging Face dataset: {e.stderr}") | |
return f"Error loading Hugging Face dataset: {e.stderr}", 500 | |
except Exception as e: | |
logger.error(f"Unexpected error loading Hugging Face dataset: {e}") | |
return f"Unexpected error loading Hugging Face dataset: {e}", 500 | |
elif 'reset_db' in request.form: | |
# Reset ChromaDB collection and Hugging Face dataset (no repopulation with samples) | |
try: | |
client = init_chromadb() | |
try: | |
client.delete_collection(DB_NAME) | |
logger.info(f"Deleted ChromaDB collection: {DB_NAME}") | |
except Exception as e: | |
logger.warning(f"Failed to delete collection {DB_NAME}: {e}") | |
collection = client.create_collection(DB_NAME) | |
logger.info(f"Created fresh ChromaDB collection: {DB_NAME}") | |
# Verify collection creation | |
if collection is None or not hasattr(collection, 'add'): | |
raise ValueError("ChromaDB collection creation failed") | |
logger.info("Verified ChromaDB collection is valid") | |
# Verify collection is empty | |
count = collection.count() | |
logger.info(f"ChromaDB now contains {count} entries after reset (should be 0)") | |
# Reset Hugging Face dataset (replace with empty dataset) | |
try: | |
empty_data = { | |
"code": [], | |
"sequence": [], | |
"vectors": [], | |
"description_tokens": [], | |
"program_vectors": [] | |
} | |
empty_dataset = Dataset.from_dict(empty_data) | |
empty_dataset.push_to_hub(HF_DATASET_NAME, token=os.getenv("HF_KEY")) | |
logger.info(f"Replaced Hugging Face dataset {HF_DATASET_NAME} with empty dataset") | |
except Exception as e: | |
logger.error(f"Error replacing Hugging Face dataset: {e}") | |
raise | |
return render_template( | |
'results_partial.html', | |
parts=None, | |
filename="Database Reset", | |
reconstructed_code=None, | |
code_input=None, | |
query_results=None, | |
message="Database and Hugging Face dataset reset successfully." | |
) | |
except Exception as e: | |
logger.error(f"Error resetting database: {e}") | |
return f"Error resetting database: {e}", 500 | |
if parts: | |
indexed_parts = [{'index': i + 1, **part} for i, part in enumerate(parts)] | |
reconstructed_code = reconstruct_code(indexed_parts) | |
return render_template( | |
'results_partial.html', | |
parts=indexed_parts, | |
filename=filename, | |
reconstructed_code=reconstructed_code, | |
code_input=code_input, | |
query_results=None | |
) | |
return 'No file, code, or query provided', 400 | |
# Initial page load (start empty, no default population) | |
logger.info("Application started, database empty until triggered by buttons") | |
return render_template('index.html', parts=None, filename=None, reconstructed_code=None, code_input=None, query_results=None) | |
def export_json(): | |
parts = request.json.get('parts', []) | |
export_data = [{'vector': part['vector'], 'source': part['source'], 'description': generate_description_tokens([part['category']], [part['vector']])} for part in parts] | |
json_str = json.dumps(export_data, indent=2) | |
buffer = io.BytesIO(json_str.encode('utf-8')) | |
buffer.seek(0) | |
return send_file( | |
buffer, | |
as_attachment=True, | |
download_name='code_vectors.json', | |
mimetype='application/json' | |
) | |
def generate_description_tokens(sequence, vectors): | |
"""Generate semantic description tokens for a program based on its sequence and vectors.""" | |
tokens = [] | |
category_descriptions = { | |
'import': 'imports module', | |
'function': 'defines function', | |
'assigned_variable': 'assigns variable', | |
'input_variable': 'input parameter', | |
'returned_variable': 'returns value', | |
'if': 'conditional statement', | |
'return': 'returns result', | |
'try': 'try block', | |
'except': 'exception handler', | |
'expression': 'expression statement', | |
'spacer': 'empty line or comment' | |
} | |
for cat, vec in zip(sequence, vectors): | |
if cat in category_descriptions: | |
tokens.append(f"{category_descriptions[cat]}:{cat}") | |
# Add vector-derived features (e.g., level, span) as tokens | |
tokens.append(f"level:{vec[1]}") | |
tokens.append(f"span:{vec[3]:.2f}") | |
return " ".join(tokens) | |
if __name__ == '__main__': | |
if not os.path.exists(UPLOAD_DIR): | |
os.makedirs(UPLOAD_DIR) | |
app.run(host="0.0.0.0", port=7860) # Bind to all interfaces for Hugging Face Spaces |