coolai / tmp /algo.py
TintinMeimei's picture
Upload 3 files
d938037
raw
history blame contribute delete
5.08 kB
import re
import jieba
import pandas as pd
from sentence_transformers import SentenceTransformer, util
class AlgoRule:
def __init__(self) -> None:
df_lvchan = pd.read_excel('lvchan.xlsx', sheet_name='Sheet1')
df_lvchan.columns = df_lvchan.iloc[0]
df_lvchan = df_lvchan[1:]
sep = r'[,、]'
self.dict_rule_index = {
'kuan': {},
'wuxiang': {},
'wuxiang_xianding': {},
}
for _, row in df_lvchan.iterrows():
item = row['三级标题']
for word in re.split(sep, row['宽口径(复核)']):
self.dict_rule_index['kuan'].setdefault(word, []).append(item)
for word in re.split(sep, row['物象关键词(复核)']):
self.dict_rule_index['wuxiang'].setdefault(word, []).append(item)
for word2 in re.split(sep, row['限定词(复核)']):
self.dict_rule_index['wuxiang_xianding'].setdefault('_'.join([word, word2]), []).append(item)
for k in self.dict_rule_index.keys():
for key in self.dict_rule_index[k].keys():
self.dict_rule_index[k][key] = list(set(self.dict_rule_index[k][key]))
def _tokenize(self, text):
tokens = [tok for tok in jieba.cut(text)]
return tokens
def _is_match(self, word, query):
items = self._tokenize(query)
for item in items:
if item == word:
return True
return False
def _match(self, query):
result = {}
matches = {
'wuxiang_xianding': [],
'wuxiang': [],
'kuan': [],
}
# Test 1st route: match both wuxiang and xianding
flag = False
for key in self.dict_rule_index['wuxiang_xianding'].keys():
wuxiang, xianding = key.split('_')
items = self.dict_rule_index['wuxiang_xianding'][key]
if self._is_match(wuxiang, query) and self._is_match(xianding, query):
# if wuxiang in query and xianding in query:
for item in items:
r = result.setdefault(item, {})
r.setdefault('限定词+物项关键词', []).append('+'.join([xianding, wuxiang]))
flag = True
if flag is True:
# clean result
for key1 in result.keys():
for key2 in result[key1].keys():
result[key1][key2] = ' ; '.join(result[key1][key2])
return result
# Test 2nd route: match wuxiang only
r2 = ''
for key in self.dict_rule_index['wuxiang'].keys():
items = self.dict_rule_index['wuxiang'][key]
if self._is_match(key, query):
# if key in query:
for item in items:
r = result.setdefault(item, {})
r.setdefault('物项关键词', []).append(key)
# Test 3rd route: match kuan
r3 = ''
for key in self.dict_rule_index['kuan'].keys():
items = self.dict_rule_index['kuan'][key]
if self._is_match(key, query):
# if key in query:
for item in items:
r = result.setdefault(item, {})
r.setdefault('宽口径', []).append(key)
# clean result
for key1 in result.keys():
for key2 in result[key1].keys():
result[key1][key2] = ' ; '.join(result[key1][key2])
return result
def algo(self, query):
result = self._match(query)
result = [item.strip() for item in result.keys()]
return result
class AlgoAI:
def __init__(self) -> None:
# self.model = SentenceTransformer('DMetaSoul/sbert-chinese-general-v2')
self.model = SentenceTransformer('TintinMeimei/menglang_yongtulv_aimatch_v1')
df_lvchan = pd.read_excel('../lvchan.xlsx', sheet_name='Sheet1')
df_lvchan.columns = df_lvchan.iloc[0]
df_lvchan = df_lvchan[1:]
dict_lvchan = dict((row['三级标题'].strip(), '\n'.join([row['三级标题'].strip(), row['解释说明']])) for _, row in df_lvchan.iterrows())
self.dict_lvchan_vectors = dict((key, self.model.encode(dict_lvchan[key], convert_to_tensor=True)) for key in dict_lvchan.keys())
self.thres = 0.25
def _sim(self, query, item):
emb1 = self.model.encode(query, convert_to_tensor=True)
emb2 = item
score = util.cos_sim(emb1, emb2)
return score
def _match(self, query):
result = []
for key in self.dict_lvchan_vectors.keys():
score = self._sim(query, self.dict_lvchan_vectors[key])
if score > self.thres:
result.append(key)
return result
def algo(self, query):
result = self._match(query)
return result
if __name__ == '__main__':
algo = AlgoRule()
query = '无害生活垃圾'
print(algo.algo(query))
algo2 = AlgoAI()
print(algo2.algo(query))