轻量级前端框架助力开发者提升项目效率与性能
1540
2022-10-01
python 实现MinHash和MinHashLSH算法
最近实现了一把MinHash和MinHashLSH算法,发现实现的细节还是挺难的,所以我把datasketch的源代码改了一下,去除了很多冗余的代码,保留了算法的实现主要细节部分。
MinHash算法:
import hashlibimport numpy as npdef sha1_hash32(data): return struct.unpack('
然后是MinhashLSH
class DictListStorage(): def __getitem__(self, key): return self.get(key) def __delitem__(self, key): return self.remove(key) def __len__(self): return self.size() def __iter__(self): for key in self.keys(): yield key def __init__(self, config,name): self._dict = defaultdict(list) def keys(self): return self._dict.keys() def get(self, key): return self._dict.get(key, []) def insert(self, key, *vals, **kwargs): self._dict[key].extend(vals) def size(self): return len(self._dict) def itemcounts(self, **kwargs): return {k: len(v) for k, v in self._dict.items()} def has_key(self, key): return key in self._dict
class DictSetStorage(): def __init__(self, config,name): self._dict = defaultdict(set) def get(self, key): return self._dict.get(key, set()) def insert(self, key, *vals, **kwargs): self._dict[key].update(vals)
def _random_name(length): return ''.join(random.choice(string.ascii_lowercase) for _ in range(length)).encode('utf8')def _false_positive_probability(threshold, b, r): _probability = lambda s : 1 - (1 - s**float(r))**float(b) a, err = integrate(_probability, 0.0, threshold) return adef _false_negative_probability(threshold, b, r): _probability = lambda s : 1 - (1 - (1 - s**float(r))**float(b)) a, err = integrate(_probability, threshold, 1.0) return adef _optimal_param(threshold, num_perm, false_positive_weight, false_negative_weight): min_error = float("inf") opt = (0, 0) for b in range(1, num_perm+1): max_r = int(num_perm / b) for r in range(1, max_r+1): fp = _false_positive_probability(threshold, b, r) fn = _false_negative_probability(threshold, b, r) error = fp*false_positive_weight + fn*false_negative_weight if error < min_error: min_error = error opt = (b, r,fp,fn) return opt
class MinHashLSH(object): def __init__(self, threshold=0.9, d=128, weights=(0.5, 0.5), params=None, storage_config=None): if storage_config is None: storage_config = {'type': 'dict'} if sum(weights) != 1.0: raise ValueError("Weights must sum to 1.0") self.h = d if params is not None: self.b, self.r = params if self.b * self.r > d: raise ValueError("The product of b and r in params is " "{} * {} = {} -- it must be less than d {}. ".format(self.b, self.r, self.b*self.r, d)) else: false_positive_weight, false_negative_weight = weights self.b, self.r ,self.fp,self.fn= _optimal_param(threshold, d,false_positive_weight, false_negative_weight) print('the best parameter b={},r={},fp={},fn={}'.format(self.b,self.r,self.fp,self.fn)) basename = storage_config.get('basename', _random_name(11)) self.hashtables=[] self.hashranges=[] for i in range(self.b): name=b''.join([basename, b'_bucket_', struct.pack('>H', i)]) item=DictSetStorage(storage_config, name=name) self.hashtables.append(item) self.hashranges.append((i*self.r, (i+1)*self.r)) self.keys = DictListStorage(storage_config, name=b''.join([basename, b'_keys'])) def insert(self, key, minhash): self._insert(key, minhash, buffer=False) def _insert(self, key, minhash, buffer=False): if key in self.keys: raise ValueError("key already exists") Hs=[] for start, end in self.hashranges: Hs.append(self._H(minhash.hashvalues[start:end])) self.keys.insert(key, *Hs, buffer=buffer) for H, hashtable in zip(Hs, self.hashtables): hashtable.insert(H, key, buffer=buffer) def query(self, minhash): candidates = set() for (start, end), hashtable in zip(self.hashranges, self.hashtables): H = self._H(minhash.hashvalues[start:end]) for key in hashtable.get(H): candidates.add(key) return list(candidates) def _H(self,hs): return bytes(hs.byteswap().data)
这是实现的全过程了,哪天我能够把这些东西自己手动实现出来,我应该就很牛了,哈哈,现在还在学习模仿阶段。
参考文献
[1]. datasketch. https://github.com/ekzhu/datasketch
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~