# app.py from flask import Flask, request, render_template, jsonify, send_file from parser import parse_python_code import os import json import io import subprocess # To call process_hf_dataset.py from database import init_chromadb, store_program, query_programs, load_chromadb_from_hf, DB_NAME, create_collection, save_chromadb_to_hf, HF_DATASET_NAME from datasets import Dataset import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # User-configurable variables UPLOAD_DIR = "./uploads" # Directory for uploads app = Flask(__name__) def reconstruct_code(parts): """Reconstruct the original code from parsed parts.""" sorted_parts = sorted(parts, key=lambda p: p['location'][0]) return ''.join(part['source'] for part in sorted_parts) @app.route('/', methods=['GET', 'POST']) def index(): if request.method == 'POST': parts = None filename = 'unnamed.py' code_input = None query_results = None # Handle file upload or pasted code (parsing) if 'file' in request.files and request.files['file'].filename: file = request.files['file'] if not file.filename.endswith('.py'): return 'Invalid file type. Please upload a Python file.', 400 filename = file.filename file_path = os.path.join(UPLOAD_DIR, filename) file.save(file_path) with open(file_path, 'r') as f: code_input = f.read() try: parts, sequence = parse_python_code(code_input) client = init_chromadb() vectors = [part['vector'] for part in parts] store_program(client, code_input, sequence, vectors, DB_NAME) logger.info(f"Stored code: {filename}") # Verify storage collection = create_collection(client, DB_NAME) count = collection.count() logger.info(f"ChromaDB now contains {count} entries") except Exception as e: logger.error(f"Error storing code {filename}: {e}") return f"Error storing code: {e}", 500 elif 'code' in request.form and request.form['code'].strip(): code_input = request.form['code'] filename = request.form.get('filename', 'unnamed.py') or 'unnamed.py' if not filename.endswith('.py'): filename += '.py' try: parts, sequence = parse_python_code(code_input) client = init_chromadb() vectors = [part['vector'] for part in parts] store_program(client, code_input, sequence, vectors, DB_NAME) logger.info(f"Stored code: {filename}") # Verify storage collection = create_collection(client, DB_NAME) count = collection.count() logger.info(f"ChromaDB now contains {count} entries") except Exception as e: logger.error(f"Error storing code {filename}: {e}") return f"Error storing code: {e}", 500 elif 'query_ops' in request.form and request.form['query_ops'].strip(): # Handle query for operations (category sequence) operations = [op.strip() for op in request.form['query_ops'].split(',')] try: client = load_chromadb_from_hf() query_results = query_programs(client, operations, DB_NAME) logger.info(f"Queried operations: {operations}") # Verify query results logger.info(f"Found {len(query_results)} matching programs in ChromaDB") return render_template( 'results_partial.html', parts=None, filename=filename, reconstructed_code=None, code_input=None, query_results=query_results ) except Exception as e: logger.error(f"Error querying operations: {e}") return f"Error querying operations: {e}", 500 elif 'semantic_query' in request.form and request.form['semantic_query'].strip(): # Handle semantic query (natural language description) semantic_query = request.form['semantic_query'] try: client = load_chromadb_from_hf() query_results = query_programs(client, None, DB_NAME, semantic_query=semantic_query) logger.info(f"Queried semantically: {semantic_query}") # Verify query results logger.info(f"Found {len(query_results)} matching programs in ChromaDB") return render_template( 'results_partial.html', parts=None, filename=filename, reconstructed_code=None, code_input=None, query_results=query_results ) except Exception as e: logger.error(f"Error querying semantically: {e}") return f"Error querying semantically: {e}", 500 elif 'process_hf' in request.form: # Trigger processing of Hugging Face dataset with fresh database try: # Reset ChromaDB collection client = init_chromadb() try: client.delete_collection(DB_NAME) logger.info(f"Deleted ChromaDB collection: {DB_NAME}") except Exception as e: logger.warning(f"Failed to delete collection {DB_NAME}: {e}") collection = client.create_collection(DB_NAME) logger.info(f"Created fresh ChromaDB collection: {DB_NAME}") # Verify collection if collection is None or not hasattr(collection, 'add'): raise ValueError("ChromaDB collection creation failed") logger.info("Verified ChromaDB collection is valid") # Verify collection is empty count = collection.count() logger.info(f"ChromaDB now contains {count} entries after reset (should be 0)") # Reset Hugging Face dataset (replace with empty dataset) try: empty_data = { "code": [], "sequence": [], "vectors": [], "description_tokens": [], "program_vectors": [] } empty_dataset = Dataset.from_dict(empty_data) empty_dataset.push_to_hub(HF_DATASET_NAME, token=os.getenv("HF_KEY")) logger.info(f"Replaced Hugging Face dataset {HF_DATASET_NAME} with empty dataset") except Exception as e: logger.error(f"Error replacing Hugging Face dataset: {e}") raise # Process dataset result = subprocess.run(['python', 'process_hf_dataset.py'], check=True, capture_output=True, text=True, cwd=os.path.dirname(__file__)) logger.info(f"Process Hugging Face dataset output: {result.stdout}") if result.stderr: logger.error(f"Process Hugging Face dataset errors: {result.stderr}") # Verify database population collection = create_collection(client, DB_NAME) count = collection.count() logger.info(f"ChromaDB now contains {count} entries after processing") return render_template( 'results_partial.html', parts=None, filename="Hugging Face Dataset Processed", reconstructed_code=None, code_input=None, query_results=None, message="Hugging Face dataset processed and stored successfully with fresh database and empty dataset." ) except subprocess.CalledProcessError as e: logger.error(f"Error processing Hugging Face dataset: {e.stderr}") return f"Error processing Hugging Face dataset: {e.stderr}", 500 except Exception as e: logger.error(f"Unexpected error processing Hugging Face dataset: {e}") return f"Unexpected error processing Hugging Face dataset: {e}", 500 elif 'load_dataset' in request.form: # Trigger loading of Hugging Face dataset without resetting try: # Check if collection exists, get or create if needed client = init_chromadb() collection = client.get_or_create_collection(DB_NAME) logger.info(f"Using existing or new ChromaDB collection: {DB_NAME}") # Verify collection if collection is None or not hasattr(collection, 'add'): raise ValueError("ChromaDB collection access failed") logger.info("Verified ChromaDB collection is valid") # Verify collection state count = collection.count() logger.info(f"ChromaDB contains {count} entries before loading") # Process dataset result = subprocess.run(['python', 'process_hf_dataset.py'], check=True, capture_output=True, text=True, cwd=os.path.dirname(__file__)) logger.info(f"Load Hugging Face dataset output: {result.stdout}") if result.stderr: logger.error(f"Load Hugging Face dataset errors: {result.stderr}") # Verify database population collection = create_collection(client, DB_NAME) count = collection.count() logger.info(f"ChromaDB now contains {count} entries after loading") return render_template( 'results_partial.html', parts=None, filename="Hugging Face Dataset Loaded", reconstructed_code=None, code_input=None, query_results=None, message="Hugging Face dataset loaded and stored successfully." ) except subprocess.CalledProcessError as e: logger.error(f"Error loading Hugging Face dataset: {e.stderr}") return f"Error loading Hugging Face dataset: {e.stderr}", 500 except Exception as e: logger.error(f"Unexpected error loading Hugging Face dataset: {e}") return f"Unexpected error loading Hugging Face dataset: {e}", 500 elif 'reset_db' in request.form: # Reset ChromaDB collection and Hugging Face dataset (no repopulation with samples) try: client = init_chromadb() try: client.delete_collection(DB_NAME) logger.info(f"Deleted ChromaDB collection: {DB_NAME}") except Exception as e: logger.warning(f"Failed to delete collection {DB_NAME}: {e}") collection = client.create_collection(DB_NAME) logger.info(f"Created fresh ChromaDB collection: {DB_NAME}") # Verify collection creation if collection is None or not hasattr(collection, 'add'): raise ValueError("ChromaDB collection creation failed") logger.info("Verified ChromaDB collection is valid") # Verify collection is empty count = collection.count() logger.info(f"ChromaDB now contains {count} entries after reset (should be 0)") # Reset Hugging Face dataset (replace with empty dataset) try: empty_data = { "code": [], "sequence": [], "vectors": [], "description_tokens": [], "program_vectors": [] } empty_dataset = Dataset.from_dict(empty_data) empty_dataset.push_to_hub(HF_DATASET_NAME, token=os.getenv("HF_KEY")) logger.info(f"Replaced Hugging Face dataset {HF_DATASET_NAME} with empty dataset") except Exception as e: logger.error(f"Error replacing Hugging Face dataset: {e}") raise return render_template( 'results_partial.html', parts=None, filename="Database Reset", reconstructed_code=None, code_input=None, query_results=None, message="Database and Hugging Face dataset reset successfully." ) except Exception as e: logger.error(f"Error resetting database: {e}") return f"Error resetting database: {e}", 500 if parts: indexed_parts = [{'index': i + 1, **part} for i, part in enumerate(parts)] reconstructed_code = reconstruct_code(indexed_parts) return render_template( 'results_partial.html', parts=indexed_parts, filename=filename, reconstructed_code=reconstructed_code, code_input=code_input, query_results=None ) return 'No file, code, or query provided', 400 # Initial page load (start empty, no default population) logger.info("Application started, database empty until triggered by buttons") return render_template('index.html', parts=None, filename=None, reconstructed_code=None, code_input=None, query_results=None) @app.route('/export_json', methods=['POST']) def export_json(): parts = request.json.get('parts', []) export_data = [{'vector': part['vector'], 'source': part['source'], 'description': generate_description_tokens([part['category']], [part['vector']])} for part in parts] json_str = json.dumps(export_data, indent=2) buffer = io.BytesIO(json_str.encode('utf-8')) buffer.seek(0) return send_file( buffer, as_attachment=True, download_name='code_vectors.json', mimetype='application/json' ) def generate_description_tokens(sequence, vectors): """Generate semantic description tokens for a program based on its sequence and vectors.""" tokens = [] category_descriptions = { 'import': 'imports module', 'function': 'defines function', 'assigned_variable': 'assigns variable', 'input_variable': 'input parameter', 'returned_variable': 'returns value', 'if': 'conditional statement', 'return': 'returns result', 'try': 'try block', 'except': 'exception handler', 'expression': 'expression statement', 'spacer': 'empty line or comment' } for cat, vec in zip(sequence, vectors): if cat in category_descriptions: tokens.append(f"{category_descriptions[cat]}:{cat}") # Add vector-derived features (e.g., level, span) as tokens tokens.append(f"level:{vec[1]}") tokens.append(f"span:{vec[3]:.2f}") return " ".join(tokens) if __name__ == '__main__': if not os.path.exists(UPLOAD_DIR): os.makedirs(UPLOAD_DIR) app.run(host="0.0.0.0", port=7860) # Bind to all interfaces for Hugging Face Spaces