import streamlit as st import os import time from yt_dlp import YoutubeDL import ffmpeg import tempfile from utils import (save_uploaded_file, extract_audio, download_youtube, get_session_dir, cleanup_session_files, get_session_id, get_temp_dir, get_features, proc_raw_audio) st.title('📥📄 Step 1: Upload Video & Preprocess') # Initialize session state defaults defaults = { 'uploaded_file': None, 'video_path': None, 'audio_path': None, 'ocr_text': None, 'transcript': None, 'summary': None, 'main_topic': None, 'input_method': 'Upload', 'input_title': None, 'video_input_path': None, 'video_url': None, # 'audio_wav': None, # 'audio_file': None, } for key, value in defaults.items(): st.session_state.setdefault(key, value) # --- Option to clear previous session --- st.sidebar.write('Current Session ID:') st.sidebar.write(f'`{get_session_id()}`') # session ID for debugging if st.sidebar.button('Start New Session'): session_id = get_session_id() # get current ID before clearing cleanup_session_files(session_id) for key in list(st.session_state.keys()): del st.session_state[key] # clear all session state st.rerun() # rerun the script to reflect cleared state # --- Main Topic --- st.session_state.main_topic = st.text_input('(optional) Provide video topic:', st.session_state.main_topic) # st.session_state.main_topic = m # col_url, col_start_from = st.columns([5, 2]) # video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) # start_from = col_start_from.number_input( # 'Start From:', # min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], # help='Time shift from the beginning (in seconds)' # ) # if video_url: # st.session_state.video_url = video_url # st.session_state.video_input_path = '' # clear path if URL is used # --- Video source selection --- input_method = st.radio( 'Select Input Method:', ('Upload', 'YouTube'), key='input_method', horizontal=True ) video_path = None uploaded_file = None video_url = None if input_method == 'Upload': uploaded_file = st.file_uploader( 'Choose a video file', type=['mp4', 'avi', 'mkv', 'mov'] ) if uploaded_file: col_info, col_ready = st.columns(2) # Display basic file info col_info.info('**[ File Details ]** ' + f'name: `{uploaded_file.name}` | ' + f'type: `{uploaded_file.type}` | ' + f'size: `{uploaded_file.size / (1024 * 1024):.2f} MB`') # Save uploaded file temporarily for the Prefect flow temp_dir = get_temp_dir() # use a shared temp location # Use a unique name to avoid conflicts if multiple users run simultaneously target_path = os.path.join(temp_dir, f'upload_{get_session_id()}_{uploaded_file.name}') try: with open(target_path, 'wb') as f: f.write(uploaded_file.getbuffer()) st.session_state.video_input_path = target_path st.session_state.video_input_title = uploaded_file.name st.session_state.video_url = '' # clear URL if file is uploaded st.session_state.transcript = None st.session_state.summary = None col_ready.info('Ready for processing.') except Exception as e: col_ready.error(f'Error saving uploaded file: {e}') st.session_state.video_input_path = '' elif input_method == 'YouTube': #-- Obtain audio from YouTube video example_youtube = { 'title': 'Общественное движение', 'url': 'https://www.youtube.com/watch?v=c3bhkrKF6F4', 'start': 0.0 } col_url, col_start_from = st.columns([5, 2]) video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) start_from = col_start_from.number_input( 'Start From:', min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], help='Time shift from the beginning (in seconds)' ) if video_url: st.session_state.video_url = video_url st.session_state.video_input_path = '' # clear path if URL is used @st.cache_resource def ui_processed_sound(audio_wav, audio_np): '''UI to show sound processing results''' st.audio(audio_wav) features = get_features(audio_np) @st.cache_resource def extract_videofile(video_file): # video_buffer = BytesIO(video_file.read()) # audio_data = VideoFileClip(video_buffer.name).audio # raw_source = StringIO(video_file.getvalue().decode('utf-8')) # raw_source = video_file.getvalue().decode('utf-8') # raw_source = video_file.read() # raw_source = BytesIO(video_file.getvalue()) #-- Get video # out, err = ( # ffmpeg # .input(video_file, ss=start_from) # .output('temp.mp4', vcodec='copy') # .overwrite_output() # .run() # ) # st.video('temp.mp4') # video = VideoFileClip(video_file) # audio = video.audio # audio.write_audiofile('output_audio.mp3') tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(video_file.read()) #-- Get audio # SAMPLE_RATE = 16000 audio_data, err = ( ffmpeg .input(tfile.name, ss=start_from) .output('pipe:', format='wav')#, acodec='pcm_s16le') # .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) # .global_args('-nostdin', '-threads', '0') .run(capture_stdout=True) ) if err: raise RuntimeError(f'Failed to load audio: {err.decode()}') return audio_data @st.cache_resource def extract_youtube(raw_url): #-- Get video # out, err = ( # ffmpeg # .input(raw_url, ss=start_from) # .output('temp.mp4', vcodec='copy') # .overwrite_output() # .run() # ) # st.video('temp.mp4') #-- Get audio # SAMPLE_RATE = 16000 audio_data, err = ( ffmpeg .input(raw_url, ss=start_from) .output('pipe:', format='wav')#, acodec='pcm_s16le') # .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) .global_args('-nostdin', '-threads', '0') .run(capture_stdout=True) ) if err: raise RuntimeError(f'Failed to load audio: {err.decode()}') return audio_data # --- Processing Button --- _, col_button_process, _ = st.columns([2, 1, 2]) if col_button_process.button('Process video', type='primary', use_container_width=True, disabled=not (st.session_state.video_input_path or st.session_state.video_url) ): # Clear previous paths if reprocessing st.session_state['video_path'] = None st.session_state['audio_path'] = None col_info, col_complete, col_next = st.columns(3) with st.spinner('Processing video input..'): if st.session_state['input_method'] == 'Upload' and uploaded_file: st.session_state.uploaded_file = uploaded_file video = uploaded_file # audio_data = extract_videofile(uploaded_file) saved_path = save_uploaded_file(uploaded_file) if saved_path: st.session_state['video_path'] = saved_path col_info.success(f'Video saved temporarily to: {os.path.basename(saved_path)}') else: col_info.error('Failed to save uploaded file') elif st.session_state['input_method'] == 'YouTube' and video_url: try: with YoutubeDL({'format': 'best+bestaudio'}) as ydl: info = ydl.extract_info(video_url, download=False) except Exception as e: st.error(e) else: d = info['duration'] h, m, s = d // 3600, (d % 3600) // 60, d % 60 time_str = [] if h: time_str.append(f'{h}h') if m: time_str.append(f'{m}m') if s or not time_str: time_str.append(f'{s}s') time_str = ' '.join(time_str) st.write(f"
\ **Title:** [{info['title']}]({video_url})\ **Duration:** {info['duration']} sec.
", unsafe_allow_html=True) video = video_url # audio_data = extract_youtube(info['url']) st.session_state.video_input_title = info['title'] session_dir = get_session_dir() os.makedirs(session_dir, exist_ok=True) downloaded_path = download_youtube(video_url, session_dir) if downloaded_path and os.path.exists(downloaded_path): st.session_state['video_path'] = downloaded_path col_info.success(f'YouTube video downloaded: {os.path.basename(downloaded_path)}') else: col_info.error('Failed to download YouTube video') else: st.warning('Please upload a file or provide a YouTube URL') st.stop() # --- Basic Preprocessing: Audio Extraction --- if st.session_state['video_path']: # st.write('Extracting audio..') start = time.time() # Ensure utils.extract_audio uses the correct path audio_path = extract_audio(st.session_state['video_path'], audio_format='mp3') # audio_path = extract_audio(st.session_state['video_path']) end = time.time() if audio_path and os.path.exists(audio_path): col_info.success(f'Audio extracted to: {os.path.basename(audio_path)} (took {end - start:.2f}s)') st.session_state['audio_path'] = audio_path else: col_info.error('Failed to extract audio from the video') st.warning('Proceeding without audio. STT step will be skipped') st.session_state['audio_path'] = None # explicitly set to None if st.session_state['video_path']: col_complete.info('Preprocessing complete') col_next.page_link('ui_transcribe.py', label='Next Step: 🎙️ **Transcribe**', icon='➡️') # Display video with st.container(height=300, border=False): _, col_preview_description, _ = st.columns([1, 3, 1]) col_preview_description.subheader('Preview Video') _, col_video, _ = st.columns([1, 3, 1]) col_video.video(video) # audio_data = audio_path # audio_wav, audio_np = proc_raw_audio(audio_data) # st.session_state.audio_wav = audio_wav # st.session_state.audio_np = audio_np # # st.session_state.video = video.read() # ui_processed_sound(audio_wav, audio_np) # # Display current status # st.subheader("Current Status:") # if st.session_state.get('video_path'): # st.success(f"✅ Video Loaded: {os.path.basename(st.session_state['video_path'])}") # else: # st.warning("⏳ Video not yet loaded or processed.") # if st.session_state.get('audio_path'): # st.success(f"✅ Audio Extracted: {os.path.basename(st.session_state['audio_path'])}") # elif st.session_state.get('video_path'): # only show warning if video was loaded but audio failed # st.warning("⚠️ Audio extraction failed or video has no audio track.")