|
import os |
|
import torch |
|
import pandas as pd |
|
import logging |
|
import faiss |
|
import numpy as np |
|
import time |
|
import gensim |
|
from fastapi import FastAPI, HTTPException |
|
from pydantic import BaseModel |
|
from datasets import load_dataset |
|
from huggingface_hub import login, hf_hub_download, HfApi, create_repo |
|
from keybert import KeyBERT |
|
from sentence_transformers import SentenceTransformer |
|
from joblib import Parallel, delayed |
|
from tqdm import tqdm |
|
import tempfile |
|
import re |
|
import sys |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI(title="🚀 KeyBERT + Word2Vec 기반 FAISS 검색 API", version="1.1") |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
logger.info(f"🚀 실행 디바이스: {device.upper()}") |
|
|
|
|
|
HF_API_TOKEN = os.getenv("HF_API_TOKEN") |
|
if HF_API_TOKEN: |
|
logger.info("🔑 Hugging Face API 로그인 중...") |
|
login(token=HF_API_TOKEN) |
|
else: |
|
logger.error("❌ HF_API_TOKEN이 설정되지 않았습니다. 일부 기능이 제한될 수 있습니다.") |
|
|
|
|
|
word2vec_model = None |
|
try: |
|
logger.info("🔄 Word2Vec 모델 로드 중...") |
|
MODEL_REPO = "aikobay/item-model" |
|
model_path = hf_hub_download(repo_id=MODEL_REPO, filename="item_vectors.bin", repo_type="dataset") |
|
word2vec_model = gensim.models.KeyedVectors.load_word2vec_format(model_path, binary=True) |
|
logger.info(f"✅ Word2Vec 모델 로드 완료! 단어 수: {len(word2vec_model.key_to_index)}") |
|
except Exception as e: |
|
logger.error(f"❌ Word2Vec 모델 로드 실패: {e}") |
|
|
|
|
|
logger.info("🔄 KeyBERT 모델 로드 중...") |
|
kw_model = KeyBERT("paraphrase-multilingual-MiniLM-L12-v2") |
|
original_embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") |
|
logger.info("✅ KeyBERT 모델 로드 완료!") |
|
|
|
|
|
embedding_model = None |
|
try: |
|
logger.info("🔄 한국어 특화 임베딩 모델로 교체 시도...") |
|
|
|
embedding_model = SentenceTransformer("jhgan/ko-sroberta-multitask") |
|
logger.info("✅ 한국어 특화 임베딩 모델 로드 완료!") |
|
except Exception as e: |
|
logger.warning(f"⚠️ 한국어 특화 모델 로드 실패, 기존 모델 유지: {e}") |
|
embedding_model = original_embedding_model |
|
|
|
|
|
def load_huggingface_jsonl(dataset_name, split="train"): |
|
"""Hugging Face Hub에서 데이터셋 로드""" |
|
try: |
|
repo_id = f"aikobay/{dataset_name}" |
|
dataset = load_dataset(repo_id, split=split) |
|
df = dataset.to_pandas().dropna() |
|
return df |
|
except Exception as e: |
|
logger.error(f"❌ 데이터 로드 중 오류 발생: {e}") |
|
return pd.DataFrame() |
|
|
|
try: |
|
active_sale_items = load_huggingface_jsonl("initial_saleitem_dataset") |
|
if active_sale_items.empty: |
|
logger.error("❌ 데이터셋이 비어 있습니다. 프로그램을 종료합니다.") |
|
exit(1) |
|
logger.info(f"✅ 경매 상품 데이터 로드 완료! 총 {len(active_sale_items)}개 상품") |
|
except Exception as e: |
|
logger.error(f"❌ 상품 데이터 로드 실패: {e}") |
|
exit(1) |
|
|
|
|
|
faiss_index = None |
|
indexed_items = [] |
|
|
|
|
|
def encode_texts_parallel(texts, batch_size=512): |
|
"""멀티 프로세싱을 활용한 벡터화 속도 최적화""" |
|
num_cores = os.cpu_count() |
|
logger.info(f"🔄 멀티코어 벡터화 진행 (코어 수: {num_cores})") |
|
|
|
def encode_batch(batch): |
|
return embedding_model.encode(batch, convert_to_numpy=True) |
|
|
|
|
|
text_batches = [texts[i:i + batch_size] for i in range(0, len(texts), batch_size)] |
|
embeddings = Parallel(n_jobs=num_cores)(delayed(encode_batch)(batch) for batch in text_batches) |
|
|
|
return np.vstack(embeddings).astype("float32") |
|
|
|
|
|
def save_faiss_index(): |
|
"""FAISS 인덱스를 Hugging Face Hub에 저장""" |
|
global faiss_index, indexed_items |
|
|
|
if faiss_index is None or not indexed_items: |
|
logger.error("❌ 저장할 FAISS 인덱스가 없습니다.") |
|
return False |
|
|
|
try: |
|
|
|
repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") |
|
|
|
|
|
api = HfApi() |
|
|
|
|
|
try: |
|
api.repo_info(repo_id=repo_id, repo_type="dataset") |
|
logger.info(f"✅ 기존 레포지토리 사용: {repo_id}") |
|
except Exception: |
|
logger.info(f"🔄 레포지토리가 존재하지 않아 새로 생성합니다: {repo_id}") |
|
create_repo( |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
private=True, |
|
exist_ok=True |
|
) |
|
logger.info(f"✅ 레포지토리 생성 완료: {repo_id}") |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
index_path = os.path.join(temp_dir, "faiss_index.bin") |
|
items_path = os.path.join(temp_dir, "indexed_items.txt") |
|
|
|
|
|
faiss.write_index(faiss_index, index_path) |
|
|
|
|
|
with open(items_path, "w", encoding="utf-8") as f: |
|
f.write("\n".join(indexed_items)) |
|
|
|
|
|
readme_path = os.path.join(temp_dir, "README.md") |
|
with open(readme_path, "w", encoding="utf-8") as f: |
|
f.write(f"""# FAISS 인덱스 저장소 |
|
이 저장소는 상품 검색을 위한 FAISS 인덱스와 관련 데이터를 포함하고 있습니다. |
|
- 최종 업데이트: {pd.Timestamp.now()} |
|
- 인덱스 항목 수: {len(indexed_items)} |
|
- 모델: KeyBERT + Word2Vec |
|
이 저장소는 'aikobay/initial_saleitem_dataset'의 상품 데이터를 기반으로 생성된 벡터 인덱스를 저장하기 위해 자동 생성되었습니다. |
|
""") |
|
|
|
|
|
for file_path, file_name in [ |
|
(index_path, "faiss_index.bin"), |
|
(items_path, "indexed_items.txt"), |
|
(readme_path, "README.md") |
|
]: |
|
api.upload_file( |
|
path_or_fileobj=file_path, |
|
path_in_repo=file_name, |
|
repo_id=repo_id, |
|
repo_type="dataset" |
|
) |
|
|
|
logger.info(f"✅ FAISS 인덱스가 Hugging Face Hub에 저장되었습니다. 레포: {repo_id}") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"❌ FAISS 인덱스 Hub 저장 중 오류 발생: {e}") |
|
|
|
|
|
try: |
|
local_path = os.path.join(os.getcwd(), "faiss_index.bin") |
|
faiss.write_index(faiss_index, local_path) |
|
with open("indexed_items.txt", "w", encoding="utf-8") as f: |
|
f.write("\n".join(indexed_items)) |
|
logger.info(f"✅ FAISS 인덱스가 로컬에 백업 저장되었습니다: {local_path}") |
|
return True |
|
except Exception as local_err: |
|
logger.error(f"❌ 로컬 백업 저장도 실패: {local_err}") |
|
return False |
|
|
|
|
|
def load_faiss_index(): |
|
"""Hugging Face Hub에서 FAISS 인덱스를 로드""" |
|
global faiss_index, indexed_items |
|
|
|
|
|
repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") |
|
|
|
try: |
|
|
|
api = HfApi() |
|
try: |
|
api.repo_info(repo_id=repo_id, repo_type="dataset") |
|
logger.info(f"✅ FAISS 인덱스 레포지토리 확인: {repo_id}") |
|
except Exception as repo_err: |
|
logger.warning(f"⚠️ 레포지토리가 존재하지 않습니다: {repo_err}") |
|
raise FileNotFoundError("Hub 레포지토리가 존재하지 않습니다") |
|
|
|
|
|
index_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename="faiss_index.bin", |
|
repo_type="dataset" |
|
) |
|
|
|
items_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename="indexed_items.txt", |
|
repo_type="dataset" |
|
) |
|
|
|
|
|
faiss_index = faiss.read_index(index_path) |
|
with open(items_path, "r", encoding="utf-8") as f: |
|
indexed_items = f.read().splitlines() |
|
|
|
logger.info(f"✅ FAISS 인덱스가 Hub에서 로드되었습니다. 총 {len(indexed_items)}개 상품") |
|
return True |
|
|
|
except Exception as e: |
|
logger.warning(f"⚠️ Hub에서 FAISS 인덱스 로드 중 오류 발생: {e}") |
|
|
|
|
|
try: |
|
local_index_path = "faiss_index.bin" |
|
local_items_path = "indexed_items.txt" |
|
|
|
if os.path.exists(local_index_path) and os.path.exists(local_items_path): |
|
faiss_index = faiss.read_index(local_index_path) |
|
with open(local_items_path, "r", encoding="utf-8") as f: |
|
indexed_items = f.read().splitlines() |
|
logger.info(f"✅ 로컬 FAISS 인덱스 로드 성공. 총 {len(indexed_items)}개 상품") |
|
return True |
|
else: |
|
logger.warning("⚠️ 로컬 FAISS 인덱스 파일이 존재하지 않습니다.") |
|
return False |
|
except Exception as local_err: |
|
logger.error(f"❌ 로컬 FAISS 인덱스 로드 중 오류: {local_err}") |
|
return False |
|
|
|
|
|
def rebuild_faiss_index(): |
|
"""FAISS 인덱스를 새롭게 구축""" |
|
global faiss_index, indexed_items, active_sale_items |
|
|
|
logger.info("🔄 FAISS 인덱스를 재구축 중...") |
|
|
|
|
|
active_sale_items = load_huggingface_jsonl("initial_saleitem_dataset") |
|
if active_sale_items.empty: |
|
logger.error("❌ 상품 데이터를 로드할 수 없습니다.") |
|
raise RuntimeError("상품 데이터 로드 실패") |
|
|
|
|
|
item_names = active_sale_items["ITEMNAME"].tolist() |
|
indexed_items = item_names |
|
logger.info(f"🔹 총 {len(item_names)}개 상품 벡터화 시작...") |
|
|
|
|
|
item_vectors = encode_texts_parallel(item_names) |
|
|
|
|
|
norms = np.linalg.norm(item_vectors, axis=1, keepdims=True) |
|
normalized_vectors = item_vectors / norms |
|
|
|
|
|
faiss_index = faiss.IndexFlatIP(item_vectors.shape[1]) |
|
faiss_index.add(normalized_vectors) |
|
|
|
logger.info(f"✅ FAISS 인덱스 구축 완료! 총 {len(indexed_items)}개 항목.") |
|
|
|
|
|
save_faiss_index() |
|
return True |
|
|
|
|
|
def check_faiss_index(): |
|
"""FAISS 인덱스가 존재하는지 확인하고 없으면 구축""" |
|
global faiss_index |
|
|
|
if faiss_index is None: |
|
|
|
if not load_faiss_index(): |
|
|
|
logger.warning("⚠️ 저장된 인덱스가 없어 새로 구축합니다.") |
|
rebuild_faiss_index() |
|
|
|
|
|
if faiss_index is None: |
|
raise RuntimeError("FAISS 인덱스 초기화에 실패했습니다.") |
|
|
|
|
|
def extract_keywords(query: str, top_n: int = 3): |
|
"""KeyBERT를 사용한 핵심 키워드 추출""" |
|
keywords = kw_model.extract_keywords(query, keyphrase_ngram_range=(1,2), top_n=top_n) |
|
return [k[0] for k in keywords] |
|
|
|
|
|
def expand_keywords_with_word2vec(keywords: list, max_new=5): |
|
"""Word2Vec 모델을 사용한 키워드 확장""" |
|
if word2vec_model is None: |
|
logger.warning("⚠️ Word2Vec 모델이 로드되지 않아 확장을 수행하지 않습니다.") |
|
return keywords |
|
|
|
expanded_keywords = list(keywords) |
|
|
|
try: |
|
for keyword in keywords: |
|
|
|
if keyword in word2vec_model: |
|
|
|
similar_words = word2vec_model.most_similar(keyword, topn=max_new) |
|
expanded_keywords.extend([word for word, _ in similar_words]) |
|
elif len(keyword.split()) > 1: |
|
|
|
for word in keyword.split(): |
|
if word in word2vec_model and len(word) > 1: |
|
similar_words = word2vec_model.most_similar(word, topn=2) |
|
expanded_keywords.extend([w for w, _ in similar_words]) |
|
|
|
|
|
expanded_keywords = list(set(expanded_keywords)) |
|
logger.info(f"🔍 Word2Vec 확장 키워드: {expanded_keywords}") |
|
return expanded_keywords |
|
|
|
except Exception as e: |
|
logger.error(f"❌ Word2Vec 키워드 확장 중 오류 발생: {e}") |
|
return keywords |
|
|
|
|
|
def search_faiss_with_keywords(query: str, top_k: int = 5, keywords=None, expanded_keywords=None): |
|
"""키워드 기반 FAISS 검색 수행""" |
|
|
|
check_faiss_index() |
|
|
|
start_time = time.time() |
|
|
|
|
|
if keywords is None: |
|
keywords = extract_keywords(query) |
|
logger.info(f"🔍 KeyBERT 추출 키워드: {keywords}") |
|
|
|
|
|
if expanded_keywords is None: |
|
expanded_keywords = expand_keywords_with_word2vec(keywords) |
|
|
|
|
|
query_vector = embedding_model.encode(query, convert_to_numpy=True).astype("float32") |
|
|
|
query_vector = query_vector / np.linalg.norm(query_vector) |
|
query_vector = np.array([query_vector]) |
|
|
|
|
|
distances, query_indices = faiss_index.search(query_vector, top_k * 2) |
|
|
|
|
|
recommendations = [] |
|
scored_results = {} |
|
|
|
|
|
for idx, (i, dist) in enumerate(zip(query_indices[0], distances[0])): |
|
if i < len(indexed_items): |
|
item_name = indexed_items[i] |
|
score = 2.0 * dist |
|
scored_results[item_name] = score |
|
|
|
|
|
for keyword in expanded_keywords: |
|
keyword_vector = embedding_model.encode(keyword, convert_to_numpy=True).astype("float32") |
|
keyword_vector = keyword_vector / np.linalg.norm(keyword_vector) |
|
keyword_vector = np.array([keyword_vector]) |
|
|
|
k_distances, k_indices = faiss_index.search(keyword_vector, top_k) |
|
|
|
|
|
for idx, (i, dist) in enumerate(zip(k_indices[0], k_distances[0])): |
|
if i < len(indexed_items): |
|
item_name = indexed_items[i] |
|
|
|
if item_name in scored_results: |
|
scored_results[item_name] += 0.5 * dist |
|
else: |
|
scored_results[item_name] = 0.5 * dist |
|
|
|
|
|
sorted_results = sorted(scored_results.items(), key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
min_score_threshold = 0.3 |
|
|
|
for item_name, score in sorted_results: |
|
|
|
if score < min_score_threshold: |
|
continue |
|
|
|
try: |
|
item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] |
|
recommendations.append({"ITEMSEQ": item_seq, "ITEMNAME": item_name, "score": float(score)}) |
|
except (IndexError, KeyError) as e: |
|
continue |
|
|
|
|
|
if len(recommendations) < top_k: |
|
direct_matches = [] |
|
for idx, item_name in enumerate(indexed_items): |
|
|
|
if query.lower() in item_name.lower(): |
|
try: |
|
item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] |
|
|
|
if not any(r["ITEMNAME"] == item_name for r in recommendations): |
|
direct_matches.append({"ITEMSEQ": item_seq, "ITEMNAME": item_name, "score": 1.0}) |
|
except (IndexError, KeyError): |
|
continue |
|
|
|
|
|
recommendations.extend(direct_matches) |
|
|
|
logger.info(f"🔍 검색 수행 완료! 걸린 시간: {time.time() - start_time:.4f}초, 추천 {len(recommendations)}개") |
|
return recommendations[:top_k] |
|
|
|
|
|
class RecommendRequest(BaseModel): |
|
search_query: str |
|
top_k: int = 5 |
|
use_expansion: bool = True |
|
|
|
|
|
@app.post("/api/recommend") |
|
async def recommend(request: RecommendRequest): |
|
"""Word2Vec 기반 FAISS 검색/추천 API""" |
|
try: |
|
|
|
logger.info(f"📝 검색 요청: '{request.search_query}' (top_k: {request.top_k}, 확장: {request.use_expansion})") |
|
|
|
|
|
keywords = extract_keywords(request.search_query) |
|
|
|
|
|
if request.use_expansion and word2vec_model is not None: |
|
expanded_keywords = expand_keywords_with_word2vec(keywords) |
|
else: |
|
expanded_keywords = keywords |
|
logger.info(f"🔍 키워드 확장 없이 진행: {keywords}") |
|
|
|
|
|
recommendations = search_faiss_with_keywords( |
|
request.search_query, |
|
request.top_k, |
|
keywords, |
|
expanded_keywords |
|
) |
|
|
|
|
|
logger.info(f"🔍 검색 결과: {[r['ITEMNAME'] for r in recommendations]}") |
|
|
|
return { |
|
"query": request.search_query, |
|
"recommendations": recommendations, |
|
"keywords": keywords, |
|
"expanded_keywords": expanded_keywords |
|
} |
|
except Exception as e: |
|
logger.error(f"❌ 추천 처리 중 오류: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"추천 오류: {str(e)}") |
|
|
|
|
|
@app.post("/api/similar_words") |
|
async def similar_words(word: str, top_k: int = 10): |
|
"""Word2Vec 모델을 사용한 유사 단어 검색 API""" |
|
try: |
|
if word2vec_model is None: |
|
return {"error": "Word2Vec 모델이 로드되지 않았습니다."} |
|
|
|
if word not in word2vec_model: |
|
return {"word": word, "similar_words": [], "message": "단어가 모델에 없습니다."} |
|
|
|
similar = word2vec_model.most_similar(word, topn=top_k) |
|
result = [{"word": w, "similarity": float(s)} for w, s in similar] |
|
|
|
return {"word": word, "similar_words": result} |
|
except Exception as e: |
|
logger.error(f"❌ 유사 단어 검색 중 오류: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"유사 단어 검색 오류: {str(e)}") |
|
|
|
|
|
@app.post("/api/update_index") |
|
async def update_index(): |
|
"""FAISS 인덱스를 새롭게 구축 (명시적 요청 시에만)""" |
|
try: |
|
|
|
rebuild_faiss_index() |
|
return {"message": "✅ FAISS 인덱스 업데이트 및 저장 완료!"} |
|
except Exception as e: |
|
logger.exception("❌ [API] 인덱스 업데이트 처리 중 예외 발생") |
|
raise HTTPException(status_code=500, detail=f"인덱스 업데이트 실패: {str(e)}") |
|
|
|
|
|
@app.get("/api/debug_index") |
|
async def debug_index(query: str, top_k: int = 20): |
|
"""인덱스 디버깅을 위한 API""" |
|
try: |
|
check_faiss_index() |
|
|
|
|
|
vector = embedding_model.encode(query, convert_to_numpy=True).astype("float32") |
|
|
|
|
|
norm = np.linalg.norm(vector) |
|
normalized_vector = vector / norm |
|
|
|
|
|
distances, indices = faiss_index.search(np.array([normalized_vector]), top_k) |
|
|
|
|
|
results = [] |
|
for i, (idx, dist) in enumerate(zip(indices[0], distances[0])): |
|
if idx < len(indexed_items): |
|
item_name = indexed_items[idx] |
|
results.append({ |
|
"rank": i + 1, |
|
"index": int(idx), |
|
"item_name": item_name, |
|
"distance/score": float(dist) |
|
}) |
|
|
|
|
|
contains_query = [item for item in indexed_items if query.lower() in item.lower()] |
|
exact_matches = [item for item in indexed_items if query.lower() == item.lower()] |
|
|
|
return { |
|
"query": query, |
|
"vector_norm": float(norm), |
|
"contains_query": contains_query[:5], |
|
"exact_matches": exact_matches, |
|
"results": results |
|
} |
|
except Exception as e: |
|
logger.error(f"❌ 인덱스 디버깅 중 오류: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"인덱스 디버깅 오류: {str(e)}") |
|
|
|
|
|
@app.get("/api/text_search") |
|
async def text_search(query: str, top_k: int = 10): |
|
"""단순 텍스트 포함 검색 API (FAISS 검색 결과가 이상할 때 대체용)""" |
|
try: |
|
|
|
matched_items = [] |
|
for idx, item_name in enumerate(indexed_items): |
|
if query.lower() in item_name.lower(): |
|
try: |
|
item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] |
|
matched_items.append({"ITEMSEQ": item_seq, "ITEMNAME": item_name, "match_type": "contains"}) |
|
except (IndexError, KeyError): |
|
continue |
|
|
|
|
|
exact_matches = [] |
|
partial_matches = [] |
|
|
|
for item in matched_items: |
|
if query.lower() == item["ITEMNAME"].lower(): |
|
item["match_type"] = "exact" |
|
exact_matches.append(item) |
|
else: |
|
partial_matches.append(item) |
|
|
|
|
|
results = exact_matches + partial_matches |
|
|
|
logger.info(f"🔍 텍스트 검색 결과: {len(results)}개 찾음, 쿼리: '{query}'") |
|
|
|
return { |
|
"query": query, |
|
"total_matches": len(results), |
|
"results": results[:top_k] |
|
} |
|
except Exception as e: |
|
logger.error(f"❌ 텍스트 검색 중 오류: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"텍스트 검색 오류: {str(e)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
if not load_faiss_index(): |
|
logger.warning("⚠️ 기존 인덱스 로드에 실패했습니다. 즉시 새 인덱스를 구축합니다.") |
|
try: |
|
|
|
rebuild_faiss_index() |
|
logger.info("✅ FAISS 인덱스 생성 완료!") |
|
except Exception as e: |
|
logger.error(f"❌ 인덱스 초기 구축 실패: {e}") |
|
logger.warning("⚠️ 인덱스 없이 시작합니다. 검색 기능이 제한될 수 있습니다.") |
|
else: |
|
logger.info("✅ 기존 인덱스를 성공적으로 로드했습니다.") |
|
|
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |