import os import torch import pandas as pd import logging import faiss import numpy as np import time import gensim from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from datasets import load_dataset from huggingface_hub import login, hf_hub_download, HfApi, create_repo from keybert import KeyBERT from sentence_transformers import SentenceTransformer from joblib import Parallel, delayed from tqdm import tqdm import tempfile import re import sys import asyncio from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # ✅ 로그 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ✅ 스레드 풀 설정 (비동기 작업을 위한) thread_pool = ThreadPoolExecutor(max_workers=min(64, os.cpu_count() * 4)) # ✅ FastAPI 인스턴스 생성 app = FastAPI(title="🚀 KeyBERT + Word2Vec 기반 FAISS 검색 API", version="1.2") # ✅ GPU 사용 여부 확인 device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"🚀 실행 디바이스: {device.upper()}") # ✅ Hugging Face 로그인 HF_API_TOKEN = os.getenv("HF_API_TOKEN") if HF_API_TOKEN: logger.info("🔑 Hugging Face API 로그인 중...") login(token=HF_API_TOKEN) else: logger.error("❌ HF_API_TOKEN이 설정되지 않았습니다. 일부 기능이 제한될 수 있습니다.") # ✅ Word2Vec 모델 로드 word2vec_model = None try: logger.info("🔄 Word2Vec 모델 로드 중...") MODEL_REPO = "aikobay/item-model" model_path = hf_hub_download(repo_id=MODEL_REPO, filename="item_vectors.bin", repo_type="dataset") word2vec_model = gensim.models.KeyedVectors.load_word2vec_format(model_path, binary=True) logger.info(f"✅ Word2Vec 모델 로드 완료! 단어 수: {len(word2vec_model.key_to_index)}") except Exception as e: logger.error(f"❌ Word2Vec 모델 로드 실패: {e}") # ✅ KeyBERT 모델 로드 logger.info("🔄 KeyBERT 모델 로드 중...") kw_model = KeyBERT("paraphrase-multilingual-MiniLM-L12-v2") original_embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") logger.info("✅ KeyBERT 모델 로드 완료!") # ✅ 한국어 특화 임베딩 모델로 교체 embedding_model = None try: logger.info("🔄 한국어 특화 임베딩 모델로 교체 시도...") # 한국어 특화 모델 로드 시도 (실패시 기존 모델 유지) embedding_model = SentenceTransformer("jhgan/ko-sroberta-multitask") logger.info("✅ 한국어 특화 임베딩 모델 로드 완료!") except Exception as e: logger.warning(f"⚠️ 한국어 특화 모델 로드 실패, 기존 모델 유지: {e}") embedding_model = original_embedding_model # ✅ 진행 중인 경매 상품 데이터 로드 async def load_huggingface_jsonl(dataset_name, split="train"): """Hugging Face Hub에서 데이터셋 비동기 로드""" try: # 스레드 풀에서 실행하여 비동기 처리 loop = asyncio.get_event_loop() def _load_dataset(): repo_id = f"aikobay/{dataset_name}" dataset = load_dataset(repo_id, split=split) return dataset.to_pandas().dropna() # 스레드 풀에서 비동기로 실행 df = await loop.run_in_executor(thread_pool, _load_dataset) return df except Exception as e: logger.error(f"❌ 데이터 로드 중 오류 발생: {e}") return pd.DataFrame() # 초기 데이터 로드 - 비동기 함수를 동기적으로 호출하여 시작 시 로드 active_sale_items = None try: # 비동기 함수를 시작 시 실행하기 위한 임시 이벤트 루프 사용 loop = asyncio.new_event_loop() active_sale_items = loop.run_until_complete(load_huggingface_jsonl("initial_saleitem_dataset")) loop.close() if active_sale_items.empty: logger.error("❌ 데이터셋이 비어 있습니다. 프로그램을 종료합니다.") exit(1) logger.info(f"✅ 경매 상품 데이터 로드 완료! 총 {len(active_sale_items)}개 상품") except Exception as e: logger.error(f"❌ 상품 데이터 로드 실패: {e}") exit(1) # ✅ FAISS 인덱스 초기화 faiss_index = None indexed_items = [] # ✅ 멀티코어 벡터화 함수 async def encode_texts_parallel(texts, batch_size=1024): """GPU 활용 + 배치 사이즈 최적화 벡터화 (대규모 성능 향상)""" if not texts: return np.array([]).astype("float32") # 배치 크기 증가로 처리 효율 향상 loop = asyncio.get_event_loop() def _encode_efficiently(): # 벡터화 최적화 - GPU 활용 + 배치 사이즈 최적화 return embedding_model.encode( texts, batch_size=batch_size, convert_to_numpy=True, show_progress_bar=False, device=device # GPU 사용 ) # 스레드 풀에서 실행 embeddings = await loop.run_in_executor(thread_pool, _encode_efficiently) return embeddings.astype("float32") # ✅ FAISS 인덱스 저장 함수 (Hugging Face Hub) async def save_faiss_index(): """FAISS 인덱스를 Hugging Face Hub에 저장 (비동기 지원)""" global faiss_index, indexed_items if faiss_index is None or not indexed_items: logger.error("❌ 저장할 FAISS 인덱스가 없습니다.") return False try: # 레포지토리 ID repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") # 비동기 작업을 위한 루프 loop = asyncio.get_event_loop() # 비동기 작업으로 실행 def _save_index(): # HfApi 객체 생성 api = HfApi() # 레포지토리 존재 여부 확인 및 생성 try: api.repo_info(repo_id=repo_id, repo_type="dataset") logger.info(f"✅ 기존 레포지토리 사용: {repo_id}") except Exception: logger.info(f"🔄 레포지토리가 존재하지 않아 새로 생성합니다: {repo_id}") create_repo( repo_id=repo_id, repo_type="dataset", private=True, exist_ok=True ) logger.info(f"✅ 레포지토리 생성 완료: {repo_id}") # 임시 파일로 먼저 로컬에 저장 with tempfile.TemporaryDirectory() as temp_dir: index_path = os.path.join(temp_dir, "faiss_index.bin") items_path = os.path.join(temp_dir, "indexed_items.txt") # FAISS 인덱스 저장 faiss.write_index(faiss_index, index_path) # 아이템 목록 저장 with open(items_path, "w", encoding="utf-8") as f: f.write("\n".join(indexed_items)) # README 파일 생성 readme_path = os.path.join(temp_dir, "README.md") with open(readme_path, "w", encoding="utf-8") as f: f.write(f"""# FAISS 인덱스 저장소 이 저장소는 상품 검색을 위한 FAISS 인덱스와 관련 데이터를 포함하고 있습니다. - 최종 업데이트: {pd.Timestamp.now()} - 인덱스 항목 수: {len(indexed_items)} - 모델: KeyBERT + Word2Vec 이 저장소는 'aikobay/initial_saleitem_dataset'의 상품 데이터를 기반으로 생성된 벡터 인덱스를 저장하기 위해 자동 생성되었습니다. """) # 파일 업로드 for file_path, file_name in [ (index_path, "faiss_index.bin"), (items_path, "indexed_items.txt"), (readme_path, "README.md") ]: api.upload_file( path_or_fileobj=file_path, path_in_repo=file_name, repo_id=repo_id, repo_type="dataset" ) logger.info(f"✅ FAISS 인덱스가 Hugging Face Hub에 저장되었습니다. 레포: {repo_id}") return True # 스레드 풀에서 비동기적으로 실행 result = await loop.run_in_executor(thread_pool, _save_index) return result except Exception as e: logger.error(f"❌ FAISS 인덱스 Hub 저장 중 오류 발생: {e}") # 로컬에 백업 저장 시도 try: loop = asyncio.get_event_loop() def _local_backup(): local_path = os.path.join(os.getcwd(), "faiss_index.bin") faiss.write_index(faiss_index, local_path) with open("indexed_items.txt", "w", encoding="utf-8") as f: f.write("\n".join(indexed_items)) logger.info(f"✅ FAISS 인덱스가 로컬에 백업 저장되었습니다: {local_path}") return True result = await loop.run_in_executor(thread_pool, _local_backup) return result except Exception as local_err: logger.error(f"❌ 로컬 백업 저장도 실패: {local_err}") return False # ✅ FAISS 인덱스 로드 함수 (Hugging Face Hub) async def load_faiss_index(): """Hugging Face Hub에서 FAISS 인덱스를 로드 (비동기 지원)""" global faiss_index, indexed_items # 레포지토리 ID repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") try: # 비동기 작업을 위한 루프 loop = asyncio.get_event_loop() # 비동기 작업으로 실행 def _load_index(): # 레포지토리 존재 확인 api = HfApi() try: api.repo_info(repo_id=repo_id, repo_type="dataset") logger.info(f"✅ FAISS 인덱스 레포지토리 확인: {repo_id}") except Exception as repo_err: logger.warning(f"⚠️ 레포지토리가 존재하지 않습니다: {repo_err}") raise FileNotFoundError("Hub 레포지토리가 존재하지 않습니다") # Hub에서 파일 다운로드 index_path = hf_hub_download( repo_id=repo_id, filename="faiss_index.bin", repo_type="dataset" ) items_path = hf_hub_download( repo_id=repo_id, filename="indexed_items.txt", repo_type="dataset" ) # 파일 로드 loaded_index = faiss.read_index(index_path) with open(items_path, "r", encoding="utf-8") as f: loaded_items = f.read().splitlines() return loaded_index, loaded_items # 스레드 풀에서 비동기적으로 실행 loaded_index, loaded_items = await loop.run_in_executor(thread_pool, _load_index) # 전역 변수에 할당 faiss_index = loaded_index indexed_items = loaded_items logger.info(f"✅ FAISS 인덱스가 Hub에서 로드되었습니다. 총 {len(indexed_items)}개 상품") return True except Exception as e: logger.warning(f"⚠️ Hub에서 FAISS 인덱스 로드 중 오류 발생: {e}") # 로컬 파일 확인 try: loop = asyncio.get_event_loop() def _load_local(): local_index_path = "faiss_index.bin" local_items_path = "indexed_items.txt" if os.path.exists(local_index_path) and os.path.exists(local_items_path): loaded_index = faiss.read_index(local_index_path) with open(local_items_path, "r", encoding="utf-8") as f: loaded_items = f.read().splitlines() return loaded_index, loaded_items else: logger.warning("⚠️ 로컬 FAISS 인덱스 파일이 존재하지 않습니다.") return None, None # 스레드 풀에서 비동기적으로 실행 result = await loop.run_in_executor(thread_pool, _load_local) if result[0] is not None: faiss_index, indexed_items = result logger.info(f"✅ 로컬 FAISS 인덱스 로드 성공. 총 {len(indexed_items)}개 상품") return True else: return False except Exception as local_err: logger.error(f"❌ 로컬 FAISS 인덱스 로드 중 오류: {local_err}") return False # ✅ FAISS 양자화 인덱스 구축 함수 (IVF 기반으로 변경) async def rebuild_faiss_index(): """FAISS 인덱스를 IVF 기반으로 새롭게 구축 (속도 최적화)""" global faiss_index, indexed_items, active_sale_items logger.info("🔄 FAISS 인덱스를 고속 IVF 기반으로 재구축 중...") # 최신 상품 데이터 로드 active_sale_items = await load_huggingface_jsonl("initial_saleitem_dataset") if active_sale_items.empty: logger.error("❌ 상품 데이터를 로드할 수 없습니다.") raise RuntimeError("상품 데이터 로드 실패") # 상품명 목록 추출 item_names = active_sale_items["ITEMNAME"].tolist() indexed_items = item_names # 간소화된 로깅 total_items = len(item_names) logger.info(f"🔹 총 {total_items}개 상품 고속 벡터화 시작...") # 벡터화 최적화 - 배치 사이즈 증가 item_vectors = await encode_texts_parallel(item_names, batch_size=1024) # 벡터 정규화 (코사인 유사도를 위해) norms = np.linalg.norm(item_vectors, axis=1, keepdims=True) norms[norms == 0] = 1.0 # 0으로 나눔 방지 normalized_vectors = item_vectors / norms # IVF 기반 인덱스 구축 (속도 대폭 개선) loop = asyncio.get_event_loop() def _build_ivf_index(): dimension = item_vectors.shape[1] # IVF 클러스터 수 - 데이터 크기에 따라 조정 (√n 규칙 사용) nlist = int(np.sqrt(total_items) * 4) # 클러스터 수 증가 nlist = max(32, min(nlist, 1024)) # 최소 32, 최대 1024개 제한 # 양자화 파라미터 - 차원 수에 맞게 조정 M = min(64, dimension // 2) # 서브벡터 수 nbits = 8 # 비트 수 # 고속 IVF 인덱스 생성 if total_items > 10000: # 벡터가 많으면 압축 기법 사용 # IVF + PQ (Product Quantization) 조합 - 메모리 효율적 quantizer = faiss.IndexFlatIP(dimension) index = faiss.IndexIVFPQ(quantizer, dimension, nlist, M, nbits) else: # 일반 IVF - 속도 향상 quantizer = faiss.IndexFlatIP(dimension) index = faiss.IndexIVFFlat(quantizer, dimension, nlist) # 학습 및 추가 index.train(normalized_vectors) index.add(normalized_vectors) # 검색 품질 향상을 위한 설정 # nprobe = 몇 개의 클러스터를 검색할지 (높을수록 정확도 ↑, 속도 ↓) index.nprobe = min(32, nlist // 4) # 클러스터의 25% 검색 logger.info(f"✅ IVF 인덱스 구축 완료: clusters={nlist}, nprobe={index.nprobe}") return index # 인덱스 구축 실행 faiss_index = await loop.run_in_executor(thread_pool, _build_ivf_index) logger.info(f"✅ 고속 FAISS 인덱스 구축 완료! 총 {len(indexed_items)}개 항목") # 구축 후 Hub에 저장 await save_faiss_index() return True # ✅ FAISS 인덱스 상태 확인 및 필요시에만 구축 async def check_faiss_index(): """FAISS 인덱스가 존재하는지 확인하고 없으면 구축 (비동기 지원)""" global faiss_index if faiss_index is None: # Hub에서 로드 시도 if not await load_faiss_index(): # 로드 실패 시 새로 구축 logger.warning("⚠️ 저장된 인덱스가 없어 새로 구축합니다.") await rebuild_faiss_index() # 모든 과정 후에도 인덱스가 None이면 오류 if faiss_index is None: raise RuntimeError("FAISS 인덱스 초기화에 실패했습니다.") # ✅ 최적화된 키워드 추출 함수 async def extract_keywords(query: str, top_n: int = 2): # top_n 감소 """KeyBERT 최적화 키워드 추출 (성능 중심)""" # 매우 짧은 쿼리는 그대로 반환 (처리 비용 절감) if len(query) <= 3: return [query] loop = asyncio.get_event_loop() def _optimized_extract(): # 성능 중심 설정 return kw_model.extract_keywords( query, keyphrase_ngram_range=(1, 1), # 단일 단어만 추출 stop_words=["이", "그", "저", "을", "를", "에", "에서", "은", "는"], # 한국어 불용어 use_mmr=True, diversity=0.5, top_n=top_n ) try: keywords = await loop.run_in_executor(thread_pool, _optimized_extract) # 가중치가 너무 낮은 키워드 제외 filtered = [(k, s) for k, s in keywords if s > 0.2] return [k[0] for k in filtered] except Exception as e: logger.error(f"❌ 키워드 추출 오류: {str(e)}") # 단어 분리로 폴백 return query.split()[:2] # ✅ 최적화된 키워드 확장 함수 async def expand_keywords_with_word2vec(keywords: list, max_new=2): # max_new 감소 """Word2Vec 키워드 확장 최적화""" global word2vec_model if word2vec_model is None or not keywords: return keywords # 결과 저장을 위한 집합 expanded = set(keywords) loop = asyncio.get_event_loop() def _expand_keywords(): for keyword in keywords: # 단일 단어인 경우 if keyword in word2vec_model: # 유사도가 높은 단어만 선택 (임계값 적용) similar_words = word2vec_model.most_similar(keyword, topn=max_new) for word, score in similar_words: if score > 0.7: # 높은 유사도 임계값 적용 expanded.add(word) # 복합어 처리 (첫 단어만) elif len(keyword.split()) > 1: word = keyword.split()[0] if word in word2vec_model and len(word) > 1: similar = word2vec_model.most_similar(word, topn=1) if similar and similar[0][1] > 0.8: # 높은 임계값 expanded.add(similar[0][0]) # 결과 변환 result = list(expanded) # 키워드가 너무 많으면 제한 if len(result) > 5: return keywords + result[len(keywords):5] return result try: # 확장 실행 expanded_keywords = await loop.run_in_executor(thread_pool, _expand_keywords) return expanded_keywords except Exception as e: logger.error(f"❌ Word2Vec 확장 오류: {str(e)}") return keywords # 오류 시 원본 키워드 반환 # ✅ 최적화된 search_faiss_with_keywords 함수 async def search_faiss_with_keywords(query: str, top_k: int = 5, keywords=None, expanded_keywords=None): """고속 키워드 기반 FAISS 검색 수행""" global faiss_index, indexed_items # FAISS 인덱스 확인 - 한 번만 실행 if faiss_index is None: await check_faiss_index() # 타이머 시작 start_time = time.time() # 병렬 실행을 위한 준비 loop = asyncio.get_event_loop() # 1. 키워드 추출 및 확장 최적화 if keywords is None: keywords = await extract_keywords(query) if expanded_keywords is None: expanded_keywords = await expand_keywords_with_word2vec(keywords) # 2. 벡터 인코딩 최적화 - 쿼리와 키워드 한 번에 처리 search_texts = [query] + expanded_keywords # 벡터 인코딩 - 최적화된 함수 사용 all_vectors = await encode_texts_parallel(search_texts) # 벡터 정규화 - 최적화된 방식 def normalize_batch(vectors): if vectors.size == 0: return vectors norms = np.linalg.norm(vectors, axis=1, keepdims=True) norms[norms == 0] = 1.0 # 0으로 나눔 방지 return vectors / norms # 벡터 정규화 실행 all_vectors = await loop.run_in_executor(thread_pool, lambda: normalize_batch(all_vectors)) # 쿼리 벡터와 키워드 벡터 분리 if len(all_vectors) > 0: query_vector = all_vectors[0:1] keyword_vectors = all_vectors[1:] if len(all_vectors) > 1 else np.array([]) else: return [] # 벡터화 실패 시 빈 결과 반환 # 3. FAISS 검색 최적화 - 일괄 배치 처리 def _optimized_batch_search(): all_results = {} # 쿼리 벡터 검색 (가중치 3배로 증가) if query_vector.shape[0] > 0: distances, indices = faiss_index.search(query_vector, top_k * 2) # 쿼리 결과 가중치 적용 (중요도 상향) for idx, dist in zip(indices[0], distances[0]): if idx < len(indexed_items): all_results[idx] = dist * 3.0 # 가중치 3.0 # 키워드 벡터 배치 검색 if keyword_vectors.shape[0] > 0: # 배치 검색 한 번에 처리 k_distances, k_indices = faiss_index.search(keyword_vectors, top_k) # 키워드별 가중치 적용 및 결과 병합 for i in range(keyword_vectors.shape[0]): for j, (idx, dist) in enumerate(zip(k_indices[i], k_distances[i])): if idx < len(indexed_items): # 순위에 따라 가중치 차등 적용 (상위 결과 우대) rank_weight = 1.0 / (1 + j * 0.2) # 순위별 가중치 감소 weight = 0.6 * rank_weight # 기본 가중치 0.6 # 기존 점수에 추가 all_results[idx] = all_results.get(idx, 0) + dist * weight return all_results # 최적화된 배치 검색 실행 result_scores = await loop.run_in_executor(thread_pool, _optimized_batch_search) # 4. 결과 처리 및 정렬 최적화 def _process_results(): # 임계값 필터링 및 정렬 filtered_items = [(idx, score) for idx, score in result_scores.items() if score >= 0.3] # 최소 점수 필터링 # 점수 기준 내림차순 정렬 sorted_items = sorted(filtered_items, key=lambda x: x[1], reverse=True) # 최종 결과 변환 recommendations = [] for idx, score in sorted_items[:top_k]: # top_k개만 처리 item_name = indexed_items[idx] try: # 메모리 내 조회 최적화 mask = active_sale_items["ITEMNAME"] == item_name if mask.any(): item_seq = active_sale_items.loc[mask, "ITEMSEQ"].values[0] recommendations.append({ "ITEMSEQ": item_seq, "ITEMNAME": item_name, "score": float(score) }) except (IndexError, KeyError): continue return recommendations # 결과 처리 실행 recommendations = await loop.run_in_executor(thread_pool, _process_results) # 5. 직접 매칭 추가 최적화 (필요한 경우에만) if len(recommendations) < top_k: direct_matches = await find_direct_matches(query, top_k - len(recommendations), [r["ITEMNAME"] for r in recommendations]) if direct_matches: recommendations.extend(direct_matches) # 처리 시간이 1초 이상인 경우에만 로깅 elapsed = time.time() - start_time if elapsed > 1.0: logger.info(f"🔍 검색 완료 | 소요시간: {elapsed:.2f}초 | 결과: {len(recommendations)}개") return recommendations[:top_k] # ✅ 직접 매칭 분리 (성능 최적화) async def find_direct_matches(query, limit=5, existing_names=None): """직접 텍스트 매칭 검색 (분리하여 최적화)""" loop = asyncio.get_event_loop() def _find_matches(): matches = [] query_lower = query.lower() existing = set(existing_names or []) # 데이터 인덱싱 최적화 item_dict = {} for idx, item_name in enumerate(indexed_items): if len(matches) >= limit: break if item_name in existing: continue if query_lower in item_name.lower(): item_dict[item_name] = idx # 한 번에 데이터프레임 조회 if item_dict: mask = active_sale_items["ITEMNAME"].isin(item_dict.keys()) filtered_items = active_sale_items[mask] for _, row in filtered_items.iterrows(): if len(matches) >= limit: break matches.append({ "ITEMSEQ": row["ITEMSEQ"], "ITEMNAME": row["ITEMNAME"], "score": 1.0 }) return matches # 스레드 풀에서 실행 return await loop.run_in_executor(thread_pool, _find_matches) # ✅ API 요청 모델 class RecommendRequest(BaseModel): search_query: str top_k: int = 5 use_expansion: bool = True # 키워드 확장 사용 여부 # ✅ 추천 API 엔드포인트 # ✅ 최적화된 recommend API 엔드포인트 @app.post("/api/recommend") async def recommend(request: RecommendRequest, background_tasks: BackgroundTasks): """고속 추천 API (I/O 병렬화 + 불필요 작업 제거)""" try: # 벤치마크용 타이머 시작 start_time = time.time() # 파라미터 최적화 및 검증 search_query = request.search_query.strip() if not search_query: raise HTTPException(status_code=400, detail="검색어를 입력해주세요") top_k = min(max(1, request.top_k), 20) # 1~20 범위로 제한 # 병렬 프로세싱을 위한 동시 실행 keywords, expanded_keywords = await asyncio.gather( extract_keywords(search_query), expand_keywords_with_word2vec( [search_query.split()[0]] if search_query.split() else [search_query], max_new=2 ) if request.use_expansion else None ) # 검색 실행 - 병렬 처리된 키워드 활용 recommendations = await search_faiss_with_keywords( search_query, top_k, keywords, expanded_keywords ) # 결과 반환 result = { "query": search_query, "recommendations": recommendations, "keywords": keywords if len(keywords) > 0 else None, "expanded_keywords": expanded_keywords if expanded_keywords and len(expanded_keywords) > 0 else None } # 응답 시간 측정 (1초 이상만 로깅) elapsed = time.time() - start_time if elapsed > 1.0: logger.info(f"⏱️ API 응답 시간: {elapsed:.2f}초 | 쿼리: '{search_query}'") return result except Exception as e: logger.error(f"❌ 추천 처리 오류: {str(e)}") raise HTTPException(status_code=500, detail=f"추천 처리 중 오류가 발생했습니다") # 인덱스 상태 확인 함수 (백그라운드 태스크용) async def check_index_health(): """인덱스 상태를 주기적으로 확인하는 백그라운드 태스크""" try: # 인덱스 사용 상태 확인 if faiss_index is None: logger.warning("⚠️ 백그라운드 체크: FAISS 인덱스가 로드되지 않았습니다.") await check_faiss_index() # 추가적인 상태 확인 로직을 여기에 구현할 수 있음 logger.debug("✅ 인덱스 상태 확인 완료") except Exception as e: logger.error(f"❌ 백그라운드 인덱스 체크 중 오류: {str(e)}") # ✅ 유사 단어 검색 API @app.post("/api/similar_words") async def similar_words(word: str, top_k: int = 10): """Word2Vec 모델을 사용한 유사 단어 검색 API (비동기 지원)""" try: if word2vec_model is None: return {"error": "Word2Vec 모델이 로드되지 않았습니다."} loop = asyncio.get_event_loop() def _get_similar(): if word not in word2vec_model: return [] similar = word2vec_model.most_similar(word, topn=top_k) return [{"word": w, "similarity": float(s)} for w, s in similar] result = await loop.run_in_executor(thread_pool, _get_similar) if not result: return {"word": word, "similar_words": [], "message": "단어가 모델에 없습니다."} return {"word": word, "similar_words": result} except Exception as e: logger.error(f"❌ 유사 단어 검색 중 오류: {str(e)}") raise HTTPException(status_code=500, detail=f"유사 단어 검색 오류: {str(e)}") # ✅ FAISS 인덱스 갱신 API (명시적으로 요청할 때만 실행) @app.post("/api/update_index") async def update_index(background_tasks: BackgroundTasks): """FAISS 인덱스를 새롭게 구축 (명시적 요청 시에만, 비동기 처리)""" try: # 인덱스 재구축을 백그라운드 태스크로 실행 background_tasks.add_task(rebuild_and_log_index) return {"message": "✅ FAISS 인덱스 업데이트가 백그라운드에서 시작되었습니다."} except Exception as e: logger.exception("❌ [API] 인덱스 업데이트 처리 중 예외 발생") raise HTTPException(status_code=500, detail=f"인덱스 업데이트 실패: {str(e)}") # 백그라운드 작업용 인덱스 재구축 함수 async def rebuild_and_log_index(): """백그라운드에서 인덱스를 재구축하고 결과를 로깅""" try: logger.info("🔄 백그라운드에서 인덱스 재구축 시작") start_time = time.time() await rebuild_faiss_index() elapsed = time.time() - start_time logger.info(f"✅ 백그라운드 인덱스 재구축 완료! 소요 시간: {elapsed:.2f}초") except Exception as e: logger.error(f"❌ 백그라운드 인덱스 재구축 중 오류: {str(e)}") # ✅ 인덱스 디버깅 API @app.get("/api/debug_index") async def debug_index(query: str, top_k: int = 20): """인덱스 디버깅을 위한 API (비동기 지원)""" try: await check_faiss_index() loop = asyncio.get_event_loop() # 원본 벡터 생성 (비동기) def _get_vector(): vector = embedding_model.encode(query, convert_to_numpy=True).astype("float32") norm = np.linalg.norm(vector) normalized_vector = vector / norm return normalized_vector, norm normalized_vector, norm = await loop.run_in_executor(thread_pool, _get_vector) # 원본 쿼리로 검색 (비동기) def _search(): return faiss_index.search(np.array([normalized_vector]), top_k) distances, indices = await loop.run_in_executor(thread_pool, _search) # 결과 매핑 results = [] for i, (idx, dist) in enumerate(zip(indices[0], distances[0])): if idx < len(indexed_items): item_name = indexed_items[idx] results.append({ "rank": i + 1, "index": int(idx), "item_name": item_name, "distance/score": float(dist) }) # 데이터셋에 해당 단어가 있는지 확인 (비동기) def _find_matches(): contains = [item for item in indexed_items if query.lower() in item.lower()][:5] exact = [item for item in indexed_items if query.lower() == item.lower()] return contains, exact contains_query, exact_matches = await loop.run_in_executor(thread_pool, _find_matches) return { "query": query, "vector_norm": float(norm), "contains_query": contains_query, "exact_matches": exact_matches, "results": results } except Exception as e: logger.error(f"❌ 인덱스 디버깅 중 오류: {str(e)}") raise HTTPException(status_code=500, detail=f"인덱스 디버깅 오류: {str(e)}") # ✅ 문자열 포함 검색 API @app.get("/api/text_search") async def text_search(query: str, top_k: int = 10): """단순 텍스트 포함 검색 API (비동기 지원)""" try: loop = asyncio.get_event_loop() # 비동기 검색 함수 def _text_search(): # 단순 텍스트 포함 검색 matched_items = [] for idx, item_name in enumerate(indexed_items): if query.lower() in item_name.lower(): try: item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] matched_items.append({"ITEMSEQ": item_seq, "ITEMNAME": item_name, "match_type": "contains"}) except (IndexError, KeyError): continue # 정확히 일치하는 항목을 앞으로 exact_matches = [] partial_matches = [] for item in matched_items: if query.lower() == item["ITEMNAME"].lower(): item["match_type"] = "exact" exact_matches.append(item) else: partial_matches.append(item) # 결합 및 제한 return exact_matches + partial_matches # 비동기적으로 검색 실행 results = await loop.run_in_executor(thread_pool, _text_search) logger.info(f"🔍 텍스트 검색 결과: {len(results)}개 찾음, 쿼리: '{query}'") return { "query": query, "total_matches": len(results), "results": results[:top_k] } except Exception as e: logger.error(f"❌ 텍스트 검색 중 오류: {str(e)}") raise HTTPException(status_code=500, detail=f"텍스트 검색 오류: {str(e)}") # ✅ FastAPI 실행 if __name__ == "__main__": # 서버 시작 시 저장된 인덱스 로드 시도 try: # 비동기 함수를 동기적으로 호출하기 위한 임시 이벤트 루프 사용 loop = asyncio.new_event_loop() if not loop.run_until_complete(load_faiss_index()): logger.warning("⚠️ 기존 인덱스 로드에 실패했습니다. 즉시 새 인덱스를 구축합니다.") # 인덱스 즉시 재구축 loop.run_until_complete(rebuild_faiss_index()) logger.info("✅ FAISS 인덱스 생성 완료!") else: logger.info("✅ 기존 인덱스를 성공적으로 로드했습니다.") loop.close() except Exception as e: logger.error(f"❌ 인덱스 초기 구축 실패: {e}") logger.warning("⚠️ 인덱스 없이 시작합니다. 검색 기능이 제한될 수 있습니다.") import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)