# process_hf_dataset.py from datasets import load_dataset import re from parser import parse_python_code, create_vector from database import init_chromadb, store_program, DB_NAME, HF_DATASET_NAME, create_collection import chromadb import os from dotenv import load_dotenv from transformers import AutoTokenizer, AutoModel import torch from tqdm import tqdm # For progress bar import time import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Cache CodeBERT model globally to avoid repeated loading and reducing freezing model_name = "microsoft/codebert-base" tokenizer = None model = None device = None def load_codebert_model(use_gpu=False): """Load and cache the CodeBERT model, handling GPU/CPU options.""" global tokenizer, model, device if tokenizer is None or model is None: try: device = torch.device("cuda" if use_gpu and torch.cuda.is_available() else "cpu") tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModel.from_pretrained(model_name).to(device) logger.info(f"CodeBERT model loaded on {device}") except Exception as e: logger.error(f"Error loading CodeBERT model: {e}") raise return tokenizer, model, device def rename_variables(code, variable_prefixes=None): """Rename variables in Python code to align with vector categories (input_variable, assigned_variable, returned_variable).""" if variable_prefixes is None: variable_prefixes = { 'input': 'input_variable', 'assigned': 'assigned_variable', 'returned': 'returned_variable' } # Simple variable name detection and renaming pattern = r'\b[a-zA-Z_]\w*\b' # Match variable names (simple heuristic) variables = set() code_lines = code.split('\n') # Find all variable names (simplified approach, could improve with AST) for line in code_lines: matches = re.findall(pattern, line) for match in matches: if match not in ['def', 'if', 'else', 'for', 'while', 'return', 'import', 'print', 'eval', 'str', 'int']: # Exclude keywords variables.add(match) # Sort variables by first appearance (simplified, could improve with AST) sorted_vars = sorted(list(variables)) var_map = {} var_count = {'input_variable': 1, 'assigned_variable': 1, 'returned_variable': 1} # Assign variables based on context (simplified heuristic) for var in sorted_vars: # Determine variable role based on context is_input = any(var in line and 'def' in line for line in code_lines) # Check if in function definition (input parameter) is_returned = any('return' in line and var in line for line in code_lines) # Check if used in return statement is_assigned = any('=' in line and var in line.split('=')[0].strip() for line in code_lines) # Check if assigned if is_input: role = 'input_variable' elif is_returned: role = 'returned_variable' elif is_assigned: role = 'assigned_variable' else: role = 'assigned_variable' # Default to assigned if unclear new_name = f"{role}{var_count[role]}" var_map[var] = new_name var_count[role] += 1 # Replace variables in code new_code = code for old_var, new_var in var_map.items(): new_code = re.sub(r'\b' + old_var + r'\b', new_var, new_code) return new_code, var_map def generate_description_tokens(sequence, vectors, var_map=None): """Generate semantic description tokens for a program, including variable roles.""" tokens = [] category_descriptions = { 'import': 'imports module', 'function': 'defines function', 'assigned_variable': 'assigns variable', 'input_variable': 'input parameter', 'returned_variable': 'returns value', 'if': 'conditional statement', 'return': 'returns result', 'try': 'try block', 'except': 'exception handler', 'expression': 'expression statement', 'spacer': 'empty line or comment' } for cat, vec in zip(sequence, vectors): if cat in category_descriptions: tokens.append(f"{category_descriptions[cat]}:{cat}") # Add vector-derived features (e.g., level, span) as tokens tokens.append(f"level:{vec[1]}") tokens.append(f"span:{vec[3]:.2f}") # Add variable role tokens if var_map exists if var_map: for old_var, new_var in var_map.items(): role = new_var.split('variable')[0] + 'variable' # Extract role (e.g., 'input_variable') tokens.append(f"variable:{old_var}={new_var}:{role}") return tokens def generate_semantic_vector(description, total_lines=100, use_gpu=False): """Generate a 6D semantic vector for a textual description using CodeBERT, projecting to 6D.""" global tokenizer, model, device if tokenizer is None or model is None: tokenizer, model, device = load_codebert_model(use_gpu) # Tokenize and encode the description inputs = tokenizer(description, return_tensors="pt", padding=True, truncation=True, max_length=512) inputs = {k: v.to(device) for k, v in inputs.items()} # Generate embeddings with torch.no_grad(): outputs = model(**inputs) # Use mean pooling of the last hidden states vector = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy().tolist() # Truncate or project to 6D (simplified projection: take first 6 dimensions) if len(vector) < 6: vector.extend([0] * (6 - len(vector))) elif len(vector) > 6: vector = vector[:6] # Truncate to 6D # Ensure vector isn’t all zeros or defaults if all(v == 0 for v in vector): logger.warning(f"Default vector detected for description: {description}") # Fallback: Use heuristic if CodeBERT fails to generate meaningful embeddings category_map = { 'import': 1, 'function': 2, 'assign': 17, 'input': 18, 'return': 19, 'if': 5, 'try': 8, 'except': 14 } tokens = description.lower().split() vector = [0] * 6 for token in tokens: for cat, cat_id in category_map.items(): if cat in token: vector[0] = cat_id # category_id vector[1] = 1 # level vector[2] = 0.5 # center_pos vector[3] = 0.1 # span vector[4] = 1 # parent_depth vector[5] = cat_id / len(category_map) # parent_weight break logger.debug(f"Generated semantic vector for '{description}': {vector}") return vector def process_hf_dataset(batch_size=100, use_gpu=False): """Process the Hugging Face dataset in batches and store programs in ChromaDB, aligning with vector categories.""" # Load the dataset try: dataset = load_dataset("iamtarun/python_code_instructions_18k_alpaca", split="train") dataset_list = list(dataset) logger.info(f"Loaded dataset with {len(dataset_list)} entries") except Exception as e: logger.error(f"Error loading dataset: {e}") raise # Initialize ChromaDB client client = init_chromadb() # Do not clear or populate with defaults here—let UI buttons handle this try: collection = client.get_or_create_collection(DB_NAME) logger.info(f"Using existing or new ChromaDB collection: {DB_NAME}, contains {collection.count()} entries") # Verify collection is valid if collection is None or not hasattr(collection, 'add'): raise ValueError("ChromaDB collection access failed") logger.info("Verified ChromaDB collection is valid") except Exception as e: logger.error(f"Error accessing ChromaDB collection: {e}") raise # Process in batches with progress bar total_entries = len(dataset_list) for i in tqdm(range(0, total_entries, batch_size), desc="Processing Hugging Face Dataset"): batch = dataset_list[i:i + batch_size] batch_programs = [] batch_ids = [] batch_documents = [] batch_metadatas = [] batch_embeddings = [] for entry in batch: try: instruction = entry['instruction'] output = entry['output'] # Rename variables to align with vector categories processed_code, var_map = rename_variables(output) # Parse the code to get parts and sequence, generating our 6D vectors parts, sequence = parse_python_code(processed_code) program_vectors = [part['vector'] for part in parts] # Use parser's 6D vectors for program structure # Generate description tokens including variable roles description_tokens = f"task:{instruction.replace(' ', '_')}" description_tokens_list = generate_description_tokens(sequence, program_vectors, var_map) description_tokens += " " + " ".join(description_tokens_list) # Generate a 6D semantic vector for the instruction semantic_vector = generate_semantic_vector(instruction, use_gpu=use_gpu) # Store program data program_id = str(hash(processed_code)) batch_ids.append(program_id) batch_documents.append(processed_code) batch_metadatas.append({"sequence": ",".join(sequence), "description_tokens": description_tokens, "program_vectors": str(program_vectors)}) batch_embeddings.append(semantic_vector) logger.debug(f"Processed entry: {program_id}, Vector: {semantic_vector}") except Exception as e: logger.error(f"Error processing entry {i}: {e}") continue # Skip failed entries but continue processing # Batch add to ChromaDB try: collection.add( documents=batch_documents, metadatas=batch_metadatas, ids=batch_ids, embeddings=batch_embeddings ) logger.info(f"Added batch {i//batch_size + 1} to ChromaDB with {len(batch_ids)} entries") # Verify addition count = collection.count() logger.info(f"ChromaDB now contains {count} entries after adding batch") except Exception as e: logger.error(f"Error adding batch to ChromaDB: {e}") raise # Save to Hugging Face Dataset save_chromadb_to_hf() def save_chromadb_to_hf(dataset_name=HF_DATASET_NAME, token=os.getenv("HF_KEY")): """Save ChromaDB data to Hugging Face Dataset, with error handling and logging.""" try: client = init_chromadb() collection = client.get_collection(DB_NAME) # Fetch all data from ChromaDB results = collection.get(include=["documents", "metadatas", "embeddings"]) data = { "code": results["documents"], "sequence": [meta["sequence"] for meta in results["metadatas"]], "vectors": results["embeddings"], # Semantic 6D vectors "description_tokens": [meta.get('description_tokens', '') for meta in results["metadatas"]], "program_vectors": [eval(meta.get('program_vectors', '[]')) for meta in results["metadatas"]] # Store structural vectors } # Create a Hugging Face Dataset dataset = Dataset.from_dict(data) logger.info(f"Created Hugging Face Dataset with {len(data['code'])} entries") # Push to Hugging Face Hub dataset.push_to_hub(dataset_name, token=token, exist_ok=True) # Allow overwriting existing dataset logger.info(f"Dataset pushed to Hugging Face Hub as {dataset_name}, overwriting existing dataset") # Verify push (optional, could check dataset on Hub) logger.info(f"Verified Hugging Face dataset push with {len(dataset)} entries") except Exception as e: logger.error(f"Error pushing dataset to Hugging Face Hub: {e}") raise if __name__ == "__main__": process_hf_dataset(batch_size=100, use_gpu=False)