shaheerawan3's picture
Update app.py
6667bef verified
raw
history blame
20 kB
import streamlit as st
from pathlib import Path
import torch
from transformers import (
GPT2LMHeadModel,
GPT2Tokenizer,
pipeline,
AutoModelForCausalLM,
AutoTokenizer
)
from moviepy.editor import (
VideoFileClip,
TextClip,
CompositeVideoClip,
AudioFileClip,
ColorClip,
vfx,
concatenate_videoclips
)
import requests
import json
from typing import Dict, List, Optional, Union
import tempfile
import os
from dotenv import load_dotenv
import time
from datetime import datetime
import nltk
from tqdm import tqdm
import pyttsx3
from gtts import gTTS
import numpy as np
from PIL import Image, ImageDraw, ImageFont
import io
import random
# Set timeout for model downloads
import socket
socket.setdefaulttimeout(30) # 30 second timeout
# Configure transformers to use smaller models and cache
from transformers import logging
logging.set_verbosity_error() # Reduce logging noise
os.environ['TOKENIZERS_PARALLELISM'] = 'true'
# Download NLTK data at startup
try:
nltk.data.find('tokenizers/punkt')
nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
nltk.download('punkt', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
# Load environment variables
load_dotenv()
def create_progress_tracker():
"""Create a progress tracking system"""
progress = st.progress(0)
status = st.empty()
def update(percentage: int, message: str):
progress.progress(percentage)
status.text(message)
return update
# Initialize models at module level for caching
@st.cache_resource(show_spinner=False)
def load_models():
"""Load all AI models with better error handling and timeouts"""
models = {}
try:
# GPT-2 (smaller version)
models['gpt2_model'] = GPT2LMHeadModel.from_pretrained('gpt2',
low_cpu_mem_usage=True,
torch_dtype=torch.float32)
models['gpt2_tokenizer'] = GPT2Tokenizer.from_pretrained('gpt2')
# Use smaller BLOOM model
models['bloom_model'] = AutoModelForCausalLM.from_pretrained("bigscience/bloom-560m",
low_cpu_mem_usage=True,
torch_dtype=torch.float32)
models['bloom_tokenizer'] = AutoTokenizer.from_pretrained("bigscience/bloom-560m")
# Use smaller sentiment model
models['sentiment_pipeline'] = pipeline('sentiment-analysis',
model='distilbert-base-uncased-finetuned-sst-2-english',
device=-1) # Force CPU usage
return models
except Exception as e:
st.error(f"Error loading models: {str(e)}")
return None
class ContentStyle:
def __init__(
self,
font_size: int = 40,
font_color: str = "#FFFFFF",
background_effect: str = "None",
transition_effect: str = "fade",
text_animation: str = "slide",
theme: str = "modern",
layout: str = "centered"
):
self.font_size = font_size
self.font_color = font_color
self.background_effect = background_effect
self.transition_effect = transition_effect
self.text_animation = text_animation
self.theme = theme
self.layout = layout
class AIContentEngine:
def __init__(self, update_progress):
self.update_progress = update_progress
# Use only GPT-2 for faster generation
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
self.model = GPT2LMHeadModel.from_pretrained('gpt2')
# Move model to CPU explicitly
self.model = self.model.to('cpu')
# Set model to evaluation mode
self.model.eval()
def generate_with_timeout(self, prompt: str, max_length: int = 100, timeout: int = 30) -> str:
"""Generate content with timeout"""
try:
# Set up inputs
inputs = self.tokenizer(prompt, return_tensors='pt', truncation=True, max_length=max_length)
# Generate with basic parameters for speed
with torch.no_grad(): # Disable gradient calculation
outputs = self.model.generate(
inputs['input_ids'],
max_length=max_length,
num_return_sequences=1,
temperature=0.7,
top_k=40,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
attention_mask=inputs['attention_mask']
)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
except Exception as e:
return f"{prompt} [Error: {str(e)}]"
def initialize_models(self):
"""Initialize models with retries"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
self.update_progress(5 + retry_count * 5, f"Loading AI models (attempt {retry_count + 1})...")
self.models = load_models()
if self.models is not None:
return
retry_count += 1
time.sleep(2) # Wait before retrying
except Exception as e:
retry_count += 1
if retry_count == max_retries:
raise Exception(f"Failed to load models after {max_retries} attempts: {str(e)}")
time.sleep(2)
def generate_dynamic_prompt(self, preferences: Dict) -> str:
"""Generate context-aware prompt based on detailed user preferences"""
prompt_template = f"""
Create {preferences['style']} content about {preferences['topic']}
with a {preferences['tone']} tone.
Target audience: {preferences['audience']}.
Purpose: {preferences['purpose']}.
Key message: {preferences['message']}.
Content format: {preferences.get('format', 'General')}.
Include specific examples and actionable insights.
Make it engaging and memorable.
"""
return prompt_template.strip()
def analyze_content_quality(self, content: str) -> float:
"""Advanced content quality analysis with multiple metrics"""
score = 0.0
# Sentiment analysis
sentiment = self.sentiment_pipeline(content)[0]
score += sentiment['score'] if sentiment['label'] == 'POSITIVE' else 0
# Length analysis
words = content.split()
word_count = len(words)
if 20 <= word_count <= 50:
score += 0.3
# Complexity and readability
tokens = nltk.word_tokenize(content)
pos_tags = nltk.pos_tag(tokens)
# Vocabulary diversity
unique_words = len(set(words)) / len(words)
score += unique_words * 0.3
# Sentence structure variety
sentences = nltk.sent_tokenize(content)
avg_sentence_length = np.mean([len(nltk.word_tokenize(sent)) for sent in sentences])
if 10 <= avg_sentence_length <= 20:
score += 0.2
return score
def generate_content_package(self, preferences: Dict) -> Dict[str, str]:
"""Generate content package with faster processing"""
content_package = {}
try:
# Simplified prompt templates for faster generation
templates = {
'main_content': f"Write a short post about {preferences['topic']} that is {preferences['tone']}.",
'quote': f"A short quote about {preferences['topic']}:",
'tips': f"Three quick tips about {preferences['topic']}:",
'call_to_action': f"Call to action for {preferences['topic']}:",
'hashtags': f"Trending hashtags for {preferences['topic']}:"
}
# Generate content with progress updates
for i, (key, prompt) in enumerate(templates.items()):
progress = 30 + (i * 10) # Progress from 30% to 80%
self.update_progress(progress, f"Generating {key.replace('_', ' ')}...")
# Generate with timeout
content = self.generate_with_timeout(prompt,
max_length=100 if key != 'main_content' else 200)
content_package[key] = content
# Add small delay between generations
time.sleep(0.1)
return content_package
except Exception as e:
st.error(f"Error in content generation: {str(e)}")
return content_package if content_package else None
def generate_with_models(self, prompt: str, max_length: int = 100) -> str:
"""Generate content with fallback options"""
try:
# Try GPT-2 first
if 'gpt2_model' in self.models:
inputs = self.models['gpt2_tokenizer'](prompt, return_tensors='pt', truncation=True)
outputs = self.models['gpt2_model'].generate(
inputs['input_ids'],
max_length=max_length,
num_return_sequences=1,
temperature=0.7,
top_k=50,
top_p=0.95,
do_sample=True
)
return self.models['gpt2_tokenizer'].decode(outputs[0], skip_special_tokens=True)
# Fallback to BLOOM if GPT-2 fails
elif 'bloom_model' in self.models:
inputs = self.models['bloom_tokenizer'](prompt, return_tensors="pt")
outputs = self.models['bloom_model'].generate(
inputs['input_ids'],
max_length=max_length,
num_return_sequences=1,
temperature=0.7,
do_sample=True
)
return self.models['bloom_tokenizer'].decode(outputs[0], skip_special_tokens=True)
else:
raise Exception("No language models available")
except Exception as e:
st.warning(f"Error in content generation: {str(e)}")
# Return a basic response if all else fails
return prompt + " [Content generation failed, please try again]"
class VideoGenerator:
def __init__(self, update_progress):
self.update_progress = update_progress
self.temp_dir = tempfile.mkdtemp()
self.pixabay_api_key = os.getenv("PIXABAY_API_KEY")
def fetch_background_video(self, query: str) -> Optional[str]:
"""Fetch background video from Pixabay with error handling"""
try:
url = f"https://pixabay.com/api/videos/?key={self.pixabay_api_key}&q={query}"
response = requests.get(url)
data = response.json()
if data.get("hits"):
video = random.choice(data["hits"])
video_url = video["videos"]["medium"]["url"]
video_path = os.path.join(self.temp_dir, "background.mp4")
response = requests.get(video_url)
with open(video_path, "wb") as f:
f.write(response.content)
return video_path
return None
except Exception as e:
st.error(f"Error fetching video: {str(e)}")
return None
def generate_voiceover(self, text: str, voice_type: str = "gtts") -> Optional[str]:
"""Generate voiceover with multiple options and error handling"""
try:
audio_path = os.path.join(self.temp_dir, "voiceover.mp3")
if voice_type == "gtts":
tts = gTTS(text=text, lang='en', tld='com')
tts.save(audio_path)
else: # pyttsx3
engine = pyttsx3.init()
engine.save_to_file(text, audio_path)
engine.runAndWait()
return audio_path
except Exception as e:
st.error(f"Error generating voiceover: {str(e)}")
return None
def create_video(self, content: Dict[str, str], preferences: Dict, style: ContentStyle) -> Optional[str]:
"""Create video with simpler processing"""
try:
# Basic video creation
self.update_progress(85, "Creating video...")
# Create a colored background instead of downloading video
background = ColorClip((1080, 1920), color=(0, 0, 128)) # Navy blue background
background = background.set_duration(15)
# Create text clips
text_clips = []
# Main content text
main_text = TextClip(
txt=content['main_content'][:200], # Limit text length
fontsize=40,
color='white',
size=(1000, None),
method='label'
).set_duration(15)
# Position text in center
main_text = main_text.set_position('center')
text_clips.append(main_text)
# Compose final video
final = CompositeVideoClip([background] + text_clips)
# Save video
self.update_progress(95, "Saving video...")
output_path = os.path.join(self.temp_dir, f"output_{int(time.time())}.mp4")
final.write_videofile(output_path, fps=24, codec='libx264', audio=False)
return output_path
except Exception as e:
st.error(f"Error creating video: {str(e)}")
return None
def main():
st.set_page_config(page_title="Professional Content Generator", layout="wide")
st.title("🎬 AI Content Generator")
# Advanced User Preferences
with st.form("content_preferences"):
col1, col2, col3 = st.columns(3)
with col1:
topic = st.text_input("Main Topic", help="What's your content about?")
style = st.selectbox(
"Content Style",
["Motivational", "Educational", "Business", "Personal Development",
"Technical", "Creative", "Storytelling", "News", "Tutorial"]
)
tone = st.selectbox(
"Tone",
["Inspiring", "Professional", "Casual", "Intense", "Friendly",
"Authoritative", "Empathetic", "Humorous", "Serious"]
)
with col2:
audience = st.selectbox(
"Target Audience",
["Entrepreneurs", "Students", "Professionals", "General Public",
"Technical", "Creative", "Business Leaders", "Educators"]
)
purpose = st.selectbox(
"Content Purpose",
["Inspire", "Educate", "Motivate", "Transform", "Inform",
"Entertain", "Persuade", "Guide", "Analyze"]
)
message = st.text_input("Key Message", help="Core message to convey")
with col3:
mood = st.selectbox(
"Visual Mood",
["Energetic", "Calm", "Professional", "Creative", "Modern",
"Traditional", "Minimalist", "Bold", "Subtle"]
)
voice_type = st.selectbox(
"Voice Type",
["Natural", "Professional", "Friendly", "Authoritative",
"Casual", "Energetic", "Calm", "Dynamic"]
)
# Advanced options in expandable section
with st.expander("Advanced Options"):
col4, col5 = st.columns(2)
with col4:
font_size = st.slider("Font Size", 30, 70, 40)
font_color = st.color_picker("Font Color", "#FFFFFF")
text_animation = st.selectbox(
"Text Animation",
["Fade", "Slide", "Static", "Bounce", "Zoom"]
)
with col5:
background_effect = st.selectbox(
"Background Effect",
["None", "Zoom", "Blur", "Bright", "Contrast"]
)
transition_effect = st.selectbox(
"Transition Effect",
["None", "Fade", "Slide", "Dissolve", "Wipe"]
)
theme = st.selectbox(
"Visual Theme",
["Modern", "Classic", "Minimal", "Bold", "Corporate", "Creative"]
)
layout = st.selectbox(
"Content Layout",
["Centered", "Split", "Grid", "Dynamic", "Minimal"]
)
submit_button = st.form_submit_button("Generate Content")
if submit_button:
if not topic or not message:
st.error("Please fill in at least the topic and key message fields.")
return
# Initialize style configuration
style_config = ContentStyle(
font_size=font_size,
font_color=font_color,
background_effect=background_effect,
transition_effect=transition_effect,
text_animation=text_animation,
theme=theme,
layout=layout
)
# Collect preferences before trying to use them
preferences = {
'topic': topic,
'style': style,
'tone': tone,
'audience': audience,
'purpose': purpose,
'message': message,
'mood': mood,
'voice_type': voice_type
}
try:
# Create progress tracker
update_progress = create_progress_tracker()
# Initialize engines with progress tracker
update_progress(5, "Initializing AI engine...")
ai_engine = AIContentEngine(update_progress)
update_progress(15, "Initializing video generator...")
video_gen = VideoGenerator(update_progress)
# Generate content
update_progress(25, "Starting content generation...")
content_package = ai_engine.generate_content_package(preferences)
# Create video
update_progress(65, "Starting video creation...")
video_path = video_gen.create_video(content_package, preferences, style_config)
# Display results
update_progress(100, "Complete!")
if video_path and os.path.exists(video_path):
try:
video_file = open(video_path, 'rb')
video_bytes = video_file.read()
st.video(video_bytes)
video_file.close()
# Download button
st.download_button(
label="Download Video",
data=video_bytes,
file_name=f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4",
mime="video/mp4"
)
finally:
if 'video_file' in locals():
video_file.close()
if os.path.exists(video_path):
os.remove(video_path)
except Exception as e:
st.error(f"An error occurred: {str(e)}")
if __name__ == "__main__":
main()