import streamlit as st import os import time from yt_dlp import YoutubeDL import ffmpeg import tempfile import pytesseract from PIL import Image from utils import (save_uploaded_file, extract_audio, download_youtube, get_session_dir, cleanup_session_files, get_session_id, get_temp_dir, get_features, proc_raw_audio, extract_frames_interval, extract_frames_pyscenedetect) st.title('📥 Загрузка видео') # Initialize session state defaults defaults = { 'uploaded_file': None, 'video_path': None, 'audio_path': None, 'ocr_text': None, 'transcript': None, 'summary': None, 'main_topic': None, 'input_method': 'Файл', 'input_title': None, 'video_input_path': None, 'video_url': None, 'secret_api': None, 'secret_prompt': None, # 'audio_wav': None, # 'audio_file': None, } for key, value in defaults.items(): st.session_state.setdefault(key, value) # --- Option to clear previous session --- # st.sidebar.write('Current Session ID:') # st.sidebar.write(f'`{get_session_id()}`') # session ID for debugging if st.sidebar.button('Очистить сессию'): session_id = get_session_id() # get current ID before clearing cleanup_session_files(session_id) for key in list(st.session_state.keys()): del st.session_state[key] # clear all session state st.rerun() # rerun the script to reflect cleared state col_input_method, col_main_topic = st.columns([2, 6]) # --- Video source selection --- input_method = col_input_method.radio( 'Выберите способ загрузки:', ('Файл', 'Ссылка YouTube'), key='input_method', horizontal=True ) # --- Main Topic --- st.session_state.main_topic = col_main_topic.text_input('(опционально) Укажите тему лекции:', st.session_state.main_topic) # col_url, col_start_from = st.columns([5, 2]) # video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) # start_from = col_start_from.number_input( # 'Start From:', # min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], # help='Time shift from the beginning (in seconds)' # ) # if video_url: # st.session_state.video_url = video_url # st.session_state.video_input_path = '' # clear path if URL is used video_path = None uploaded_file = None video_url = None if input_method == 'Файл': uploaded_file = st.file_uploader( 'Выберите видеофайл', type=['mp4', 'avi', 'mkv', 'mov'] ) if uploaded_file: col_info, col_ready = st.columns(2) # Display basic file info # col_info.badge(f'файл: `{uploaded_file.name}` | ' + # f'тип: `{uploaded_file.type}` | ' + # f'размер: `{uploaded_file.size / (1024 * 1024):.2f} MB`') # Save uploaded file temporarily for the Prefect flow temp_dir = get_temp_dir() # use a shared temp location # Use a unique name to avoid conflicts if multiple users run simultaneously target_path = os.path.join(temp_dir, f'upload_{get_session_id()}_{uploaded_file.name}') try: with open(target_path, 'wb') as f: f.write(uploaded_file.getbuffer()) st.session_state.video_input_path = target_path st.session_state.video_input_title = uploaded_file.name st.session_state.video_url = '' # clear URL if file is uploaded st.session_state.transcript = None st.session_state.summary = None # col_info.info('Готово к предобработке.') except Exception as e: col_ready.error(f'Error saving uploaded file: {e}') st.session_state.video_input_path = '' elif input_method == 'Ссылка YouTube': #-- Obtain audio from YouTube video example_youtube = { 'title': 'Общественное движение', 'url': 'https://www.youtube.com/watch?v=c3bhkrKF6F4', 'start': 0.0 } col_url, col_start_from = st.columns([5, 2]) video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) start_from = col_start_from.number_input( 'Начать с сек.:', min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], help='Сдвиг по времени, с которого начинается лекция' ) if video_url: st.session_state.video_url = video_url st.session_state.video_input_path = '' # clear path if URL is used @st.cache_resource def ui_processed_sound(audio_wav, audio_np): '''UI to show sound processing results''' st.audio(audio_wav) features = get_features(audio_np) @st.cache_resource def extract_videofile(video_file): # video_buffer = BytesIO(video_file.read()) # audio_data = VideoFileClip(video_buffer.name).audio # raw_source = StringIO(video_file.getvalue().decode('utf-8')) # raw_source = video_file.getvalue().decode('utf-8') # raw_source = video_file.read() # raw_source = BytesIO(video_file.getvalue()) #-- Get video # out, err = ( # ffmpeg # .input(video_file, ss=start_from) # .output('temp.mp4', vcodec='copy') # .overwrite_output() # .run() # ) # st.video('temp.mp4') # video = VideoFileClip(video_file) # audio = video.audio # audio.write_audiofile('output_audio.mp3') tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(video_file.read()) #-- Get audio # SAMPLE_RATE = 16000 audio_data, err = ( ffmpeg .input(tfile.name, ss=start_from) .output('pipe:', format='wav')#, acodec='pcm_s16le') # .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) # .global_args('-nostdin', '-threads', '0') .run(capture_stdout=True) ) if err: raise RuntimeError(f'Failed to load audio: {err.decode()}') return audio_data @st.cache_resource def extract_youtube(raw_url): #-- Get video # out, err = ( # ffmpeg # .input(raw_url, ss=start_from) # .output('temp.mp4', vcodec='copy') # .overwrite_output() # .run() # ) # st.video('temp.mp4') #-- Get audio # SAMPLE_RATE = 16000 audio_data, err = ( ffmpeg .input(raw_url, ss=start_from) .output('pipe:', format='wav')#, acodec='pcm_s16le') # .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) .global_args('-nostdin', '-threads', '0') .run(capture_stdout=True) ) if err: raise RuntimeError(f'Failed to load audio: {err.decode()}') return audio_data # --- Processing Button --- _, col_button_process, _ = st.columns([2, 1, 2]) if col_button_process.button('Подготовить видео', type='primary', use_container_width=True, disabled=not (st.session_state.video_input_path or st.session_state.video_url or uploaded_file) ): # Clear previous paths if reprocessing st.session_state['video_path'] = None st.session_state['audio_path'] = None col_info, col_complete, col_next = st.columns(3) with st.spinner('Обрабатываем видео..'): if st.session_state['input_method'] == 'Файл' and uploaded_file: st.session_state.uploaded_file = uploaded_file video = uploaded_file # audio_data = extract_videofile(uploaded_file) saved_path = save_uploaded_file(uploaded_file) if saved_path: st.session_state['video_path'] = saved_path col_info.success(f'Видео временно сохранено в: `{os.path.basename(saved_path)}`') else: col_info.error('Failed to save uploaded file') elif st.session_state['input_method'] == 'Ссылка YouTube' and video_url: try: with YoutubeDL({'format': 'best+bestaudio'}) as ydl: info = ydl.extract_info(video_url, download=False) except Exception as e: st.error(e) else: d = info['duration'] h, m, s = d // 3600, (d % 3600) // 60, d % 60 time_str = [] if h: time_str.append(f'{h}h') if m: time_str.append(f'{m}m') if s or not time_str: time_str.append(f'{s}s') time_str = ' '.join(time_str) st.write(f"
\ **Title:** [{info['title']}]({video_url})\ **Duration:** {info['duration']} sec.
", unsafe_allow_html=True) video = video_url # audio_data = extract_youtube(info['url']) st.session_state.video_input_title = info['title'] session_dir = get_session_dir() os.makedirs(session_dir, exist_ok=True) downloaded_path = download_youtube(video_url, session_dir) if downloaded_path and os.path.exists(downloaded_path): st.session_state['video_path'] = downloaded_path col_info.success(f'YouTube видео скачано в: {os.path.basename(downloaded_path)}') else: col_info.error('Failed to download YouTube video') else: st.warning('Пожалуйста, загрузите видеофайл или предоставьте ссылку YouTube') st.stop() # # ----------------------------------- AUDIO EXTRACTION # if st.session_state.video_path: # st.write('Extracting audio..') start = time.time() # Ensure utils.extract_audio uses the correct path audio_path = extract_audio(st.session_state['video_path'], audio_format='mp3') # audio_path = extract_audio(st.session_state['video_path']) end = time.time() if audio_path and os.path.exists(audio_path): col_info.success(f'Аудиодорожка сохранена в: {os.path.basename(audio_path)} (за {end - start:.2f}сек)') st.session_state['audio_path'] = audio_path else: col_info.error('Failed to extract audio from the video') st.warning('Proceeding without audio. STT step will be skipped') st.session_state['audio_path'] = None # explicitly set to None # # ----------------------------------- FRAMES EXTRACTION # if st.session_state.video_path: extraction_method = 'pyscenedetect' extraction_threshold = 1.5 ocr_lang = 'rus' st.session_state['frames_paths'] = [] with st.spinner(f'Выделяем информативные кадры..'): start_time = time.time() frames_dir, frame_paths = extract_frames_pyscenedetect(st.session_state.video_path, 'frames_pyscenedetect', threshold=extraction_threshold) extract_time = time.time() - start_time if frames_dir and frame_paths: st.session_state['frames_dir'] = frames_dir st.session_state['frames_paths'] = frame_paths # store paths col_info.success(f'Выделено {len(frame_paths)} кадров (заняло {int(extract_time)} сек)') else: col_info.error('Failed to extract frames') st.stop() if st.session_state['frames_paths']: total_frames = len(st.session_state['frames_paths']) # col_info.write(f'Performing OCR on {total_frames} frames..') # ocr_progress = st.progress(0) start_ocr_time = time.time() extracted_texts = [] processed_count = 0 # Use columns to display some example frames max_display_frames = 6 display_cols = st.columns(min(max_display_frames, total_frames) if total_frames > 0 else 1) display_idx = 0 # Process frames in batches or one by one for i, frame_path in enumerate(st.session_state['frames_paths']): img = Image.open(frame_path) # Extract timestamp from filename (assuming format frame_XXXXXX.png) try: secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) extracted_texts.append({'timestamp': timestamp, 'image': img}) except: extracted_texts.append({'timestamp': 'N/A', 'image': img}) # fallback if filename parse fails # Display some examples if display_idx < max_display_frames and display_idx < len(display_cols): with display_cols[display_idx]: st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) display_idx += 1 processed_count += 1 if st.session_state.video_path: col_complete.info('Предобработка видео завершена.') col_next.page_link('ui_create_summary.py', label='Можно переходить в ✨ **Кабинет методиста**', icon='➡️') # Display video _, col_preview, _ = st.columns([1, 3, 1]) with col_preview.container(border=True): # col_preview_description.subheader('Превью') # _, col_video, _ = st.columns([1, 3, 1]) # col_video.video(video) st.video(video) st.columns([3, 2, 3])[1].page_link('ui_create_summary.py', label='Перейти в ✨ **Кабинет методиста**', icon='➡️', use_container_width=True)