import os import torch import pandas as pd import logging import re import faiss import numpy as np import time from fastapi import FastAPI, HTTPException from pydantic import BaseModel from transformers import AutoTokenizer, AutoModelForCausalLM from datasets import load_dataset from huggingface_hub import login from sentence_transformers import SentenceTransformer from joblib import Parallel, delayed from tqdm import tqdm # Pydantic 모델 정의 (API 입력용) class RecommendRequest(BaseModel): search_query: str top_k: int = 10 # 로그 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logger.info(f"✅ NumPy 버전: {np.__version__}") logger.info(f"✅ FAISS 버전: {faiss.__version__}") # FastAPI 인스턴스 생성 app = FastAPI(title="🚀 한글 LLAMA 3.2 추천 시스템 API", version="1.4") # 모델 정보 MODEL_NAME = "Bllossom/llama-3.2-Korean-Bllossom-3B" HF_API_TOKEN = os.getenv("HF_API_TOKEN") # Hugging Face 로그인 if HF_API_TOKEN: logger.info("🔑 Hugging Face API 토큰을 사용하여 로그인 중...") login(token=HF_API_TOKEN) else: logger.warning("⚠️ 환경 변수 'HF_API_TOKEN'이 설정되지 않았습니다!") # GPU 사용 여부 확인 device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"🚀 실행 디바이스: {device.upper()}") # 모델 및 토크나이저 로드 logger.info(f"🔄 {MODEL_NAME} 모델 로드 중...") try: tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, token=HF_API_TOKEN) # pad_token_id를 명시적으로 설정 if tokenizer.pad_token_id is None: tokenizer.pad_token_id = tokenizer.eos_token_id model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, token=HF_API_TOKEN, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device == "cuda" else None, # 패딩 토큰 ID 명시적 설정 pad_token_id=tokenizer.pad_token_id ) logger.info("✅ 한글 LLAMA 3.2 모델 로드 완료!") except Exception as e: logger.error(f"❌ 모델 로드 중 오류 발생: {e}") model = None tokenizer = None # Llama 3를 이용한 연관 키워드 생성 함수 def generate_related_keywords(query, max_keywords=5): """Llama 3 모델을 사용하여 쿼리와 관련된 키워드 생성 및 정규화""" if not model or not tokenizer: logger.warning("모델이 로드되지 않아 기본 키워드 추출로 대체됩니다.") return extract_keywords_simple(query) try: # 개선된 프롬프트 template 구성 (한국어) prompt = f""""{query}"에 대해 관련성 높은 연관 키워드 또는 확장 추론 키워드 {max_keywords}개를 제안해. 숫자제외 """ # 토큰화 및 모델 입력 준비 inputs = tokenizer(prompt, return_tensors="pt", add_special_tokens=True).to(device) # 텍스트 생성 outputs = model.generate( inputs.input_ids, max_new_tokens=100, # 생성할 토큰 수 조정 num_return_sequences=1, temperature=0.6, # 다양성과 정확성 사이 밸런스 do_sample=True, repetition_penalty=1.2 # 반복 방지 ) # 생성된 텍스트 디코딩 generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) # 키워드 추출 (숫자와 점 다음에 오는 텍스트 추출) raw_keywords = [] for line in generated_text.split('\n'): line = line.strip() # 엄격한 필터링 조건 추가 if (line and len(line) > 1 and # 최소 길이 2자 이상 not any(x in line.lower() for x in [ '출력', '예시', '가이드', '제안', '라인', '검색어', '키워드' ])): # 숫자와 점 제거, 공백 제거 keyword = re.sub(r'^\d+\.\s*', '', line).strip() # 원본 쿼리의 키워드와 중복되는 키워드 제외 query_keywords = extract_keywords_simple(query) if (keyword and keyword not in raw_keywords and keyword not in query_keywords and keyword.lower() not in [k.lower() for k in query_keywords]): raw_keywords.append(keyword) # ===== 새로 추가된 정규화 로직 ===== normalized_keywords = [] for keyword in raw_keywords: # 1. 문장부호 제거 keyword = re.sub(r'[.,;:!?"\'\(\)\[\]\{\}]', '', keyword) # 2. 문장이나 구문을 단어로 분리 (한글 & 영문) if len(keyword.split()) > 2: # 2단어 이상인 경우 # 한글 처리: 2음절 이상의 명사만 추출 korean_words = re.findall(r'[가-힣]{2,}', keyword) # 영문 처리: 3자 이상의, 의미있는 영단어만 추출 english_words = re.findall(r'[a-zA-Z]{3,}', keyword) # 추출된 단어들 결합 extracted_words = korean_words + english_words if extracted_words: # 가장 길고 의미있는 단어 선택 (복합어가 더 정보량이 많음) keyword = max(extracted_words, key=len) # 3. 불필요한 조사 제거 (한글 조사 패턴) keyword = re.sub(r'(은|는|이|가|을|를|의|와|과|로|으로|에|에서)$', '', keyword) # 4. 길이 및 내용 검증 if len(keyword) >= 2 and not keyword.isspace() and keyword not in normalized_keywords: normalized_keywords.append(keyword) # 중복 제거 및 최대 키워드 수 제한 normalized_keywords = list(dict.fromkeys(normalized_keywords))[:max_keywords] # 키워드가 없으면 기본 키워드 추출 방법 사용 if not normalized_keywords: normalized_keywords = extract_keywords_simple(query) logger.info(f"🔍 생성된 연관 키워드: {normalized_keywords}") return normalized_keywords except Exception as e: logger.error(f"❌ 연관 키워드 생성 중 오류 발생: {e}") # 오류 발생 시 기본 키워드 추출 방법으로 대체 return extract_keywords_simple(query) # 기존의 키워드 추출 함수 (유지) def extract_keywords_simple(query): """한국어 검색어에 최적화된 키워드 추출""" # 특수문자 제거 및 소문자 변환 cleaned_query = re.sub(r'[^\w\s가-힣]', ' ', query).strip().lower() # 한글 단어와 영문 단어 분리 정규식 pattern = re.compile(r'[가-힣]+|[\u4e00-\u9fff]+|[a-zA-Z]+') # 모든 매칭 찾기 matches = pattern.findall(cleaned_query) # 길이 필터링 (한글은 1자 이상, 영문은 2자 이상) words = [] for word in matches: if re.match(r'[가-힣]+', word) and len(word) >= 1: words.append(word) elif re.match(r'[a-zA-Z]+', word) and len(word) >= 2: words.append(word) # 공백으로 구분된 원본 토큰도 추가 (복합어 고려) for token in cleaned_query.split(): if token not in words and len(token) >= 2: words.append(token) # 빈 리스트인 경우 원본 쿼리의 토큰 사용 if not words: return [w for w in cleaned_query.split() if w] return words # FAISS 검색 함수 수정 (Llama 3 연관 키워드 통합) def search_faiss(query, top_k=10): if faiss_index is None or indexed_items is None: logger.error("❌ FAISS 인덱스가 초기화되지 않았습니다.") return [] # Llama 3를 이용한 연관 키워드 생성 keywords = generate_related_keywords(query) logger.info(f"🔍 검색 쿼리: '{query}' → 연관 키워드: {keywords}") start_time = time.time() # 원본 벡터 기반 검색 query_vector = np.array([embedding_model.encode(query)]).astype("float32") distances, indices = faiss_index.search(query_vector, top_k * 2) # 더 많은 후보 검색 # 결과를 저장할 리스트 candidates = [] # 검색 결과에 키워드 매칭 점수 추가 for i, idx in enumerate(indices[0]): if idx >= len(indexed_items): continue item_name = indexed_items[idx] # 키워드 매칭 점수 계산 keyword_score = 0 for keyword in keywords: keyword_match_score = calculate_keyword_score(item_name, [keyword]) keyword_score = max(keyword_score, keyword_match_score) # 벡터 유사도 점수 (거리를 0~1 사이 점수로 변환) vector_score = max(0, 100 - distances[0][i] * 10) # 거리가 작을수록 점수 높음 # 최종 점수 (키워드 매칭 가중치 + 벡터 유사도 가중치) final_score = (keyword_score * 0.7) + (vector_score * 0.3) try: item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] candidates.append({ "ITEMSEQ": item_seq, "ITEMNAME": item_name, "score": final_score, "keyword_match": keyword_score > 0 }) except (IndexError, KeyError) as e: logger.warning(f"⚠️ 상품 정보 매핑 오류 (ID: {idx}): {e}") # 최종 점수로 정렬 candidates.sort(key=lambda x: x["score"], reverse=True) # top_k개 선택 recommendations = candidates[:top_k] end_time = time.time() logger.info(f"🔍 검색 수행 완료! 걸린 시간: {end_time - start_time:.4f}초, 결과: {len(recommendations)}개") # 결과 로깅 (상위 3개만) for i, rec in enumerate(recommendations[:3]): logger.info(f" #{i+1}: {rec['ITEMNAME']} (점수: {rec['score']:.2f}, 키워드 매칭: {rec['keyword_match']})") # API 응답에는 점수 정보 제외 for rec in recommendations: rec.pop("score", None) rec.pop("keyword_match", None) return recommendations # 키워드 매칭 점수 계산 함수 (기존 로직 유지) def calculate_keyword_score(item_name, keywords): """개선된 키워드 매칭 점수 계산""" score = 0 item_lower = item_name.lower() # 전체 쿼리가 완전히 일치하면 가장 높은 점수 joined_query = ''.join(keywords).lower() if item_lower == joined_query: return 100 # 완전 일치는 최고 점수 # 전체 쿼리가 부분 일치하는 경우 if joined_query in item_lower: score += 50 # 개별 키워드 매칭 (단, 길이가 2자 이상인 의미있는 키워드만) meaningful_keywords = [k for k in keywords if len(k) >= 2] for keyword in meaningful_keywords: kw_lower = keyword.lower() if kw_lower in item_lower: # 단어 경계에서 시작하는지 확인 (더 정확한 매칭) word_boundary_match = re.search(r'(^|\s|_)' + re.escape(kw_lower), item_lower) is not None # 정확히 일치하는 경우 if item_lower == kw_lower: score += 40 # 상품명이 키워드로 시작하는 경우 elif item_lower.startswith(kw_lower): score += 30 # 단어 경계에서 매칭되는 경우 elif word_boundary_match: score += 20 # 단순 포함되는 경우 else: score += 10 # 유의미한 키워드가 많을수록 더 높은 점수 부여 if meaningful_keywords: matched_keywords = sum(1 for k in meaningful_keywords if k.lower() in item_lower) coverage_ratio = matched_keywords / len(meaningful_keywords) score += coverage_ratio * 15 return score # ✅ 데이터 로드 함수 def load_huggingface_jsonl(dataset_name, split="train"): if HF_API_TOKEN: login(token=HF_API_TOKEN) try: repo_id = f"aikobay/{dataset_name}" dataset = load_dataset(repo_id, split=split) df = dataset.to_pandas().dropna() # ✅ NaN 값 제거 return df except Exception as e: logger.error(f"❌ 데이터 로드 중 오류 발생: {e}") return pd.DataFrame() # ✅ 진행 중인 경매 상품 데이터 로드 try: active_sale_items = load_huggingface_jsonl("initial_saleitem_dataset") logger.info(f"✅ 진행 중인 경매 상품 데이터 로드 완료! 총 {len(active_sale_items)}개 상품") except Exception as e: logger.error(f"❌ 상품 데이터 로드 중 오류 발생: {e}") active_sale_items = pd.DataFrame() # ✅ FAISS 벡터 모델 embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") # ✅ 멀티코어 벡터화 함수 def encode_texts_parallel(texts, batch_size=512): """멀티 프로세싱을 활용한 벡터화 속도 최적화""" num_cores = os.cpu_count() # CPU 개수 확인 logger.info(f"🔄 멀티코어 벡터화 진행 (코어 수: {num_cores})") def encode_batch(batch): return embedding_model.encode(batch, convert_to_numpy=True) # 배치 단위로 병렬 처리 text_batches = [texts[i:i + batch_size] for i in range(0, len(texts), batch_size)] embeddings = Parallel(n_jobs=num_cores)(delayed(encode_batch)(batch) for batch in text_batches) return np.vstack(embeddings).astype("float32") # ✅ FAISS 인덱스 저장 & 로드 def save_faiss_index(): """FAISS 인덱스를 Hugging Face Hub에 저장하여 서버 재시작 시에도 데이터 유지""" from huggingface_hub import HfApi, create_repo import tempfile try: # 레포지토리 ID (환경 변수에서 가져오거나 기본값 사용) repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") # HfApi 객체 생성 api = HfApi() # 레포지토리 존재 여부 확인 및 생성 try: # 레포지토리 정보 조회 시도 api.repo_info(repo_id=repo_id, repo_type="dataset") logger.info(f"✅ 기존 레포지토리 사용: {repo_id}") except Exception: # 레포지토리가 없으면 새로 생성 logger.info(f"🔄 레포지토리가 존재하지 않아 새로 생성합니다: {repo_id}") create_repo( repo_id=repo_id, repo_type="dataset", private=True, # 비공개 레포지토리로 설정 exist_ok=True # 이미 존재해도 오류 발생하지 않음 ) logger.info(f"✅ 레포지토리 생성 완료: {repo_id}") # 임시 파일로 먼저 로컬에 저장 with tempfile.TemporaryDirectory() as temp_dir: index_path = os.path.join(temp_dir, "faiss_index.bin") items_path = os.path.join(temp_dir, "indexed_items.txt") # FAISS 인덱스 저장 faiss.write_index(faiss_index, index_path) # 아이템 목록 저장 with open(items_path, "w", encoding="utf-8") as f: f.write("\n".join(indexed_items)) # README 파일 생성 (레포지토리 정보용) readme_path = os.path.join(temp_dir, "README.md") with open(readme_path, "w", encoding="utf-8") as f: f.write(f"""# FAISS 인덱스 저장소 이 저장소는 상품 검색을 위한 FAISS 인덱스와 관련 데이터를 포함하고 있습니다. - 최종 업데이트: {pd.Timestamp.now()} - 인덱스 항목 수: {len(indexed_items)} - 모델: {MODEL_NAME} 이 저장소는, 'aikobay/initial_saleitem_dataset'의 상품 데이터를 기반으로 생성된 벡터 인덱스를 저장하기 위해 자동 생성되었습니다. """) # 파일 업로드 for file_path, file_name in [ (index_path, "faiss_index.bin"), (items_path, "indexed_items.txt"), (readme_path, "README.md") ]: api.upload_file( path_or_fileobj=file_path, path_in_repo=file_name, repo_id=repo_id, repo_type="dataset" ) logger.info(f"✅ FAISS 인덱스가 Hugging Face Hub에 저장되었습니다. 레포: {repo_id}") except Exception as e: logger.error(f"❌ FAISS 인덱스 Hub 저장 중 오류 발생: {e}") # 로컬에 백업 저장 시도 try: local_path = os.path.join(os.getcwd(), "faiss_index.bin") faiss.write_index(faiss_index, local_path) with open("indexed_items.txt", "w", encoding="utf-8") as f: f.write("\n".join(indexed_items)) logger.info(f"✅ FAISS 인덱스가 로컬에 백업 저장되었습니다: {local_path}") except Exception as local_err: logger.error(f"❌ 로컬 백업 저장도 실패: {local_err}") def load_faiss_index(): """Hugging Face Hub에서 FAISS 인덱스를 로드하여 검색 속도 향상""" from huggingface_hub import hf_hub_download, HfApi global faiss_index, indexed_items, active_sale_items # 레포지토리 ID (환경 변수에서 가져오거나 기본값 사용) repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") try: # 레포지토리 존재 확인 api = HfApi() try: api.repo_info(repo_id=repo_id, repo_type="dataset") logger.info(f"✅ FAISS 인덱스 레포지토리 확인: {repo_id}") except Exception as repo_err: logger.warning(f"⚠️ 레포지토리가 존재하지 않습니다: {repo_err}") raise FileNotFoundError("Hub 레포지토리가 존재하지 않습니다") # Hub에서 파일 다운로드 index_path = hf_hub_download( repo_id=repo_id, filename="faiss_index.bin", repo_type="dataset" ) items_path = hf_hub_download( repo_id=repo_id, filename="indexed_items.txt", repo_type="dataset" ) # 파일 로드 faiss_index = faiss.read_index(index_path) with open(items_path, "r", encoding="utf-8") as f: indexed_items = f.read().splitlines() logger.info(f"✅ FAISS 인덱스가 Hub에서 로드되었습니다. 총 {len(indexed_items)}개 상품") except Exception as e: logger.warning(f"⚠️ Hub에서 FAISS 인덱스 로드 중 오류 발생: {e}") # 로컬 파일 확인 try: faiss_index = faiss.read_index("faiss_index.bin") with open("indexed_items.txt", "r", encoding="utf-8") as f: indexed_items = f.read().splitlines() logger.info(f"✅ 로컬 FAISS 인덱스 로드 성공. 총 {len(indexed_items)}개 상품") except FileNotFoundError: logger.warning("⚠️ FAISS 인덱스 파일이 존재하지 않습니다. 새로 구축합니다.") rebuild_faiss_index() except Exception as local_err: logger.error(f"❌ 로컬 FAISS 인덱스 로드 중 오류: {local_err}") rebuild_faiss_index() # ✅ FAISS 인덱스 생성 (진행률 표시 추가) def rebuild_faiss_index(): global faiss_index, indexed_items, active_sale_items logger.info("🔄 새로운 sale_item 데이터로 FAISS 인덱스를 재구축합니다...") active_sale_items = load_huggingface_jsonl("initial_saleitem_dataset") item_names = active_sale_items["ITEMNAME"].tolist() logger.info(f"🔹 총 {len(item_names)}개 상품 벡터화 시작...") # 병렬 처리를 통한 벡터화 (더 빠른 처리) item_vectors = encode_texts_parallel(item_names) # ✅ FAISS 인덱스 생성 faiss_index = faiss.IndexFlatL2(item_vectors.shape[1]) faiss_index.add(item_vectors) indexed_items = item_names logger.info(f"✅ FAISS 인덱스가 {len(indexed_items)}개 상품으로 새롭게 구축되었습니다.") save_faiss_index() # 키워드 추출 함수 추가 #def extract_keywords(query): # """검색 쿼리에서 주요 키워드 추출 및 정제""" # try: # # 기본 정제 (특수문자 제거, 소문자 변환 등) # cleaned_query = re.sub(r'[^\w\s]', ' ', query).strip().lower() # # 형태소 분석을 통한 명사 추출 # nouns = okt.nouns(cleaned_query) # # 2글자 이상 명사만 선택 (의미있는 키워드 위주) # keywords = [noun for noun in nouns if len(noun) >= 2] # # 명사가 없거나 추출 실패 시, 원본 쿼리의 공백으로 구분된 토큰 사용 # if not keywords: # keywords = cleaned_query.split() # return keywords # except Exception as e: # logger.error(f"❌ 키워드 추출 중 오류 발생: {e}") # # 오류 발생 시 원본 쿼리 그대로 반환 # return [query] # 검색 결과 향상을 위한 키워드 매칭 점수 계산 함수 #def calculate_keyword_score(item_name, keywords): # """아이템 이름과 키워드 간의 매칭 점수 계산""" # score = 0 # item_lower = item_name.lower() # # 1. 전체 쿼리가 상품명에 포함되면 높은 점수 # if ''.join(keywords).lower() in item_lower: # score += 10 # # 2. 개별 키워드 매칭 점수 # for keyword in keywords: # if keyword.lower() in item_lower: # # 정확히 일치하는 경우 높은 점수 # if keyword.lower() == item_lower: # score += 15 # # 상품명이 키워드로 시작하는 경우 # elif item_lower.startswith(keyword.lower()): # score += 8 # # 단순 포함되는 경우 # else: # score += 5 # return score # 추천 API 엔드포인트 @app.post("/api/recommend") async def recommend(request: RecommendRequest): try: recommendations = search_faiss(request.search_query, request.top_k) return { "query": request.search_query, "recommendations": recommendations, "related_keywords": generate_related_keywords(request.search_query) } except Exception as e: raise HTTPException(status_code=500, detail=f"추천 오류: {str(e)}") # ✅ 인덱스 갱신 API @app.post("/api/update_index") async def update_index(): rebuild_faiss_index() return {"message": "✅ FAISS 인덱스 업데이트 완료!"} # ✅ FastAPI 실행 if __name__ == "__main__": load_faiss_index() import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)