File size: 8,245 Bytes
3d833be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import git
from pathlib import Path
from openai import OpenAI
from anthropic import Anthropic
from dotenv import load_dotenv
from pydantic_model import ImpactAnalysis
import tiktoken
import json
from typing import List, Tuple, Dict, Any

# Load environment variables
load_dotenv()

# Initialize API clients
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
anthropic_client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

def clone_repository(repo_url, temp_dir):
    """Clone a git repository to a temporary directory."""
    try:
        git.Repo.clone_from(repo_url, temp_dir)
        return True, None
    except Exception as e:
        return False, str(e)

def read_code_files(directory):
    """Read all code files from the directory."""
    code_files = []
    code_extensions = {'.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.cpp', '.c', '.cs', '.go', '.rb', '.php', '.cls', '.object','.page'}
    warnings = []
    
    for root, _, files in os.walk(directory):
        for file in files:
            if Path(file).suffix in code_extensions:
                file_path = os.path.join(root, file)
                try:
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                        relative_path = os.path.relpath(file_path, directory)
                        code_files.append({
                            'path': relative_path,
                            'content': content
                        })
                except Exception as e:
                    warnings.append(f"Could not read file {file_path}: {str(e)}")
    
    return code_files, warnings

def count_tokens(text: str, model: str = "gpt-4") -> int:
    """Count the number of tokens in a text string."""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def chunk_files(code_files: List[Dict[str, str]], model: str = "gpt-4", max_tokens: int = 120000) -> List[List[Dict[str, str]]]:
    """Split files into chunks that fit within the context window."""
    chunks = []
    current_chunk = []
    current_tokens = 0
    
    for file in code_files:
        file_content = f"File: {file['path']}\nContent:\n{file['content']}\n"
        file_tokens = count_tokens(file_content, model)
        
        # If a single file is larger than max_tokens, skip it
        if file_tokens > max_tokens:
            print(f"Warning: File {file['path']} is too large ({file_tokens} tokens) and will be skipped")
            continue
            
        # If adding this file would exceed max_tokens, start a new chunk
        if current_tokens + file_tokens > max_tokens:
            if current_chunk:  # Only add non-empty chunks
                chunks.append(current_chunk)
            current_chunk = [file]
            current_tokens = file_tokens
        else:
            current_chunk.append(file)
            current_tokens += file_tokens
    
    # Add the last chunk if it's not empty
    if current_chunk:
        chunks.append(current_chunk)
    
    return chunks

def analyze_code_chunk(chunk: List[Dict[str, str]], prompt: str, model: str) -> Tuple[str, str]:
    """Analyze a chunk of code files."""
    try:
        # Prepare the context from the chunk
        context = "Here are the relevant code files:\n\n"
        for file in chunk:
            context += f"File: {file['path']}\n```\n{file['content']}\n```\n"
        
        if model == "gpt-4":
            json_schema = ImpactAnalysis.model_json_schema()
            messages = [
                {"role": "system", "content": "You are a code analysis expert. Analyze the provided code based on the user's prompt."},
                {"role": "user", "content": f"Please check the impact of performing the below code/configuration changes on the above codebase. Provide only the summary of the impact in a table with aggregate analysis that outputs a JSON object with the following schema : {json_schema} . Pls note :  Do not add the characters ``` json anywhere in the response. Do not respond with messages like 'Here is the response in the required JSON format:'.\n\nCode or configuration changes: {prompt}\n\n{context}"}
            ]
            
            response = openai_client.chat.completions.create(
                model="gpt-4o",
                messages=messages,
                temperature=0.7,
                max_tokens=2000
            )
            return response.choices[0].message.content, ""
        else:
            # Keep original Claude implementation
            system_message = "You are a code analysis expert. Analyze the provided code based on the user's prompt."
            user_message = f"Please check the impact of performing the below code/configuration changes on the above codebase. Provide only the summary of the impact in a table with aggregate analysis that includes 1) List of files impacted. 2) No of files impacted 3) Impactd etail on each file impacted . Surface a 'Severity Level' at the top of table with possible values: Low, Medium, High based on the 'Number of impacted files' impacted. E.g. if 'Number of impacted files' > 0 but < 3 then LOW, if 'Number of impacted files' > 3 but < 8 then MEDIUM, if 'Number of impacted files' > 8 then HIGH.\n\nCode or configuration changes: {prompt}\n\n{context}"
            
            response = anthropic_client.messages.create(
                model="claude-3-7-sonnet-20250219",
                max_tokens=2000,
                temperature=0.7,
                system=system_message,
                messages=[{"role": "user", "content": user_message}]
            )
            return response.content[0].text, ""
    except Exception as e:
        return "", str(e)

def analyze_code(code_files: List[Dict[str, str]], prompt: str, model: str) -> Tuple[str, str]:
    """Analyze code files with chunking to handle large codebases."""
    try:
        # Split files into chunks
        chunks = chunk_files(code_files, model)
        
        if not chunks:
            return "", "No valid files to analyze"
        
        # Analyze each chunk
        all_analyses = []
        for i, chunk in enumerate(chunks):
            analysis, error = analyze_code_chunk(chunk, prompt, model)
            if error:
                return "", f"Error analyzing chunk {i+1}: {error}"
            if analysis:
                all_analyses.append(analysis)
        
        if not all_analyses:
            return "", "No analysis results generated"
        
        # Combine results from all chunks
        combined_analysis = {
            "severity_level": "LOW",  # Default to lowest severity
            "number_of_files_impacted": 0,
            "files_impacted": []
        }
        
        # Merge results from all chunks
        for analysis in all_analyses:
            try:
                chunk_data = json.loads(analysis)
                combined_analysis["number_of_files_impacted"] += chunk_data.get("number_of_files_impacted", 0)
                combined_analysis["files_impacted"].extend(chunk_data.get("files_impacted", []))
                
                # Update severity level based on the highest severity found
                severity_map = {"LOW": 1, "MEDIUM": 2, "HIGH": 3}
                current_severity = severity_map.get(combined_analysis["severity_level"], 0)
                chunk_severity = severity_map.get(chunk_data.get("severity_level", "LOW"), 0)
                if chunk_severity > current_severity:
                    combined_analysis["severity_level"] = chunk_data["severity_level"]
            except json.JSONDecodeError:
                continue
        
        return json.dumps(combined_analysis), ""
        
    except Exception as e:
        return "", str(e)

def check_api_keys():
    """Check if required API keys are set."""
    openai_key = os.getenv("OPENAI_API_KEY") is not None
    anthropic_key = os.getenv("ANTHROPIC_API_KEY") is not None
    return {
        "gpt-4": openai_key,
        "claude-sonnet": anthropic_key
    }