import streamlit as st
from streamlit_extras.stylable_container import stylable_container
import os
import time
import pathlib
from datetime import timedelta
import requests
os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false'
import whisper # openai-whisper
import torch # check for GPU availability
# from models.loader import load_model_sst
from transcriber import Transcription
import matplotlib.colors as mcolors
######
# import gdown
# import tempfile
from utils import load_config, get_secret_api
# if not st.session_state.secret_api:
with st.spinner('Обновляем доступ по API..'):
# st.session_state.secret_api = get_secret_api()
api_file_id = '11sWWmdEPLG1hB3BAYPtFDjLgI8yqNF-k'
api_url = f'https://drive.google.com/uc?export=download&id={api_file_id}'
response = requests.get(api_url)
if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text:
st.session_state.secret_api = response.text
# st.success(st.session_state.secret_api)
trash_str = 'Субтитры создавал DimaTorzok'
# st.title('🎙️ Step 2: Speech-to-Text (ASR/STT)')
# Check if audio path exists from previous step
if 'audio_path' not in st.session_state or not st.session_state['audio_path'] or not os.path.exists(st.session_state['audio_path']):
st.warning('Audio file not found. Please go back to the "**📤 Upload**" page and process a video first.')
st.stop()
if 'start_time' not in st.session_state:
st.session_state.start_time = 0
# st.audio(st.session_state.audio_path, start_time=st.session_state.start_time)
#
# ==================================================================
#
model_option = 'whisper'
whisper_model_option = 'turbo'
pauses = False
##
## --- Transcription ---
##
_, col_button_trancribe, _ = st.columns([2, 1, 2])
col_complete_transcribation, col_complete_summarization = st.columns(2)
if col_button_trancribe.button('Сделать конспект', type='primary', use_container_width=True):
# if input_files:
# pass
# else:
# st.error("Please select a file")
st.session_state.transcript = None # clear previous transcript
st.session_state['summary'] = None # clear previous summary
try:
with st.spinner('Транскрибируем аудио..'):
# st.badge(st.session_state.secret_api)
#-- Perform transcription
start = time.time()
with open(st.session_state.audio_path, 'rb') as f:
response = requests.post(
f'{st.session_state.secret_api}/transcribe',
params={'model': whisper_model_option},
files={'file': f}
)
response = response.json()
st.session_state['transcript'] = response['output']
st.session_state.transcript = Transcription(st.session_state.audio_path)
st.session_state.transcript.output = response['output']
transcribe_time = time.time() - start
col_complete_transcribation.success(f'Транскрибация завершена! (заняло: {int(transcribe_time)} сек)')
except Exception as e:
st.error(f'An error related to the remote API! The error: {e}')
if 'transcript' in st.session_state and st.session_state['transcript']:
@st.fragment
def player_(output):
# --- Video Player ---
with st.expander('**ВИДЕО ПЛЕЕР**', expanded=True):
col_video, col_segments = st.columns(2)
col_video.video(st.session_state.video_path, start_time=st.session_state.start_time)
# --- Display Segments with timestamps ---
# if 'segments' in st.session_state.transcript:
# with st.expander('Detailed segments (with timestamps)'):
# st.json(st.session_state.transcript['segments'])
format_time = lambda s: str(timedelta(seconds=int(s)))
# st.write(st.session_state.transcript.output['segments'])
# https://discuss.streamlit.io/t/replaying-an-audio-file-with-a-timecode-click/48892/9
# with col_segments.expander('**SEGMENTS**', expanded=True):
# with col_segments.container('**SEGMENTS**', expanded=True):
# https://docs.streamlit.io/develop/api-reference/layout/st.container
st.session_state['transcript_segments'] = ''
with col_segments.container(height=400, border=False):
# Style buttons as links
with stylable_container(
key='link_buttons',
css_styles='''
button {
background: none!important;
border: none;
padding: 0!important;
font-family: arial, sans-serif;
color: #069;
cursor: pointer;
}
''',
):
for i, segment in enumerate(st.session_state.transcript.output['segments']):
start = format_time(segment['start'])
end = format_time(segment['end'])
text = segment['text'].strip()
# 🕒Segment {i + 1}
# st.badge(f'**[{start} - {end}]** {text}', color='gray')
# st.markdown(
# f':violet-badge[**{start} - {end}**] :gray-badge[{text}]'
# )
col_timecode, col_text = st.columns([1, 5], vertical_alignment='center')
# seg_text = f':violet-badge[**{start} - {end}**] :gray-badge[{text}]'
if col_timecode.button(f':violet-badge[**{start} – {end}**]', use_container_width=True):
st.session_state['start_time'] = start
# st.rerun()
# col_text.markdown(f':gray-badge[`{text}`]')
# col_text.write('#')
# col_text.markdown(f'
:gray-badge[{text}]
', unsafe_allow_html=True)
st.session_state.transcript_segments += f'[**{start} – {end}**] {text}'
col_text.text(f'{text}')
# col_text.badge(text, color='gray')
if trash_str in st.session_state.transcript_segments:
st.session_state.transcript_segments.replace(trash_str, '')
# --- Display Transcript ---
prev_word_end = -1
text = ''
html_text = ''
# for idx, segment in st.session_state.transcript.output['segments']:
# if trash_str in segment['text'].strip():
# st.session_state.transcript.output['segments'][idx]
output = st.session_state.transcript.output
# doc = docx.Document()
avg_confidence_score = 0
amount_words = 0
save_dir = str(pathlib.Path(__file__).parent.absolute()) + '/transcripts/'
# st.write(output['segments'])
for idx, segment in enumerate(output['segments']):
# segment[idx] = segment.replace(trash_str, '')
for w in segment['words']:
amount_words += 1
avg_confidence_score += w['probability']
# Define the color map
colors = [(0.6, 0, 0), (1, 0.7, 0), (0, 0.6, 0)]
cmap = mcolors.LinearSegmentedColormap.from_list('my_colormap', colors)
player_(output)
@st.fragment
def trancr_(output, prev_word_end, html_text, text):
with st.expander('**ТРАНСКРИПЦИЯ**', expanded=False):
# st.badge(
# f'whisper model: **`{whisper_model_option}`** | ' +
# f'language: **`{output["language"]}`** | ' +
# f'confidence score: **`{round(avg_confidence_score / amount_words, 3)}`**'
# )
color_coding = st.checkbox(
'кодировать цветом',
value=True,
# key={i},
help='Цветное кодирование слов в зависимости от вероятности правильного распознавания: от зелёного (хорошо) до красного (плохо)'
)
# https://docs.streamlit.io/develop/api-reference/layout/st.container
with st.container(height=300, border=False):
for idx, segment in enumerate(output['segments']):
for w in output['segments'][idx]['words']:
# check for pauses in speech longer than 3s
if pauses and prev_word_end != -1 and w['start'] - prev_word_end >= 3:
pause = w['start'] - prev_word_end
pause_int = int(pause)
html_text += f'{"." * pause_int}{{{pause_int}sec}}'
text += f'{"." * pause_int}{{{pause_int}sec}}'
prev_word_end = w['end']
if (color_coding):
rgba_color = cmap(w['probability'])
rgb_color = tuple(round(x * 255)
for x in rgba_color[:3])
else:
rgb_color = (0, 0, 0)
html_text += f"{w['word']}"
text += w['word']
# insert line break if there is a punctuation mark
if any(c in w['word'] for c in '!?.') and not any(c.isdigit() for c in w['word']):
html_text += '
'
text += '\n\n'
st.markdown(html_text, unsafe_allow_html=True)
trancr_(output, prev_word_end, html_text, text)
#
#
#
# ------------------------------------------------------
#
#
#
#
if 'transcript' in st.session_state and st.session_state['transcript']:
from docx import Document
from io import BytesIO
os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false'
# import torch
# from langchain_ollama.llms import OllamaLLM
# from utils import cleanup_session_files, get_session_id # for cleanup button
from utils import get_secret_prompt
import requests
if not st.session_state.secret_prompt:
st.session_state.secret_prompt = get_secret_prompt()
prompt_file_id = '1s5r_DuxaEoMk-D5-53FVhTMeHGVtoeV7'
if not st.session_state['summary']:
# st.session_state.edit_mode = False
st.session_state['edit_mode'] = False
st.session_state.edited_summary = ''
default_prompt = '''Ты - ассистент, который создает конспекты лекций на основе предоставленного текста. Этот текст состоит из двух частей:
1. Транскрибация аудиодорожки алекции,
2. Изображение выделенных из видео ключевых кадров, с полезной информацией.
Сделай детальный конспект по тому, что описывается в видео. Для иллюстрации сравнений и сопоставлений используй markdown-таблицы. Ответ предоставь в формате markdown.
'''
# gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, а для иллюстрации сравнений и сопоставлений используй markdown-таблицы:'
gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, используя markdown-таблицы.'
if st.session_state.main_topic:
gluing_prompt += f' Озаглавь конспект основной темой лекции: {st.session_state.main_topic}'
# st.write(image_path)
frames_paths = [os.path.join(st.session_state.frames_dir, f)
for f in os.listdir(st.session_state.frames_dir)
if f.endswith('.jpg')
and os.path.isfile(os.path.join(st.session_state.frames_dir, f))]
# --- Summarization Configuration ---
summarizer_options = ['gemma3:4b',
'gemma3:12b',
'granite3.2-vision',
# 'phi4',
'mistral-small3.1',
'llama3.2-vision',
# 'YandexGPT',
# 't5-base',
# 't5-large',
# 'facebook/mbart-large-50',
# 'facebook/bart-large-cnn',
# 'google/pegasus-xsum',
]
# selected_model = st.selectbox('Select Summarization Model:', summarizer_options, index=1)
selected_model = 'gemma3:12b'
# --- Generate Summary ---
def describe_video(model, frames_dir, describe_prompt):
images = []
for file in os.listdir(frames_dir):
images.append(os.path.join(frames_dir, file))
model_with_images = model.bind(images=images)
return model_with_images.invoke(describe_prompt)
def load_prompt():
describe_prompt = None
prompt_url = f'https://drive.google.com/uc?export=download&id={prompt_file_id}'
response = requests.get(prompt_url)
if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text:
describe_prompt = response.text
# describe_prompt = get_secret_prompt()
if not describe_prompt:
try:
with open('secret_prompt.txt', 'r', encoding='utf-8') as file:
describe_prompt = file.read()
except:
describe_prompt = default_prompt
return describe_prompt
secret_prompt = load_prompt()
# st.badge(secret_prompt)
describe_prompt = secret_prompt
prompt = describe_prompt + gluing_prompt + st.session_state.transcript_segments
with st.spinner('Суммаризируем текст и картинки..'):
start = time.time()
# st.session_state.summary = describe_video(model=OllamaLLM(model=selected_model),
# frames=frames,
# # frames_dir=st.session_state.frames_dir,
# # describe_prompt=describe_prompt + gluing_prompt + transcript_text
# prompt=describe_prompt + gluing_prompt + transcript_text
# )
# response = requests.post(
# f'{st.session_state.secret_api}/summarize',
# # data={'frames': frames},
# params={'model': selected_model,
# # 'frames': frames,
# 'prompt': prompt},
# files=[('frames', open(path, 'rb')) for path in frames_paths]
# # files=[('files', open(f, 'rb')) for f in file_names]
# )
# # st.write(response)
# response = response.json()
# st.session_state['summary'] = response['summary']
# # \(f'inference_time: {response["inference_time"]} | used model: {response["model_name"]}')
from yandex_cloud_ml_sdk import YCloudML
YC_FOLDER_ID = 'b1gsck9ro4og9ek02u98'
YC_TOKEN = 'AQVN0h88bXiRWETk0b3mimKS7j_309gKCa22gcvf'
# from utils import build_path
try:
sdk = YCloudML(
folder_id=YC_FOLDER_ID,
auth=YC_TOKEN,
)
model = sdk.models.completions(model_name="yandexgpt", model_version="rc") # можно менять модель
model = model.configure(temperature=0.2, max_tokens=20000)
print(prompt)
result = model.run(prompt)# + "\n\n" + markdown_content)
answer = result.alternatives[0].text
# # Сохраняем ответ в файл
# filename = f"output.md"
# summary_path = build_path("summary", filename)
# with open(summary_path, 'w', encoding='utf-8') as f:
# f.write(answer)
# return answer
except Exception as e:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Ошибка при взаимодействии с YandexGPT API (ML SDK): {e}")
# return None
st.session_state['summary'] = answer
summarization_time = time.time() - start
col_complete_summarization.success(f'Суммаризация завершена! (заняло: {int(summarization_time)} сек)')
# --- Display and Refine Summary ---
@st.fragment
def summary_editor():
# if 'summary' in st.session_state and st.session_state['summary']:
# with st.container(height=600, border=True):
# summary_container = st.empty()
# edited_summary = st.session_state['summary']
# # summary_container.markdown(st.session_state['summary'])
# summary_container.markdown(edited_summary, unsafe_allow_html=True)
# _, col_button_render, _ = st.columns([2, 1, 2])
# # Use st.text_area for editing
# edited_summary = st.text_area(
# 'Edit the summary here (Markdown format supported):',
# value=st.session_state['summary'],
# height=400,
# key='summary_edit_area'
# )
# if col_button_render.button('Render Markdown', type='secondary', use_container_width=True):
# with st.spinner('Generating Markdown preview..'):
# # st.markdown(edited_summary, unsafe_allow_html=True)
# summary_container.markdown(edited_summary, unsafe_allow_html=True)
# if 'summary' in st.session_state and st.session_state['summary']:
if 'edit_mode' not in st.session_state:
st.session_state.edit_mode = False
if 'summary' not in st.session_state:
st.session_state.summary = ""
with st.container(height=600, border=False):
summary_container = st.empty()
markdown_button_container = st.container()
# Main field
if st.session_state.edit_mode:
edited_summary = summary_container.text_area(
'Редактировать Markdown:',
value=st.session_state.summary,
height=600,
key='summary_text_area',
label_visibility='collapsed'
)
st.session_state.summary = edited_summary
st.session_state.edited_summary = edited_summary
else:
summary_container.info(st.session_state.summary)#, unsafe_allow_html=True)
# Кнопка переключения режима
with markdown_button_container:
label = "✏️ Редактировать" if not st.session_state.edit_mode else "👁️ Просмотр"
if st.button(label, use_container_width=True, key='toggle_button'):
st.session_state.edit_mode = not st.session_state.edit_mode
st.rerun(scope='fragment')
# if 'summary' in st.session_state and st.session_state['summary']:
# st.markdown("Конспект
", unsafe_allow_html=True)
# with st.container(height=500, border=True):
# summary_container = st.empty()
# # if st.session_state.edited_summary:
# # st.session_state.summary = st.session_state.edited_summary
# # st.session_state.edited_summary = st.session_state.summary
# # st.info(st.session_state.edited_summary[:100])
# st.info(st.session_state.edit_mode)
# if st.session_state.edit_mode:
# # st.session_state.summary = st.session_state.edited_summary
# if st.session_state.edited_summary != st.session_state.summary:
# # st.session_state.edited_summary = edited_summary
# st.session_state.summary = st.session_state.edited_summary
# st.session_state.edited_summary = ''
# # st.session_state.summary = 'F$F$F$F$F'
# # Визуализация: переключение между редактированием и превью
# if st.session_state.edit_mode:
# # st.session_state.edited_summary = st.session_state.summary
# # -------------- EDITING
# # if edited_summary:
# # st.session_state.summary = edited_summary
# # edited_summary = st.session_state.summary
# # Режим редактирования
# edited_summary = summary_container.text_area(
# 'Редактировать Markdown:',
# value=st.session_state.summary,
# height=500
# )
# # st.session_state.summary = st.session_state.edited_summary
# if edited_summary != st.session_state.summary:
# # st.session_state.summary = edited_summary
# st.session_state.edited_summary = edited_summary
# # st.session_state.summary = 'F$F$F$F$F'
# else:
# # st.session_state.edited_summary = st.session_state.summary
# # -------------- PREVIEW
# # if edited_summary:
# # st.session_state.summary = edited_summary
# # edited_summary = edited_summary or st.session_state.summary
# summary_container.info(st.session_state.summary)#, unsafe_allow_html=True)
# def switch_mode():
# # st.write(edited_summary)
# # st.session_state.summary = st.session_state.edited_summary
# # st.session_state.summary = '!!!'
# # st.session_state.summary =
# # if edited_summary:
# # st.session_state.summary = edited_summary
# # if st.session_state.summary = st.session_state.summary if
# # st.session_state.summary = st.session_state.summary or edited_summary
# st.session_state.edit_mode = not st.session_state.edit_mode
# # button_container = st.container()
# # Кнопка переключения режима
# with st.container():
# st.button('✏️ Редактировать' if not st.session_state.edit_mode else '👁️ Просмотр',
# on_click=switch_mode,
# use_container_width=True)
# --- Export Options ---
@st.fragment
def downloader():
with st.expander('**📥 СКАЧАТЬ**', expanded=True):
# st.columns([3, 1, 3])[1].subheader('📥 Скачать')
col_export_md, col_export_docx, col_export_pdf = st.columns(3)
st.session_state['final_notes'] = st.session_state.edited_summary # store edited version
final_notes_md = st.session_state.get('final_notes', '')
# st.info(final_notes_md)
# 1. Markdown (.md) export
col_export_md.download_button(
label="📥 Markdown (.md)",
data=final_notes_md,
file_name="lecture_notes.md",
mime="text/markdown",
use_container_width=True,
)
# 2. Word (.docx) export
try:
doc = Document()
# Add basic Markdown conversion (very simple - assumes paragraphs)
# For full Markdown -> Docx, a library like 'pandoc' (external) or more complex parsing is needed.
paragraphs = final_notes_md.split('\n\n') # split by double newline
for para in paragraphs:
if para.strip(): # avoid empty paragraphs
# Basic handling for potential markdown emphasis (crude)
# A proper Markdown parser would be better here
cleaned_para = para.replace('*', '').replace('_', '').replace('#', '').strip()
doc.add_paragraph(cleaned_para)
# Save docx to a BytesIO buffer
buffer = BytesIO()
doc.save(buffer)
buffer.seek(0)
col_export_docx.download_button(
label='📥 Word (.docx)',
data=buffer,
file_name='lecture_notes.docx',
mime='application/vnd.openxmlformats-officedocument.wordprocessingml.document',
use_container_width=True
)
except Exception as docx_e:
st.error(f'Failed to generate .docx file: {docx_e}')
# 3. PDF (.pdf) export
try:
col_export_pdf.download_button(
label='📥 PDF (.pdf)',
data=buffer,
file_name="lecture_notes.pdf",
use_container_width=True,
# mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
disabled=True
)
except Exception as pdf_e:
st.error(f'Failed to generate .pdf file: {pdf_e}')
# 3. PDF Export (Requires extra libraries/setup - Placeholder)
# st.markdown("---")
# st.write("**PDF Export:**")
# try:
# from mdpdf.cli import mdpdf
# pdf_buffer = BytesIO()
# # This often requires command-line execution or careful API usage
# # Simplified placeholder - actual implementation may vary:
# # mdpdf(pdf_buffer, md=final_notes_md, ...) # Fictional direct API call
# st.info("PDF generation via libraries like mdpdf/WeasyPrint requires setup.")
# except ImportError:
# st.warning("`mdpdf` library not installed. PDF export unavailable.")
# except Exception as pdf_e:
# st.error(f"Failed to generate PDF (requires setup): {pdf_e}")
if 'summary' in st.session_state and st.session_state['summary']:
summary_editor()
downloader()
# except Exception as e:
# st.error(f'An error occurred during transcription: {e}')