Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import time | |
from yt_dlp import YoutubeDL | |
import ffmpeg | |
import tempfile | |
import pytesseract | |
from PIL import Image | |
from utils import (save_uploaded_file, extract_audio, | |
download_youtube, get_session_dir, | |
cleanup_session_files, get_session_id, | |
get_temp_dir, get_features, proc_raw_audio, | |
extract_frames_interval, extract_frames_pyscenedetect) | |
st.title('📥 Загрузка видео') | |
# Initialize session state defaults | |
defaults = { | |
'uploaded_file': None, | |
'video_path': None, | |
'audio_path': None, | |
'ocr_text': None, | |
'transcript': None, | |
'summary': None, | |
'main_topic': None, | |
'input_method': 'Файл', | |
'input_title': None, | |
'video_input_path': None, | |
'video_url': None, | |
'secret_api': None, | |
'secret_prompt': None, | |
# 'audio_wav': None, | |
# 'audio_file': None, | |
} | |
for key, value in defaults.items(): | |
st.session_state.setdefault(key, value) | |
# --- Option to clear previous session --- | |
# st.sidebar.write('Current Session ID:') | |
# st.sidebar.write(f'`{get_session_id()}`') # session ID for debugging | |
if st.sidebar.button('Очистить сессию'): | |
session_id = get_session_id() # get current ID before clearing | |
cleanup_session_files(session_id) | |
for key in list(st.session_state.keys()): | |
del st.session_state[key] # clear all session state | |
st.rerun() # rerun the script to reflect cleared state | |
col_input_method, col_main_topic = st.columns([2, 6]) | |
# --- Video source selection --- | |
input_method = col_input_method.radio( | |
'Выберите способ загрузки:', | |
('Файл', 'Ссылка YouTube'), | |
key='input_method', | |
horizontal=True | |
) | |
# --- Main Topic --- | |
st.session_state.main_topic = col_main_topic.text_input('(опционально) Укажите тему лекции:', st.session_state.main_topic) | |
# col_url, col_start_from = st.columns([5, 2]) | |
# video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) | |
# start_from = col_start_from.number_input( | |
# 'Start From:', | |
# min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], | |
# help='Time shift from the beginning (in seconds)' | |
# ) | |
# if video_url: | |
# st.session_state.video_url = video_url | |
# st.session_state.video_input_path = '' # clear path if URL is used | |
video_path = None | |
uploaded_file = None | |
video_url = None | |
if input_method == 'Файл': | |
uploaded_file = st.file_uploader( | |
'Выберите видеофайл', | |
type=['mp4', 'avi', 'mkv', 'mov'] | |
) | |
if uploaded_file: | |
col_info, col_ready = st.columns(2) | |
# Display basic file info | |
# col_info.badge(f'файл: `{uploaded_file.name}` | ' + | |
# f'тип: `{uploaded_file.type}` | ' + | |
# f'размер: `{uploaded_file.size / (1024 * 1024):.2f} MB`') | |
# Save uploaded file temporarily for the Prefect flow | |
temp_dir = get_temp_dir() # use a shared temp location | |
# Use a unique name to avoid conflicts if multiple users run simultaneously | |
target_path = os.path.join(temp_dir, f'upload_{get_session_id()}_{uploaded_file.name}') | |
try: | |
with open(target_path, 'wb') as f: | |
f.write(uploaded_file.getbuffer()) | |
st.session_state.video_input_path = target_path | |
st.session_state.video_input_title = uploaded_file.name | |
st.session_state.video_url = '' # clear URL if file is uploaded | |
st.session_state.transcript = None | |
st.session_state.summary = None | |
# col_info.info('Готово к предобработке.') | |
except Exception as e: | |
col_ready.error(f'Error saving uploaded file: {e}') | |
st.session_state.video_input_path = '' | |
elif input_method == 'Ссылка YouTube': | |
#-- Obtain audio from YouTube video | |
example_youtube = { | |
'title': 'Общественное движение', | |
'url': 'https://www.youtube.com/watch?v=c3bhkrKF6F4', | |
'start': 0.0 | |
} | |
col_url, col_start_from = st.columns([5, 2]) | |
video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url']) | |
start_from = col_start_from.number_input( | |
'Начать с сек.:', | |
min_value=0.0, step=0.5, format='%f', value=example_youtube['start'], | |
help='Сдвиг по времени, с которого начинается лекция' | |
) | |
if video_url: | |
st.session_state.video_url = video_url | |
st.session_state.video_input_path = '' # clear path if URL is used | |
def ui_processed_sound(audio_wav, audio_np): | |
'''UI to show sound processing results''' | |
st.audio(audio_wav) | |
features = get_features(audio_np) | |
def extract_videofile(video_file): | |
# video_buffer = BytesIO(video_file.read()) | |
# audio_data = VideoFileClip(video_buffer.name).audio | |
# raw_source = StringIO(video_file.getvalue().decode('utf-8')) | |
# raw_source = video_file.getvalue().decode('utf-8') | |
# raw_source = video_file.read() | |
# raw_source = BytesIO(video_file.getvalue()) | |
#-- Get video | |
# out, err = ( | |
# ffmpeg | |
# .input(video_file, ss=start_from) | |
# .output('temp.mp4', vcodec='copy') | |
# .overwrite_output() | |
# .run() | |
# ) | |
# st.video('temp.mp4') | |
# video = VideoFileClip(video_file) | |
# audio = video.audio | |
# audio.write_audiofile('output_audio.mp3') | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(video_file.read()) | |
#-- Get audio | |
# SAMPLE_RATE = 16000 | |
audio_data, err = ( | |
ffmpeg | |
.input(tfile.name, ss=start_from) | |
.output('pipe:', format='wav')#, acodec='pcm_s16le') | |
# .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) | |
# .global_args('-nostdin', '-threads', '0') | |
.run(capture_stdout=True) | |
) | |
if err: | |
raise RuntimeError(f'Failed to load audio: {err.decode()}') | |
return audio_data | |
def extract_youtube(raw_url): | |
#-- Get video | |
# out, err = ( | |
# ffmpeg | |
# .input(raw_url, ss=start_from) | |
# .output('temp.mp4', vcodec='copy') | |
# .overwrite_output() | |
# .run() | |
# ) | |
# st.video('temp.mp4') | |
#-- Get audio | |
# SAMPLE_RATE = 16000 | |
audio_data, err = ( | |
ffmpeg | |
.input(raw_url, ss=start_from) | |
.output('pipe:', format='wav')#, acodec='pcm_s16le') | |
# .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE) | |
.global_args('-nostdin', '-threads', '0') | |
.run(capture_stdout=True) | |
) | |
if err: | |
raise RuntimeError(f'Failed to load audio: {err.decode()}') | |
return audio_data | |
# --- Processing Button --- | |
_, col_button_process, _ = st.columns([2, 1, 2]) | |
if col_button_process.button('Подготовить видео', | |
type='primary', | |
use_container_width=True, | |
disabled=not (st.session_state.video_input_path or st.session_state.video_url or uploaded_file) | |
): | |
# Clear previous paths if reprocessing | |
st.session_state['video_path'] = None | |
st.session_state['audio_path'] = None | |
col_info, col_complete, col_next = st.columns(3) | |
with st.spinner('Обрабатываем видео..'): | |
if st.session_state['input_method'] == 'Файл' and uploaded_file: | |
st.session_state.uploaded_file = uploaded_file | |
video = uploaded_file | |
# audio_data = extract_videofile(uploaded_file) | |
saved_path = save_uploaded_file(uploaded_file) | |
if saved_path: | |
st.session_state['video_path'] = saved_path | |
col_info.success(f'Видео временно сохранено в: `{os.path.basename(saved_path)}`') | |
else: | |
col_info.error('Failed to save uploaded file') | |
elif st.session_state['input_method'] == 'Ссылка YouTube' and video_url: | |
try: | |
with YoutubeDL({'format': 'best+bestaudio'}) as ydl: | |
info = ydl.extract_info(video_url, download=False) | |
except Exception as e: | |
st.error(e) | |
else: | |
d = info['duration'] | |
h, m, s = d // 3600, (d % 3600) // 60, d % 60 | |
time_str = [] | |
if h: time_str.append(f'{h}h') | |
if m: time_str.append(f'{m}m') | |
if s or not time_str: time_str.append(f'{s}s') | |
time_str = ' '.join(time_str) | |
st.write(f"<small><div style='float: center; text-align: center'>\ | |
**Title:** [{info['title']}]({video_url})\ | |
**Duration:** {info['duration']} sec.</div></small>", | |
unsafe_allow_html=True) | |
video = video_url | |
# audio_data = extract_youtube(info['url']) | |
st.session_state.video_input_title = info['title'] | |
session_dir = get_session_dir() | |
os.makedirs(session_dir, exist_ok=True) | |
downloaded_path = download_youtube(video_url, session_dir) | |
if downloaded_path and os.path.exists(downloaded_path): | |
st.session_state['video_path'] = downloaded_path | |
col_info.success(f'YouTube видео скачано в: {os.path.basename(downloaded_path)}') | |
else: | |
col_info.error('Failed to download YouTube video') | |
else: | |
st.warning('Пожалуйста, загрузите видеофайл или предоставьте ссылку YouTube') | |
st.stop() | |
# | |
# ----------------------------------- AUDIO EXTRACTION | |
# | |
if st.session_state.video_path: | |
# st.write('Extracting audio..') | |
start = time.time() | |
# Ensure utils.extract_audio uses the correct path | |
audio_path = extract_audio(st.session_state['video_path'], audio_format='mp3') | |
# audio_path = extract_audio(st.session_state['video_path']) | |
end = time.time() | |
if audio_path and os.path.exists(audio_path): | |
col_info.success(f'Аудиодорожка сохранена в: {os.path.basename(audio_path)} (за {end - start:.2f}сек)') | |
st.session_state['audio_path'] = audio_path | |
else: | |
col_info.error('Failed to extract audio from the video') | |
st.warning('Proceeding without audio. STT step will be skipped') | |
st.session_state['audio_path'] = None # explicitly set to None | |
# | |
# ----------------------------------- FRAMES EXTRACTION | |
# | |
if st.session_state.video_path: | |
extraction_method = 'pyscenedetect' | |
extraction_threshold = 1.5 | |
ocr_lang = 'rus' | |
st.session_state['frames_paths'] = [] | |
with st.spinner(f'Выделяем информативные кадры..'): | |
start_time = time.time() | |
frames_dir, frame_paths = extract_frames_pyscenedetect(st.session_state.video_path, | |
'frames_pyscenedetect', | |
threshold=extraction_threshold) | |
extract_time = time.time() - start_time | |
if frames_dir and frame_paths: | |
st.session_state['frames_dir'] = frames_dir | |
st.session_state['frames_paths'] = frame_paths # store paths | |
col_info.success(f'Выделено {len(frame_paths)} кадров (заняло {int(extract_time)} сек)') | |
else: | |
col_info.error('Failed to extract frames') | |
st.stop() | |
if st.session_state['frames_paths']: | |
total_frames = len(st.session_state['frames_paths']) | |
# col_info.write(f'Performing OCR on {total_frames} frames..') | |
# ocr_progress = st.progress(0) | |
start_ocr_time = time.time() | |
extracted_texts = [] | |
processed_count = 0 | |
# Use columns to display some example frames | |
max_display_frames = 6 | |
display_cols = st.columns(min(max_display_frames, total_frames) if total_frames > 0 else 1) | |
display_idx = 0 | |
# Process frames in batches or one by one | |
for i, frame_path in enumerate(st.session_state['frames_paths']): | |
img = Image.open(frame_path) | |
# Extract timestamp from filename (assuming format frame_XXXXXX.png) | |
try: | |
secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0]) | |
timestamp = time.strftime('%H:%M:%S', time.gmtime(secs)) | |
extracted_texts.append({'timestamp': timestamp, 'image': img}) | |
except: | |
extracted_texts.append({'timestamp': 'N/A', 'image': img}) # fallback if filename parse fails | |
# Display some examples | |
if display_idx < max_display_frames and display_idx < len(display_cols): | |
with display_cols[display_idx]: | |
st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True) | |
display_idx += 1 | |
processed_count += 1 | |
if st.session_state.video_path: | |
col_complete.info('Предобработка видео завершена.') | |
col_next.page_link('ui_create_summary.py', label='Можно переходить в ✨ **Кабинет методиста**', icon='➡️') | |
# Display video | |
_, col_preview, _ = st.columns([1, 3, 1]) | |
with col_preview.container(border=True): | |
# col_preview_description.subheader('Превью') | |
# _, col_video, _ = st.columns([1, 3, 1]) | |
# col_video.video(video) | |
st.video(video) | |
st.columns([3, 2, 3])[1].page_link('ui_create_summary.py', | |
label='Перейти в ✨ **Кабинет методиста**', icon='➡️', | |
use_container_width=True) | |