import spaces import gradio as gr import numpy as np import os import torch import random import subprocess subprocess.run( "pip install flash-attn --no-build-isolation", env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, shell=True, ) from accelerate import infer_auto_device_map, load_checkpoint_and_dispatch, init_empty_weights from PIL import Image import uuid from data.data_utils import add_special_tokens, pil_img2rgb from data.transforms import ImageTransform from inferencer import InterleaveInferencer from modeling.autoencoder import load_ae from modeling.bagel.qwen2_navit import NaiveCache from modeling.bagel import ( BagelConfig, Bagel, Qwen2Config, Qwen2ForCausalLM, SiglipVisionConfig, SiglipVisionModel ) from modeling.qwen2 import Qwen2Tokenizer from huggingface_hub import snapshot_download save_dir = "./model" repo_id = "ByteDance-Seed/BAGEL-7B-MoT" cache_dir = save_dir + "/cache" if not os.path.exists(os.path.join(save_dir, "ema.safetensors")): print(f"Downloading model from {repo_id} to {save_dir}") snapshot_download(cache_dir=cache_dir, local_dir=save_dir, repo_id=repo_id, local_dir_use_symlinks=False, resume_download=True, allow_patterns=["*.json", "*.safetensors", "*.bin", "*.py", "*.md", "*.txt"], ) else: print(f"Model found at {save_dir}") model_path = "./model" llm_config = Qwen2Config.from_json_file(os.path.join(model_path, "llm_config.json")) llm_config.qk_norm = True llm_config.tie_word_embeddings = False llm_config.layer_module = "Qwen2MoTDecoderLayer" vit_config = SiglipVisionConfig.from_json_file(os.path.join(model_path, "vit_config.json")) vit_config.rope = False vit_config.num_hidden_layers -= 1 vae_model, vae_config = load_ae(local_path=os.path.join(model_path, "ae.safetensors")) config = BagelConfig( visual_gen=True, visual_und=True, llm_config=llm_config, vit_config=vit_config, vae_config=vae_config, vit_max_num_patch_per_side=70, connector_act='gelu_pytorch_tanh', latent_patch_size=2, max_latent_size=64, ) with init_empty_weights(): language_model = Qwen2ForCausalLM(llm_config) vit_model = SiglipVisionModel(vit_config) model = Bagel(language_model, vit_model, config) model.vit_model.vision_model.embeddings.convert_conv2d_to_linear(vit_config, meta=True) tokenizer = Qwen2Tokenizer.from_pretrained(model_path) tokenizer, new_token_ids, _ = add_special_tokens(tokenizer) vae_transform = ImageTransform(1024, 512, 16) vit_transform = ImageTransform(980, 224, 14) device_map = infer_auto_device_map( model, max_memory={i: "80GiB" for i in range(torch.cuda.device_count())}, no_split_module_classes=["Bagel", "Qwen2MoTDecoderLayer"], ) same_device_modules = [ 'language_model.model.embed_tokens', 'time_embedder', 'latent_pos_embed', 'vae2llm', 'llm2vae', 'connector', 'vit_pos_embed' ] if torch.cuda.device_count() == 1: first_device = device_map.get(same_device_modules[0], "cuda:0") for k in same_device_modules: device_map[k] = first_device else: # Ensure all same_device_modules are on the same device if they exist in device_map # Find the device for the first module in the list that is actually in the device_map first_assigned_device = None for k_module in same_device_modules: if k_module in device_map: first_assigned_device = device_map[k_module] break if first_assigned_device is not None: for k_module in same_device_modules: if k_module in device_map: # Only assign if the module is part of the device_map device_map[k_module] = first_assigned_device model = load_checkpoint_and_dispatch( model, checkpoint=os.path.join(model_path, "ema.safetensors"), device_map=device_map, offload_buffers=True, dtype=torch.bfloat16, force_hooks=True, ).eval() inferencer = InterleaveInferencer( model=model, vae_model=vae_model, tokenizer=tokenizer, vae_transform=vae_transform, vit_transform=vit_transform, new_token_ids=new_token_ids, ) def set_seed(seed): if seed is not None and seed > 0: random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False return seed # --- Backend Functions (Adapted from original app.py) --- @spaces.GPU(duration=90) def call_text_to_image(prompt, show_thinking, cfg_text_scale, cfg_interval, timestep_shift, num_timesteps, cfg_renorm_min, cfg_renorm_type, max_think_token_n, do_sample, text_temperature, seed, image_ratio): set_seed(seed) image_shapes = (1024, 1024) if image_ratio == "4:3": image_shapes = (768, 1024) elif image_ratio == "3:4": image_shapes = (1024, 768) elif image_ratio == "16:9": image_shapes = (576, 1024) elif image_ratio == "9:16": image_shapes = (1024, 576) inference_hyper = dict( max_think_token_n=max_think_token_n if show_thinking else 1024, do_sample=do_sample if show_thinking else False, text_temperature=text_temperature if show_thinking else 0.3, cfg_text_scale=cfg_text_scale, cfg_interval=[cfg_interval, 1.0], timestep_shift=timestep_shift, num_timesteps=num_timesteps, cfg_renorm_min=cfg_renorm_min, cfg_renorm_type=cfg_renorm_type, image_shapes=image_shapes, ) result = inferencer(text=prompt, think=show_thinking, **inference_hyper) return result.get("image", None), result.get("text", None) # text is thinking @spaces.GPU(duration=90) def call_image_understanding(image, prompt, show_thinking, do_sample, text_temperature, max_new_tokens, seed): set_seed(seed) if image is None: return "Please upload an image.", None if isinstance(image, np.ndarray): image = Image.fromarray(image) image = pil_img2rgb(image) inference_hyper = dict( do_sample=do_sample, text_temperature=text_temperature, max_think_token_n=max_new_tokens, ) result = inferencer(image=image, text=prompt, think=show_thinking, understanding_output=True, **inference_hyper) return result.get("text", None), None # Main output is text, thinking is part of it if show_thinking=True @spaces.GPU(duration=90) def call_edit_image(image, prompt, show_thinking, cfg_text_scale, cfg_img_scale, cfg_interval, timestep_shift, num_timesteps, cfg_renorm_min, cfg_renorm_type, max_think_token_n, do_sample, text_temperature, seed): set_seed(seed) if image is None: return "Please upload an image.", None, None if isinstance(image, np.ndarray): image = Image.fromarray(image) image = pil_img2rgb(image) inference_hyper = dict( max_think_token_n=max_think_token_n if show_thinking else 1024, do_sample=do_sample if show_thinking else False, text_temperature=text_temperature if show_thinking else 0.3, cfg_text_scale=cfg_text_scale, cfg_img_scale=cfg_img_scale, cfg_interval=[cfg_interval, 1.0], timestep_shift=timestep_shift, num_timesteps=num_timesteps, cfg_renorm_min=cfg_renorm_min, cfg_renorm_type=cfg_renorm_type, ) result = inferencer(image=image, text=prompt, think=show_thinking, **inference_hyper) return result.get("image", None), result.get("text", None) # text is thinking # --- Gradio UI --- DEFAULT_WELCOME_MESSAGE = { "role": "assistant", "content": [ {"type": "text", "content": "Hello! I am BAGEL, your multimodal assistant. How can I help you today? Select a mode and enter your prompt."} ], "key": "welcome" } class GradioApp: def __init__(self): self.current_conversation_id = None self.conversation_contexts = {} self.conversations_list = [] # For the sidebar def _get_current_history(self): if self.current_conversation_id and self.current_conversation_id in self.conversation_contexts: return self.conversation_contexts[self.current_conversation_id]["history"] return [] def _get_current_settings(self): if self.current_conversation_id and self.current_conversation_id in self.conversation_contexts: return self.conversation_contexts[self.current_conversation_id].get("settings", {}) return {} def _update_conversation_list_ui(self): return gr.update(choices=[(c['label'], c['key']) for c in self.conversations_list], value=self.current_conversation_id) def add_message(self, text_input, image_input, mode, # TTI params tti_show_thinking, tti_cfg_text_scale, tti_cfg_interval, tti_timestep_shift, tti_num_timesteps, tti_cfg_renorm_min, tti_cfg_renorm_type, tti_max_think_token_n, tti_do_sample, tti_text_temperature, tti_seed, tti_image_ratio, # Edit params edit_show_thinking, edit_cfg_text_scale, edit_cfg_img_scale, edit_cfg_interval, edit_timestep_shift, edit_num_timesteps, edit_cfg_renorm_min, edit_cfg_renorm_type, edit_max_think_token_n, edit_do_sample, edit_text_temperature, edit_seed, # Understand params und_show_thinking, und_do_sample, und_text_temperature, und_max_new_tokens, und_seed ): if not text_input and not (mode in ["Image Edit", "Image Understanding"] and image_input): gr.Warning("Please enter a prompt or upload an image for Edit/Understanding modes.") # Need to yield original state for all outputs if we return early # This part is tricky with dynamic outputs, might need a dummy update for all # For simplicity, let's assume user always provides some input # A better way is to disable submit button if input is invalid return self._get_current_history(), gr.update(value=None), gr.update(value=None) # chatbot, text_input, image_input if not self.current_conversation_id: self.new_chat_session(text_input[:30] if text_input else "New Chat") # Create a new chat if none exists history = self._get_current_history() # Store settings for this turn # This is simplified; best-gradio-ui.py stores settings per conversation current_turn_settings = { "mode": mode, "image_input_path": image_input.name if image_input else None, # Store path if image is uploaded # TTI "tti_show_thinking": tti_show_thinking, "tti_cfg_text_scale": tti_cfg_text_scale, "tti_cfg_interval": tti_cfg_interval, "tti_timestep_shift": tti_timestep_shift, "tti_num_timesteps": tti_num_timesteps, "tti_cfg_renorm_min": tti_cfg_renorm_min, "tti_cfg_renorm_type": tti_cfg_renorm_type, "tti_max_think_token_n": tti_max_think_token_n, "tti_do_sample": tti_do_sample, "tti_text_temperature": tti_text_temperature, "tti_seed": tti_seed, "tti_image_ratio": tti_image_ratio, # Edit "edit_show_thinking": edit_show_thinking, "edit_cfg_text_scale": edit_cfg_text_scale, "edit_cfg_img_scale": edit_cfg_img_scale, "edit_cfg_interval": edit_cfg_interval, "edit_timestep_shift": edit_timestep_shift, "edit_num_timesteps": edit_num_timesteps, "edit_cfg_renorm_min": edit_cfg_renorm_min, "edit_cfg_renorm_type": edit_cfg_renorm_type, "edit_max_think_token_n": edit_max_think_token_n, "edit_do_sample": edit_do_sample, "edit_text_temperature": edit_text_temperature, "edit_seed": edit_seed, # Understand "und_show_thinking": und_show_thinking, "und_do_sample": und_do_sample, "und_text_temperature": und_text_temperature, "und_max_new_tokens": und_max_new_tokens, "und_seed": und_seed } self.conversation_contexts[self.current_conversation_id]["settings"] = current_turn_settings user_message_content = [] if text_input: user_message_content.append({"type": "text", "content": text_input}) if image_input and mode in ["Image Edit", "Image Understanding"]: # Gradio chatbot can display images directly if they are file paths or PIL Images # For simplicity, let's assume image_input is a PIL Image or path that gr.Image can handle user_message_content.append({"type": "image", "content": image_input}) if not user_message_content: user_message_content.append({"type": "text", "content": "(No text prompt provided for image operation)"}) history.append({"role": "user", "content": user_message_content, "key": str(uuid.uuid4())}) history.append({"role": "assistant", "content": [{"type": "text", "content": "Processing..."}], "key": str(uuid.uuid4()), "loading": True}) yield history, gr.update(value=None), gr.update(value=None) # chatbot, text_input, image_input (clear inputs) # Call backend try: output_image = None output_text = None thinking_text = None pil_image_input = Image.open(image_input.name) if image_input else None if mode == "Text to Image": output_image, thinking_text = call_text_to_image(text_input, tti_show_thinking, tti_cfg_text_scale, tti_cfg_interval, tti_timestep_shift, tti_num_timesteps, tti_cfg_renorm_min, tti_cfg_renorm_type, tti_max_think_token_n, tti_do_sample, tti_text_temperature, tti_seed, tti_image_ratio) elif mode == "Image Edit": if not pil_image_input: output_text = "Error: Image required for Image Edit mode." else: output_image, thinking_text = call_edit_image(pil_image_input, text_input, edit_show_thinking, edit_cfg_text_scale, edit_cfg_img_scale, edit_cfg_interval, edit_timestep_shift, edit_num_timesteps, edit_cfg_renorm_min, edit_cfg_renorm_type, edit_max_think_token_n, edit_do_sample, edit_text_temperature, edit_seed) elif mode == "Image Understanding": if not pil_image_input: output_text = "Error: Image required for Image Understanding mode." else: output_text, _ = call_image_understanding(pil_image_input, text_input, und_show_thinking, und_do_sample, und_text_temperature, und_max_new_tokens, und_seed) # For VLM, the main output is text, thinking might be part of it or not separately returned # depending on `inferencer`'s behavior with `understanding_output=True` if und_show_thinking and output_text and "Thinking:" in output_text: # crude check parts = output_text.split("Thinking:", 1) if len(parts) > 1: thinking_text = "Thinking:" + parts[1].split("\nAnswer:")[0] if "\nAnswer:" in parts[1] else parts[1] output_text = parts[0].strip() + ("\nAnswer:" + output_text.split("\nAnswer:")[1] if "\nAnswer:" in output_text else "") else: thinking_text = None # Or handle as part of main output_text bot_response_content = [] if thinking_text: bot_response_content.append({"type": "text", "content": f"**Thinking Process:**\n{thinking_text}"}) if output_text: bot_response_content.append({"type": "text", "content": output_text}) if output_image: bot_response_content.append({"type": "image", "content": output_image}) if not bot_response_content: bot_response_content.append({"type": "text", "content": "(No output generated)"}) history[-1]["content"] = bot_response_content history[-1]["loading"] = False except Exception as e: print(f"Error during processing: {e}") history[-1]["content"] = [{"type": "text", "content": f"Error: {str(e)}"}] history[-1]["loading"] = False raise gr.Error(f"Processing Error: {str(e)}") yield history, gr.update(value=None), gr.update(value=None) def new_chat_session(self, label="New Chat"): session_id = str(uuid.uuid4()) self.current_conversation_id = session_id self.conversation_contexts[session_id] = { "history": [DEFAULT_WELCOME_MESSAGE.copy()], "settings": {} # Initialize with default settings if any } # Ensure label is unique if needed, or just use the provided one # For simplicity, we allow duplicate labels for now. new_conv_entry = {"label": label if label else f"Chat {len(self.conversations_list) + 1}", "key": session_id} self.conversations_list.insert(0, new_conv_entry) # Add to top return self._get_current_history(), self._update_conversation_list_ui() def change_chat_session(self, session_id): if session_id and session_id in self.conversation_contexts: self.current_conversation_id = session_id # Potentially update hyperparameter UI elements based on loaded session_settings # For now, just load history return self._get_current_history() return self._get_current_history() # No change or invalid ID def clear_history(self): if self.current_conversation_id: self.conversation_contexts[self.current_conversation_id]["history"] = [DEFAULT_WELCOME_MESSAGE.copy()] # Also clear current inputs if desired return self._get_current_history(), gr.update(value=None), gr.update(value=None) return [], gr.update(value=None), gr.update(value=None) def build_ui(self): with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("""