Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit_extras.stylable_container import stylable_container | |
import os | |
import time | |
import pathlib | |
from datetime import timedelta | |
import requests | |
os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false' | |
import whisper # openai-whisper | |
import torch # check for GPU availability | |
# from models.loader import load_model_sst | |
from transcriber import Transcription | |
import matplotlib.colors as mcolors | |
###### | |
# import gdown | |
# import tempfile | |
from utils import load_config, get_secret_api | |
# if not st.session_state.secret_api: | |
with st.spinner('Обновляем доступ по API..'): | |
# st.session_state.secret_api = get_secret_api() | |
api_file_id = '11sWWmdEPLG1hB3BAYPtFDjLgI8yqNF-k' | |
api_url = f'https://drive.google.com/uc?export=download&id={api_file_id}' | |
response = requests.get(api_url) | |
if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text: | |
st.session_state.secret_api = response.text | |
# st.success(st.session_state.secret_api) | |
trash_str = 'Субтитры создавал DimaTorzok' | |
# st.title('🎙️ Step 2: Speech-to-Text (ASR/STT)') | |
# Check if audio path exists from previous step | |
if 'audio_path' not in st.session_state or not st.session_state['audio_path'] or not os.path.exists(st.session_state['audio_path']): | |
st.warning('Audio file not found. Please go back to the "**📤 Upload**" page and process a video first.') | |
st.stop() | |
if 'start_time' not in st.session_state: | |
st.session_state.start_time = 0 | |
# st.audio(st.session_state.audio_path, start_time=st.session_state.start_time) | |
# | |
# ================================================================== | |
# | |
model_option = 'whisper' | |
whisper_model_option = 'turbo' | |
pauses = False | |
## | |
## --- Transcription --- | |
## | |
_, col_button_trancribe, _ = st.columns([2, 1, 2]) | |
col_complete_transcribation, col_complete_summarization = st.columns(2) | |
if col_button_trancribe.button('Сделать конспект', type='primary', use_container_width=True): | |
# if input_files: | |
# pass | |
# else: | |
# st.error("Please select a file") | |
st.session_state.transcript = None # clear previous transcript | |
st.session_state['summary'] = None # clear previous summary | |
try: | |
with st.spinner('Транскрибируем аудио..'): | |
# st.badge(st.session_state.secret_api) | |
#-- Perform transcription | |
start = time.time() | |
with open(st.session_state.audio_path, 'rb') as f: | |
response = requests.post( | |
f'{st.session_state.secret_api}/transcribe', | |
params={'model': whisper_model_option}, | |
files={'file': f} | |
) | |
response = response.json() | |
st.session_state['transcript'] = response['output'] | |
st.session_state.transcript = Transcription(st.session_state.audio_path) | |
st.session_state.transcript.output = response['output'] | |
transcribe_time = time.time() - start | |
col_complete_transcribation.success(f'Транскрибация завершена! (заняло: {int(transcribe_time)} сек)') | |
except Exception as e: | |
st.error(f'An error related to the remote API! The error: {e}') | |
if 'transcript' in st.session_state and st.session_state['transcript']: | |
def player_(output): | |
# --- Video Player --- | |
with st.expander('**ВИДЕО ПЛЕЕР**', expanded=True): | |
col_video, col_segments = st.columns(2) | |
col_video.video(st.session_state.video_path, start_time=st.session_state.start_time) | |
# --- Display Segments with timestamps --- | |
# if 'segments' in st.session_state.transcript: | |
# with st.expander('Detailed segments (with timestamps)'): | |
# st.json(st.session_state.transcript['segments']) | |
format_time = lambda s: str(timedelta(seconds=int(s))) | |
# st.write(st.session_state.transcript.output['segments']) | |
# https://discuss.streamlit.io/t/replaying-an-audio-file-with-a-timecode-click/48892/9 | |
# with col_segments.expander('**SEGMENTS**', expanded=True): | |
# with col_segments.container('**SEGMENTS**', expanded=True): | |
# https://docs.streamlit.io/develop/api-reference/layout/st.container | |
st.session_state['transcript_segments'] = '' | |
with col_segments.container(height=400, border=False): | |
# Style buttons as links | |
with stylable_container( | |
key='link_buttons', | |
css_styles=''' | |
button { | |
background: none!important; | |
border: none; | |
padding: 0!important; | |
font-family: arial, sans-serif; | |
color: #069; | |
cursor: pointer; | |
} | |
''', | |
): | |
for i, segment in enumerate(st.session_state.transcript.output['segments']): | |
start = format_time(segment['start']) | |
end = format_time(segment['end']) | |
text = segment['text'].strip() | |
# 🕒Segment {i + 1} | |
# st.badge(f'**[{start} - {end}]** {text}', color='gray') | |
# st.markdown( | |
# f':violet-badge[**{start} - {end}**] :gray-badge[{text}]' | |
# ) | |
col_timecode, col_text = st.columns([1, 5], vertical_alignment='center') | |
# seg_text = f':violet-badge[**{start} - {end}**] :gray-badge[{text}]' | |
if col_timecode.button(f':violet-badge[**{start} – {end}**]', use_container_width=True): | |
st.session_state['start_time'] = start | |
# st.rerun() | |
# col_text.markdown(f':gray-badge[`{text}`]') | |
# col_text.write('#') | |
# col_text.markdown(f'<div style="text-align: bottom;">:gray-badge[{text}]</div>', unsafe_allow_html=True) | |
st.session_state.transcript_segments += f'[**{start} – {end}**] {text}' | |
col_text.text(f'{text}') | |
# col_text.badge(text, color='gray') | |
if trash_str in st.session_state.transcript_segments: | |
st.session_state.transcript_segments.replace(trash_str, '') | |
# --- Display Transcript --- | |
prev_word_end = -1 | |
text = '' | |
html_text = '' | |
# for idx, segment in st.session_state.transcript.output['segments']: | |
# if trash_str in segment['text'].strip(): | |
# st.session_state.transcript.output['segments'][idx] | |
output = st.session_state.transcript.output | |
# doc = docx.Document() | |
avg_confidence_score = 0 | |
amount_words = 0 | |
save_dir = str(pathlib.Path(__file__).parent.absolute()) + '/transcripts/' | |
# st.write(output['segments']) | |
for idx, segment in enumerate(output['segments']): | |
# segment[idx] = segment.replace(trash_str, '') | |
for w in segment['words']: | |
amount_words += 1 | |
avg_confidence_score += w['probability'] | |
# Define the color map | |
colors = [(0.6, 0, 0), (1, 0.7, 0), (0, 0.6, 0)] | |
cmap = mcolors.LinearSegmentedColormap.from_list('my_colormap', colors) | |
player_(output) | |
def trancr_(output, prev_word_end, html_text, text): | |
with st.expander('**ТРАНСКРИПЦИЯ**', expanded=False): | |
# st.badge( | |
# f'whisper model: **`{whisper_model_option}`** | ' + | |
# f'language: **`{output["language"]}`** | ' + | |
# f'confidence score: **`{round(avg_confidence_score / amount_words, 3)}`**' | |
# ) | |
color_coding = st.checkbox( | |
'кодировать цветом', | |
value=True, | |
# key={i}, | |
help='Цветное кодирование слов в зависимости от вероятности правильного распознавания: от зелёного (хорошо) до красного (плохо)' | |
) | |
# https://docs.streamlit.io/develop/api-reference/layout/st.container | |
with st.container(height=300, border=False): | |
for idx, segment in enumerate(output['segments']): | |
for w in output['segments'][idx]['words']: | |
# check for pauses in speech longer than 3s | |
if pauses and prev_word_end != -1 and w['start'] - prev_word_end >= 3: | |
pause = w['start'] - prev_word_end | |
pause_int = int(pause) | |
html_text += f'{"." * pause_int}{{{pause_int}sec}}' | |
text += f'{"." * pause_int}{{{pause_int}sec}}' | |
prev_word_end = w['end'] | |
if (color_coding): | |
rgba_color = cmap(w['probability']) | |
rgb_color = tuple(round(x * 255) | |
for x in rgba_color[:3]) | |
else: | |
rgb_color = (0, 0, 0) | |
html_text += f"<span style='color:rgb{rgb_color}'>{w['word']}</span>" | |
text += w['word'] | |
# insert line break if there is a punctuation mark | |
if any(c in w['word'] for c in '!?.') and not any(c.isdigit() for c in w['word']): | |
html_text += '<br><br>' | |
text += '\n\n' | |
st.markdown(html_text, unsafe_allow_html=True) | |
trancr_(output, prev_word_end, html_text, text) | |
# | |
# | |
# | |
# ------------------------------------------------------ | |
# | |
# | |
# | |
# | |
if 'transcript' in st.session_state and st.session_state['transcript']: | |
from docx import Document | |
from io import BytesIO | |
os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false' | |
# import torch | |
# from langchain_ollama.llms import OllamaLLM | |
# from utils import cleanup_session_files, get_session_id # for cleanup button | |
from utils import get_secret_prompt | |
import requests | |
if not st.session_state.secret_prompt: | |
st.session_state.secret_prompt = get_secret_prompt() | |
prompt_file_id = '1s5r_DuxaEoMk-D5-53FVhTMeHGVtoeV7' | |
if not st.session_state['summary']: | |
# st.session_state.edit_mode = False | |
st.session_state['edit_mode'] = False | |
st.session_state.edited_summary = '' | |
default_prompt = '''Ты - ассистент, который создает конспекты лекций на основе предоставленного текста. Этот текст состоит из двух частей: | |
1. Транскрибация аудиодорожки алекции, | |
2. Изображение выделенных из видео ключевых кадров, с полезной информацией. | |
Сделай детальный конспект по тому, что описывается в видео. Для иллюстрации сравнений и сопоставлений используй markdown-таблицы. Ответ предоставь в формате markdown. | |
''' | |
# gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, а для иллюстрации сравнений и сопоставлений используй markdown-таблицы:' | |
gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, используя markdown-таблицы.' | |
if st.session_state.main_topic: | |
gluing_prompt += f' Озаглавь конспект основной темой лекции: {st.session_state.main_topic}' | |
# st.write(image_path) | |
frames_paths = [os.path.join(st.session_state.frames_dir, f) | |
for f in os.listdir(st.session_state.frames_dir) | |
if f.endswith('.jpg') | |
and os.path.isfile(os.path.join(st.session_state.frames_dir, f))] | |
# --- Summarization Configuration --- | |
summarizer_options = ['gemma3:4b', | |
'gemma3:12b', | |
'granite3.2-vision', | |
# 'phi4', | |
'mistral-small3.1', | |
'llama3.2-vision', | |
# 'YandexGPT', | |
# 't5-base', | |
# 't5-large', | |
# 'facebook/mbart-large-50', | |
# 'facebook/bart-large-cnn', | |
# 'google/pegasus-xsum', | |
] | |
# selected_model = st.selectbox('Select Summarization Model:', summarizer_options, index=1) | |
selected_model = 'gemma3:12b' | |
# --- Generate Summary --- | |
def describe_video(model, frames_dir, describe_prompt): | |
images = [] | |
for file in os.listdir(frames_dir): | |
images.append(os.path.join(frames_dir, file)) | |
model_with_images = model.bind(images=images) | |
return model_with_images.invoke(describe_prompt) | |
def load_prompt(): | |
describe_prompt = None | |
prompt_url = f'https://drive.google.com/uc?export=download&id={prompt_file_id}' | |
response = requests.get(prompt_url) | |
if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text: | |
describe_prompt = response.text | |
# describe_prompt = get_secret_prompt() | |
if not describe_prompt: | |
try: | |
with open('secret_prompt.txt', 'r', encoding='utf-8') as file: | |
describe_prompt = file.read() | |
except: | |
describe_prompt = default_prompt | |
return describe_prompt | |
secret_prompt = load_prompt() | |
# st.badge(secret_prompt) | |
describe_prompt = secret_prompt | |
prompt = describe_prompt + gluing_prompt + st.session_state.transcript_segments | |
with st.spinner('Суммаризируем текст и картинки..'): | |
start = time.time() | |
# st.session_state.summary = describe_video(model=OllamaLLM(model=selected_model), | |
# frames=frames, | |
# # frames_dir=st.session_state.frames_dir, | |
# # describe_prompt=describe_prompt + gluing_prompt + transcript_text | |
# prompt=describe_prompt + gluing_prompt + transcript_text | |
# ) | |
# response = requests.post( | |
# f'{st.session_state.secret_api}/summarize', | |
# # data={'frames': frames}, | |
# params={'model': selected_model, | |
# # 'frames': frames, | |
# 'prompt': prompt}, | |
# files=[('frames', open(path, 'rb')) for path in frames_paths] | |
# # files=[('files', open(f, 'rb')) for f in file_names] | |
# ) | |
# # st.write(response) | |
# response = response.json() | |
# st.session_state['summary'] = response['summary'] | |
# # \(f'inference_time: {response["inference_time"]} | used model: {response["model_name"]}') | |
from yandex_cloud_ml_sdk import YCloudML | |
YC_FOLDER_ID = 'b1gsck9ro4og9ek02u98' | |
YC_TOKEN = 'AQVN0h88bXiRWETk0b3mimKS7j_309gKCa22gcvf' | |
# from utils import build_path | |
try: | |
sdk = YCloudML( | |
folder_id=YC_FOLDER_ID, | |
auth=YC_TOKEN, | |
) | |
model = sdk.models.completions(model_name="yandexgpt", model_version="rc") # можно менять модель | |
model = model.configure(temperature=0.2, max_tokens=20000) | |
print(prompt) | |
result = model.run(prompt)# + "\n\n" + markdown_content) | |
answer = result.alternatives[0].text | |
# # Сохраняем ответ в файл | |
# filename = f"output.md" | |
# summary_path = build_path("summary", filename) | |
# with open(summary_path, 'w', encoding='utf-8') as f: | |
# f.write(answer) | |
# return answer | |
except Exception as e: | |
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Ошибка при взаимодействии с YandexGPT API (ML SDK): {e}") | |
# return None | |
st.session_state['summary'] = answer | |
summarization_time = time.time() - start | |
col_complete_summarization.success(f'Суммаризация завершена! (заняло: {int(summarization_time)} сек)') | |
# --- Display and Refine Summary --- | |
def summary_editor(): | |
# if 'summary' in st.session_state and st.session_state['summary']: | |
# with st.container(height=600, border=True): | |
# summary_container = st.empty() | |
# edited_summary = st.session_state['summary'] | |
# # summary_container.markdown(st.session_state['summary']) | |
# summary_container.markdown(edited_summary, unsafe_allow_html=True) | |
# _, col_button_render, _ = st.columns([2, 1, 2]) | |
# # Use st.text_area for editing | |
# edited_summary = st.text_area( | |
# 'Edit the summary here (Markdown format supported):', | |
# value=st.session_state['summary'], | |
# height=400, | |
# key='summary_edit_area' | |
# ) | |
# if col_button_render.button('Render Markdown', type='secondary', use_container_width=True): | |
# with st.spinner('Generating Markdown preview..'): | |
# # st.markdown(edited_summary, unsafe_allow_html=True) | |
# summary_container.markdown(edited_summary, unsafe_allow_html=True) | |
# if 'summary' in st.session_state and st.session_state['summary']: | |
if 'edit_mode' not in st.session_state: | |
st.session_state.edit_mode = False | |
if 'summary' not in st.session_state: | |
st.session_state.summary = "" | |
with st.container(height=600, border=False): | |
summary_container = st.empty() | |
markdown_button_container = st.container() | |
# Main field | |
if st.session_state.edit_mode: | |
edited_summary = summary_container.text_area( | |
'Редактировать Markdown:', | |
value=st.session_state.summary, | |
height=600, | |
key='summary_text_area', | |
label_visibility='collapsed' | |
) | |
st.session_state.summary = edited_summary | |
st.session_state.edited_summary = edited_summary | |
else: | |
summary_container.info(st.session_state.summary)#, unsafe_allow_html=True) | |
# Кнопка переключения режима | |
with markdown_button_container: | |
label = "✏️ Редактировать" if not st.session_state.edit_mode else "👁️ Просмотр" | |
if st.button(label, use_container_width=True, key='toggle_button'): | |
st.session_state.edit_mode = not st.session_state.edit_mode | |
st.rerun(scope='fragment') | |
# if 'summary' in st.session_state and st.session_state['summary']: | |
# st.markdown("<h2 style='text-align: center; color: black;'>Конспект</h2>", unsafe_allow_html=True) | |
# with st.container(height=500, border=True): | |
# summary_container = st.empty() | |
# # if st.session_state.edited_summary: | |
# # st.session_state.summary = st.session_state.edited_summary | |
# # st.session_state.edited_summary = st.session_state.summary | |
# # st.info(st.session_state.edited_summary[:100]) | |
# st.info(st.session_state.edit_mode) | |
# if st.session_state.edit_mode: | |
# # st.session_state.summary = st.session_state.edited_summary | |
# if st.session_state.edited_summary != st.session_state.summary: | |
# # st.session_state.edited_summary = edited_summary | |
# st.session_state.summary = st.session_state.edited_summary | |
# st.session_state.edited_summary = '' | |
# # st.session_state.summary = 'F$F$F$F$F' | |
# # Визуализация: переключение между редактированием и превью | |
# if st.session_state.edit_mode: | |
# # st.session_state.edited_summary = st.session_state.summary | |
# # -------------- EDITING | |
# # if edited_summary: | |
# # st.session_state.summary = edited_summary | |
# # edited_summary = st.session_state.summary | |
# # Режим редактирования | |
# edited_summary = summary_container.text_area( | |
# 'Редактировать Markdown:', | |
# value=st.session_state.summary, | |
# height=500 | |
# ) | |
# # st.session_state.summary = st.session_state.edited_summary | |
# if edited_summary != st.session_state.summary: | |
# # st.session_state.summary = edited_summary | |
# st.session_state.edited_summary = edited_summary | |
# # st.session_state.summary = 'F$F$F$F$F' | |
# else: | |
# # st.session_state.edited_summary = st.session_state.summary | |
# # -------------- PREVIEW | |
# # if edited_summary: | |
# # st.session_state.summary = edited_summary | |
# # edited_summary = edited_summary or st.session_state.summary | |
# summary_container.info(st.session_state.summary)#, unsafe_allow_html=True) | |
# def switch_mode(): | |
# # st.write(edited_summary) | |
# # st.session_state.summary = st.session_state.edited_summary | |
# # st.session_state.summary = '!!!' | |
# # st.session_state.summary = | |
# # if edited_summary: | |
# # st.session_state.summary = edited_summary | |
# # if st.session_state.summary = st.session_state.summary if | |
# # st.session_state.summary = st.session_state.summary or edited_summary | |
# st.session_state.edit_mode = not st.session_state.edit_mode | |
# # button_container = st.container() | |
# # Кнопка переключения режима | |
# with st.container(): | |
# st.button('✏️ Редактировать' if not st.session_state.edit_mode else '👁️ Просмотр', | |
# on_click=switch_mode, | |
# use_container_width=True) | |
# --- Export Options --- | |
def downloader(): | |
with st.expander('**📥 СКАЧАТЬ**', expanded=True): | |
# st.columns([3, 1, 3])[1].subheader('📥 Скачать') | |
col_export_md, col_export_docx, col_export_pdf = st.columns(3) | |
st.session_state['final_notes'] = st.session_state.edited_summary # store edited version | |
final_notes_md = st.session_state.get('final_notes', '') | |
# st.info(final_notes_md) | |
# 1. Markdown (.md) export | |
col_export_md.download_button( | |
label="📥 Markdown (.md)", | |
data=final_notes_md, | |
file_name="lecture_notes.md", | |
mime="text/markdown", | |
use_container_width=True, | |
) | |
# 2. Word (.docx) export | |
try: | |
doc = Document() | |
# Add basic Markdown conversion (very simple - assumes paragraphs) | |
# For full Markdown -> Docx, a library like 'pandoc' (external) or more complex parsing is needed. | |
paragraphs = final_notes_md.split('\n\n') # split by double newline | |
for para in paragraphs: | |
if para.strip(): # avoid empty paragraphs | |
# Basic handling for potential markdown emphasis (crude) | |
# A proper Markdown parser would be better here | |
cleaned_para = para.replace('*', '').replace('_', '').replace('#', '').strip() | |
doc.add_paragraph(cleaned_para) | |
# Save docx to a BytesIO buffer | |
buffer = BytesIO() | |
doc.save(buffer) | |
buffer.seek(0) | |
col_export_docx.download_button( | |
label='📥 Word (.docx)', | |
data=buffer, | |
file_name='lecture_notes.docx', | |
mime='application/vnd.openxmlformats-officedocument.wordprocessingml.document', | |
use_container_width=True | |
) | |
except Exception as docx_e: | |
st.error(f'Failed to generate .docx file: {docx_e}') | |
# 3. PDF (.pdf) export | |
try: | |
col_export_pdf.download_button( | |
label='📥 PDF (.pdf)', | |
data=buffer, | |
file_name="lecture_notes.pdf", | |
use_container_width=True, | |
# mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
disabled=True | |
) | |
except Exception as pdf_e: | |
st.error(f'Failed to generate .pdf file: {pdf_e}') | |
# 3. PDF Export (Requires extra libraries/setup - Placeholder) | |
# st.markdown("---") | |
# st.write("**PDF Export:**") | |
# try: | |
# from mdpdf.cli import mdpdf | |
# pdf_buffer = BytesIO() | |
# # This often requires command-line execution or careful API usage | |
# # Simplified placeholder - actual implementation may vary: | |
# # mdpdf(pdf_buffer, md=final_notes_md, ...) # Fictional direct API call | |
# st.info("PDF generation via libraries like mdpdf/WeasyPrint requires setup.") | |
# except ImportError: | |
# st.warning("`mdpdf` library not installed. PDF export unavailable.") | |
# except Exception as pdf_e: | |
# st.error(f"Failed to generate PDF (requires setup): {pdf_e}") | |
if 'summary' in st.session_state and st.session_state['summary']: | |
summary_editor() | |
downloader() | |
# except Exception as e: | |
# st.error(f'An error occurred during transcription: {e}') | |