import os import torch import pandas as pd import logging import faiss import numpy as np import time import gensim import random import multiprocessing from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from datasets import load_dataset from huggingface_hub import login, hf_hub_download, HfApi, create_repo from keybert import KeyBERT from sentence_transformers import SentenceTransformer from joblib import Parallel, delayed from tqdm import tqdm from fastapi.middleware.cors import CORSMiddleware import tempfile import re import sys import asyncio import gc from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # ✅ 로그 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ✅ 스레드 풀 최적화 (작업자 수 감소로 오버헤드 감소) thread_pool = ThreadPoolExecutor(max_workers=min(32, os.cpu_count() * 2)) # ✅ 메모리 관리 전역 변수 last_gc_time = time.time() request_count = 0 CLEANUP_INTERVAL = 100 # 100 요청마다 메모리 정리 startup_semaphore = asyncio.Semaphore(1) # 한 번에 1개의 워커만 초기화 가능 # ✅ FastAPI 인스턴스 생성 app = FastAPI(title="🚀 KeyBERT + Word2Vec 기반 FAISS 검색 API", version="1.2") # 여기서 CORS 미들웨어 등록 app.add_middleware( CORSMiddleware, allow_origins=["https://dev.kobay.co.kr"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ✅ GPU 사용 여부 확인 device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"🚀 실행 디바이스: {device.upper()}") # ✅ Hugging Face 로그인 HF_API_TOKEN = os.getenv("HF_API_TOKEN") if multiprocessing.current_process().name == "MainProcess": if HF_API_TOKEN and HF_API_TOKEN.startswith("hf_"): logger.info("🔑 Hugging Face API 로그인 중 (MainProcess)...") login(token=HF_API_TOKEN) else: logger.warning("⚠️ HF_API_TOKEN이 없거나 잘못된 형식입니다.") # ✅ 모델 변수 초기화만 (실제 로드는 나중에) word2vec_model = None kw_model = None embedding_model = None # ✅ 지연 로딩 구현 - 모델 로드 함수 async def load_models(): """모든 필요한 모델을 로드하는 함수 (지연 로딩)""" global word2vec_model, kw_model, embedding_model # 이미 로드되었는지 확인 (중복 로드 방지) if word2vec_model is not None and embedding_model is not None: return True worker_id = os.getenv("WORKER_ID", multiprocessing.current_process().name) logger.info(f"🔄 워커 {worker_id}: 모델 로드 시작...") try: # 1. Word2Vec 모델 로드 if word2vec_model is None: MODEL_REPO = "aikobay/item-model" model_path = hf_hub_download(repo_id=MODEL_REPO, filename="item_vectors.bin", repo_type="dataset", token=HF_API_TOKEN) word2vec_model = gensim.models.KeyedVectors.load_word2vec_format(model_path, binary=True) logger.info(f"✅ 워커 {worker_id}: Word2Vec 모델 로드 완료! 단어 수: {len(word2vec_model.key_to_index)}") # 2. KeyBERT 모델 로드 if kw_model is None: kw_model = KeyBERT("paraphrase-multilingual-MiniLM-L12-v2") logger.info(f"✅ 워커 {worker_id}: KeyBERT 모델 로드 완료!") # 3. 한국어 특화 임베딩 모델 로드 if embedding_model is None: try: embedding_model = SentenceTransformer("jhgan/ko-sroberta-multitask") logger.info(f"✅ 워커 {worker_id}: 한국어 특화 임베딩 모델 로드 완료!") except Exception as e: logger.warning(f"⚠️ 워커 {worker_id}: 한국어 특화 모델 로드 실패, 기본 모델 사용: {e}") embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") logger.info(f"✅ 워커 {worker_id}: 기본 임베딩 모델 로드 완료!") if device == "cuda": try: # 첫 번째 워커나 짝수 번호의 워커만 GPU 사용 if worker_id == "MainProcess" or worker_id.endswith("0") or worker_id.endswith("1") or worker_id.endswith("2"): embedding_model.to(device) embedding_model.eval() # 평가 모드로 설정 logger.info(f"✅ 워커 {worker_id}: GPU에 임베딩 모델 로드 완료!") else: logger.info(f"⚠️ 워커 {worker_id}: 메모리 효율화를 위해 CPU 모드 사용") except Exception as e: logger.error(f"❌ 워커 {worker_id}: GPU 모델 초기화 오류: {e}") # 가비지 컬렉션 수행 await cleanup_memory(force=True) return True except Exception as e: logger.error(f"❌ 워커 {worker_id}: 모델 로드 중 오류 발생: {e}") return False # ✅ 진행 중인 경매 상품 데이터 로드 async def load_huggingface_jsonl(dataset_name, split="train"): """Hugging Face Hub에서 데이터셋 비동기 로드""" try: # 스레드 풀에서 실행하여 비동기 처리 loop = asyncio.get_event_loop() def _load_dataset(): repo_id = f"aikobay/{dataset_name}" dataset = load_dataset(repo_id, split=split) return dataset.to_pandas().dropna() # 스레드 풀에서 비동기로 실행 df = await loop.run_in_executor(thread_pool, _load_dataset) return df except Exception as e: logger.error(f"❌ 데이터 로드 중 오류 발생: {e}") return pd.DataFrame() # 초기화만 수행, 실제 로드는 startup에서 active_sale_items = None # ✅ FAISS 인덱스 초기화 faiss_index = None indexed_items = [] # ✅ 주기적 메모리 정리 함수 async def cleanup_memory(force=False): """주기적인 메모리 정리 수행""" global last_gc_time # 현재 시간 확인 current_time = time.time() # 15초마다 메모리 정리 또는 강제 정리 요청 시 if force or (current_time - last_gc_time > 15): # 가비지 컬렉션 실행 gc.collect() # GPU 메모리 정리 if torch.cuda.is_available(): torch.cuda.empty_cache() # 시간 업데이트 last_gc_time = current_time logger.debug("🧹 메모리 정리 완료") return True return False # ✅ 멀티코어 벡터화 함수 - 메모리 누수 해결 async def encode_texts_parallel(texts, batch_size=1024): """GPU 활용 + 메모리 누수 방지 최적화 벡터화""" # 모델이 로드되었는지 확인 if embedding_model is None: # 비동기 컨텍스트에서 동기 함수 호출 방지 worker_id = multiprocessing.current_process().name logger.warning(f"⚠️ 워커 {worker_id}: 벡터화 전 모델 로드 필요") if not await load_models(): # 비동기 호출로 수정 logger.error(f"❌ 워커 {worker_id}: 모델 로드 실패, 벡터화 불가") return np.array([]).astype("float32") if not texts: return np.array([]).astype("float32") try: # 배치 크기 조정 - 짧은 텍스트는 큰 배치, 길면 작게 if len(texts) > 10: batch_size = min(1024, batch_size) # 많은 텍스트는 배치 크기 제한 else: batch_size = min(2048, batch_size) # 적은 텍스트는 더 큰 배치 가능 loop = asyncio.get_event_loop() def _encode_efficiently(): # 작은 배치로 나누어 인코딩 (메모리 최적화) with torch.no_grad(): return embedding_model.encode( texts, batch_size=batch_size, convert_to_numpy=True, show_progress_bar=False, device=device, normalize_embeddings=True ) # 스레드 풀에서 실행 embeddings = await loop.run_in_executor(thread_pool, _encode_efficiently) return embeddings.astype("float32") except Exception as e: logger.error(f"❌ 벡터화 오류: {str(e)}") return np.array([]).astype("float32") finally: # 수행 후 메모리 정리 (요청이 많을 경우는 가끔씩만) global request_count if request_count % 25 == 0: await cleanup_memory(force=True) # ✅ FAISS 인덱스 저장 함수 (Hugging Face Hub) async def save_faiss_index(): """FAISS 인덱스를 Hugging Face Hub에 저장 (비동기 지원)""" global faiss_index, indexed_items if faiss_index is None or not indexed_items: logger.error("❌ 저장할 FAISS 인덱스가 없습니다.") return False try: # 레포지토리 ID repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") # 비동기 작업을 위한 루프 loop = asyncio.get_event_loop() # 비동기 작업으로 실행 def _save_index(): # HfApi 객체 생성 api = HfApi() # 레포지토리 존재 여부 확인 및 생성 try: api.repo_info(repo_id=repo_id, repo_type="dataset") logger.info(f"✅ 기존 레포지토리 사용: {repo_id}") except Exception: logger.info(f"🔄 레포지토리가 존재하지 않아 새로 생성합니다: {repo_id}") create_repo( repo_id=repo_id, repo_type="dataset", private=True, exist_ok=True ) logger.info(f"✅ 레포지토리 생성 완료: {repo_id}") # 임시 파일로 먼저 로컬에 저장 with tempfile.TemporaryDirectory() as temp_dir: index_path = os.path.join(temp_dir, "faiss_index.bin") items_path = os.path.join(temp_dir, "indexed_items.txt") # FAISS 인덱스 저장 faiss.write_index(faiss_index, index_path) # 아이템 목록 저장 with open(items_path, "w", encoding="utf-8") as f: f.write("\n".join(indexed_items)) # README 파일 생성 readme_path = os.path.join(temp_dir, "README.md") with open(readme_path, "w", encoding="utf-8") as f: f.write(f"""# FAISS 인덱스 저장소 이 저장소는 상품 검색을 위한 FAISS 인덱스와 관련 데이터를 포함하고 있습니다. - 최종 업데이트: {pd.Timestamp.now()} - 인덱스 항목 수: {len(indexed_items)} - 모델: KeyBERT + Word2Vec 이 저장소는 'aikobay/initial_saleitem_dataset'의 상품 데이터를 기반으로 생성된 벡터 인덱스를 저장하기 위해 자동 생성되었습니다. """) # 파일 업로드 for file_path, file_name in [ (index_path, "faiss_index.bin"), (items_path, "indexed_items.txt"), (readme_path, "README.md") ]: api.upload_file( path_or_fileobj=file_path, path_in_repo=file_name, repo_id=repo_id, repo_type="dataset" ) logger.info(f"✅ FAISS 인덱스가 Hugging Face Hub에 저장되었습니다. 레포: {repo_id}") return True # 스레드 풀에서 비동기적으로 실행 result = await loop.run_in_executor(thread_pool, _save_index) return result except Exception as e: logger.error(f"❌ FAISS 인덱스 Hub 저장 중 오류 발생: {e}") # 로컬에 백업 저장 시도 try: loop = asyncio.get_event_loop() def _local_backup(): local_path = os.path.join(os.getcwd(), "faiss_index.bin") faiss.write_index(faiss_index, local_path) with open("indexed_items.txt", "w", encoding="utf-8") as f: f.write("\n".join(indexed_items)) logger.info(f"✅ FAISS 인덱스가 로컬에 백업 저장되었습니다: {local_path}") return True result = await loop.run_in_executor(thread_pool, _local_backup) return result except Exception as local_err: logger.error(f"❌ 로컬 백업 저장도 실패: {local_err}") return False # ✅ FAISS 인덱스 로드 함수 (Hugging Face Hub) async def load_faiss_index_safe(): """안전하게 FAISS 인덱스를 읽기 전용으로 로드""" global faiss_index, indexed_items # 최대 재시도 횟수 max_retries = 5 retry_delay = 1 # 초기 지연 (초) worker_id = os.getenv("WORKER_ID", multiprocessing.current_process().name) for attempt in range(max_retries): try: # 레포지토리 ID repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") # Hub에서 파일 다운로드 (읽기 전용) index_path = hf_hub_download( repo_id=repo_id, filename="faiss_index.bin", repo_type="dataset" ) items_path = hf_hub_download( repo_id=repo_id, filename="indexed_items.txt", repo_type="dataset" ) # 직접 파일 경로를 사용하여 인덱스 로드 - 이 방법이 가장 안정적 loaded_index = faiss.read_index(index_path) # 항목 목록 읽기 with open(items_path, "r", encoding="utf-8") as f: loaded_items = f.read().splitlines() # 전역 변수에 할당 faiss_index = loaded_index indexed_items = loaded_items logger.info(f"✅ 워커 {worker_id}: FAISS 인덱스 로드 완료. 총 {len(indexed_items)}개 항목") return True except Exception as e: logger.warning(f"⚠️ 워커 {worker_id}: 인덱스 로드 실패 (시도 {attempt+1}/{max_retries}): {e}") # 지연 후 재시도 await asyncio.sleep(retry_delay * (2 ** attempt)) # 지수 백오프 logger.error(f"❌ 워커 {worker_id}: 인덱스 로드 최대 재시도 횟수 초과") return False # ✅ 최적화된 키워드 추출 함수 async def extract_keywords(query: str, top_n: int = 2): """KeyBERT 최적화 키워드 추출 (성능 중심)""" # 매우 짧은 쿼리는 그대로 반환 (처리 비용 절감) if len(query) <= 3: return [query] loop = asyncio.get_event_loop() def _optimized_extract(): # 성능 중심 설정 return kw_model.extract_keywords( query, keyphrase_ngram_range=(1, 1), # 단일 단어만 추출 stop_words=["이", "그", "저", "을", "를", "에", "에서", "은", "는"], # 한국어 불용어 use_mmr=True, diversity=0.5, top_n=top_n ) try: keywords = await loop.run_in_executor(thread_pool, _optimized_extract) # 가중치가 너무 낮은 키워드 제외 filtered = [(k, s) for k, s in keywords if s > 0.2] return [k[0] for k in filtered] except Exception as e: logger.error(f"❌ 키워드 추출 오류: {str(e)}") # 단어 분리로 폴백 return query.split()[:2] # ✅ 배치 검색 통합 함수 - 한번에 검색으로 효율 향상 async def unified_search(vectors, top_k=5): """모든 벡터를 한 번에 검색하여 효율성 향상""" if vectors.size == 0: return [] # nprobe 동적 조정 (서버 부하에 따라) global request_count if request_count % 100 == 0: # 100개 요청마다 조정 if faiss_index.nprobe > 8: # 현재 값이 높으면 faiss_index.nprobe = 8 # 낮은 값으로 설정 (속도 중시) loop = asyncio.get_event_loop() def _batch_search(): # 모든 벡터를 한 번에 검색 distances, indices = faiss_index.search(vectors, top_k) return distances, indices try: # 일괄 검색 수행 distances, indices = await loop.run_in_executor(thread_pool, _batch_search) # 결과 정리 results = [] for i in range(len(indices)): items = [] for j, (idx, dist) in enumerate(zip(indices[i], distances[i])): if idx < len(indexed_items): items.append((idx, float(dist))) results.append(items) return results except Exception as e: logger.error(f"❌ 검색 오류: {str(e)}") return [] # ✅ 최적화된 search_faiss_with_keywords 함수 async def search_faiss_with_keywords(query: str, top_k: int = 5, keywords=None): """고속 키워드 기반 FAISS 검색 수행 (효율적 최적화)""" global faiss_index, indexed_items, request_count # 타이머 시작 start_time = time.time() # 요청 카운터 증가 request_count += 1 # 조기 최적화 - 매우 짧은 쿼리 if len(query) <= 2: # 간단한 처리로 빠르게 반환 vector = await encode_texts_parallel([query]) distances, indices = faiss_index.search(vector, top_k) quick_results = [] for idx, dist in zip(indices[0], distances[0]): if idx < len(indexed_items): item_name = indexed_items[idx] try: item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] quick_results.append({ "ITEMSEQ": item_seq, "ITEMNAME": item_name, "score": float(dist) }) except: continue # 주기적 메모리 정리 if request_count % CLEANUP_INTERVAL == 0: await cleanup_memory() return quick_results # 1. 키워드 추출 if keywords is None: keywords = await extract_keywords(query) # 불필요한 확장 절차 제거 (성능 향상) # 2. 벡터 인코딩 - 모든 텍스트를 한 번에 처리 search_texts = [query] + keywords try: # 벡터 인코딩 - 최적화된 함수 사용 (정규화 포함) all_vectors = await encode_texts_parallel(search_texts) if all_vectors.size == 0: logger.warning(f"⚠️ 벡터화 실패: {query}") return [] # 3. 일괄 검색 수행 (효율적) search_results = await unified_search(all_vectors, top_k=top_k) if not search_results: return [] # 4. 결과 통합 및 중복 제거 all_results = {} # 쿼리 결과 처리 (가중치 높게) for idx, score in search_results[0]: if idx < len(indexed_items): all_results[idx] = score * 3.0 # 쿼리 결과에 가중치 3배 # 키워드 결과 처리 for i in range(1, len(search_results)): keyword_results = search_results[i] weight = 0.5 # 키워드 가중치 for idx, score in keyword_results: if idx in all_results: # 기존 점수에 추가 all_results[idx] = max(all_results[idx], score * weight) else: # 새 항목 추가 all_results[idx] = score * weight # 5. 최종 처리 및 반환 # 점수 기준 정렬 sorted_items = sorted(all_results.items(), key=lambda x: x[1], reverse=True) # 최종 결과 변환 (최소한의 룩업으로 최적화) recommendations = [] item_indices = [idx for idx, _ in sorted_items[:top_k]] # 배치로 항목 조회 (성능 향상) if item_indices: item_names = [indexed_items[idx] for idx in item_indices] # 효율적인 배치 조회 items_df = active_sale_items[active_sale_items["ITEMNAME"].isin(item_names)] items_map = dict(zip(items_df["ITEMNAME"], items_df["ITEMSEQ"])) for idx, score in sorted_items[:top_k]: item_name = indexed_items[idx] if item_name in items_map: recommendations.append({ "ITEMSEQ": items_map[item_name], "ITEMNAME": item_name, "score": float(score) }) # 주기적 메모리 정리 if request_count % CLEANUP_INTERVAL == 0: await cleanup_memory() # 처리 시간이 1초 이상인 경우에만 로깅 elapsed = time.time() - start_time if elapsed > 1.0: logger.info(f"🔍 검색 완료 | 소요시간: {elapsed:.2f}초 | 결과: {len(recommendations)}개") return recommendations[:top_k] except Exception as e: logger.error(f"❌ 검색 프로세스 오류: {str(e)}") return [] # ✅ API 요청 모델 class RecommendRequest(BaseModel): search_query: str top_k: int = 5 use_expansion: bool = True # 키워드 확장 사용 여부 # 모델이 로드되었는지 검증하는 함수 추가 def validate_models(): """필요한 모델들이 모두 로드되었는지 확인""" models_loaded = ( word2vec_model is not None and kw_model is not None and embedding_model is not None ) return models_loaded # ✅ 추천 API 엔드포인트 (다중 요청 처리 최적화) @app.post("/api/recommend") async def recommend(request: RecommendRequest, background_tasks: BackgroundTasks): if not validate_models(): # 모델이 로드되지 않았을 때 재시도 또는 오류 반환 await load_models() # 비동기 호출로 수정 if not validate_models(): raise HTTPException(status_code=503, detail="서비스가 준비되지 않았습니다. 잠시 후 다시 시도해주세요.") """고속 추천 API (메모리 관리 최적화 + 성능 개선)""" try: # 벤치마크용 타이머 시작 start_time = time.time() # 파라미터 최적화 및 검증 search_query = request.search_query.strip() if not search_query: raise HTTPException(status_code=400, detail="검색어를 입력해주세요") top_k = min(max(1, request.top_k), 20) # 1~20 범위로 제한 # 최적화 검색 수행 recommendations = await search_faiss_with_keywords( search_query, top_k ) # 결과 반환 (간소화) result = { "query": search_query, "recommendations": recommendations } # 응답 시간 측정 (1초 이상만 로깅) elapsed = time.time() - start_time if elapsed > 1.0: logger.info(f"⏱️ API 응답 시간: {elapsed:.2f}초 | 쿼리: '{search_query}'") return result except Exception as e: logger.error(f"❌ 추천 처리 오류: {str(e)}") raise HTTPException(status_code=500, detail=f"추천 처리 중 오류가 발생했습니다") # 주기적 메모리 모니터링 함수 async def periodic_memory_monitor(): """주기적으로 메모리 사용량을 모니터링하고 정리합니다.""" try: worker_id = multiprocessing.current_process().name logger.info(f"🔄 워커 {worker_id}: 주기적 메모리 모니터링 시작") while True: await asyncio.sleep(1800) # 30분마다 실행 # 메모리 정리 (중복 제거) await cleanup_memory(force=True) # 메모리 사용량 로깅 if device == "cuda": allocated = torch.cuda.memory_allocated() / (1024**3) reserved = torch.cuda.memory_reserved() / (1024**3) logger.info(f"📊 워커 {worker_id}: GPU 메모리 - 할당: {allocated:.2f}GB, 예약: {reserved:.2f}GB") # 시스템 메모리 로깅 (선택 사항) import psutil process = psutil.Process() memory_info = process.memory_info() logger.info(f"📊 워커 {worker_id}: 시스템 메모리 - RSS: {memory_info.rss/(1024**3):.2f}GB") except Exception as e: logger.error(f"❌ 워커 {worker_id}: 메모리 모니터링 중 오류: {e}") # FastAPI 시작 이벤트 핸들러 확장 @app.on_event("startup") async def startup_event(): try: worker_id = multiprocessing.current_process().name logger.warning(f"🟡 워커 {worker_id} STARTUP 시작") # 리소스 사용량 로깅 추가 (여기에 추가) if device == "cuda": logger.info(f"🔄 워커 {worker_id}: GPU 메모리 상태:") logger.info(f" - 총 메모리: {torch.cuda.get_device_properties(0).total_memory/(1024**3):.2f}GB") logger.info(f" - 현재 할당: {torch.cuda.memory_allocated()/(1024**3):.2f}GB") logger.info(f" - 예약: {torch.cuda.memory_reserved()/(1024**3):.2f}GB") logger.info(f"🔄 워커 {worker_id}: 초기화 세마포어 대기 중...") async with startup_semaphore: logger.info(f"✅ 워커 {worker_id}: 초기화 시작") # 모델 로드 부분에 더 견고한 예외 처리 추가 ---------- try: if not await load_models(): # 비동기 호출로 수정 logger.error(f"❌ 워커 {worker_id} 모델 로드 실패") # 오류가 있어도 계속 진행 except Exception as model_err: logger.error(f"💥 워커 {worker_id} 모델 로드 중 심각한 오류: {model_err}") # 오류를 기록하고 계속 진행 (종료하지 않음) # 데이터 로드 부분 오류 처리 강화 ----------- global active_sale_items try: active_sale_items = await load_huggingface_jsonl("initial_saleitem_dataset") if active_sale_items is not None and not active_sale_items.empty: logger.info(f"✅ 워커 {worker_id} 데이터 로드 완료: {len(active_sale_items)}개 항목") else: logger.error(f"❌ 워커 {worker_id} 데이터 로드 실패 - 빈 데이터셋") # 데이터가 없어도 계속 진행 except Exception as data_err: logger.error(f"💥 워커 {worker_id} 데이터 로드 중 오류: {data_err}") # 데이터 로드 실패해도 계속 진행 active_sale_items = pd.DataFrame() # 빈 DataFrame으로 초기화 # FAISS 인덱스 로드 부분 오류 처리 강화 ----------- faiss_loaded = False try: if await load_faiss_index_safe(): logger.info(f"✅ 워커 {worker_id} FAISS 인덱스 로드 성공") faiss_loaded = True else: logger.warning(f"⚠️ 워커 {worker_id} FAISS 인덱스 로드 실패") except Exception as faiss_err: logger.error(f"💥 워커 {worker_id} FAISS 인덱스 로드 중 오류: {faiss_err}") # 인덱스 로드 실패해도 계속 진행 # 여기에 주기적 메모리 모니터링 시작 try: asyncio.create_task(periodic_memory_monitor()) logger.info(f"✅ 워커 {worker_id} 메모리 모니터링 시작") except Exception as monitor_err: logger.error(f"⚠️ 워커 {worker_id} 메모리 모니터링 시작 실패: {monitor_err}") # 최종 워커 상태 기록 status = "🟢 정상" if faiss_loaded else "🟠 부분적 (인덱스 없음)" logger.info(f"🏁 워커 {worker_id} STARTUP 완료: {status}") except Exception as e: logger.exception(f"🔥 워커 {worker_id} STARTUP 실패: {e}") # 상세 오류 로깅 추가 (여기에 추가) import traceback logger.error(f"스택 추적: {traceback.format_exc()}") # 심각한 오류 시 워커 종료 고려 # sys.exit(1) # 프로덕션에서는 주의해서 사용 # ✅ FastAPI 실행 # Uvicorn 실행 설정 최적화 if __name__ == "__main__": import uvicorn # 워커 수 설정 workers = int(os.getenv("WORKERS", 3)) # GPU 메모리 분배를 명시적으로 설정 (여기에 추가) if device == "cuda": # 사용 가능한 GPU 메모리 제한 torch.cuda.set_per_process_memory_fraction(0.28) # 각 워커가 최대 40%의 GPU 메모리만 사용 uvicorn.run( "searchWorker:app", host="0.0.0.0", port=7860, workers=workers, log_level="info", timeout_keep_alive=65, # 연결 유지 시간 증가 limit_concurrency=100, # 동시 연결 제한(기본 100에서 변경함) timeout_graceful_shutdown=30 # 종료 시 대기 시간 )