import streamlit as st import os import tempfile import gdown import uuid import tomllib from pathlib import Path # import ffmpeg from moviepy.video.io.VideoFileClip import VideoFileClip import cv2 import numpy as np from io import BytesIO from pydub import AudioSegment from pydub.silence import detect_leading_silence import librosa import librosa.display as lbd import matplotlib.pyplot as plt TEMP_DIR = tempfile.mkdtemp() CONFIG_FILE = 'app_config.toml' def load_config(): '''Loads configuration from app_config.toml''' try: with open(CONFIG_FILE, 'rb') as f: return tomllib.load(f) except FileNotFoundError: print(f'Error: {CONFIG_FILE} not found. Using default settings.') # Provide default fallback config if needed return { "paths": {"output_dir": "output", "temp_dir": "temp_processing"}, "models": {"whisper_model": "base.en", "ocr_languages": ["en"], "summarization_model": "google/pegasus-xsum"}, "settings": {"frame_extraction_interval_seconds": 10, "max_summary_length": 500, "min_summary_length": 100} } except Exception as e: print(f'Error loading config: {e}') raise # Re-raise after printing CONFIG = load_config() def ensure_dir(directory_path): """Creates a directory if it doesn't exist.""" Path(directory_path).mkdir(parents=True, exist_ok=True) def get_secret_api(): with tempfile.NamedTemporaryFile(delete=False) as tmp: gdown.download(id=CONFIG['links']['secret_api_id'], output=tmp.name, quiet=True, fuzzy=True, use_cookies=True) tmp.seek(0) secret_api = tmp.read().decode('utf-8') tmp_path = tmp.name tmp.close() os.remove(tmp_path) return secret_api def get_secret_prompt(): with tempfile.NamedTemporaryFile(delete=False) as tmp: gdown.download(id=CONFIG['links']['secret_prompt_id'], output=tmp.name, quiet=True) tmp.seek(0) secret_prompt = tmp.read().decode('utf-8') tmp_path = tmp.name tmp.close() os.remove(tmp_path) return secret_prompt def save_uploaded_file(uploaded_file): """Saves an uploaded file to a temporary directory.""" if uploaded_file is not None: # Generate a unique sub-directory for this upload session_id = get_session_id() # simple way to group files per session/upload upload_dir = os.path.join(TEMP_DIR, session_id) os.makedirs(upload_dir, exist_ok=True) file_path = os.path.join(upload_dir, uploaded_file.name) with open(file_path, 'wb') as f: f.write(uploaded_file.getbuffer()) print(f'File saved to: {file_path}') # debugging return file_path return None def get_session_id(): """Generates or retrieves a unique session ID.""" if 'session_id' not in st.session_state: st.session_state['session_id'] = str(uuid.uuid4())[:8] return st.session_state['session_id'] def get_session_dir(): """Gets the temporary directory path for the current session.""" session_id = get_session_id() return os.path.join(TEMP_DIR, session_id) def get_temp_dir(): """Creates and returns the path to a temporary directory for processing.""" temp_dir = Path(CONFIG['paths']['temp_dir']) ensure_dir(temp_dir) # Consider using unique subdirs per run if needed # processing_subdir = tempfile.mkdtemp(dir=temp_dir) # return processing_subdir return str(temp_dir) # Return as string for wider compatibility def extract_audio(video_path, audio_format='wav'): """Extracts audio from video using moviepy.""" try: session_dir = os.path.dirname(video_path) # assumes video is in session dir base_name = os.path.splitext(os.path.basename(video_path))[0] audio_filename = f"{base_name}_audio.{audio_format}" audio_path = os.path.join(session_dir, audio_filename) if os.path.exists(audio_path): print(f"Audio file already exists: {audio_path}") return audio_path print(f"Extracting audio from {video_path} to {audio_path}...") video_clip = VideoFileClip(video_path) audio_clip = video_clip.audio if audio_clip is None: print("No audio track found in the video.") video_clip.close() return None audio_clip.write_audiofile(audio_path, codec='pcm_s16le' if audio_format == 'wav' else 'mp3') # WAV is often better for STT audio_clip.close() video_clip.close() print("Audio extraction complete.") return audio_path except Exception as e: print(f"Error extracting audio: {e}") # Clean up potentially corrupted file if 'audio_clip' in locals() and audio_clip: audio_clip.close() if 'video_clip' in locals() and video_clip: video_clip.close() # Attempt to remove partial file if creation failed mid-way if os.path.exists(audio_path): try: os.remove(audio_path) except OSError as rm_e: print(f"Could not remove partial audio file {audio_path}: {rm_e}") return None from scenedetect import open_video, SceneManager from scenedetect.detectors import ContentDetector def extract_frames_pyscenedetect(video_path, output_dir, threshold=2.0): # session_dir = os.path.dirname(video_path) # frames_dir = os.path.join(session_dir, 'frames_pyscenedetect') # os.makedirs(frames_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) # ensure the output dir exists # Init video- and scene- managers # video_manager = VideoManager([video_path]) video = open_video(video_path) scene_manager = SceneManager() scene_manager.add_detector(ContentDetector(threshold=threshold)) # Start analysis # video_manager.set_downscale_factor() # video_manager.start() # scene_manager.detect_scenes(frame_source=video_manager) scene_manager.detect_scenes(video) print(scene_manager.get_scene_list()) # Get the scene list scene_list = scene_manager.get_scene_list() print(f'Обнаружено {len(scene_list)} смен сцен.') # Save the scenes switch frames cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f'Error: Could not open video file {video_path}') return None extracted_frame_paths = [] for i, (start_time, _) in enumerate(scene_list): frame_num = start_time.get_frames() cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) success, frame = cap.read() if success: timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC) # frame_filename = f'scene_{i + 1:03d}.jpg' # frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.png' # naming by seconds frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.jpg' # naming by seconds frame_path = os.path.join(output_dir, frame_filename) cv2.imwrite(frame_path, frame) print(f'[*] Сохранён кадр {frame_num} в {frame_path}') extracted_frame_paths.append(frame_path) else: print(f'[!] Ошибка при чтении кадра {frame_num}') cap.release() return output_dir, extracted_frame_paths print(f'Extracted {len(extracted_frame_paths)} frames to {output_dir}.') return output_dir, extracted_frame_paths def extract_frames_interval(video_path, output_dir, interval_sec=5): '''Extracts frames from video at specified intervals using OpenCV.''' try: # session_dir = os.path.dirname(video_path) # frames_dir = os.path.join(session_dir, 'frames_interval') # os.makedirs(frames_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True) # ensure the output dir exists print(f'Extracting frames from {video_path} every {interval_sec}s..') cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f'Error: Could not open video file {video_path}') return None fps = cap.get(cv2.CAP_PROP_FPS) if fps == 0: print('Warning: Could not get FPS, defaulting to 30.') fps = 30 # provide a default if FPS is not available frame_interval = int(fps * interval_sec) frame_count = 0 extracted_frame_paths = [] def extract_frame(): timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC) frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.png' # naming by seconds frame_path = os.path.join(output_dir, frame_filename) cv2.imwrite(frame_path, frame) extracted_frame_paths.append(frame_path) success = True while success: if frame_count % frame_interval == 0: success, frame = cap.read() if success: extract_frame() else: # Skip frames efficiently without decoding for _ in range(frame_interval - 1): success = cap.grab() if not success: break frame_count += 1 # Now read the desired frame if grab was successful if success: success, frame = cap.retrieve() if success: extract_frame() else: # Handle case where retrieve fails after grab print(f'Warning: Failed to retrieve frame after grab at frame count {frame_count}') frame_count += 1 cap.release() print(f'Extracted {len(extracted_frame_paths)} frames to {output_dir}.') return output_dir, extracted_frame_paths except Exception as e: print(f'Error extracting frames: {e}') if 'cap' in locals() and cap.isOpened(): cap.release() return None, [] # --- Add other potential helpers: yt-dlp download, file cleanup etc. --- def download_youtube(url, output_dir): """Downloads YouTube video using yt-dlp.""" import yt_dlp ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'), 'noplaylist': True, # download only single video if URL is part of playlist 'progress_hooks': [lambda d: print(d['status'])] # basic progress } try: print(f'Attempting to download YouTube video: {url}') with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) # Try to get the downloaded filename filename = ydl.prepare_filename(info) print(f"YouTube video downloaded to: {filename}") return filename except Exception as e: print(f"Error downloading YouTube video: {e}") return None def cleanup_session_files(session_id): """Removes the temporary directory for a given session.""" session_dir = os.path.join(TEMP_DIR, session_id) if os.path.exists(session_dir): import shutil try: shutil.rmtree(session_dir) print(f"Cleaned up temporary files for session: {session_id}") except Exception as e: print(f"Error cleaning up session files {session_dir}: {e}") ### ###=== Audio Loading and Processing ### SAMPLE_RATE = 22050 DURATION = 5 n_mfcc = 13 # number of MFCCs to extract from each sample n_mels = 128 n_fft = 2048 hop_length = 512 delta_width = 9 # MFCC Delta parameter def trim_silence(sound, s_thresh=-28.0): '''Trims silent chunks from beginning and end of the sound''' duration = len(sound) start_trim = detect_leading_silence(sound, s_thresh) end_trim = detect_leading_silence(sound.reverse(), s_thresh) start = start_trim if start_trim != duration else None end = duration - end_trim if end_trim != duration else None return sound[start:end] def normalize_volume(sound, target_dBFS=-20.0): '''Normalizes sound and shifts to specified loudness''' sound = sound.normalize() difference = target_dBFS - sound.dBFS return sound.apply_gain(difference) def proc_raw_audio(audio_data, from_start=0, duration=None, before_end=0): '''Processes raw audio data and return wav and numpy arrays''' # Instanciate pydub AudioSegment object from raw audio audioObj = AudioSegment.from_file(BytesIO(audio_data)) # Convert to mono mode with the desired sample rate audioObj = audioObj.set_frame_rate(SAMPLE_RATE).set_channels(1) # Normalize audio volume audioObj = normalize_volume(audioObj) # Trim by removing silence from beginning and end of the sound audioObj = trim_silence(audioObj) # Cut to the desired duration start = from_start * 1000 if duration: end = start + duration * 1000 else: end = len(audioObj) - before_end * 1000 audioObj = audioObj[start:end] # Convert AudioSegment to wav format instance buf = BytesIO() audioObj.export(buf, format='wav') audio_wav = buf.getvalue() # Convert the AudioSegment to signal in form of numpy.array arr = audioObj.get_array_of_samples() audio_np = np.array(arr, dtype='float') # Normalize if specified # if normalized: # audio_np = np.array(arr) / np.iinfo(arr.typecode).max # y /= np.linalg.norm(y) # return y, sample_rate return audio_wav, audio_np ###============================================== def obtain_features(y, sr=22050, duration=5, delta_width=9): '''Extracts sound features from given signal and returns them as a numpy array''' # --- MFCC (returns M: np.ndarray [shape=(n_mfcc, t)]) mfcc = librosa.feature.mfcc(y, sr, n_mfcc=n_mfcc, n_mels=n_mels, n_fft=n_fft, hop_length=hop_length) return mfcc def create_features_array(mfcc):#, mfcc_delta1, mfcc_delta2, spectr_c, spectr_r): '''Creates wholistic numpy array of means and variances out of given features''' make_meanvar = lambda mean, var: [item for mv in zip(mean, var) for item in mv] mean_var_ops = [ (mfcc.mean(axis=1), mfcc.var(axis=1)) ] mfcc_meanvars = sum([make_meanvar(mean, var) for mean, var in mean_var_ops], []) # features_array = mfcc_meanvars + spectr_meanvars features_array = [mfcc_meanvars] return features_array # def get_features(y, sr=22050, duration=5, delta_width=9): # '''Returns numpy array of sound features obtained from signal''' # return create_features_array(*obtain_features(y, sr, duration, delta_width)) def get_features(y, duration=5, sr=SAMPLE_RATE): '''Returns numpy array of sound features obtained from signal''' fig, axes = plt.subplots(1, 2, figsize=(24, 2)) # WAVE PLOT axes[0].set_title(f'Wave Plot for audio sample at {sr} hz') axes[0].set_facecolor('#B4E8CF') lbd.waveshow(y, sr=sr, color='#4300FF', ax=axes[0]) # MELSPEC melspec = librosa.feature.melspectrogram(y=y, sr=sr) melspec = librosa.power_to_db(np.abs(melspec), ref=np.max) axes[1].set_title(f'Mel Spectogram | shape: {melspec.shape}') lbd.specshow(melspec, cmap='viridis', y_axis='mel', x_axis='time', ax=axes[1]) st.pyplot(fig) pad_signal = lambda s, v: np.pad( s, [(0, 0), (0, max(0, 216 - s.shape[1]))], constant_values=v ) # Prepare melspec for use melspec = pad_signal(melspec, melspec.min()) melspec = melspec.reshape(1, *melspec.shape) # MFCC # mfcc = create_features_array(obtain_features(y, sr, duration, delta_width)) # mfcc = np.array(mfcc).reshape(1, -1) return melspec # return mfcc