import cv2 import numpy as np import base64 import os import tempfile from openai import OpenAI from PIL import Image import io import re import time class VideoAnomalyDetector: def __init__(self, api_key, model="gpt-4o"): """ Initialize the VideoAnomalyDetector with OpenAI API key. Args: api_key (str): OpenAI API key for accessing GPT-4o model model (str): Model to use for analysis ("gpt-4o" or "gpt-4o-mini") """ self.client = OpenAI(api_key=api_key) self.model = model def extract_frames(self, video_path, skip_frames): """ Extract frames from a video file, skipping the specified number of frames. Args: video_path (str): Path to the video file skip_frames (int): Number of frames to skip between captures Returns: list: List of extracted frames as numpy arrays """ frames = [] # Use the default backend for video files # DirectShow can cause issues with some video files cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Could not open video file: {video_path}") # Don't set MJPG format for video files as it can interfere with proper decoding # cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G')) frame_count = 0 while True: ret, frame = cap.read() if not ret: break if frame_count % (skip_frames + 1) == 0: # Convert from BGR to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frames.append(rgb_frame) frame_count += 1 cap.release() return frames def process_live_stream(self, stream_source, skip_frames, prompt, analysis_depth="granular", max_frames=None, callback=None, time_interval=None): """ Process frames from a live video stream. Args: stream_source: Stream source (0 for webcam, URL for IP camera or RTSP stream) skip_frames (int): Number of frames to skip between captures prompt (str): Prompt describing what anomaly to look for analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis max_frames (int, optional): Maximum number of frames to process (None for unlimited) callback (function, optional): Callback function to report progress time_interval (int, optional): If set, capture one frame every X seconds instead of using skip_frames Returns: list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative) """ # Open the video stream with appropriate backend # Only use DirectShow for local webcams (0 or 1) on Windows if os.name == 'nt' and (stream_source == 0 or stream_source == 1): # This is a local webcam on Windows, use DirectShow cap = cv2.VideoCapture(stream_source, cv2.CAP_DSHOW) # For webcams, MJPG format can be more stable cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G')) else: # For IP cameras, RTSP streams, or non-Windows systems, use default backend cap = cv2.VideoCapture(stream_source) if not cap.isOpened(): raise ValueError(f"Could not open video stream: {stream_source}") frames = [] frame_count = 0 processed_count = 0 last_capture_time = time.time() try: while True: ret, frame = cap.read() if not ret: break current_time = time.time() # Determine if we should capture this frame should_capture = False if time_interval is not None: # Time-based interval mode if current_time - last_capture_time >= time_interval: should_capture = True last_capture_time = current_time else: # Frame-skip mode if frame_count % (skip_frames + 1) == 0: should_capture = True if should_capture: # Convert from BGR to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frames.append(rgb_frame) processed_count += 1 # Process the frame immediately in time interval mode if time_interval is not None: # Process the frame immediately result = self.analyze_frame(rgb_frame, prompt) # Make sure the frame is included in the result if "frame" not in result: result["frame"] = rgb_frame # If we have a callback, call it with -1 for total to indicate continuous mode if callback: callback(processed_count, -1) # In time interval mode, we yield results one by one if analysis_depth == "granular": yield result else: # For cumulative analysis, we need to keep all frames # We'll handle this outside the loop pass else: # Update progress if callback is provided (frame count mode) if callback and max_frames: callback(processed_count, max_frames) # Break if we've reached the maximum number of frames if max_frames and processed_count >= max_frames: break frame_count += 1 finally: cap.release() # If we're in time interval mode with cumulative analysis, we don't return here # as we're yielding results above if time_interval is not None and analysis_depth == "cumulative": # This is a special case - we need to periodically do cumulative analysis # For simplicity, we'll just return the current cumulative analysis result = self.analyze_frames_cumulatively(frames, prompt, callback) yield result return # Process the collected frames for non-time interval mode if time_interval is None: if analysis_depth == "cumulative": return self.analyze_frames_cumulatively(frames, prompt, callback) else: # granular (default) results = [] for i, frame in enumerate(frames): if callback: callback(i, len(frames)) result = self.analyze_frame(frame, prompt) results.append(result) return results def encode_image_to_base64(self, image_array): """ Convert a numpy array image to base64 encoded string. Args: image_array (numpy.ndarray): Image as numpy array Returns: str: Base64 encoded image string """ # Convert numpy array to PIL Image pil_image = Image.fromarray(image_array) # Create a bytes buffer buffer = io.BytesIO() # Save the image to the buffer in PNG format pil_image.save(buffer, format="PNG") # Get the bytes from the buffer img_bytes = buffer.getvalue() # Encode the bytes to base64 base64_encoded = base64.b64encode(img_bytes).decode('utf-8') return base64_encoded def analyze_frame(self, frame, prompt): """ Analyze a frame using the selected OpenAI model. Args: frame (numpy.ndarray): Frame to analyze prompt (str): Prompt describing what anomaly to look for Returns: dict: Analysis result from the model """ base64_image = self.encode_image_to_base64(frame) # Enhanced prompt to get structured information about anomalies enhanced_prompt = f""" {prompt} After your analysis, please include a structured assessment at the end of your response in this exact format: ANOMALY_DETECTED: [Yes/No] ANOMALY_TYPE: [Human/Non-human/None] CONFIDENCE: [0-100] For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly, otherwise "No". For ANOMALY_TYPE, if an anomaly is detected, classify it as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None". For CONFIDENCE, provide a number from 0 to 100 indicating your confidence level in the assessment. """ try: response = self.client.chat.completions.create( model=self.model, messages=[ { "role": "user", "content": [ {"type": "text", "text": enhanced_prompt}, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}" } } ] } ], max_tokens=1000 ) # Extract the response text response_text = response.choices[0].message.content # Extract anomaly detection information using regex anomaly_detected = False anomaly_type = "None" confidence = 0 # Look for the structured format anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE) if anomaly_match and anomaly_match.group(1).lower() == 'yes': anomaly_detected = True confidence = 90 # Default high confidence when anomaly is detected # If anomaly detected, look for the type type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE) if type_match: anomaly_type = type_match.group(1) # Look for confidence information conf_match = re.search(r'CONFIDENCE:\s*(\d+)', response_text, re.IGNORECASE) if conf_match: try: confidence = int(conf_match.group(1)) except: pass # Keep default confidence if parsing fails return { "text": response_text, "analysis": response_text, # Add analysis field as an alias for text "frame": frame, "anomaly_detected": anomaly_detected, "anomaly_type": anomaly_type, "confidence": confidence, # Add confidence field "timestamp": time.time() # Add timestamp for live stream analysis } except Exception as e: return { "error": str(e), "frame": frame, "anomaly_detected": False, "anomaly_type": "None", "confidence": 0, # Add confidence field "timestamp": time.time() # Add timestamp for live stream analysis } def analyze_frames_cumulatively(self, frames, prompt, callback=None): """ Analyze all frames together and provide a cumulative analysis. Args: frames (list): List of frames to analyze prompt (str): Prompt describing what anomaly to look for callback (function, optional): Callback function to report progress Returns: dict: Cumulative analysis result """ # First, analyze each frame individually to identify potential anomalies individual_results = [] for i, frame in enumerate(frames): if callback: callback(i, len(frames) * 2) # First half of progress for individual analysis result = self.analyze_frame(frame, f"{prompt} Provide a brief analysis of this frame only.") individual_results.append(result) # Identify frames with potential anomalies anomaly_frames = [] anomaly_descriptions = [] anomaly_types = [] for i, result in enumerate(individual_results): if "error" not in result and result["anomaly_detected"]: anomaly_frames.append(result["frame"]) anomaly_descriptions.append(f"Frame {i+1}: {result['text']}") anomaly_types.append(result["anomaly_type"]) # Limit to 3 anomaly frames if len(anomaly_frames) >= 3: break # If no anomalies were detected, use the first, middle, and last frames if not anomaly_frames and len(frames) > 0: if len(frames) == 1: anomaly_frames = [frames[0]] elif len(frames) == 2: anomaly_frames = [frames[0], frames[1]] else: anomaly_frames = [ frames[0], frames[len(frames) // 2], frames[-1] ] # Limit to max 3 frames anomaly_frames = anomaly_frames[:3] # Create a cumulative analysis prompt with the anomaly descriptions cumulative_prompt = f""" {prompt} Based on the analysis of all frames, provide a comprehensive summary of any anomalies detected in the video. Focus on patterns or recurring issues. Here are some notable observations from individual frames: {chr(10).join(anomaly_descriptions[:5])} After your analysis, please include a structured assessment at the end of your response in this exact format: ANOMALY_DETECTED: [Yes/No] ANOMALY_TYPE: [Human/Non-human/None] For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly across the video, otherwise "No". For ANOMALY_TYPE, if an anomaly is detected, classify the predominant type as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None". """ try: if callback: callback(len(frames), len(frames) * 2) # Second half of progress for cumulative analysis # Encode the selected frames base64_images = [self.encode_image_to_base64(frame) for frame in anomaly_frames] # Create the content for the API call content = [{"type": "text", "text": cumulative_prompt}] # Add the images for base64_image in base64_images: content.append({ "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}" } }) response = self.client.chat.completions.create( model=self.model, messages=[ { "role": "user", "content": content } ], max_tokens=1500 ) # Extract the response text response_text = response.choices[0].message.content # Extract anomaly detection information using regex anomaly_detected = False anomaly_type = "None" # Look for the structured format anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE) if anomaly_match and anomaly_match.group(1).lower() == 'yes': anomaly_detected = True # If anomaly detected, look for the type type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE) if type_match: anomaly_type = type_match.group(1) return { "text": response_text, "frames": anomaly_frames, "anomaly_detected": anomaly_detected, "anomaly_type": anomaly_type, "timestamp": time.time() # Add timestamp for live stream analysis } except Exception as e: return { "error": str(e), "frames": anomaly_frames, "anomaly_detected": False, "anomaly_type": "None", "timestamp": time.time() # Add timestamp for live stream analysis } def process_video(self, video_path, skip_frames, prompt, analysis_depth="granular", callback=None): """ Process a video file, extracting frames and analyzing them for anomalies. Args: video_path (str): Path to the video file skip_frames (int): Number of frames to skip between captures prompt (str): Prompt describing what anomaly to look for analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis callback (function, optional): Callback function to report progress Returns: list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative) """ frames = self.extract_frames(video_path, skip_frames) if analysis_depth == "cumulative": return self.analyze_frames_cumulatively(frames, prompt, callback) else: # granular (default) results = [] for i, frame in enumerate(frames): if callback: if analysis_depth == "cumulative": callback(i, len(frames) / 2) else: callback(i, len(frames)) result = self.analyze_frame(frame, prompt) results.append(result) return results