Face_Swap / app.py
Arrcttacsrks's picture
Update app.py
0cffa18 verified
raw
history blame
15 kB
# -*- coding:UTF-8 -*-
#!/usr/bin/env python
import numpy as np
import gradio as gr
import roop.globals
from roop.core import (
start,
decode_execution_providers,
suggest_max_memory,
suggest_execution_threads,
)
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import normalize_output_path
import os
from PIL import Image
from datetime import datetime
from huggingface_hub import HfApi, login
from datasets import load_dataset, Dataset
import json
import shutil
from dotenv import load_dotenv
import cv2
from insightface.app import FaceAnalysis
# Load environment variables
load_dotenv()
# Hàm tính cosine similarity để mày so sánh "điểm tương đồng" của khuôn mặt
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-6)
# Class FaceIntegrDataset nguyên bản (cho image swap, không cần "xịn" cho video)
class FaceIntegrDataset:
def __init__(self, repo_id="Arrcttacsrks/face_integrData"):
self.token = os.getenv('hf_token')
if not self.token:
raise ValueError("HF_TOKEN environment variable is not set")
self.repo_id = repo_id
self.api = HfApi()
login(self.token)
self.temp_dir = "temp_dataset"
os.makedirs(self.temp_dir, exist_ok=True)
def create_date_folder(self):
current_date = datetime.now().strftime("%Y-%m-%d")
folder_path = os.path.join(self.temp_dir, current_date)
os.makedirs(folder_path, exist_ok=True)
return folder_path, current_date
def save_metadata(self, source_path, target_path, output_path, timestamp):
metadata = {
"timestamp": timestamp,
"source_image": source_path,
"target_image": target_path,
"output_image": output_path,
"date_created": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return metadata
def upload_to_hf(self, local_folder, date_folder):
try:
self.api.upload_folder(
folder_path=local_folder,
repo_id=self.repo_id,
repo_type="dataset",
path_in_repo=date_folder
)
return True
except Exception as e:
print(f"Error uploading to Hugging Face: {str(e)}")
return False
# Hàm swap_face nguyên bản dành cho ghép ảnh tĩnh
def swap_face(source_file, target_file, doFaceEnhancer):
folder_path = None
try:
dataset_handler = FaceIntegrDataset()
folder_path, date_folder = dataset_handler.create_date_folder()
timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y")
source_path = os.path.join(folder_path, f"source_{timestamp}.jpg")
target_path = os.path.join(folder_path, f"target_{timestamp}.jpg")
output_path = os.path.join(folder_path, f"OutputImage{timestamp}.jpg")
if source_file is None or target_file is None:
raise ValueError("Source and target images are required")
Image.fromarray(source_file).save(source_path)
Image.fromarray(target_file).save(target_path)
print("source_path: ", source_path)
print("target_path: ", target_path)
roop.globals.source_path = source_path
roop.globals.target_path = target_path
roop.globals.output_path = normalize_output_path(
roop.globals.source_path,
roop.globals.target_path,
output_path
)
if doFaceEnhancer:
roop.globals.frame_processors = ["face_swapper", "face_enhancer"]
else:
roop.globals.frame_processors = ["face_swapper"]
roop.globals.headless = True
roop.globals.keep_fps = True
roop.globals.keep_audio = True
roop.globals.keep_frames = False
roop.globals.many_faces = False
roop.globals.video_encoder = "libx264"
roop.globals.video_quality = 18
roop.globals.max_memory = suggest_max_memory()
roop.globals.execution_providers = decode_execution_providers(["cuda"])
roop.globals.execution_threads = suggest_execution_threads()
print(
"start process",
roop.globals.source_path,
roop.globals.target_path,
roop.globals.output_path,
)
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
if not frame_processor.pre_check():
return None
start()
metadata = dataset_handler.save_metadata(
f"source_{timestamp}.jpg",
f"target_{timestamp}.jpg",
f"OutputImage{timestamp}.jpg",
timestamp
)
metadata_path = os.path.join(folder_path, f"metadata_{timestamp}.json")
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=4)
upload_success = dataset_handler.upload_to_hf(folder_path, date_folder)
if upload_success:
print(f"Successfully uploaded files to dataset {dataset_handler.repo_id}")
else:
print("Failed to upload files to Hugging Face dataset")
if os.path.exists(output_path):
output_image = Image.open(output_path)
output_array = np.array(output_image)
shutil.rmtree(folder_path)
return output_array
else:
print("Output image not found")
if folder_path and os.path.exists(folder_path):
shutil.rmtree(folder_path)
return None
except Exception as e:
print(f"Error in face swap process: {str(e)}")
if folder_path and os.path.exists(folder_path):
shutil.rmtree(folder_path)
raise gr.Error(f"Face swap failed: {str(e)}")
# HÀM MỚI: Xử lý ghép mặt cho 1 frame video bằng cách "mượn" thuật toán của roop
def swap_face_frame(frame_bgr, replacement_face_rgb, doFaceEnhancer):
# Tao convert frame từ BGR sang RGB vì PIL làm việc với RGB – không cho mày chê!
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
temp_dir = "temp_faceswap_frame"
os.makedirs(temp_dir, exist_ok=True)
timestamp = datetime.now().strftime("%S-%M-%H-%d-%m-%Y")
source_path = os.path.join(temp_dir, f"source_{timestamp}.jpg")
target_path = os.path.join(temp_dir, f"target_{timestamp}.jpg")
output_path = os.path.join(temp_dir, f"OutputImage_{timestamp}.jpg")
Image.fromarray(frame_rgb).save(source_path)
Image.fromarray(replacement_face_rgb).save(target_path)
roop.globals.source_path = source_path
roop.globals.target_path = target_path
roop.globals.output_path = normalize_output_path(source_path, target_path, output_path)
if doFaceEnhancer:
roop.globals.frame_processors = ["face_swapper", "face_enhancer"]
else:
roop.globals.frame_processors = ["face_swapper"]
roop.globals.headless = True
roop.globals.keep_fps = True
roop.globals.keep_audio = True
roop.globals.keep_frames = False
roop.globals.many_faces = False
roop.globals.video_encoder = "libx264"
roop.globals.video_quality = 18
roop.globals.max_memory = suggest_max_memory()
roop.globals.execution_providers = decode_execution_providers(["cuda"])
roop.globals.execution_threads = suggest_execution_threads()
start()
if os.path.exists(output_path):
swapped_img = np.array(Image.open(output_path))
else:
swapped_img = frame_rgb
shutil.rmtree(temp_dir)
return swapped_img
# HÀM MỚI: Xử lý ghép mặt cho video frame-by-frame với insightface để so sánh khuôn mặt
def swap_face_video(reference_face, replacement_face, video_input, similarity_threshold, doFaceEnhancer):
"""
reference_face: Ảnh tham chiếu (RGB) để khóa khuôn mặt
replacement_face: Ảnh ghép (RGB)
video_input: Đường dẫn file video đầu vào
similarity_threshold: Ngưỡng (0.0 - 1.0) cho tỉ lệ tương đồng
doFaceEnhancer: Boolean, có áp dụng cải thiện chất lượng hay không
"""
try:
# Chuẩn bị insightface
fa = FaceAnalysis()
fa.prepare(ctx_id=0, nms=0.4)
# Lấy embedding của khuôn mặt tham chiếu
ref_detections = fa.get(reference_face)
if not ref_detections:
raise gr.Error("Không phát hiện khuôn mặt trong ảnh tham chiếu!")
ref_embedding = ref_detections[0].embedding
# Mở video đầu vào
cap = cv2.VideoCapture(video_input)
if not cap.isOpened():
raise gr.Error("Không mở được video đầu vào!")
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
output_video_path = "temp_faceswap_video.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
frame_index = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Convert frame sang RGB để insightface xử lý
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
detections = fa.get(frame_rgb)
swap_this_frame = False
for det in detections:
sim = cosine_similarity(det.embedding, ref_embedding)
if sim >= similarity_threshold:
swap_this_frame = True
break
if swap_this_frame:
# Ghép mặt từ replacement_face vào frame
swapped_frame_rgb = swap_face_frame(frame, replacement_face, doFaceEnhancer)
# Convert ngược lại sang BGR để ghi video
swapped_frame = cv2.cvtColor(swapped_frame_rgb, cv2.COLOR_RGB2BGR)
else:
swapped_frame = frame
out.write(swapped_frame)
frame_index += 1
print(f"Đã xử lý frame {frame_index}")
cap.release()
out.release()
return output_video_path
except Exception as e:
print(f"Lỗi khi xử lý video: {str(e)}")
raise gr.Error(f"Face swap video failed: {str(e)}")
# Giao diện Gradio được "đấm" thêm mục chuyển đổi giữa image và video
def create_interface():
custom_css = """
.container {
max-width: 1200px;
margin: auto;
padding: 20px;
}
.output-image {
min-height: 400px;
border: 1px solid #ccc;
border-radius: 8px;
padding: 10px;
}
"""
title = "Face - Integrator"
description = r"""
Upload source and target images to perform face swap.
"""
article = r"""
<div style="text-align: center; max-width: 650px; margin: 40px auto;">
<p>
This tool performs face swapping with optional enhancement.
</p>
</div>
"""
with gr.Blocks(title=title, css=custom_css) as app:
gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>")
gr.Markdown(description)
with gr.Tabs():
with gr.TabItem("FaceSwap Image"):
with gr.Row():
with gr.Column(scale=1):
source_image = gr.Image(
label="Source Image",
type="numpy",
sources=["upload"]
)
with gr.Column(scale=1):
target_image = gr.Image(
label="Target Image",
type="numpy",
sources=["upload"]
)
with gr.Column(scale=1):
output_image = gr.Image(
label="Output Image",
type="numpy",
interactive=False,
elem_classes="output-image"
)
with gr.Row():
enhance_checkbox = gr.Checkbox(
label="Apply the algorithm?",
info="Image Quality Improvement",
value=False
)
with gr.Row():
process_btn = gr.Button(
"Process Face Swap",
variant="primary",
size="lg"
)
process_btn.click(
fn=swap_face,
inputs=[source_image, target_image, enhance_checkbox],
outputs=output_image,
api_name="swap_face"
)
with gr.TabItem("FaceSwap Video"):
gr.Markdown("<h2 style='text-align:center;'>FaceSwap Video</h2>")
with gr.Row():
ref_image = gr.Image(
label="Ảnh mặt tham chiếu (khóa khuôn mặt)",
type="numpy",
sources=["upload"]
)
swap_image = gr.Image(
label="Ảnh mặt ghép",
type="numpy",
sources=["upload"]
)
video_input = gr.Video(
label="Video đầu vào",
type="filepath"
)
similarity_threshold = gr.Slider(
minimum=0.0,
maximum=1.0,
step=0.01,
value=0.7,
label="Tỉ lệ tương đồng"
)
enhance_checkbox_video = gr.Checkbox(
label="Áp dụng cải thiện chất lượng ảnh",
info="Tùy chọn cải thiện",
value=False
)
process_video_btn = gr.Button(
"Xử lý FaceSwap Video",
variant="primary",
size="lg"
)
video_output = gr.Video(
label="Video kết quả",
type="filepath"
)
process_video_btn.click(
fn=swap_face_video,
inputs=[ref_image, swap_image, video_input, similarity_threshold, enhance_checkbox_video],
outputs=video_output,
api_name="swap_face_video"
)
gr.Markdown(article)
return app
def main():
app = create_interface()
app.launch(share=False)
if __name__ == "__main__":
main()