import streamlit as st import os from transformers import pipeline import time from docx import Document from io import BytesIO os.environ['STREAMLIT_SERVER_ENABLE_FILE_WATCHER'] = 'false' import torch from langchain_ollama.llms import OllamaLLM # from utils import cleanup_session_files, get_session_id # for cleanup button from utils import get_secret_api, get_secret_prompt st.session_state.secret_api = get_secret_api() import requests # st.session_state.secret_prompt = get_secret_prompt() prompt_file_id = '1s5r_DuxaEoMk-D5-53FVhTMeHGVtoeV7' default_prompt = '''Ты - ассистент, который создает конспекты лекций на основе предоставленного текста. Этот текст состоит из двух частей: 1. Транскрибация аудиодорожки видеолекции, 2. Изображение выделенных из видео ключевых кадров, с полезной информацией. Сделай детальный конспект по тому, что описывается в видео. Для иллюстрации сравнений и сопоставлений используй markdown-таблицы. Ответ предоставь в формате markdown. ''' # gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, а для иллюстрации сравнений и сопоставлений используй markdown-таблицы:' gluing_prompt = 'Вот упомянутый транскрибированный текст с таймкодами, суммаризируй его вместе с изображениями, используя markdown-таблицы.' if st.session_state.main_topic: gluing_prompt += f' Основная тема лекции: {st.session_state.main_topic}' # st.write(image_path) frames_paths = [os.path.join(st.session_state.frames_dir, f) for f in os.listdir(st.session_state.frames_dir) if f.endswith('.jpg') and os.path.isfile(os.path.join(st.session_state.frames_dir, f))] # import base64 # # Load and encode JPEG images to base64 # frames = [] # # st.success(os.listdir(st.session_state.frames_dir)) # # st.success([os.path.isfile(f) for f in os.listdir(st.session_state.frames_dir)])# if f.endswith('.jpg') and os.path.isfile(f)]) # for image_path in frames_paths: # # st.write(image_path) # with open(os.path.join(st.session_state.frames_dir, image_path), 'rb') as image_file: # # Read the image and encode it to base64 # encoded_string = base64.b64encode(image_file.read()).decode('utf-8') # frames.append(encoded_string) # # st.success(frames) st.title('📝 Step 4: Lecture Summarization') # Check if transcript and potentially OCR text are available transcript_available = 'transcript' in st.session_state and st.session_state['transcript'] frames_available = 'frames_dir' in st.session_state and st.session_state['frames_dir'] if not transcript_available and not frames_available: st.warning("No text content (Transcript or OCR) found. Please complete previous steps first.") st.stop() # st.info("This step combines the generated transcript and OCR text (if available) and creates a summary.") # --- Combine Sources --- st.subheader('Sources') # combined_text = "" source_info = [] col_source_transcript, col_source_frames = st.columns(2) if transcript_available: col_source_transcript.success('✅ Transcript found') # st.success(len(st.session_state.transcript.__dict__['output'])) # st.success(st.session_state.transcript.__dict__['output'][0]['text']) # combined_text += '--- Transcript ---\n' + st.session_state.transcript['output'][0]['text'] + '\n\n' # st.success(st.session_state.transcript.output[0]['text']) transcript_text = st.session_state.transcript.output['text'] transcript_segments = st.session_state.transcript_segments # combined_text += '--- Transcript ---\n\n' + transcript_text + '\n\n' # st.write(combined_text) source_info.append('Transcript') with col_source_transcript.expander('Show transcript'): st.text_area('Transcript', transcript_text, height=200, key='sum_transcript_disp') else: col_source_transcript.warning('Transcript not available.') if frames_available: col_source_frames.success('✅ Extracted frames found') # combined_text += "--- OCR results ---\n" + st.session_state['frames_dir'] source_info.append('Frames dir') # with st.expander('Extracted frames directory'): # st.text_area('Extracted frames directory', st.session_state['frames_dir'], height=200, key="sum_ocr_disp") # st.text_area('Extracted frames directory', st.session_state['frames_dir'], height=200, key="sum_ocr_disp") with col_source_frames.expander('Show frames'): st.text_input('Extracted frames directory', st.session_state['frames_dir']) else: # st.warning('OCR Text not available.') col_source_frames.warning('Extracted frames not available.') # combined_text = combined_text.strip() # if not combined_text: # st.error("Combined text is empty. Cannot proceed.") if not transcript_text: st.error('Transcript text is empty. Cannot proceed.') st.stop() # --- Summarization Configuration --- st.subheader('Summarization Settings') # Consider different models/pipelines summarizer_options = ['gemma3:4b', 'gemma3:12b', 'granite3.2-vision', # 'phi4', 'mistral-small3.1', 'llama3.2-vision', # 'YandexGPT', # 't5-base', # 't5-large', # 'facebook/mbart-large-50', # 'facebook/bart-large-cnn', # 'google/pegasus-xsum', ] selected_model = st.selectbox('Select Summarization Model:', summarizer_options, index=1) # # Dynamic length based on input size (example logic) # # input_length = len(combined_text.split()) # input_length = len(transcript_text.split()) # approx word count # default_min = max(50, input_length // 10) # suggest min length ~10% of input # default_max = max(150, input_length // 3) # suggest max length ~30% of input # min_length = st.slider("Minimum Summary Length (tokens):", min_value=30, max_value=max(500, default_max + 100), value=default_min) # max_length = st.slider("Maximum Summary Length (tokens):", min_value=50, max_value=max(1000, default_max + 200), value=default_max) # if min_length >= max_length: # st.warning("Minimum length should be less than maximum length.") # # Adjust max_length automatically or prevent proceeding # max_length = min_length + 50 # simple adjustment # --- Generate Summary --- def describe_video(model, frames_dir, describe_prompt): images = [] for file in os.listdir(frames_dir): images.append(os.path.join(frames_dir, file)) model_with_images = model.bind(images=images) return model_with_images.invoke(describe_prompt) def load_prompt(): describe_prompt = None prompt_url = f'https://drive.google.com/uc?export=download&id={prompt_file_id}' response = requests.get(prompt_url) if response.status_code == 200 and 'Google Drive - Quota exceeded' not in response.text: describe_prompt = response.text if not describe_prompt: try: with open('ideal_prompt.txt', 'r', encoding='utf-8') as file: describe_prompt = file.read() except: describe_prompt = default_prompt return describe_prompt secret_prompt = load_prompt() # secret_prompt = with st.expander('**Prompt**', expanded=True): # col_1, col_2 = st.columns(2) describe_prompt = st.text_area(label='Промпт', height=300, value=secret_prompt) _, col_button_summary, _ = st.columns([2, 1, 2]) if col_button_summary.button('Generate Summary', type='primary', use_container_width=True): st.session_state['summary'] = None # clear previous summary st.session_state['edit_mode'] = False with st.spinner(f'Performing summarization with `{selected_model}` model..'): # st.session_state.summary = describe_video(model=OllamaLLM(model=selected_model), # frames=frames, # # frames_dir=st.session_state.frames_dir, # # describe_prompt=describe_prompt + gluing_prompt + transcript_text # prompt=describe_prompt + gluing_prompt + transcript_text # ) # [st.write(path, 'rb') for path in frames_paths] response = requests.post( f'{st.session_state.secret_api}/summarize', # data={'frames': frames}, params={'model': selected_model, # 'frames': frames, 'prompt': describe_prompt + gluing_prompt + transcript_segments}, # 'prompt': ''}, files=[('frames', open(path, 'rb')) for path in frames_paths] # files=[('files', open(f, 'rb')) for f in file_names] ) st.write(response) response = response.json() st.badge(f'inference_time: {response["inference_time"]} | used model: {response["model_name"]}') # st.write(response['form']) st.session_state['summary'] = response['summary'] # if combined_text: # with st.spinner(f"Summarizing text using {selected_model}.. Может занять некоторое время (до x2)"): # try: # start_time = time.time() # # Load the pipeline - specify device if possible # device = 0 if torch.cuda.is_available() else -1 # device=0 for first GPU, -1 for CPU # summarizer = pipeline("summarization", model=selected_model, device=device) # # Handle potential long input (simplistic chunking if needed, better models handle longer inputs) # # Basic check: Transformers often have input limits (e.g., 1024 tokens for BART). # # A more robust solution involves chunking, summarizing chunks, and combining summaries. # # For this example, we'll try summarizing directly, but add a warning. # max_model_input_length = getattr(summarizer.model.config, 'max_position_embeddings', 1024) # get model's max length # if len(summarizer.tokenizer.encode(combined_text)) > max_model_input_length: # st.warning(f'Input text might be too long for {selected_model} (max ~{max_model_input_length} tokens).' + # f'Consider using models designed for longer text or implementing chunking.') # # Simple Truncation (Not Ideal): # # truncated_text = summarizer.tokenizer.decode(summarizer.tokenizer.encode(combined_text, max_length=max_model_input_length, truncation=True)) # # summary_result = summarizer(truncated_text, max_length=max_length, min_length=min_length, do_sample=False) # # Attempt summarization (may error if too long and not handled) # summary_result = summarizer(combined_text, max_length=max_length, min_length=min_length, do_sample=False) # st.session_state['summary'] = summary_result[0]['summary_text'] # end_time = time.time() # st.success(f"Summary generated in {end_time - start_time:.2f} seconds.") # except Exception as e: # st.error(f"Error during summarization: {e}") # st.error("This could be due to model loading issues, insufficient memory, or input text length.") # if 'summarizer' in locals(): # del summarizer # try to free memory # if device == 0: torch.cuda.empty_cache() # else: # st.error("No text available to summarize.") # # --- Display and Refine Summary --- # # st.subheader('Summary') if 'summary' in st.session_state and st.session_state['summary']: # with st.container(height=600, border=True): # summary_container = st.empty() # edited_summary = st.session_state['summary'] # # summary_container.markdown(st.session_state['summary']) # summary_container.markdown(edited_summary, unsafe_allow_html=True) # _, col_button_render, _ = st.columns([2, 1, 2]) # # Use st.text_area for editing # edited_summary = st.text_area( # 'Edit the summary here (Markdown format supported):', # value=st.session_state['summary'], # height=400, # key='summary_edit_area' # ) # if col_button_render.button('Render Markdown', type='secondary', use_container_width=True): # with st.spinner('Generating Markdown preview..'): # # st.markdown(edited_summary, unsafe_allow_html=True) # summary_container.markdown(edited_summary, unsafe_allow_html=True) # # st.session_state['summary'] = edited_summary # update summary # # else: # # st.markdown('', unsafe_allow_html=True) # Инициализация состояния if 'edit_mode' not in st.session_state: st.session_state.edit_mode = False with st.container(height=500, border=True): summary_container = st.empty() edited_summary = st.session_state.summary # Визуализация: переключение между редактированием и превью if st.session_state.edit_mode: # Режим редактирования edited_summary = summary_container.text_area( 'Редактировать Markdown:', value=st.session_state.summary, height=500 ) st.session_state.summary = edited_summary else: # Режим превью summary_container.markdown(st.session_state.summary, unsafe_allow_html=True) def switch_mode(): st.session_state.edit_mode = not st.session_state.edit_mode # Кнопка переключения режима st.button('✏️ Редактировать' if not st.session_state.edit_mode else '👁️ Просмотр', on_click=switch_mode, use_container_width=True) # --- Export Options --- st.subheader('📥 Export Notes (Download)') col_export_md, col_export_docx, col_export_pdf = st.columns(3) st.session_state['final_notes'] = edited_summary # store edited version # st.session_state['final_notes'] = summary_container # store edited version final_notes_md = st.session_state.get('final_notes', '') # st.info(final_notes_md) # 1. Markdown (.md) export col_export_md.download_button( label="📥 Markdown (.md)", data=final_notes_md, file_name="lecture_notes.md", mime="text/markdown", use_container_width=True, ) # 2. Word (.docx) export try: doc = Document() doc.add_heading('Lecture Notes Summary', 0) # Add basic Markdown conversion (very simple - assumes paragraphs) # For full Markdown -> Docx, a library like 'pandoc' (external) or more complex parsing is needed. paragraphs = final_notes_md.split('\n\n') # split by double newline for para in paragraphs: if para.strip(): # avoid empty paragraphs # Basic handling for potential markdown emphasis (crude) # A proper Markdown parser would be better here cleaned_para = para.replace('*', '').replace('_', '').replace('#', '').strip() doc.add_paragraph(cleaned_para) # Save docx to a BytesIO buffer buffer = BytesIO() doc.save(buffer) buffer.seek(0) col_export_docx.download_button( label='📥 Word (.docx)', data=buffer, file_name='lecture_notes.docx', mime='application/vnd.openxmlformats-officedocument.wordprocessingml.document', use_container_width=True ) except Exception as docx_e: st.error(f'Failed to generate .docx file: {docx_e}') # 3. PDF (.pdf) export try: col_export_pdf.download_button( label='📥 PDF (.pdf)', data=buffer, file_name="lecture_notes.pdf", use_container_width=True, # mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" disabled=True ) except Exception as pdf_e: st.error(f'Failed to generate .pdf file: {pdf_e}') # 3. PDF Export (Requires extra libraries/setup - Placeholder) # st.markdown("---") # st.write("**PDF Export:**") # try: # from mdpdf.cli import mdpdf # pdf_buffer = BytesIO() # # This often requires command-line execution or careful API usage # # Simplified placeholder - actual implementation may vary: # # mdpdf(pdf_buffer, md=final_notes_md, ...) # Fictional direct API call # st.info("PDF generation via libraries like mdpdf/WeasyPrint requires setup.") # except ImportError: # st.warning("`mdpdf` library not installed. PDF export unavailable.") # except Exception as pdf_e: # st.error(f"Failed to generate PDF (requires setup): {pdf_e}") else: st.info('Summary has not been generated or is empty.') # --- Optional: Cleanup Button --- # st.sidebar.markdown("---") # if st.sidebar.button("End Session & Clean Up Files"): # session_id = get_session_id() # cleanup_session_files(session_id) # # Clear relevant session state keys # keys_to_clear = ['video_path', 'audio_path', 'frames_dir', 'transcript', 'summary', 'final_notes', 'extracted_frames', 'session_id'] # for key in keys_to_clear: # if key in st.session_state: # del st.session_state[key] # st.success("Temporary files cleaned and session data cleared.") # st.info("You can now start a new session from the 'Main' page.") # # Consider navigating back to Main page or just showing message