parse_py / app.py
broadfield-dev's picture
Update app.py
9c65ec4 verified
raw
history blame contribute delete
15.9 kB
# app.py
from flask import Flask, request, render_template, jsonify, send_file
from parser import parse_python_code
import os
import json
import io
import subprocess # To call process_hf_dataset.py
from database import init_chromadb, store_program, query_programs, load_chromadb_from_hf, DB_NAME, create_collection, save_chromadb_to_hf, HF_DATASET_NAME
from datasets import Dataset
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# User-configurable variables
UPLOAD_DIR = "./uploads" # Directory for uploads
app = Flask(__name__)
def reconstruct_code(parts):
"""Reconstruct the original code from parsed parts."""
sorted_parts = sorted(parts, key=lambda p: p['location'][0])
return ''.join(part['source'] for part in sorted_parts)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
parts = None
filename = 'unnamed.py'
code_input = None
query_results = None
# Handle file upload or pasted code (parsing)
if 'file' in request.files and request.files['file'].filename:
file = request.files['file']
if not file.filename.endswith('.py'):
return 'Invalid file type. Please upload a Python file.', 400
filename = file.filename
file_path = os.path.join(UPLOAD_DIR, filename)
file.save(file_path)
with open(file_path, 'r') as f:
code_input = f.read()
try:
parts, sequence = parse_python_code(code_input)
client = init_chromadb()
vectors = [part['vector'] for part in parts]
store_program(client, code_input, sequence, vectors, DB_NAME)
logger.info(f"Stored code: {filename}")
# Verify storage
collection = create_collection(client, DB_NAME)
count = collection.count()
logger.info(f"ChromaDB now contains {count} entries")
except Exception as e:
logger.error(f"Error storing code {filename}: {e}")
return f"Error storing code: {e}", 500
elif 'code' in request.form and request.form['code'].strip():
code_input = request.form['code']
filename = request.form.get('filename', 'unnamed.py') or 'unnamed.py'
if not filename.endswith('.py'):
filename += '.py'
try:
parts, sequence = parse_python_code(code_input)
client = init_chromadb()
vectors = [part['vector'] for part in parts]
store_program(client, code_input, sequence, vectors, DB_NAME)
logger.info(f"Stored code: {filename}")
# Verify storage
collection = create_collection(client, DB_NAME)
count = collection.count()
logger.info(f"ChromaDB now contains {count} entries")
except Exception as e:
logger.error(f"Error storing code {filename}: {e}")
return f"Error storing code: {e}", 500
elif 'query_ops' in request.form and request.form['query_ops'].strip():
# Handle query for operations (category sequence)
operations = [op.strip() for op in request.form['query_ops'].split(',')]
try:
client = load_chromadb_from_hf()
query_results = query_programs(client, operations, DB_NAME)
logger.info(f"Queried operations: {operations}")
# Verify query results
logger.info(f"Found {len(query_results)} matching programs in ChromaDB")
return render_template(
'results_partial.html',
parts=None,
filename=filename,
reconstructed_code=None,
code_input=None,
query_results=query_results
)
except Exception as e:
logger.error(f"Error querying operations: {e}")
return f"Error querying operations: {e}", 500
elif 'semantic_query' in request.form and request.form['semantic_query'].strip():
# Handle semantic query (natural language description)
semantic_query = request.form['semantic_query']
try:
client = load_chromadb_from_hf()
query_results = query_programs(client, None, DB_NAME, semantic_query=semantic_query)
logger.info(f"Queried semantically: {semantic_query}")
# Verify query results
logger.info(f"Found {len(query_results)} matching programs in ChromaDB")
return render_template(
'results_partial.html',
parts=None,
filename=filename,
reconstructed_code=None,
code_input=None,
query_results=query_results
)
except Exception as e:
logger.error(f"Error querying semantically: {e}")
return f"Error querying semantically: {e}", 500
elif 'process_hf' in request.form:
# Trigger processing of Hugging Face dataset with fresh database
try:
# Reset ChromaDB collection
client = init_chromadb()
try:
client.delete_collection(DB_NAME)
logger.info(f"Deleted ChromaDB collection: {DB_NAME}")
except Exception as e:
logger.warning(f"Failed to delete collection {DB_NAME}: {e}")
collection = client.create_collection(DB_NAME)
logger.info(f"Created fresh ChromaDB collection: {DB_NAME}")
# Verify collection
if collection is None or not hasattr(collection, 'add'):
raise ValueError("ChromaDB collection creation failed")
logger.info("Verified ChromaDB collection is valid")
# Verify collection is empty
count = collection.count()
logger.info(f"ChromaDB now contains {count} entries after reset (should be 0)")
# Reset Hugging Face dataset (replace with empty dataset)
try:
empty_data = {
"code": [],
"sequence": [],
"vectors": [],
"description_tokens": [],
"program_vectors": []
}
empty_dataset = Dataset.from_dict(empty_data)
empty_dataset.push_to_hub(HF_DATASET_NAME, token=os.getenv("HF_KEY"))
logger.info(f"Replaced Hugging Face dataset {HF_DATASET_NAME} with empty dataset")
except Exception as e:
logger.error(f"Error replacing Hugging Face dataset: {e}")
raise
# Process dataset
result = subprocess.run(['python', 'process_hf_dataset.py'], check=True, capture_output=True, text=True, cwd=os.path.dirname(__file__))
logger.info(f"Process Hugging Face dataset output: {result.stdout}")
if result.stderr:
logger.error(f"Process Hugging Face dataset errors: {result.stderr}")
# Verify database population
collection = create_collection(client, DB_NAME)
count = collection.count()
logger.info(f"ChromaDB now contains {count} entries after processing")
return render_template(
'results_partial.html',
parts=None,
filename="Hugging Face Dataset Processed",
reconstructed_code=None,
code_input=None,
query_results=None,
message="Hugging Face dataset processed and stored successfully with fresh database and empty dataset."
)
except subprocess.CalledProcessError as e:
logger.error(f"Error processing Hugging Face dataset: {e.stderr}")
return f"Error processing Hugging Face dataset: {e.stderr}", 500
except Exception as e:
logger.error(f"Unexpected error processing Hugging Face dataset: {e}")
return f"Unexpected error processing Hugging Face dataset: {e}", 500
elif 'load_dataset' in request.form:
# Trigger loading of Hugging Face dataset without resetting
try:
# Check if collection exists, get or create if needed
client = init_chromadb()
collection = client.get_or_create_collection(DB_NAME)
logger.info(f"Using existing or new ChromaDB collection: {DB_NAME}")
# Verify collection
if collection is None or not hasattr(collection, 'add'):
raise ValueError("ChromaDB collection access failed")
logger.info("Verified ChromaDB collection is valid")
# Verify collection state
count = collection.count()
logger.info(f"ChromaDB contains {count} entries before loading")
# Process dataset
result = subprocess.run(['python', 'process_hf_dataset.py'], check=True, capture_output=True, text=True, cwd=os.path.dirname(__file__))
logger.info(f"Load Hugging Face dataset output: {result.stdout}")
if result.stderr:
logger.error(f"Load Hugging Face dataset errors: {result.stderr}")
# Verify database population
collection = create_collection(client, DB_NAME)
count = collection.count()
logger.info(f"ChromaDB now contains {count} entries after loading")
return render_template(
'results_partial.html',
parts=None,
filename="Hugging Face Dataset Loaded",
reconstructed_code=None,
code_input=None,
query_results=None,
message="Hugging Face dataset loaded and stored successfully."
)
except subprocess.CalledProcessError as e:
logger.error(f"Error loading Hugging Face dataset: {e.stderr}")
return f"Error loading Hugging Face dataset: {e.stderr}", 500
except Exception as e:
logger.error(f"Unexpected error loading Hugging Face dataset: {e}")
return f"Unexpected error loading Hugging Face dataset: {e}", 500
elif 'reset_db' in request.form:
# Reset ChromaDB collection and Hugging Face dataset (no repopulation with samples)
try:
client = init_chromadb()
try:
client.delete_collection(DB_NAME)
logger.info(f"Deleted ChromaDB collection: {DB_NAME}")
except Exception as e:
logger.warning(f"Failed to delete collection {DB_NAME}: {e}")
collection = client.create_collection(DB_NAME)
logger.info(f"Created fresh ChromaDB collection: {DB_NAME}")
# Verify collection creation
if collection is None or not hasattr(collection, 'add'):
raise ValueError("ChromaDB collection creation failed")
logger.info("Verified ChromaDB collection is valid")
# Verify collection is empty
count = collection.count()
logger.info(f"ChromaDB now contains {count} entries after reset (should be 0)")
# Reset Hugging Face dataset (replace with empty dataset)
try:
empty_data = {
"code": [],
"sequence": [],
"vectors": [],
"description_tokens": [],
"program_vectors": []
}
empty_dataset = Dataset.from_dict(empty_data)
empty_dataset.push_to_hub(HF_DATASET_NAME, token=os.getenv("HF_KEY"))
logger.info(f"Replaced Hugging Face dataset {HF_DATASET_NAME} with empty dataset")
except Exception as e:
logger.error(f"Error replacing Hugging Face dataset: {e}")
raise
return render_template(
'results_partial.html',
parts=None,
filename="Database Reset",
reconstructed_code=None,
code_input=None,
query_results=None,
message="Database and Hugging Face dataset reset successfully."
)
except Exception as e:
logger.error(f"Error resetting database: {e}")
return f"Error resetting database: {e}", 500
if parts:
indexed_parts = [{'index': i + 1, **part} for i, part in enumerate(parts)]
reconstructed_code = reconstruct_code(indexed_parts)
return render_template(
'results_partial.html',
parts=indexed_parts,
filename=filename,
reconstructed_code=reconstructed_code,
code_input=code_input,
query_results=None
)
return 'No file, code, or query provided', 400
# Initial page load (start empty, no default population)
logger.info("Application started, database empty until triggered by buttons")
return render_template('index.html', parts=None, filename=None, reconstructed_code=None, code_input=None, query_results=None)
@app.route('/export_json', methods=['POST'])
def export_json():
parts = request.json.get('parts', [])
export_data = [{'vector': part['vector'], 'source': part['source'], 'description': generate_description_tokens([part['category']], [part['vector']])} for part in parts]
json_str = json.dumps(export_data, indent=2)
buffer = io.BytesIO(json_str.encode('utf-8'))
buffer.seek(0)
return send_file(
buffer,
as_attachment=True,
download_name='code_vectors.json',
mimetype='application/json'
)
def generate_description_tokens(sequence, vectors):
"""Generate semantic description tokens for a program based on its sequence and vectors."""
tokens = []
category_descriptions = {
'import': 'imports module',
'function': 'defines function',
'assigned_variable': 'assigns variable',
'input_variable': 'input parameter',
'returned_variable': 'returns value',
'if': 'conditional statement',
'return': 'returns result',
'try': 'try block',
'except': 'exception handler',
'expression': 'expression statement',
'spacer': 'empty line or comment'
}
for cat, vec in zip(sequence, vectors):
if cat in category_descriptions:
tokens.append(f"{category_descriptions[cat]}:{cat}")
# Add vector-derived features (e.g., level, span) as tokens
tokens.append(f"level:{vec[1]}")
tokens.append(f"span:{vec[3]:.2f}")
return " ".join(tokens)
if __name__ == '__main__':
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
app.run(host="0.0.0.0", port=7860) # Bind to all interfaces for Hugging Face Spaces