import os import streamlit as st import json import sys import time from pathlib import Path import tempfile import io from pdf2image import convert_from_bytes from PIL import Image, ImageEnhance, ImageFilter, UnidentifiedImageError import PIL import cv2 import numpy as np # Import the StructuredOCR class and config from the local files from structured_ocr import StructuredOCR from config import MISTRAL_API_KEY # Import UI layout if available try: from ui.layout import tool_container UI_LAYOUT_AVAILABLE = True except ImportError: UI_LAYOUT_AVAILABLE = False # Set page configuration st.set_page_config( page_title="Historical OCR", page_icon="📜", layout="wide", initial_sidebar_state="expanded" ) # Enable caching for expensive operations @st.cache_data(ttl=3600, show_spinner=False) def convert_pdf_to_images(pdf_bytes, dpi=150): """Convert PDF bytes to a list of images with caching""" try: return convert_from_bytes(pdf_bytes, dpi=dpi) except Exception as e: st.error(f"Error converting PDF: {str(e)}") return [] def safe_open_image(image_bytes): """Safe wrapper for PIL.Image.open with robust error handling""" try: return Image.open(io.BytesIO(image_bytes)) except Exception: # Return None if image can't be opened return None @st.cache_data(ttl=3600, show_spinner=False) def preprocess_image(image_bytes, preprocessing_options): """Preprocess image with selected options""" try: # Attempt to open the image safely image = safe_open_image(image_bytes) # If image could not be opened, return the original bytes if image is None: return image_bytes # Ensure image is in RGB mode for OpenCV processing if image.mode not in ['RGB', 'RGBA']: image = image.convert('RGB') elif image.mode == 'RGBA': # Handle RGBA images by removing transparency background = Image.new('RGB', image.size, (255, 255, 255)) background.paste(image, mask=image.split()[3]) # 3 is the alpha channel image = background # Handle image rotation based on user selection rotation_option = preprocessing_options.get("rotation", "None") if rotation_option != "None": if rotation_option == "Rotate 90° clockwise": image = image.transpose(Image.ROTATE_270) elif rotation_option == "Rotate 90° counterclockwise": image = image.transpose(Image.ROTATE_90) elif rotation_option == "Rotate 180°": image = image.transpose(Image.ROTATE_180) elif rotation_option == "Auto-detect": # Auto-detect orientation width, height = image.size # If image is in landscape and likely a document (typically portrait is better for OCR) if width > height and (width / height) > 1.5: image = image.transpose(Image.ROTATE_90) # Convert to numpy array for OpenCV processing try: img_array = np.array(image) except Exception: # Return the original image as JPEG if we can't convert to array byte_io = io.BytesIO() image.save(byte_io, format='JPEG') byte_io.seek(0) return byte_io.getvalue() # Apply preprocessing based on selected options try: if preprocessing_options.get("grayscale", False): img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) if preprocessing_options.get("contrast", 0) != 0: contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 10) image = Image.fromarray(img_array) enhancer = ImageEnhance.Contrast(image) image = enhancer.enhance(contrast_factor) img_array = np.array(image) if preprocessing_options.get("denoise", False): # Ensure the image is in the correct format for denoising (CV_8UC3) if len(img_array.shape) != 3 or img_array.shape[2] != 3: # Convert to RGB if it's not already a 3-channel color image if len(img_array.shape) == 2: # Grayscale img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 10, 10, 7, 21) if preprocessing_options.get("threshold", False): # Convert to grayscale if not already if len(img_array.shape) == 3: gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) else: gray = img_array # Apply adaptive threshold binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) # Convert back to RGB img_array = cv2.cvtColor(binary, cv2.COLOR_GRAY2RGB) except Exception: # Return the original image if preprocessing fails byte_io = io.BytesIO() image.save(byte_io, format='JPEG') byte_io.seek(0) return byte_io.getvalue() # Convert back to PIL Image try: processed_image = Image.fromarray(img_array) # Convert to bytes byte_io = io.BytesIO() processed_image.save(byte_io, format='JPEG') # Use JPEG for better compatibility byte_io.seek(0) return byte_io.getvalue() except Exception: # Final fallback - return original bytes return image_bytes except Exception: # Return original image bytes as fallback return image_bytes # Define functions def process_file(uploaded_file, use_vision=True, preprocessing_options=None): """Process the uploaded file and return the OCR results Args: uploaded_file: The uploaded file to process use_vision: Whether to use vision model preprocessing_options: Dictionary of preprocessing options """ if preprocessing_options is None: preprocessing_options = {} # Show progress indicator progress_bar = st.progress(0) status_text = st.empty() status_text.text("Preparing file for processing...") # Save the uploaded file to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp: tmp.write(uploaded_file.getvalue()) temp_path = tmp.name try: # Check if API key is available if not MISTRAL_API_KEY: # Return dummy data if no API key progress_bar.progress(100) status_text.empty() # Show a clear message about the missing API key st.error("🔑 **Missing API Key**: Cannot process document without a valid Mistral AI API key.") st.info(""" **How to add your API key:** For Hugging Face Spaces: 1. Go to your Space settings 2. Add a secret named `MISTRAL_API_KEY` with your API key value For local development: 1. Add to your shell: `export MISTRAL_API_KEY=your_key_here` 2. Or create a `.env` file with `MISTRAL_API_KEY=your_key_here` """) return { "file_name": uploaded_file.name, "topics": ["API Key Required"], "languages": ["English"], "ocr_contents": { "title": "Missing Mistral API Key", "content": "To process real documents, please set the MISTRAL_API_KEY environment variable as described above." } } # Update progress progress_bar.progress(20) status_text.text("Initializing OCR processor...") # Initialize OCR processor with explicit API key try: # Make sure the API key is properly formatted api_key = MISTRAL_API_KEY.strip() processor = StructuredOCR(api_key=api_key) except Exception as e: st.error(f"Error initializing OCR processor: {str(e)}") return { "file_name": uploaded_file.name, "error": "API authentication failed", "ocr_contents": { "error": "Could not authenticate with Mistral API. Please check your API key." } } # Determine file type from extension file_ext = Path(uploaded_file.name).suffix.lower() file_type = "pdf" if file_ext == ".pdf" else "image" # Store original filename in session state for preservation st.session_state.original_filename = uploaded_file.name # Apply preprocessing if needed if any(preprocessing_options.values()) and file_type == "image": status_text.text("Applying image preprocessing...") try: processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) # Save processed image to temp file but preserve original filename for results original_ext = Path(uploaded_file.name).suffix.lower() # Use original extension when possible for better format recognition if original_ext in ['.jpg', '.jpeg', '.png']: suffix = original_ext else: suffix = '.jpg' # Default fallback to JPEG with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as proc_tmp: proc_tmp.write(processed_bytes) temp_path = proc_tmp.name except Exception as e: st.warning(f"Image preprocessing failed: {str(e)}. Proceeding with original image.") # If preprocessing fails, use original file # This ensures the OCR process continues even if preprocessing has issues # Get file size in MB file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) # Check if file exceeds size limits (10 MB for API processing) # This is a lower limit than the UI file size to ensure API requests don't fail if file_size_mb > 10: st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size for API processing is 10MB.") return { "file_name": uploaded_file.name, "topics": ["Document"], "languages": ["English"], "confidence_score": 0.0, "error": f"File size {file_size_mb:.2f} MB exceeds API limit of 10 MB", "ocr_contents": { "error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds API limit of 10 MB", "partial_text": "Document could not be processed due to API limitations. Try reducing the file size or resolution." } } # Update progress progress_bar.progress(40) status_text.text("Processing document with OCR...") # Process the file with file size information for automatic page limiting # Make sure we're using the latest mistral-ocr model # See https://docs.mistral.ai/capabilities/document/ for more info result = processor.process_file(temp_path, file_type=file_type, use_vision=use_vision, file_size_mb=file_size_mb) # Complete progress progress_bar.progress(100) status_text.empty() # Preserve original filename in results if hasattr(st.session_state, 'original_filename'): result['file_name'] = st.session_state.original_filename # Clear the stored filename for next run del st.session_state.original_filename return result except Exception as e: progress_bar.progress(100) status_text.empty() st.error(f"Error during processing: {str(e)}") raise finally: # Clean up the temporary file if os.path.exists(temp_path): os.unlink(temp_path) # Initialize session state for storing results if 'previous_results' not in st.session_state: st.session_state.previous_results = [] if 'current_result' not in st.session_state: st.session_state.current_result = None # App title and description st.title("Historical Document OCR") st.write("Process historical documents and images with AI-powered OCR.") # Check if API key is available if not MISTRAL_API_KEY: st.warning("⚠️ **No Mistral API key found.** Please set the MISTRAL_API_KEY environment variable.") st.info("For Hugging Face Spaces, add it as a secret. For local development, export it in your shell or add it to a .env file.") # Create main layout with tabs main_tab1, main_tab2, main_tab3 = st.tabs(["Document Processing", "Previous Results", "About"]) # Sidebar with options with st.sidebar: st.header("Options") # Model options st.subheader("Model Settings") use_vision = st.checkbox("Use Vision Model", value=True, help="For image files, use the vision model for improved analysis") # Image preprocessing options st.subheader("Image Preprocessing") with st.expander("Preprocessing Options"): preprocessing_options = {} preprocessing_options["grayscale"] = st.checkbox("Convert to Grayscale", help="Convert image to grayscale before OCR") preprocessing_options["threshold"] = st.checkbox("Apply Thresholding", help="Apply adaptive thresholding to enhance text") preprocessing_options["denoise"] = st.checkbox("Denoise Image", help="Remove noise from the image") preprocessing_options["contrast"] = st.slider("Adjust Contrast", -5, 5, 0, help="Adjust image contrast (-5 to +5)") # Add rotation options rotation_options = ["None", "Rotate 90° clockwise", "Rotate 90° counterclockwise", "Rotate 180°", "Auto-detect"] preprocessing_options["rotation"] = st.selectbox("Image Orientation", rotation_options, index=0, help="Rotate image to correct orientation") # PDF options st.subheader("PDF Options") with st.expander("PDF Settings"): pdf_dpi = st.slider("PDF Resolution (DPI)", 72, 300, 150, help="Higher DPI gives better quality but slower processing") max_pages = st.number_input("Maximum Pages", 1, 20, 5, help="Limit number of pages to process") # Previous Results tab with main_tab2: if not st.session_state.previous_results: st.info("No previous documents have been processed yet. Process a document to see results here.") else: st.subheader("Previously Processed Documents") # Display previous results in a selectable list previous_files = [f"{i+1}. {result.get('file_name', 'Document')}" for i, result in enumerate(st.session_state.previous_results)] selected_index = st.selectbox("Select a previous document:", options=range(len(previous_files)), format_func=lambda i: previous_files[i]) selected_result = st.session_state.previous_results[selected_index] # Display selected result in tabs has_images = selected_result.get('has_images', False) if has_images: prev_tabs = st.tabs(["Document Info", "Content", "With Images"]) else: prev_tabs = st.tabs(["Document Info", "Content"]) # Document Info tab with prev_tabs[0]: st.write(f"**File:** {selected_result.get('file_name', 'Document')}") # Remove confidence score from display # Show languages if available if 'languages' in selected_result and selected_result['languages']: languages = [lang for lang in selected_result['languages'] if lang is not None] if languages: st.write(f"**Languages:** {', '.join(languages)}") # Show topics if available if 'topics' in selected_result and selected_result['topics']: st.write(f"**Topics:** {', '.join(selected_result['topics'])}") # Show any limited pages info if 'limited_pages' in selected_result: st.info(f"Processed {selected_result['limited_pages']['processed']} of {selected_result['limited_pages']['total']} pages") # Content tab with prev_tabs[1]: if 'ocr_contents' in selected_result: st.markdown("## Document Contents") if isinstance(selected_result['ocr_contents'], dict): for section, content in selected_result['ocr_contents'].items(): if not content: continue section_title = section.replace('_', ' ').title() # Special handling for title and subtitle if section.lower() == 'title': st.markdown(f"# {content}") elif section.lower() == 'subtitle': st.markdown(f"*{content}*") else: st.markdown(f"### {section_title}") # Handle different content types if isinstance(content, str): st.markdown(content) elif isinstance(content, list): for item in content: if isinstance(item, str): st.markdown(f"* {item}") else: st.json(item) elif isinstance(content, dict): for k, v in content.items(): st.markdown(f"**{k}:** {v}") else: st.warning("No content available for this document.") # Images tab if available if has_images and len(prev_tabs) > 2: with prev_tabs[2]: try: # Import function from ocr_utils import create_html_with_images if 'pages_data' in selected_result: # Generate HTML with images html_with_images = create_html_with_images(selected_result) # Display HTML content st.components.v1.html(html_with_images, height=600, scrolling=True) # Download button with unique key to prevent resets st.download_button( label="Download with Images (HTML)", data=html_with_images, file_name=f"{selected_result.get('file_name', 'document')}_with_images.html", mime="text/html", key=f"prev_download_{hash(selected_result.get('file_name', 'doc'))}_{selected_index}" ) else: st.warning("No image data available for this document.") except Exception as e: st.error(f"Could not display document with images: {str(e)}") # About tab content with main_tab3: st.markdown(""" ### About This Application This app uses Mistral AI's Document OCR to extract text and images from historical documents with enhanced formatting. It can process: - Image files (jpg, png, etc.) - PDF documents (multi-page support) The extracted content is processed into structured data based on the document type, combining: - Text extraction with `mistral-ocr-latest` - Analysis with language models - Layout preservation with images - Enhanced typography for historical documents View results in three formats: - **Structured View**: Beautifully formatted HTML with proper document structure - **Raw JSON**: Complete data structure for developers - **With Images**: Document with embedded images preserving original layout **History Feature:** - All processed documents are saved in the session history - Access previous documents in the "Previous Results" tab - No need to reprocess the same document multiple times """) # Main tab content with main_tab1: # Create a more compact layout using custom CSS st.markdown('
{p.strip()}
\n' elif isinstance(content, list): # Properly format lists with better handling for dict items html_content += '