Spaces:
Paused
Paused
import spaces | |
import gradio as gr | |
import numpy as np | |
import os | |
import torch | |
import random | |
import subprocess | |
subprocess.run( | |
"pip install flash-attn --no-build-isolation", | |
env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, | |
shell=True, | |
) | |
from accelerate import infer_auto_device_map, load_checkpoint_and_dispatch, init_empty_weights | |
from PIL import Image | |
import uuid | |
from data.data_utils import add_special_tokens, pil_img2rgb | |
from data.transforms import ImageTransform | |
from inferencer import InterleaveInferencer | |
from modeling.autoencoder import load_ae | |
from modeling.bagel.qwen2_navit import NaiveCache | |
from modeling.bagel import ( | |
BagelConfig, Bagel, Qwen2Config, Qwen2ForCausalLM, | |
SiglipVisionConfig, SiglipVisionModel | |
) | |
from modeling.qwen2 import Qwen2Tokenizer | |
from huggingface_hub import snapshot_download | |
save_dir = "./model" | |
repo_id = "ByteDance-Seed/BAGEL-7B-MoT" | |
cache_dir = save_dir + "/cache" | |
if not os.path.exists(os.path.join(save_dir, "ema.safetensors")): | |
print(f"Downloading model from {repo_id} to {save_dir}") | |
snapshot_download(cache_dir=cache_dir, | |
local_dir=save_dir, | |
repo_id=repo_id, | |
local_dir_use_symlinks=False, | |
resume_download=True, | |
allow_patterns=["*.json", "*.safetensors", "*.bin", "*.py", "*.md", "*.txt"], | |
) | |
else: | |
print(f"Model found at {save_dir}") | |
model_path = "./model" | |
llm_config = Qwen2Config.from_json_file(os.path.join(model_path, "llm_config.json")) | |
llm_config.qk_norm = True | |
llm_config.tie_word_embeddings = False | |
llm_config.layer_module = "Qwen2MoTDecoderLayer" | |
vit_config = SiglipVisionConfig.from_json_file(os.path.join(model_path, "vit_config.json")) | |
vit_config.rope = False | |
vit_config.num_hidden_layers -= 1 | |
vae_model, vae_config = load_ae(local_path=os.path.join(model_path, "ae.safetensors")) | |
config = BagelConfig( | |
visual_gen=True, | |
visual_und=True, | |
llm_config=llm_config, | |
vit_config=vit_config, | |
vae_config=vae_config, | |
vit_max_num_patch_per_side=70, | |
connector_act='gelu_pytorch_tanh', | |
latent_patch_size=2, | |
max_latent_size=64, | |
) | |
with init_empty_weights(): | |
language_model = Qwen2ForCausalLM(llm_config) | |
vit_model = SiglipVisionModel(vit_config) | |
model = Bagel(language_model, vit_model, config) | |
model.vit_model.vision_model.embeddings.convert_conv2d_to_linear(vit_config, meta=True) | |
tokenizer = Qwen2Tokenizer.from_pretrained(model_path) | |
tokenizer, new_token_ids, _ = add_special_tokens(tokenizer) | |
vae_transform = ImageTransform(1024, 512, 16) | |
vit_transform = ImageTransform(980, 224, 14) | |
device_map = infer_auto_device_map( | |
model, | |
max_memory={i: "80GiB" for i in range(torch.cuda.device_count())}, | |
no_split_module_classes=["Bagel", "Qwen2MoTDecoderLayer"], | |
) | |
same_device_modules = [ | |
'language_model.model.embed_tokens', | |
'time_embedder', | |
'latent_pos_embed', | |
'vae2llm', | |
'llm2vae', | |
'connector', | |
'vit_pos_embed' | |
] | |
if torch.cuda.device_count() == 1: | |
first_device = device_map.get(same_device_modules[0], "cuda:0") | |
for k in same_device_modules: | |
device_map[k] = first_device | |
else: | |
# Ensure all same_device_modules are on the same device if they exist in device_map | |
# Find the device for the first module in the list that is actually in the device_map | |
first_assigned_device = None | |
for k_module in same_device_modules: | |
if k_module in device_map: | |
first_assigned_device = device_map[k_module] | |
break | |
if first_assigned_device is not None: | |
for k_module in same_device_modules: | |
if k_module in device_map: # Only assign if the module is part of the device_map | |
device_map[k_module] = first_assigned_device | |
model = load_checkpoint_and_dispatch( | |
model, | |
checkpoint=os.path.join(model_path, "ema.safetensors"), | |
device_map=device_map, | |
offload_buffers=True, | |
dtype=torch.bfloat16, | |
force_hooks=True, | |
).eval() | |
inferencer = InterleaveInferencer( | |
model=model, | |
vae_model=vae_model, | |
tokenizer=tokenizer, | |
vae_transform=vae_transform, | |
vit_transform=vit_transform, | |
new_token_ids=new_token_ids, | |
) | |
def set_seed(seed): | |
if seed is not None and seed > 0: | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
return seed | |
# --- Backend Functions (Adapted from original app.py) --- | |
def call_text_to_image(prompt, show_thinking, cfg_text_scale, cfg_interval, | |
timestep_shift, num_timesteps, cfg_renorm_min, cfg_renorm_type, | |
max_think_token_n, do_sample, text_temperature, seed, image_ratio): | |
set_seed(seed) | |
image_shapes = (1024, 1024) | |
if image_ratio == "4:3": image_shapes = (768, 1024) | |
elif image_ratio == "3:4": image_shapes = (1024, 768) | |
elif image_ratio == "16:9": image_shapes = (576, 1024) | |
elif image_ratio == "9:16": image_shapes = (1024, 576) | |
inference_hyper = dict( | |
max_think_token_n=max_think_token_n if show_thinking else 1024, | |
do_sample=do_sample if show_thinking else False, | |
text_temperature=text_temperature if show_thinking else 0.3, | |
cfg_text_scale=cfg_text_scale, | |
cfg_interval=[cfg_interval, 1.0], | |
timestep_shift=timestep_shift, | |
num_timesteps=num_timesteps, | |
cfg_renorm_min=cfg_renorm_min, | |
cfg_renorm_type=cfg_renorm_type, | |
image_shapes=image_shapes, | |
) | |
result = inferencer(text=prompt, think=show_thinking, **inference_hyper) | |
return result.get("image", None), result.get("text", None) # text is thinking | |
def call_image_understanding(image, prompt, show_thinking, do_sample, text_temperature, max_new_tokens, seed): | |
set_seed(seed) | |
if image is None: return "Please upload an image.", None | |
if isinstance(image, np.ndarray): image = Image.fromarray(image) | |
image = pil_img2rgb(image) | |
inference_hyper = dict( | |
do_sample=do_sample, | |
text_temperature=text_temperature, | |
max_think_token_n=max_new_tokens, | |
) | |
result = inferencer(image=image, text=prompt, think=show_thinking, understanding_output=True, **inference_hyper) | |
return result.get("text", None), None # Main output is text, thinking is part of it if show_thinking=True | |
def call_edit_image(image, prompt, show_thinking, cfg_text_scale, cfg_img_scale, cfg_interval, | |
timestep_shift, num_timesteps, cfg_renorm_min, cfg_renorm_type, | |
max_think_token_n, do_sample, text_temperature, seed): | |
set_seed(seed) | |
if image is None: return "Please upload an image.", None, None | |
if isinstance(image, np.ndarray): image = Image.fromarray(image) | |
image = pil_img2rgb(image) | |
inference_hyper = dict( | |
max_think_token_n=max_think_token_n if show_thinking else 1024, | |
do_sample=do_sample if show_thinking else False, | |
text_temperature=text_temperature if show_thinking else 0.3, | |
cfg_text_scale=cfg_text_scale, | |
cfg_img_scale=cfg_img_scale, | |
cfg_interval=[cfg_interval, 1.0], | |
timestep_shift=timestep_shift, | |
num_timesteps=num_timesteps, | |
cfg_renorm_min=cfg_renorm_min, | |
cfg_renorm_type=cfg_renorm_type, | |
) | |
result = inferencer(image=image, text=prompt, think=show_thinking, **inference_hyper) | |
return result.get("image", None), result.get("text", None) # text is thinking | |
# --- Gradio UI --- | |
DEFAULT_WELCOME_MESSAGE = { | |
"role": "assistant", | |
"content": [ | |
{"type": "text", "content": "Hello! I am BAGEL, your multimodal assistant. How can I help you today? Select a mode and enter your prompt."} | |
], | |
"key": "welcome" | |
} | |
class GradioApp: | |
def __init__(self): | |
self.current_conversation_id = None | |
self.conversation_contexts = {} | |
self.conversations_list = [] # For the sidebar | |
def _get_current_history(self): | |
if self.current_conversation_id and self.current_conversation_id in self.conversation_contexts: | |
return self.conversation_contexts[self.current_conversation_id]["history"] | |
return [] | |
def _get_current_settings(self): | |
if self.current_conversation_id and self.current_conversation_id in self.conversation_contexts: | |
return self.conversation_contexts[self.current_conversation_id].get("settings", {}) | |
return {} | |
def _update_conversation_list_ui(self): | |
return gr.update(choices=[(c['label'], c['key']) for c in self.conversations_list], value=self.current_conversation_id) | |
def add_message(self, text_input, image_input, mode, | |
# TTI params | |
tti_show_thinking, tti_cfg_text_scale, tti_cfg_interval, tti_timestep_shift, tti_num_timesteps, tti_cfg_renorm_min, tti_cfg_renorm_type, tti_max_think_token_n, tti_do_sample, tti_text_temperature, tti_seed, tti_image_ratio, | |
# Edit params | |
edit_show_thinking, edit_cfg_text_scale, edit_cfg_img_scale, edit_cfg_interval, edit_timestep_shift, edit_num_timesteps, edit_cfg_renorm_min, edit_cfg_renorm_type, edit_max_think_token_n, edit_do_sample, edit_text_temperature, edit_seed, | |
# Understand params | |
und_show_thinking, und_do_sample, und_text_temperature, und_max_new_tokens, und_seed | |
): | |
if not text_input and not (mode in ["Image Edit", "Image Understanding"] and image_input): | |
gr.Warning("Please enter a prompt or upload an image for Edit/Understanding modes.") | |
# Need to yield original state for all outputs if we return early | |
# This part is tricky with dynamic outputs, might need a dummy update for all | |
# For simplicity, let's assume user always provides some input | |
# A better way is to disable submit button if input is invalid | |
return self._get_current_history(), gr.update(value=None), gr.update(value=None) # chatbot, text_input, image_input | |
if not self.current_conversation_id: | |
self.new_chat_session(text_input[:30] if text_input else "New Chat") # Create a new chat if none exists | |
history = self._get_current_history() | |
# Store settings for this turn | |
# This is simplified; best-gradio-ui.py stores settings per conversation | |
current_turn_settings = { | |
"mode": mode, | |
"image_input_path": image_input.name if image_input else None, # Store path if image is uploaded | |
# TTI | |
"tti_show_thinking": tti_show_thinking, "tti_cfg_text_scale": tti_cfg_text_scale, "tti_cfg_interval": tti_cfg_interval, "tti_timestep_shift": tti_timestep_shift, "tti_num_timesteps": tti_num_timesteps, "tti_cfg_renorm_min": tti_cfg_renorm_min, "tti_cfg_renorm_type": tti_cfg_renorm_type, "tti_max_think_token_n": tti_max_think_token_n, "tti_do_sample": tti_do_sample, "tti_text_temperature": tti_text_temperature, "tti_seed": tti_seed, "tti_image_ratio": tti_image_ratio, | |
# Edit | |
"edit_show_thinking": edit_show_thinking, "edit_cfg_text_scale": edit_cfg_text_scale, "edit_cfg_img_scale": edit_cfg_img_scale, "edit_cfg_interval": edit_cfg_interval, "edit_timestep_shift": edit_timestep_shift, "edit_num_timesteps": edit_num_timesteps, "edit_cfg_renorm_min": edit_cfg_renorm_min, "edit_cfg_renorm_type": edit_cfg_renorm_type, "edit_max_think_token_n": edit_max_think_token_n, "edit_do_sample": edit_do_sample, "edit_text_temperature": edit_text_temperature, "edit_seed": edit_seed, | |
# Understand | |
"und_show_thinking": und_show_thinking, "und_do_sample": und_do_sample, "und_text_temperature": und_text_temperature, "und_max_new_tokens": und_max_new_tokens, "und_seed": und_seed | |
} | |
self.conversation_contexts[self.current_conversation_id]["settings"] = current_turn_settings | |
user_message_content = [] | |
if text_input: | |
user_message_content.append({"type": "text", "content": text_input}) | |
if image_input and mode in ["Image Edit", "Image Understanding"]: | |
# Gradio chatbot can display images directly if they are file paths or PIL Images | |
# For simplicity, let's assume image_input is a PIL Image or path that gr.Image can handle | |
user_message_content.append({"type": "image", "content": image_input}) | |
if not user_message_content: | |
user_message_content.append({"type": "text", "content": "(No text prompt provided for image operation)"}) | |
history.append({"role": "user", "content": user_message_content, "key": str(uuid.uuid4())}) | |
history.append({"role": "assistant", "content": [{"type": "text", "content": "Processing..."}], "key": str(uuid.uuid4()), "loading": True}) | |
yield history, gr.update(value=None), gr.update(value=None) # chatbot, text_input, image_input (clear inputs) | |
# Call backend | |
try: | |
output_image = None | |
output_text = None | |
thinking_text = None | |
pil_image_input = Image.open(image_input.name) if image_input else None | |
if mode == "Text to Image": | |
output_image, thinking_text = call_text_to_image(text_input, tti_show_thinking, tti_cfg_text_scale, tti_cfg_interval, tti_timestep_shift, tti_num_timesteps, tti_cfg_renorm_min, tti_cfg_renorm_type, tti_max_think_token_n, tti_do_sample, tti_text_temperature, tti_seed, tti_image_ratio) | |
elif mode == "Image Edit": | |
if not pil_image_input: | |
output_text = "Error: Image required for Image Edit mode." | |
else: | |
output_image, thinking_text = call_edit_image(pil_image_input, text_input, edit_show_thinking, edit_cfg_text_scale, edit_cfg_img_scale, edit_cfg_interval, edit_timestep_shift, edit_num_timesteps, edit_cfg_renorm_min, edit_cfg_renorm_type, edit_max_think_token_n, edit_do_sample, edit_text_temperature, edit_seed) | |
elif mode == "Image Understanding": | |
if not pil_image_input: | |
output_text = "Error: Image required for Image Understanding mode." | |
else: | |
output_text, _ = call_image_understanding(pil_image_input, text_input, und_show_thinking, und_do_sample, und_text_temperature, und_max_new_tokens, und_seed) | |
# For VLM, the main output is text, thinking might be part of it or not separately returned | |
# depending on `inferencer`'s behavior with `understanding_output=True` | |
if und_show_thinking and output_text and "Thinking:" in output_text: # crude check | |
parts = output_text.split("Thinking:", 1) | |
if len(parts) > 1: | |
thinking_text = "Thinking:" + parts[1].split("\nAnswer:")[0] if "\nAnswer:" in parts[1] else parts[1] | |
output_text = parts[0].strip() + ("\nAnswer:" + output_text.split("\nAnswer:")[1] if "\nAnswer:" in output_text else "") | |
else: | |
thinking_text = None # Or handle as part of main output_text | |
bot_response_content = [] | |
if thinking_text: | |
bot_response_content.append({"type": "text", "content": f"**Thinking Process:**\n{thinking_text}"}) | |
if output_text: | |
bot_response_content.append({"type": "text", "content": output_text}) | |
if output_image: | |
bot_response_content.append({"type": "image", "content": output_image}) | |
if not bot_response_content: | |
bot_response_content.append({"type": "text", "content": "(No output generated)"}) | |
history[-1]["content"] = bot_response_content | |
history[-1]["loading"] = False | |
except Exception as e: | |
print(f"Error during processing: {e}") | |
history[-1]["content"] = [{"type": "text", "content": f"Error: {str(e)}"}] | |
history[-1]["loading"] = False | |
raise gr.Error(f"Processing Error: {str(e)}") | |
yield history, gr.update(value=None), gr.update(value=None) | |
def new_chat_session(self, label="New Chat"): | |
session_id = str(uuid.uuid4()) | |
self.current_conversation_id = session_id | |
self.conversation_contexts[session_id] = { | |
"history": [DEFAULT_WELCOME_MESSAGE.copy()], | |
"settings": {} # Initialize with default settings if any | |
} | |
# Ensure label is unique if needed, or just use the provided one | |
# For simplicity, we allow duplicate labels for now. | |
new_conv_entry = {"label": label if label else f"Chat {len(self.conversations_list) + 1}", "key": session_id} | |
self.conversations_list.insert(0, new_conv_entry) # Add to top | |
return self._get_current_history(), self._update_conversation_list_ui() | |
def change_chat_session(self, session_id): | |
if session_id and session_id in self.conversation_contexts: | |
self.current_conversation_id = session_id | |
# Potentially update hyperparameter UI elements based on loaded session_settings | |
# For now, just load history | |
return self._get_current_history() | |
return self._get_current_history() # No change or invalid ID | |
def clear_history(self): | |
if self.current_conversation_id: | |
self.conversation_contexts[self.current_conversation_id]["history"] = [DEFAULT_WELCOME_MESSAGE.copy()] | |
# Also clear current inputs if desired | |
return self._get_current_history(), gr.update(value=None), gr.update(value=None) | |
return [], gr.update(value=None), gr.update(value=None) | |
def build_ui(self): | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
<div> | |
<img src="https://lf3-static.bytednsdoc.com/obj/eden-cn/nuhojubrps/banner.png" alt="BAGEL" width="380"/> | |
<h1>Unified BAGEL Chat Interface</h1> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Conversations") | |
conversation_selector = gr.Radio( | |
label="Select Chat", | |
choices=[], | |
type="value" | |
) | |
new_chat_btn = gr.Button("➕ New Chat") | |
gr.Markdown("### Operation Mode") | |
mode_selector = gr.Radio( | |
label="Select Mode", | |
choices=["Text to Image", "Image Edit", "Image Understanding"], | |
value="Text to Image", | |
interactive=True | |
) | |
# --- Hyperparameter Accordions --- | |
# Visibility will be controlled by mode_selector | |
with gr.Accordion("Text to Image Settings", open=True, visible=True) as tti_accordion: | |
tti_show_thinking_cb = gr.Checkbox(label="Show Thinking Process", value=False, interactive=True) | |
tti_seed_slider = gr.Slider(minimum=0, maximum=1000000, value=0, step=1, label="Seed (0 for random)", interactive=True) | |
tti_image_ratio_dd = gr.Dropdown(choices=["1:1", "4:3", "3:4", "16:9", "9:16"], value="1:1", label="Image Ratio", interactive=True) | |
tti_cfg_text_scale_slider = gr.Slider(minimum=1.0, maximum=8.0, value=4.0, step=0.1, label="CFG Text Scale", interactive=True) | |
tti_cfg_interval_slider = gr.Slider(minimum=0.0, maximum=1.0, value=0.4, step=0.1, label="CFG Interval Start", interactive=True) | |
tti_cfg_renorm_type_dd = gr.Dropdown(choices=["global", "local", "text_channel"], value="global", label="CFG Renorm Type", interactive=True) | |
tti_cfg_renorm_min_slider = gr.Slider(minimum=0.0, maximum=1.0, value=0.0, step=0.1, label="CFG Renorm Min", interactive=True) | |
tti_num_timesteps_slider = gr.Slider(minimum=10, maximum=100, value=50, step=5, label="Timesteps", interactive=True) | |
tti_timestep_shift_slider = gr.Slider(minimum=1.0, maximum=5.0, value=3.0, step=0.5, label="Timestep Shift", interactive=True) | |
with gr.Group(visible=False) as tti_thinking_params_group: | |
tti_do_sample_cb = gr.Checkbox(label="Sampling (for thinking)", value=False, interactive=True) | |
tti_max_think_token_slider = gr.Slider(minimum=64, maximum=4096, value=1024, step=64, label="Max Think Tokens", interactive=True) | |
tti_text_temp_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.3, step=0.1, label="Temperature (for thinking)", interactive=True) | |
tti_show_thinking_cb.change(lambda x: gr.update(visible=x), inputs=[tti_show_thinking_cb], outputs=[tti_thinking_params_group]) | |
with gr.Accordion("Image Edit Settings", open=False, visible=False) as edit_accordion: | |
edit_show_thinking_cb = gr.Checkbox(label="Show Thinking Process", value=False, interactive=True) | |
edit_seed_slider = gr.Slider(minimum=0, maximum=1000000, value=0, step=1, label="Seed (0 for random)", interactive=True) | |
edit_cfg_text_scale_slider = gr.Slider(1.0, 8.0, value=4.0, step=0.1, label="CFG Text Scale", interactive=True) | |
edit_cfg_img_scale_slider = gr.Slider(1.0, 4.0, value=2.0, step=0.1, label="CFG Image Scale", interactive=True) | |
edit_cfg_interval_slider = gr.Slider(0.0, 1.0, value=0.0, step=0.1, label="CFG Interval Start", interactive=True) | |
edit_cfg_renorm_type_dd = gr.Dropdown(["global", "local", "text_channel"], value="text_channel", label="CFG Renorm Type", interactive=True) | |
edit_cfg_renorm_min_slider = gr.Slider(0.0, 1.0, value=0.0, step=0.1, label="CFG Renorm Min", interactive=True) | |
edit_num_timesteps_slider = gr.Slider(10, 100, value=50, step=5, label="Timesteps", interactive=True) | |
edit_timestep_shift_slider = gr.Slider(1.0, 10.0, value=3.0, step=0.5, label="Timestep Shift", interactive=True) | |
with gr.Group(visible=False) as edit_thinking_params_group: | |
edit_do_sample_cb = gr.Checkbox(label="Sampling (for thinking)", value=False, interactive=True) | |
edit_max_think_token_slider = gr.Slider(64, 4096, value=1024, step=64, label="Max Think Tokens", interactive=True) | |
edit_text_temp_slider = gr.Slider(0.1, 1.0, value=0.3, step=0.1, label="Temperature (for thinking)", interactive=True) | |
edit_show_thinking_cb.change(lambda x: gr.update(visible=x), inputs=[edit_show_thinking_cb], outputs=[edit_thinking_params_group]) | |
with gr.Accordion("Image Understanding Settings", open=False, visible=False) as und_accordion: | |
und_show_thinking_cb = gr.Checkbox(label="Show Thinking Process (if applicable)", value=False, interactive=True) | |
und_seed_slider = gr.Slider(minimum=0, maximum=1000000, value=0, step=1, label="Seed (0 for random)", interactive=True) | |
und_do_sample_cb = gr.Checkbox(label="Sampling", value=False, interactive=True) | |
und_text_temp_slider = gr.Slider(0.1, 1.0, value=0.7, step=0.1, label="Temperature", interactive=True) | |
und_max_new_tokens_slider = gr.Slider(32, 2048, value=512, step=32, label="Max New Tokens", interactive=True) | |
# Logic to show/hide accordions based on mode | |
def update_accordion_visibility(mode): | |
return ( | |
gr.update(visible=mode == "Text to Image"), | |
gr.update(visible=mode == "Image Edit"), | |
gr.update(visible=mode == "Image Understanding") | |
) | |
mode_selector.change(update_accordion_visibility, inputs=[mode_selector], outputs=[tti_accordion, edit_accordion, und_accordion]) | |
with gr.Column(scale=3): | |
chatbot_ui = gr.Chatbot(label="BAGEL Chat", value=[DEFAULT_WELCOME_MESSAGE.copy()], bubble_full_width=False, height=600) | |
with gr.Row(): | |
image_upload_ui = gr.Image(type="pil", label="Upload Image (for Edit/Understand)", sources=['upload'], visible=False, interactive=True) | |
with gr.Row(): | |
text_input_ui = gr.Textbox(label="Enter your prompt here...", lines=3, scale=7, interactive=True) | |
submit_btn = gr.Button("Send", variant="primary", scale=1) | |
clear_btn = gr.Button("Clear Chat", scale=1) | |
# Show/hide image upload based on mode | |
def update_image_upload_visibility(mode): | |
return gr.update(visible=mode in ["Image Edit", "Image Understanding"]) | |
mode_selector.change(update_image_upload_visibility, inputs=[mode_selector], outputs=[image_upload_ui]) | |
# Initial state setup | |
demo.load(lambda: self.new_chat_session("Welcome Chat"), outputs=[chatbot_ui, conversation_selector]) | |
# Event handlers | |
new_chat_btn.click( | |
self.new_chat_session, | |
inputs=None, | |
outputs=[chatbot_ui, conversation_selector] | |
) | |
conversation_selector.change( | |
self.change_chat_session, | |
inputs=[conversation_selector], | |
outputs=[chatbot_ui] | |
) | |
submit_btn.click( | |
self.add_message, | |
inputs=[ | |
text_input_ui, image_upload_ui, mode_selector, | |
# TTI | |
tti_show_thinking_cb, tti_cfg_text_scale_slider, tti_cfg_interval_slider, tti_timestep_shift_slider, tti_num_timesteps_slider, tti_cfg_renorm_min_slider, tti_cfg_renorm_type_dd, tti_max_think_token_slider, tti_do_sample_cb, tti_text_temp_slider, tti_seed_slider, tti_image_ratio_dd, | |
# Edit | |
edit_show_thinking_cb, edit_cfg_text_scale_slider, edit_cfg_img_scale_slider, edit_cfg_interval_slider, edit_timestep_shift_slider, edit_num_timesteps_slider, edit_cfg_renorm_min_slider, edit_cfg_renorm_type_dd, edit_max_think_token_slider, edit_do_sample_cb, edit_text_temp_slider, edit_seed_slider, | |
# Understand | |
und_show_thinking_cb, und_do_sample_cb, und_text_temp_slider, und_max_new_tokens_slider, und_seed_slider | |
], | |
outputs=[chatbot_ui, text_input_ui, image_upload_ui] | |
) | |
text_input_ui.submit( | |
self.add_message, | |
inputs=[ | |
text_input_ui, image_upload_ui, mode_selector, | |
# TTI | |
tti_show_thinking_cb, tti_cfg_text_scale_slider, tti_cfg_interval_slider, tti_timestep_shift_slider, tti_num_timesteps_slider, tti_cfg_renorm_min_slider, tti_cfg_renorm_type_dd, tti_max_think_token_slider, tti_do_sample_cb, tti_text_temp_slider, tti_seed_slider, tti_image_ratio_dd, | |
# Edit | |
edit_show_thinking_cb, edit_cfg_text_scale_slider, edit_cfg_img_scale_slider, edit_cfg_interval_slider, edit_timestep_shift_slider, edit_num_timesteps_slider, edit_cfg_renorm_min_slider, edit_cfg_renorm_type_dd, edit_max_think_token_slider, edit_do_sample_cb, edit_text_temp_slider, edit_seed_slider, | |
# Understand | |
und_show_thinking_cb, und_do_sample_cb, und_text_temp_slider, und_max_new_tokens_slider, und_seed_slider | |
], | |
outputs=[chatbot_ui, text_input_ui, image_upload_ui] | |
) | |
clear_btn.click(self.clear_history, inputs=None, outputs=[chatbot_ui, text_input_ui, image_upload_ui]) | |
return demo | |
# Main execution | |
if __name__ == "__main__": | |
app_instance = GradioApp() | |
demo_ui = app_instance.build_ui() | |
demo_ui.queue().launch(share=True, debug=True) # Set share=True if you need a public link |