conspectum / ui_upload.py
macsunmood's picture
update app
6300cca
raw
history blame contribute delete
14.6 kB
import streamlit as st
import os
import time
from yt_dlp import YoutubeDL
import ffmpeg
import tempfile
import pytesseract
from PIL import Image
from utils import (save_uploaded_file, extract_audio,
download_youtube, get_session_dir,
cleanup_session_files, get_session_id,
get_temp_dir, get_features, proc_raw_audio,
extract_frames_interval, extract_frames_pyscenedetect)
st.title('📥 Загрузка видео')
# Initialize session state defaults
defaults = {
'uploaded_file': None,
'video_path': None,
'audio_path': None,
'ocr_text': None,
'transcript': None,
'summary': None,
'main_topic': None,
'input_method': 'Файл',
'input_title': None,
'video_input_path': None,
'video_url': None,
'secret_api': None,
'secret_prompt': None,
# 'audio_wav': None,
# 'audio_file': None,
}
for key, value in defaults.items():
st.session_state.setdefault(key, value)
# --- Option to clear previous session ---
# st.sidebar.write('Current Session ID:')
# st.sidebar.write(f'`{get_session_id()}`') # session ID for debugging
if st.sidebar.button('Очистить сессию'):
session_id = get_session_id() # get current ID before clearing
cleanup_session_files(session_id)
for key in list(st.session_state.keys()):
del st.session_state[key] # clear all session state
st.rerun() # rerun the script to reflect cleared state
col_input_method, col_main_topic = st.columns([2, 6])
# --- Video source selection ---
input_method = col_input_method.radio(
'Выберите способ загрузки:',
('Файл', 'Ссылка YouTube'),
key='input_method',
horizontal=True
)
# --- Main Topic ---
st.session_state.main_topic = col_main_topic.text_input('(опционально) Укажите тему лекции:', st.session_state.main_topic)
# col_url, col_start_from = st.columns([5, 2])
# video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url'])
# start_from = col_start_from.number_input(
# 'Start From:',
# min_value=0.0, step=0.5, format='%f', value=example_youtube['start'],
# help='Time shift from the beginning (in seconds)'
# )
# if video_url:
# st.session_state.video_url = video_url
# st.session_state.video_input_path = '' # clear path if URL is used
video_path = None
uploaded_file = None
video_url = None
if input_method == 'Файл':
uploaded_file = st.file_uploader(
'Выберите видеофайл',
type=['mp4', 'avi', 'mkv', 'mov']
)
if uploaded_file:
col_info, col_ready = st.columns(2)
# Display basic file info
# col_info.badge(f'файл: `{uploaded_file.name}` | ' +
# f'тип: `{uploaded_file.type}` | ' +
# f'размер: `{uploaded_file.size / (1024 * 1024):.2f} MB`')
# Save uploaded file temporarily for the Prefect flow
temp_dir = get_temp_dir() # use a shared temp location
# Use a unique name to avoid conflicts if multiple users run simultaneously
target_path = os.path.join(temp_dir, f'upload_{get_session_id()}_{uploaded_file.name}')
try:
with open(target_path, 'wb') as f:
f.write(uploaded_file.getbuffer())
st.session_state.video_input_path = target_path
st.session_state.video_input_title = uploaded_file.name
st.session_state.video_url = '' # clear URL if file is uploaded
st.session_state.transcript = None
st.session_state.summary = None
# col_info.info('Готово к предобработке.')
except Exception as e:
col_ready.error(f'Error saving uploaded file: {e}')
st.session_state.video_input_path = ''
elif input_method == 'Ссылка YouTube':
#-- Obtain audio from YouTube video
example_youtube = {
'title': 'Общественное движение',
'url': 'https://www.youtube.com/watch?v=c3bhkrKF6F4',
'start': 0.0
}
col_url, col_start_from = st.columns([5, 2])
video_url = col_url.text_input('Enter YouTube video URL:', example_youtube['url'])
start_from = col_start_from.number_input(
'Начать с сек.:',
min_value=0.0, step=0.5, format='%f', value=example_youtube['start'],
help='Сдвиг по времени, с которого начинается лекция'
)
if video_url:
st.session_state.video_url = video_url
st.session_state.video_input_path = '' # clear path if URL is used
@st.cache_resource
def ui_processed_sound(audio_wav, audio_np):
'''UI to show sound processing results'''
st.audio(audio_wav)
features = get_features(audio_np)
@st.cache_resource
def extract_videofile(video_file):
# video_buffer = BytesIO(video_file.read())
# audio_data = VideoFileClip(video_buffer.name).audio
# raw_source = StringIO(video_file.getvalue().decode('utf-8'))
# raw_source = video_file.getvalue().decode('utf-8')
# raw_source = video_file.read()
# raw_source = BytesIO(video_file.getvalue())
#-- Get video
# out, err = (
# ffmpeg
# .input(video_file, ss=start_from)
# .output('temp.mp4', vcodec='copy')
# .overwrite_output()
# .run()
# )
# st.video('temp.mp4')
# video = VideoFileClip(video_file)
# audio = video.audio
# audio.write_audiofile('output_audio.mp3')
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(video_file.read())
#-- Get audio
# SAMPLE_RATE = 16000
audio_data, err = (
ffmpeg
.input(tfile.name, ss=start_from)
.output('pipe:', format='wav')#, acodec='pcm_s16le')
# .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE)
# .global_args('-nostdin', '-threads', '0')
.run(capture_stdout=True)
)
if err:
raise RuntimeError(f'Failed to load audio: {err.decode()}')
return audio_data
@st.cache_resource
def extract_youtube(raw_url):
#-- Get video
# out, err = (
# ffmpeg
# .input(raw_url, ss=start_from)
# .output('temp.mp4', vcodec='copy')
# .overwrite_output()
# .run()
# )
# st.video('temp.mp4')
#-- Get audio
# SAMPLE_RATE = 16000
audio_data, err = (
ffmpeg
.input(raw_url, ss=start_from)
.output('pipe:', format='wav')#, acodec='pcm_s16le')
# .output('pipe:', format='s16le', ac=1, acodec='pcm_s16le', ar=SAMPLE_RATE)
.global_args('-nostdin', '-threads', '0')
.run(capture_stdout=True)
)
if err:
raise RuntimeError(f'Failed to load audio: {err.decode()}')
return audio_data
# --- Processing Button ---
_, col_button_process, _ = st.columns([2, 1, 2])
if col_button_process.button('Подготовить видео',
type='primary',
use_container_width=True,
disabled=not (st.session_state.video_input_path or st.session_state.video_url or uploaded_file)
):
# Clear previous paths if reprocessing
st.session_state['video_path'] = None
st.session_state['audio_path'] = None
col_info, col_complete, col_next = st.columns(3)
with st.spinner('Обрабатываем видео..'):
if st.session_state['input_method'] == 'Файл' and uploaded_file:
st.session_state.uploaded_file = uploaded_file
video = uploaded_file
# audio_data = extract_videofile(uploaded_file)
saved_path = save_uploaded_file(uploaded_file)
if saved_path:
st.session_state['video_path'] = saved_path
col_info.success(f'Видео временно сохранено в: `{os.path.basename(saved_path)}`')
else:
col_info.error('Failed to save uploaded file')
elif st.session_state['input_method'] == 'Ссылка YouTube' and video_url:
try:
with YoutubeDL({'format': 'best+bestaudio'}) as ydl:
info = ydl.extract_info(video_url, download=False)
except Exception as e:
st.error(e)
else:
d = info['duration']
h, m, s = d // 3600, (d % 3600) // 60, d % 60
time_str = []
if h: time_str.append(f'{h}h')
if m: time_str.append(f'{m}m')
if s or not time_str: time_str.append(f'{s}s')
time_str = ' '.join(time_str)
st.write(f"<small><div style='float: center; text-align: center'>\
**Title:** [{info['title']}]({video_url})\
**Duration:** {info['duration']} sec.</div></small>",
unsafe_allow_html=True)
video = video_url
# audio_data = extract_youtube(info['url'])
st.session_state.video_input_title = info['title']
session_dir = get_session_dir()
os.makedirs(session_dir, exist_ok=True)
downloaded_path = download_youtube(video_url, session_dir)
if downloaded_path and os.path.exists(downloaded_path):
st.session_state['video_path'] = downloaded_path
col_info.success(f'YouTube видео скачано в: {os.path.basename(downloaded_path)}')
else:
col_info.error('Failed to download YouTube video')
else:
st.warning('Пожалуйста, загрузите видеофайл или предоставьте ссылку YouTube')
st.stop()
#
# ----------------------------------- AUDIO EXTRACTION
#
if st.session_state.video_path:
# st.write('Extracting audio..')
start = time.time()
# Ensure utils.extract_audio uses the correct path
audio_path = extract_audio(st.session_state['video_path'], audio_format='mp3')
# audio_path = extract_audio(st.session_state['video_path'])
end = time.time()
if audio_path and os.path.exists(audio_path):
col_info.success(f'Аудиодорожка сохранена в: {os.path.basename(audio_path)} (за {end - start:.2f}сек)')
st.session_state['audio_path'] = audio_path
else:
col_info.error('Failed to extract audio from the video')
st.warning('Proceeding without audio. STT step will be skipped')
st.session_state['audio_path'] = None # explicitly set to None
#
# ----------------------------------- FRAMES EXTRACTION
#
if st.session_state.video_path:
extraction_method = 'pyscenedetect'
extraction_threshold = 1.5
ocr_lang = 'rus'
st.session_state['frames_paths'] = []
with st.spinner(f'Выделяем информативные кадры..'):
start_time = time.time()
frames_dir, frame_paths = extract_frames_pyscenedetect(st.session_state.video_path,
'frames_pyscenedetect',
threshold=extraction_threshold)
extract_time = time.time() - start_time
if frames_dir and frame_paths:
st.session_state['frames_dir'] = frames_dir
st.session_state['frames_paths'] = frame_paths # store paths
col_info.success(f'Выделено {len(frame_paths)} кадров (заняло {int(extract_time)} сек)')
else:
col_info.error('Failed to extract frames')
st.stop()
if st.session_state['frames_paths']:
total_frames = len(st.session_state['frames_paths'])
# col_info.write(f'Performing OCR on {total_frames} frames..')
# ocr_progress = st.progress(0)
start_ocr_time = time.time()
extracted_texts = []
processed_count = 0
# Use columns to display some example frames
max_display_frames = 6
display_cols = st.columns(min(max_display_frames, total_frames) if total_frames > 0 else 1)
display_idx = 0
# Process frames in batches or one by one
for i, frame_path in enumerate(st.session_state['frames_paths']):
img = Image.open(frame_path)
# Extract timestamp from filename (assuming format frame_XXXXXX.png)
try:
secs = int(os.path.basename(frame_path).split('_')[1].split('.')[0])
timestamp = time.strftime('%H:%M:%S', time.gmtime(secs))
extracted_texts.append({'timestamp': timestamp, 'image': img})
except:
extracted_texts.append({'timestamp': 'N/A', 'image': img}) # fallback if filename parse fails
# Display some examples
if display_idx < max_display_frames and display_idx < len(display_cols):
with display_cols[display_idx]:
st.image(img, caption=f'Frame (t={timestamp})', use_container_width=True)
display_idx += 1
processed_count += 1
if st.session_state.video_path:
col_complete.info('Предобработка видео завершена.')
col_next.page_link('ui_create_summary.py', label='Можно переходить в ✨ **Кабинет методиста**', icon='➡️')
# Display video
_, col_preview, _ = st.columns([1, 3, 1])
with col_preview.container(border=True):
# col_preview_description.subheader('Превью')
# _, col_video, _ = st.columns([1, 3, 1])
# col_video.video(video)
st.video(video)
st.columns([3, 2, 3])[1].page_link('ui_create_summary.py',
label='Перейти в ✨ **Кабинет методиста**', icon='➡️',
use_container_width=True)