Spaces:
Sleeping
Sleeping
| import re | |
| import jieba | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer, util | |
| class AlgoRule: | |
| def __init__(self) -> None: | |
| df_lvchan = pd.read_excel('lvchan.xlsx', sheet_name='Sheet1') | |
| df_lvchan.columns = df_lvchan.iloc[0] | |
| df_lvchan = df_lvchan[1:] | |
| sep = r'[,、]' | |
| self.dict_rule_index = { | |
| 'kuan': {}, | |
| 'wuxiang': {}, | |
| 'wuxiang_xianding': {}, | |
| } | |
| for _, row in df_lvchan.iterrows(): | |
| item = row['三级标题'] | |
| for word in re.split(sep, row['宽口径(复核)']): | |
| self.dict_rule_index['kuan'].setdefault(word, []).append(item) | |
| for word in re.split(sep, row['物象关键词(复核)']): | |
| self.dict_rule_index['wuxiang'].setdefault(word, []).append(item) | |
| for word2 in re.split(sep, row['限定词(复核)']): | |
| self.dict_rule_index['wuxiang_xianding'].setdefault('_'.join([word, word2]), []).append(item) | |
| for k in self.dict_rule_index.keys(): | |
| for key in self.dict_rule_index[k].keys(): | |
| self.dict_rule_index[k][key] = list(set(self.dict_rule_index[k][key])) | |
| def _tokenize(self, text): | |
| tokens = [tok for tok in jieba.cut(text)] | |
| return tokens | |
| def _is_match(self, word, query): | |
| items = self._tokenize(query) | |
| for item in items: | |
| if item == word: | |
| return True | |
| return False | |
| def _match(self, query): | |
| result = {} | |
| matches = { | |
| 'wuxiang_xianding': [], | |
| 'wuxiang': [], | |
| 'kuan': [], | |
| } | |
| # Test 1st route: match both wuxiang and xianding | |
| flag = False | |
| for key in self.dict_rule_index['wuxiang_xianding'].keys(): | |
| wuxiang, xianding = key.split('_') | |
| items = self.dict_rule_index['wuxiang_xianding'][key] | |
| if self._is_match(wuxiang, query) and self._is_match(xianding, query): | |
| # if wuxiang in query and xianding in query: | |
| for item in items: | |
| r = result.setdefault(item, {}) | |
| r.setdefault('限定词+物项关键词', []).append('+'.join([xianding, wuxiang])) | |
| flag = True | |
| if flag is True: | |
| # clean result | |
| for key1 in result.keys(): | |
| for key2 in result[key1].keys(): | |
| result[key1][key2] = ' ; '.join(result[key1][key2]) | |
| return result | |
| # Test 2nd route: match wuxiang only | |
| r2 = '' | |
| for key in self.dict_rule_index['wuxiang'].keys(): | |
| items = self.dict_rule_index['wuxiang'][key] | |
| if self._is_match(key, query): | |
| # if key in query: | |
| for item in items: | |
| r = result.setdefault(item, {}) | |
| r.setdefault('物项关键词', []).append(key) | |
| # Test 3rd route: match kuan | |
| r3 = '' | |
| for key in self.dict_rule_index['kuan'].keys(): | |
| items = self.dict_rule_index['kuan'][key] | |
| if self._is_match(key, query): | |
| # if key in query: | |
| for item in items: | |
| r = result.setdefault(item, {}) | |
| r.setdefault('宽口径', []).append(key) | |
| # clean result | |
| for key1 in result.keys(): | |
| for key2 in result[key1].keys(): | |
| result[key1][key2] = ' ; '.join(result[key1][key2]) | |
| return result | |
| def algo(self, query): | |
| result = self._match(query) | |
| result = [item.strip() for item in result.keys()] | |
| return result | |
| class AlgoAI: | |
| def __init__(self) -> None: | |
| # self.model = SentenceTransformer('DMetaSoul/sbert-chinese-general-v2') | |
| self.model = SentenceTransformer('TintinMeimei/menglang_yongtulv_aimatch_v1') | |
| df_lvchan = pd.read_excel('../lvchan.xlsx', sheet_name='Sheet1') | |
| df_lvchan.columns = df_lvchan.iloc[0] | |
| df_lvchan = df_lvchan[1:] | |
| dict_lvchan = dict((row['三级标题'].strip(), '\n'.join([row['三级标题'].strip(), row['解释说明']])) for _, row in df_lvchan.iterrows()) | |
| self.dict_lvchan_vectors = dict((key, self.model.encode(dict_lvchan[key], convert_to_tensor=True)) for key in dict_lvchan.keys()) | |
| self.thres = 0.25 | |
| def _sim(self, query, item): | |
| emb1 = self.model.encode(query, convert_to_tensor=True) | |
| emb2 = item | |
| score = util.cos_sim(emb1, emb2) | |
| return score | |
| def _match(self, query): | |
| result = [] | |
| for key in self.dict_lvchan_vectors.keys(): | |
| score = self._sim(query, self.dict_lvchan_vectors[key]) | |
| if score > self.thres: | |
| result.append(key) | |
| return result | |
| def algo(self, query): | |
| result = self._match(query) | |
| return result | |
| if __name__ == '__main__': | |
| algo = AlgoRule() | |
| query = '无害生活垃圾' | |
| print(algo.algo(query)) | |
| algo2 = AlgoAI() | |
| print(algo2.algo(query)) |