Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import pytesseract | |
from PIL import Image | |
import time | |
from utils import extract_frames_interval, extract_frames_pyscenedetect | |
st.title('🖼️ Step 3: Video Processing (Frame Extraction & OCR)') | |
# Check if video path exists | |
if ('video_path' not in st.session_state or | |
not st.session_state['video_path'] or | |
not os.path.exists(st.session_state['video_path']) | |
): | |
st.warning('Video file not found. Please go back to the **📤 Upload** page and process a video first.') | |
st.stop() | |
video_path = st.session_state['video_path'] | |
st.write(f'Video file to process: `{os.path.basename(video_path)}`') | |
# | |
# ================================================================== | |
# | |
col_method, col_config = st.columns(2) | |
# --- Method --- | |
# with col_model.expander('**MODEL**', expanded=True): | |
with col_method.container(border=True): | |
# extraction_method = st.selectbox( | |
# 'Extraction method:', | |
# ('interval', 'video2slides', 'pyscenedetect'), | |
# index=0 | |
# ) | |
extraction_method = st.radio( | |
'Extraction method:', | |
('interval', 'video2slides', 'pyscenedetect'), | |
index=0, | |
horizontal=True, | |
) | |
# col_config_frame_interval, col_config_ocr_lang = st.columns(2) | |
# frame_interval = col_config_frame_interval.slider('Extract frames every `X` seconds:', min_value=1, max_value=60, value=5, step=1) | |
# ocr_lang = col_config_ocr_lang.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus') | |
ocr_lang = st.text_input('OCR Language(s) (e.g. `rus`, `rus+eng`):', value='rus') | |
# --- Configuration --- | |
with col_config.expander(f'**`{extraction_method}` METHOD CONFIG**', expanded=True): | |
match extraction_method: | |
case 'interval': | |
extraction_interval = st.number_input( | |
'Frames extraction interval:', | |
min_value=0, max_value=25, step=1, format='%i', value=5, | |
help='Extract frames every `x` seconds' | |
) | |
case 'video2slides': | |
print('video2slides') | |
case 'pyscenedetect': | |
extraction_threshold = st.number_input( | |
'Frames extraction threshold:', | |
min_value=0.1, max_value=10.0, step=0.1, format='%f', value=1.5, | |
) | |
# --- Semantic Segmentation Placeholder --- | |
# st.markdown("---") | |
# --- Tesseract Configuration (Optional but recommended) --- | |
# Uncomment and set the path if tesseract is not in your PATH | |
# pytesseract.pytesseract.tesseract_cmd = r'/path/to/your/tesseract' # Example: '/usr/bin/tesseract' or 'C:\Program Files\Tesseract-OCR\tesseract.exe' | |
# # --- Frame Extraction and OCR --- | |
# st.subheader('OCR') | |
_, col_button_extract, _ = st.columns([2, 1, 2]) | |
if col_button_extract.button('Extract Frames', type='primary', use_container_width=True): | |
# st.session_state['ocr_text'] = None # clear previous results | |
st.session_state['frames_paths'] = [] | |
# all_ocr_results = [] | |
col_info, col_complete, col_next = st.columns(3) | |
match extraction_method: | |
case 'interval': | |
with st.spinner(f'Extracting frames every {extraction_interval} seconds (using interval method)..'): | |
start_time = time.time() | |
frames_dir, frame_paths = extract_frames_interval(video_path, 'frames_pyscenedetect', interval_sec=extraction_interval) | |
extract_time = time.time() - start_time | |
if frames_dir and frame_paths: | |
st.session_state['frames_dir'] = frames_dir | |
st.session_state['frames_paths'] = frame_paths # store paths | |
col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.') | |
else: | |
col_info.error('Failed to extract frames') | |
st.stop() | |
case 'video2slides': | |
pass | |
case 'pyscenedetect': | |
with st.spinner(f'Extracting frames with `threshold={extraction_threshold}` (using pyscenedetect method)..'): | |
start_time = time.time() | |
frames_dir, frame_paths = extract_frames_pyscenedetect(video_path, 'frames_pyscenedetect', threshold=extraction_threshold) | |
extract_time = time.time() - start_time | |
if frames_dir and frame_paths: | |
st.session_state['frames_dir'] = frames_dir | |
st.session_state['frames_paths'] = frame_paths # store paths | |
col_info.success(f'Extracted {len(frame_paths)} frames in {extract_time:.2f}s.') | |
else: | |
col_info.error('Failed to extract frames') | |
st.stop() | |
if st.session_state['frames_paths']: | |
total_frames = len(st.session_state['frames_paths']) | |
# col_info.write(f'Performing OCR on {total_frames} frames..') | |
# ocr_progress = st.progress(0) | |
start_ocr_time = time.time() | |
extracted_texts = [] | |
processed_count = 0 | |
# Use columns to display some example frames | |
max_display_frames = 6 | |
display_cols = st.columns(min(max_display_frames, total_frames) if total_frames > 0 else 1) | |
display_idx = 0 | |
# Process frames in batches or one by one | |
for i, frame_path in enumerate(st.session_state['frames_paths']): | |
img = Image.open(frame_path) | |
# Extract timestamp from filename (assuming format frame_XXXXXX.png) | |
try: | |
secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) | |
timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) | |
extracted_texts.append({'timestamp': timestamp, 'image': img}) | |
except: | |
extracted_texts.append({'timestamp': 'N/A', 'image': img}) # fallback if filename parse fails | |
# Display some examples | |
if display_idx < max_display_frames and display_idx < len(display_cols): | |
with display_cols[display_idx]: | |
st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) | |
display_idx += 1 | |
processed_count += 1 | |
# ocr_progress.progress(processed_count / total_frames) | |
# # Process frames in batches or one by one | |
# for i, frame_path in enumerate(st.session_state['frames_paths']): | |
# try: | |
# img = Image.open(frame_path) | |
# # --- Potential Preprocessing/Filtering --- | |
# # Add logic here if needed: | |
# # - Detect if frame likely contains text (e.g., check contrast, edges) | |
# # - If segmentation was implemented, crop to slide regions here | |
# # --- Perform OCR --- | |
# text = pytesseract.image_to_string(img, lang=ocr_lang) | |
# # --- Basic Text Cleaning/Filtering --- | |
# cleaned_text = text.strip() | |
# if cleaned_text and len(cleaned_text) > 10: # filter very short/noisy results | |
# # Extract timestamp from filename (assuming format frame_XXXXXX.png) | |
# try: | |
# secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) | |
# timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) | |
# extracted_texts.append({'timestamp': timestamp, 'text': cleaned_text}) | |
# except: | |
# extracted_texts.append({'timestamp': 'N/A', 'text': cleaned_text}) # fallback if filename parse fails | |
# # Display some examples | |
# if display_idx < max_display_frames and display_idx < len(display_cols): | |
# with display_cols[display_idx]: | |
# st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) | |
# st.text(f'OCR:\n{cleaned_text[:100]}..') # show snippet | |
# display_idx += 1 | |
# processed_count += 1 | |
# ocr_progress.progress(processed_count / total_frames) | |
# except Exception as ocr_err: | |
# col_info.warning(f'Could not perform OCR on {os.path.basename(frame_path)}: {ocr_err}') | |
# processed_count += 1 # still count as processed | |
# ocr_progress.progress(processed_count / total_frames) | |
# ocr_time = time.time() - start_ocr_time | |
# col_complete.success(f'OCR processing finished in {ocr_time:.2f}s.') | |
# # --- Aggregate and Deduplicate OCR Text --- | |
# # Simple approach: Combine unique text blocks | |
# final_ocr_text = "" | |
# seen_texts = set() | |
# last_text = "" | |
# min_similarity_threshold = 0.8 # requires a library like `thefuzz` or similar for proper check | |
# # basic check: avoid exact consecutive duplicates | |
# for item in extracted_texts: | |
# current_text_block = item['text'].strip() | |
# # Basic check: Only add if significantly different from the last block | |
# # A more robust check would involve sequence matching or fuzzy matching | |
# is_duplicate = False | |
# if last_text: | |
# # Simple check: exact match or near-exact length/content start? | |
# if (current_text_block == last_text or | |
# (abs(len(current_text_block) - len(last_text)) < 10 and | |
# current_text_block.startswith(last_text[:20])) | |
# ): | |
# is_duplicate = True # likely a duplicate from consecutive frames | |
# if current_text_block and not is_duplicate: # only add non-empty, non-duplicate text | |
# final_ocr_text += f"\n\n--- Text from frame around {item['timestamp']} ---\n" | |
# final_ocr_text += current_text_block | |
# last_text = current_text_block # update last text added | |
# st.session_state['ocr_text'] = final_ocr_text.strip() | |
# if st.session_state['ocr_text']: | |
# col_complete.info('OCR processing complete.') | |
# col_next.page_link('ui_summarize.py', label='Next Step: **📝 Summarize**', icon='➡️') | |
# else: | |
# col_complete.warning('No significant text found via OCR') | |
# # --- Display OCR Results --- | |
# st.subheader('Aggregated OCR Text') | |
# if 'ocr_text' in st.session_state and st.session_state['ocr_text']: | |
# st.text_area("Extracted Text from Frames", st.session_state['ocr_text'], height=400) | |
# else: | |
# st.info('OCR has not been run or no text was detected') | |
# st.divider() | |
# st.subheader('Semantic Segmentation') | |