|
import os |
|
import torch |
|
import pandas as pd |
|
import logging |
|
import re |
|
import faiss |
|
import numpy as np |
|
import time |
|
from fastapi import FastAPI, HTTPException |
|
from pydantic import BaseModel |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from datasets import load_dataset |
|
from huggingface_hub import login |
|
from sentence_transformers import SentenceTransformer |
|
from joblib import Parallel, delayed |
|
from tqdm import tqdm |
|
|
|
|
|
class RecommendRequest(BaseModel): |
|
search_query: str |
|
top_k: int = 10 |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
logger.info(f"β
NumPy λ²μ : {np.__version__}") |
|
logger.info(f"β
FAISS λ²μ : {faiss.__version__}") |
|
|
|
|
|
app = FastAPI(title="π νκΈ LLAMA 3.2 μΆμ² μμ€ν
API", version="1.4") |
|
|
|
|
|
MODEL_NAME = "Bllossom/llama-3.2-Korean-Bllossom-3B" |
|
HF_API_TOKEN = os.getenv("HF_API_TOKEN") |
|
|
|
|
|
if HF_API_TOKEN: |
|
logger.info("π Hugging Face API ν ν°μ μ¬μ©νμ¬ λ‘κ·ΈμΈ μ€...") |
|
login(token=HF_API_TOKEN) |
|
else: |
|
logger.warning("β οΈ νκ²½ λ³μ 'HF_API_TOKEN'μ΄ μ€μ λμ§ μμμ΅λλ€!") |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
logger.info(f"π μ€ν λλ°μ΄μ€: {device.upper()}") |
|
|
|
|
|
logger.info(f"π {MODEL_NAME} λͺ¨λΈ λ‘λ μ€...") |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, token=HF_API_TOKEN) |
|
|
|
|
|
if tokenizer.pad_token_id is None: |
|
tokenizer.pad_token_id = tokenizer.eos_token_id |
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
MODEL_NAME, |
|
token=HF_API_TOKEN, |
|
torch_dtype=torch.float16 if device == "cuda" else torch.float32, |
|
device_map="auto" if device == "cuda" else None, |
|
|
|
pad_token_id=tokenizer.pad_token_id |
|
) |
|
logger.info("β
νκΈ LLAMA 3.2 λͺ¨λΈ λ‘λ μλ£!") |
|
except Exception as e: |
|
logger.error(f"β λͺ¨λΈ λ‘λ μ€ μ€λ₯ λ°μ: {e}") |
|
model = None |
|
tokenizer = None |
|
|
|
|
|
def generate_related_keywords(query, max_keywords=5): |
|
"""Llama 3 λͺ¨λΈμ μ¬μ©νμ¬ μΏΌλ¦¬μ κ΄λ ¨λ ν€μλ μμ± λ° μ κ·ν""" |
|
if not model or not tokenizer: |
|
logger.warning("λͺ¨λΈμ΄ λ‘λλμ§ μμ κΈ°λ³Έ ν€μλ μΆμΆλ‘ λ체λ©λλ€.") |
|
return extract_keywords_simple(query) |
|
|
|
try: |
|
|
|
prompt = f""""{query}"μ λν΄ κ΄λ ¨μ± λμ μ°κ΄ ν€μλ λλ νμ₯ μΆλ‘ ν€μλ {max_keywords}κ°λ₯Ό μ μν΄. |
|
μ«μμ μΈ |
|
""" |
|
|
|
inputs = tokenizer(prompt, return_tensors="pt", add_special_tokens=True).to(device) |
|
|
|
|
|
outputs = model.generate( |
|
inputs.input_ids, |
|
max_new_tokens=100, |
|
num_return_sequences=1, |
|
temperature=0.6, |
|
do_sample=True, |
|
repetition_penalty=1.2 |
|
) |
|
|
|
|
|
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
|
|
raw_keywords = [] |
|
for line in generated_text.split('\n'): |
|
line = line.strip() |
|
|
|
if (line and |
|
len(line) > 1 and |
|
not any(x in line.lower() for x in [ |
|
'μΆλ ₯', 'μμ', 'κ°μ΄λ', 'μ μ', 'λΌμΈ', 'κ²μμ΄', 'ν€μλ' |
|
])): |
|
|
|
|
|
keyword = re.sub(r'^\d+\.\s*', '', line).strip() |
|
|
|
|
|
query_keywords = extract_keywords_simple(query) |
|
if (keyword and |
|
keyword not in raw_keywords and |
|
keyword not in query_keywords and |
|
keyword.lower() not in [k.lower() for k in query_keywords]): |
|
raw_keywords.append(keyword) |
|
|
|
|
|
normalized_keywords = [] |
|
for keyword in raw_keywords: |
|
|
|
keyword = re.sub(r'[.,;:!?"\'\(\)\[\]\{\}]', '', keyword) |
|
|
|
|
|
if len(keyword.split()) > 2: |
|
|
|
korean_words = re.findall(r'[κ°-ν£]{2,}', keyword) |
|
|
|
|
|
english_words = re.findall(r'[a-zA-Z]{3,}', keyword) |
|
|
|
|
|
extracted_words = korean_words + english_words |
|
|
|
if extracted_words: |
|
|
|
keyword = max(extracted_words, key=len) |
|
|
|
|
|
keyword = re.sub(r'(μ|λ|μ΄|κ°|μ|λ₯Ό|μ|μ|κ³Ό|λ‘|μΌλ‘|μ|μμ)$', '', keyword) |
|
|
|
|
|
if len(keyword) >= 2 and not keyword.isspace() and keyword not in normalized_keywords: |
|
normalized_keywords.append(keyword) |
|
|
|
|
|
normalized_keywords = list(dict.fromkeys(normalized_keywords))[:max_keywords] |
|
|
|
|
|
if not normalized_keywords: |
|
normalized_keywords = extract_keywords_simple(query) |
|
|
|
logger.info(f"π μμ±λ μ°κ΄ ν€μλ: {normalized_keywords}") |
|
return normalized_keywords |
|
|
|
except Exception as e: |
|
logger.error(f"β μ°κ΄ ν€μλ μμ± μ€ μ€λ₯ λ°μ: {e}") |
|
|
|
return extract_keywords_simple(query) |
|
|
|
|
|
|
|
|
|
def extract_keywords_simple(query): |
|
"""νκ΅μ΄ κ²μμ΄μ μ΅μ νλ ν€μλ μΆμΆ""" |
|
|
|
cleaned_query = re.sub(r'[^\w\sκ°-ν£]', ' ', query).strip().lower() |
|
|
|
|
|
pattern = re.compile(r'[κ°-ν£]+|[\u4e00-\u9fff]+|[a-zA-Z]+') |
|
|
|
|
|
matches = pattern.findall(cleaned_query) |
|
|
|
|
|
words = [] |
|
for word in matches: |
|
if re.match(r'[κ°-ν£]+', word) and len(word) >= 1: |
|
words.append(word) |
|
elif re.match(r'[a-zA-Z]+', word) and len(word) >= 2: |
|
words.append(word) |
|
|
|
|
|
for token in cleaned_query.split(): |
|
if token not in words and len(token) >= 2: |
|
words.append(token) |
|
|
|
|
|
if not words: |
|
return [w for w in cleaned_query.split() if w] |
|
|
|
return words |
|
|
|
|
|
def search_faiss(query, top_k=10): |
|
if faiss_index is None or indexed_items is None: |
|
logger.error("β FAISS μΈλ±μ€κ° μ΄κΈ°νλμ§ μμμ΅λλ€.") |
|
return [] |
|
|
|
|
|
keywords = generate_related_keywords(query) |
|
logger.info(f"π κ²μ 쿼리: '{query}' β μ°κ΄ ν€μλ: {keywords}") |
|
|
|
start_time = time.time() |
|
|
|
|
|
query_vector = np.array([embedding_model.encode(query)]).astype("float32") |
|
distances, indices = faiss_index.search(query_vector, top_k * 2) |
|
|
|
|
|
candidates = [] |
|
|
|
|
|
for i, idx in enumerate(indices[0]): |
|
if idx >= len(indexed_items): |
|
continue |
|
|
|
item_name = indexed_items[idx] |
|
|
|
|
|
keyword_score = 0 |
|
for keyword in keywords: |
|
keyword_match_score = calculate_keyword_score(item_name, [keyword]) |
|
keyword_score = max(keyword_score, keyword_match_score) |
|
|
|
|
|
vector_score = max(0, 100 - distances[0][i] * 10) |
|
|
|
|
|
final_score = (keyword_score * 0.7) + (vector_score * 0.3) |
|
|
|
try: |
|
item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] |
|
candidates.append({ |
|
"ITEMSEQ": item_seq, |
|
"ITEMNAME": item_name, |
|
"score": final_score, |
|
"keyword_match": keyword_score > 0 |
|
}) |
|
except (IndexError, KeyError) as e: |
|
logger.warning(f"β οΈ μν μ 보 λ§€ν μ€λ₯ (ID: {idx}): {e}") |
|
|
|
|
|
candidates.sort(key=lambda x: x["score"], reverse=True) |
|
|
|
|
|
recommendations = candidates[:top_k] |
|
|
|
end_time = time.time() |
|
|
|
logger.info(f"π κ²μ μν μλ£! κ±Έλ¦° μκ°: {end_time - start_time:.4f}μ΄, κ²°κ³Ό: {len(recommendations)}κ°") |
|
|
|
|
|
for i, rec in enumerate(recommendations[:3]): |
|
logger.info(f" #{i+1}: {rec['ITEMNAME']} (μ μ: {rec['score']:.2f}, ν€μλ λ§€μΉ: {rec['keyword_match']})") |
|
|
|
|
|
for rec in recommendations: |
|
rec.pop("score", None) |
|
rec.pop("keyword_match", None) |
|
|
|
return recommendations |
|
|
|
|
|
def calculate_keyword_score(item_name, keywords): |
|
"""κ°μ λ ν€μλ λ§€μΉ μ μ κ³μ°""" |
|
score = 0 |
|
item_lower = item_name.lower() |
|
|
|
|
|
joined_query = ''.join(keywords).lower() |
|
if item_lower == joined_query: |
|
return 100 |
|
|
|
|
|
if joined_query in item_lower: |
|
score += 50 |
|
|
|
|
|
meaningful_keywords = [k for k in keywords if len(k) >= 2] |
|
for keyword in meaningful_keywords: |
|
kw_lower = keyword.lower() |
|
if kw_lower in item_lower: |
|
|
|
word_boundary_match = re.search(r'(^|\s|_)' + re.escape(kw_lower), item_lower) is not None |
|
|
|
|
|
if item_lower == kw_lower: |
|
score += 40 |
|
|
|
elif item_lower.startswith(kw_lower): |
|
score += 30 |
|
|
|
elif word_boundary_match: |
|
score += 20 |
|
|
|
else: |
|
score += 10 |
|
|
|
|
|
if meaningful_keywords: |
|
matched_keywords = sum(1 for k in meaningful_keywords if k.lower() in item_lower) |
|
coverage_ratio = matched_keywords / len(meaningful_keywords) |
|
score += coverage_ratio * 15 |
|
|
|
return score |
|
|
|
|
|
def load_huggingface_jsonl(dataset_name, split="train"): |
|
if HF_API_TOKEN: |
|
login(token=HF_API_TOKEN) |
|
try: |
|
repo_id = f"aikobay/{dataset_name}" |
|
dataset = load_dataset(repo_id, split=split) |
|
df = dataset.to_pandas().dropna() |
|
return df |
|
except Exception as e: |
|
logger.error(f"β λ°μ΄ν° λ‘λ μ€ μ€λ₯ λ°μ: {e}") |
|
return pd.DataFrame() |
|
|
|
|
|
try: |
|
active_sale_items = load_huggingface_jsonl("initial_saleitem_dataset") |
|
logger.info(f"β
μ§ν μ€μΈ κ²½λ§€ μν λ°μ΄ν° λ‘λ μλ£! μ΄ {len(active_sale_items)}κ° μν") |
|
except Exception as e: |
|
logger.error(f"β μν λ°μ΄ν° λ‘λ μ€ μ€λ₯ λ°μ: {e}") |
|
active_sale_items = pd.DataFrame() |
|
|
|
|
|
embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") |
|
|
|
|
|
def encode_texts_parallel(texts, batch_size=512): |
|
"""λ©ν° νλ‘μΈμ±μ νμ©ν 벑ν°ν μλ μ΅μ ν""" |
|
num_cores = os.cpu_count() |
|
logger.info(f"π λ©ν°μ½μ΄ 벑ν°ν μ§ν (μ½μ΄ μ: {num_cores})") |
|
|
|
def encode_batch(batch): |
|
return embedding_model.encode(batch, convert_to_numpy=True) |
|
|
|
|
|
text_batches = [texts[i:i + batch_size] for i in range(0, len(texts), batch_size)] |
|
embeddings = Parallel(n_jobs=num_cores)(delayed(encode_batch)(batch) for batch in text_batches) |
|
|
|
return np.vstack(embeddings).astype("float32") |
|
|
|
|
|
|
|
def save_faiss_index(): |
|
"""FAISS μΈλ±μ€λ₯Ό Hugging Face Hubμ μ μ₯νμ¬ μλ² μ¬μμ μμλ λ°μ΄ν° μ μ§""" |
|
from huggingface_hub import HfApi, create_repo |
|
import tempfile |
|
|
|
try: |
|
|
|
repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") |
|
|
|
|
|
api = HfApi() |
|
|
|
|
|
try: |
|
|
|
api.repo_info(repo_id=repo_id, repo_type="dataset") |
|
logger.info(f"β
κΈ°μ‘΄ λ ν¬μ§ν 리 μ¬μ©: {repo_id}") |
|
except Exception: |
|
|
|
logger.info(f"π λ ν¬μ§ν λ¦¬κ° μ‘΄μ¬νμ§ μμ μλ‘ μμ±ν©λλ€: {repo_id}") |
|
create_repo( |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
private=True, |
|
exist_ok=True |
|
) |
|
logger.info(f"β
λ ν¬μ§ν 리 μμ± μλ£: {repo_id}") |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
index_path = os.path.join(temp_dir, "faiss_index.bin") |
|
items_path = os.path.join(temp_dir, "indexed_items.txt") |
|
|
|
|
|
faiss.write_index(faiss_index, index_path) |
|
|
|
|
|
with open(items_path, "w", encoding="utf-8") as f: |
|
f.write("\n".join(indexed_items)) |
|
|
|
|
|
readme_path = os.path.join(temp_dir, "README.md") |
|
with open(readme_path, "w", encoding="utf-8") as f: |
|
f.write(f"""# FAISS μΈλ±μ€ μ μ₯μ |
|
μ΄ μ μ₯μλ μν κ²μμ μν FAISS μΈλ±μ€μ κ΄λ ¨ λ°μ΄ν°λ₯Ό ν¬ν¨νκ³ μμ΅λλ€. |
|
- μ΅μ’
μ
λ°μ΄νΈ: {pd.Timestamp.now()} |
|
- μΈλ±μ€ νλͺ© μ: {len(indexed_items)} |
|
- λͺ¨λΈ: {MODEL_NAME} |
|
μ΄ μ μ₯μλ, 'aikobay/initial_saleitem_dataset'μ μν λ°μ΄ν°λ₯Ό κΈ°λ°μΌλ‘ μμ±λ λ²‘ν° μΈλ±μ€λ₯Ό μ μ₯νκΈ° μν΄ μλ μμ±λμμ΅λλ€. |
|
""") |
|
|
|
|
|
for file_path, file_name in [ |
|
(index_path, "faiss_index.bin"), |
|
(items_path, "indexed_items.txt"), |
|
(readme_path, "README.md") |
|
]: |
|
api.upload_file( |
|
path_or_fileobj=file_path, |
|
path_in_repo=file_name, |
|
repo_id=repo_id, |
|
repo_type="dataset" |
|
) |
|
|
|
logger.info(f"β
FAISS μΈλ±μ€κ° Hugging Face Hubμ μ μ₯λμμ΅λλ€. λ ν¬: {repo_id}") |
|
except Exception as e: |
|
logger.error(f"β FAISS μΈλ±μ€ Hub μ μ₯ μ€ μ€λ₯ λ°μ: {e}") |
|
|
|
try: |
|
local_path = os.path.join(os.getcwd(), "faiss_index.bin") |
|
faiss.write_index(faiss_index, local_path) |
|
with open("indexed_items.txt", "w", encoding="utf-8") as f: |
|
f.write("\n".join(indexed_items)) |
|
logger.info(f"β
FAISS μΈλ±μ€κ° λ‘컬μ λ°±μ
μ μ₯λμμ΅λλ€: {local_path}") |
|
except Exception as local_err: |
|
logger.error(f"β λ‘컬 λ°±μ
μ μ₯λ μ€ν¨: {local_err}") |
|
|
|
def load_faiss_index(): |
|
"""Hugging Face Hubμμ FAISS μΈλ±μ€λ₯Ό λ‘λνμ¬ κ²μ μλ ν₯μ""" |
|
from huggingface_hub import hf_hub_download, HfApi |
|
global faiss_index, indexed_items, active_sale_items |
|
|
|
|
|
repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") |
|
|
|
try: |
|
|
|
api = HfApi() |
|
try: |
|
api.repo_info(repo_id=repo_id, repo_type="dataset") |
|
logger.info(f"β
FAISS μΈλ±μ€ λ ν¬μ§ν 리 νμΈ: {repo_id}") |
|
except Exception as repo_err: |
|
logger.warning(f"β οΈ λ ν¬μ§ν λ¦¬κ° μ‘΄μ¬νμ§ μμ΅λλ€: {repo_err}") |
|
raise FileNotFoundError("Hub λ ν¬μ§ν λ¦¬κ° μ‘΄μ¬νμ§ μμ΅λλ€") |
|
|
|
|
|
index_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename="faiss_index.bin", |
|
repo_type="dataset" |
|
) |
|
|
|
items_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename="indexed_items.txt", |
|
repo_type="dataset" |
|
) |
|
|
|
|
|
faiss_index = faiss.read_index(index_path) |
|
with open(items_path, "r", encoding="utf-8") as f: |
|
indexed_items = f.read().splitlines() |
|
|
|
logger.info(f"β
FAISS μΈλ±μ€κ° Hubμμ λ‘λλμμ΅λλ€. μ΄ {len(indexed_items)}κ° μν") |
|
|
|
except Exception as e: |
|
logger.warning(f"β οΈ Hubμμ FAISS μΈλ±μ€ λ‘λ μ€ μ€λ₯ λ°μ: {e}") |
|
|
|
|
|
try: |
|
faiss_index = faiss.read_index("faiss_index.bin") |
|
with open("indexed_items.txt", "r", encoding="utf-8") as f: |
|
indexed_items = f.read().splitlines() |
|
logger.info(f"β
λ‘컬 FAISS μΈλ±μ€ λ‘λ μ±κ³΅. μ΄ {len(indexed_items)}κ° μν") |
|
except FileNotFoundError: |
|
logger.warning("β οΈ FAISS μΈλ±μ€ νμΌμ΄ μ‘΄μ¬νμ§ μμ΅λλ€. μλ‘ κ΅¬μΆν©λλ€.") |
|
rebuild_faiss_index() |
|
except Exception as local_err: |
|
logger.error(f"β λ‘컬 FAISS μΈλ±μ€ λ‘λ μ€ μ€λ₯: {local_err}") |
|
rebuild_faiss_index() |
|
|
|
|
|
def rebuild_faiss_index(): |
|
global faiss_index, indexed_items, active_sale_items |
|
|
|
logger.info("π μλ‘μ΄ sale_item λ°μ΄ν°λ‘ FAISS μΈλ±μ€λ₯Ό μ¬κ΅¬μΆν©λλ€...") |
|
|
|
active_sale_items = load_huggingface_jsonl("initial_saleitem_dataset") |
|
item_names = active_sale_items["ITEMNAME"].tolist() |
|
|
|
logger.info(f"πΉ μ΄ {len(item_names)}κ° μν 벑ν°ν μμ...") |
|
|
|
|
|
item_vectors = encode_texts_parallel(item_names) |
|
|
|
|
|
faiss_index = faiss.IndexFlatL2(item_vectors.shape[1]) |
|
faiss_index.add(item_vectors) |
|
|
|
indexed_items = item_names |
|
logger.info(f"β
FAISS μΈλ±μ€κ° {len(indexed_items)}κ° μνμΌλ‘ μλ‘κ² κ΅¬μΆλμμ΅λλ€.") |
|
|
|
save_faiss_index() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/recommend") |
|
async def recommend(request: RecommendRequest): |
|
try: |
|
recommendations = search_faiss(request.search_query, request.top_k) |
|
return { |
|
"query": request.search_query, |
|
"recommendations": recommendations, |
|
"related_keywords": generate_related_keywords(request.search_query) |
|
} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"μΆμ² μ€λ₯: {str(e)}") |
|
|
|
|
|
@app.post("/api/update_index") |
|
async def update_index(): |
|
rebuild_faiss_index() |
|
return {"message": "β
FAISS μΈλ±μ€ μ
λ°μ΄νΈ μλ£!"} |
|
|
|
|
|
if __name__ == "__main__": |
|
load_faiss_index() |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |