import streamlit as st import os import pytesseract from PIL import Image import time from utils import extract_frames_interval, extract_frames_pyscenedetect st.title('🖼️ Step 3: Video Processing (Frame Extraction & OCR)') # Check if video path exists if ('video_path' not in st.session_state or not st.session_state['video_path'] or not os.path.exists(st.session_state['video_path']) ): st.warning('Video file not found. Please go back to the **📤 Upload** page and process a video first.') st.stop() video_path = st.session_state['video_path'] st.write(f'Video file to process: `{os.path.basename(video_path)}`') # # ================================================================== # col_method, col_config = st.columns(2) # --- Method --- # with col_model.expander('**MODEL**', expanded=True): with col_method.container(border=True): # extraction_method = st.selectbox( # 'Extraction method:', # ('interval', 'video2slides', 'pyscenedetect'), # index=0 # ) extraction_method = st.radio( 'Extraction method:', ('interval', 'video2slides', 'pyscenedetect'), index=0, horizontal=True, ) # col_config_frame_interval, col_config_ocr_lang = st.columns(2) # frame_interval = col_config_frame_interval.slider('Extract frames every `X` seconds:', min_value=1, max_value=60, value=5, step=1) # ocr_lang = col_config_ocr_lang.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus') ocr_lang = st.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus') # --- Configuration --- with col_config.expander(f'**`{extraction_method}` METHOD CONFIG**', expanded=True): match extraction_method: case 'interval': extraction_interval = st.number_input( 'Frames extraction interval:', min_value=0, max_value=25, step=1, format='%i', value=5, help='Extract frames every `x` seconds' ) case 'video2slides': print('video2slides') case 'pyscenedetect': extraction_threshold = st.number_input( 'Frames extraction threshold:', min_value=0.1, max_value=10.0, step=0.1, format='%f', value=1.5, ) # --- Semantic Segmentation Placeholder --- # st.markdown("---") # --- Tesseract Configuration (Optional but recommended) --- # Uncomment and set the path if tesseract is not in your PATH # pytesseract.pytesseract.tesseract_cmd = r'/path/to/your/tesseract' # Example: '/usr/bin/tesseract' or 'C:\Program Files\Tesseract-OCR\tesseract.exe' # # --- Frame Extraction and OCR --- # st.subheader('OCR') _, col_button_extract, _ = st.columns([2, 1, 2]) if col_button_extract.button('Extract Frames', type='primary', use_container_width=True): # st.session_state['ocr_text'] = None # clear previous results st.session_state['frames_paths'] = [] # all_ocr_results = [] col_info, col_complete, col_next = st.columns(3) match extraction_method: case 'interval': with st.spinner(f'Extracting frames every {extraction_interval} seconds (using interval method)..'): start_time = time.time() frames_dir, frame_paths = extract_frames_interval(video_path, 'frames_pyscenedetect', interval_sec=extraction_interval) extract_time = time.time() - start_time if frames_dir and frame_paths: st.session_state['frames_dir'] = frames_dir st.session_state['frames_paths'] = frame_paths # store paths col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.') else: col_info.error('Failed to extract frames') st.stop() case 'video2slides': pass case 'pyscenedetect': with st.spinner(f'Extracting frames with `threshold={extraction_threshold}` (using pyscenedetect method)..'): start_time = time.time() frames_dir, frame_paths = extract_frames_pyscenedetect(video_path, 'frames_pyscenedetect', threshold=extraction_threshold) extract_time = time.time() - start_time if frames_dir and frame_paths: st.session_state['frames_dir'] = frames_dir st.session_state['frames_paths'] = frame_paths # store paths col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.') else: col_info.error('Failed to extract frames') st.stop() if st.session_state['frames_paths']: total_frames = len(st.session_state['frames_paths']) # col_info.write(f'Performing OCR on {total_frames} frames..') # ocr_progress = st.progress(0) start_ocr_time = time.time() extracted_texts = [] processed_count = 0 # Use columns to display some example frames max_display_frames = 6 display_cols = st.columns(min(max_display_frames, total_frames) if total_frames > 0 else 1) display_idx = 0 # Process frames in batches or one by one for i, frame_path in enumerate(st.session_state['frames_paths']): img = Image.open(frame_path) # Extract timestamp from filename (assuming format frame_XXXXXX.png) try: secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) extracted_texts.append({'timestamp': timestamp, 'image': img}) except: extracted_texts.append({'timestamp': 'N/A', 'image': img}) # fallback if filename parse fails # Display some examples if display_idx < max_display_frames and display_idx < len(display_cols): with display_cols[display_idx]: st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) display_idx += 1 processed_count += 1 # ocr_progress.progress(processed_count / total_frames) # # Process frames in batches or one by one # for i, frame_path in enumerate(st.session_state['frames_paths']): # try: # img = Image.open(frame_path) # # --- Potential Preprocessing/Filtering --- # # Add logic here if needed: # # - Detect if frame likely contains text (e.g., check contrast, edges) # # - If segmentation was implemented, crop to slide regions here # # --- Perform OCR --- # text = pytesseract.image_to_string(img, lang=ocr_lang) # # --- Basic Text Cleaning/Filtering --- # cleaned_text = text.strip() # if cleaned_text and len(cleaned_text) > 10: # filter very short/noisy results # # Extract timestamp from filename (assuming format frame_XXXXXX.png) # try: # secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) # timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) # extracted_texts.append({'timestamp': timestamp, 'text': cleaned_text}) # except: # extracted_texts.append({'timestamp': 'N/A', 'text': cleaned_text}) # fallback if filename parse fails # # Display some examples # if display_idx < max_display_frames and display_idx < len(display_cols): # with display_cols[display_idx]: # st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) # st.text(f'OCR:\n{cleaned_text[:100]}..') # show snippet # display_idx += 1 # processed_count += 1 # ocr_progress.progress(processed_count / total_frames) # except Exception as ocr_err: # col_info.warning(f'Could not perform OCR on {os.path.basename(frame_path)}: {ocr_err}') # processed_count += 1 # still count as processed # ocr_progress.progress(processed_count / total_frames) # ocr_time = time.time() - start_ocr_time # col_complete.success(f'OCR processing finished in {ocr_time:.2f}s.') # # --- Aggregate and Deduplicate OCR Text --- # # Simple approach: Combine unique text blocks # final_ocr_text = "" # seen_texts = set() # last_text = "" # min_similarity_threshold = 0.8 # requires a library like `thefuzz` or similar for proper check # # basic check: avoid exact consecutive duplicates # for item in extracted_texts: # current_text_block = item['text'].strip() # # Basic check: Only add if significantly different from the last block # # A more robust check would involve sequence matching or fuzzy matching # is_duplicate = False # if last_text: # # Simple check: exact match or near-exact length/content start? # if (current_text_block == last_text or # (abs(len(current_text_block) - len(last_text)) < 10 and # current_text_block.startswith(last_text[:20])) # ): # is_duplicate = True # likely a duplicate from consecutive frames # if current_text_block and not is_duplicate: # only add non-empty, non-duplicate text # final_ocr_text += f"\n\n--- Text from frame around {item['timestamp']} ---\n" # final_ocr_text += current_text_block # last_text = current_text_block # update last text added # st.session_state['ocr_text'] = final_ocr_text.strip() # if st.session_state['ocr_text']: # col_complete.info('OCR processing complete.') # col_next.page_link('ui_summarize.py', label='Next Step: **📝 Summarize**', icon='➡️') # else: # col_complete.warning('No significant text found via OCR') # # --- Display OCR Results --- # st.subheader('Aggregated OCR Text') # if 'ocr_text' in st.session_state and st.session_state['ocr_text']: # st.text_area("Extracted Text from Frames", st.session_state['ocr_text'], height=400) # else: # st.info('OCR has not been run or no text was detected') # st.divider() # st.subheader('Semantic Segmentation')