diff --git a/recsplain/__init__.py b/recsplain/__init__.py index eb59318..1b0673d 100644 --- a/recsplain/__init__.py +++ b/recsplain/__init__.py @@ -1,4 +1,5 @@ -__version__="0.0.83" +__version__="0.0.104" +from .similarity_helpers import SciKitNearestNeighbors, RedisIndex from .strategies import BaseStrategy, AvgUserStrategy, RedisStrategy from .encoders import PartitionSchema from .endpoint import run_server diff --git a/recsplain/encoders.py b/recsplain/encoders.py index b4d37e7..941b3b5 100644 --- a/recsplain/encoders.py +++ b/recsplain/encoders.py @@ -522,4 +522,3 @@ def json_encode(self, value): def encode(self, value): val = self.get_feature(value) return self.json_encode(val) - diff --git a/recsplain/similarity_helpers.py b/recsplain/similarity_helpers.py index 3120c06..d9ae185 100644 --- a/recsplain/similarity_helpers.py +++ b/recsplain/similarity_helpers.py @@ -1,4 +1,5 @@ import sys +from typing import Dict import numpy as np import collections from sklearn.neighbors import NearestNeighbors @@ -9,11 +10,14 @@ except ModuleNotFoundError: print ("hnswlib not found") HNSWMock = collections.namedtuple("HNSWMock", ("Index", "max_elements")) - hnswlib = HNSWMock(None,0) + class MockHnsw: + def __init__(self, *args, **kwargs) -> None: + pass + hnswlib = HNSWMock(MockHnsw(),0) try: import faiss available_engines.add("faiss") -except ModuleNotFoundError: +except Exception: print ("faiss not found") faiss = None try: @@ -45,7 +49,7 @@ def parse_server_name(sname): class FaissIndexFactory: - def __init__(self, space, dim, index_factory, **kwargs): + def __init__(self, space:str, dim:int, index_factory:str, **kwargs): if index_factory == '': index_factory = 'Flat' if space in ['ip', 'cosine']: @@ -90,7 +94,7 @@ def load_index(self, fname): self.index = faiss.read_index(fname) class LazyHnsw(hnswlib.Index): - def __init__(self, space, dim, index_factory=None,max_elements=1024, ef_construction=200, M=16): + def __init__(self, space:str, dim:int, max_elements=1024, ef_construction=200, M=16,**kwargs): super().__init__(space, dim) self.init_max_elements = max_elements self.init_ef_construction = ef_construction @@ -151,7 +155,7 @@ def get_current_count(self): class SciKitNearestNeighbors: - def __init__(self, space, dim, index_factory=None, **kwargs): + def __init__(self, space:str, dim:int, **kwargs): if space=="ip": self.space = "cosine" sys.stderr.write("Warning: ip is not supported by sklearn, falling back to cosine") @@ -161,7 +165,7 @@ def __init__(self, space, dim, index_factory=None, **kwargs): self.items = [] self.ids = [] self.fitted = False - self.index = NearestNeighbors(metric=self.space,n_jobs=-1,n_neighbors=10, **kwargs) + self.index = NearestNeighbors(metric=self.space,n_jobs=-1,n_neighbors=10) def __len__(self): return len(self.items) @@ -175,6 +179,8 @@ def init(self, **kwargs): def add_items(self, data, ids=None, num_threads=-1): self.items.extend(data) + if ids is None: + ids = list(range(len(self.items),len(self.items)+len(data))) self.ids.extend(ids) self.fitted = False @@ -186,7 +192,8 @@ def search(self, data, k=1): self.index.fit(self.items) self.fitted = True scores, idx = self.index.kneighbors(data ,k, return_distance=True) - return (scores, idx) + names = [[self.ids[i] for i in ids] for ids in idx] + return scores, names def get_max_elements(self): return -1 @@ -196,17 +203,31 @@ def get_current_count(self): class RedisIndex: - def __init__(self, space, dim, index_factory=None,redis_credentials=None,max_elements=1024, ef_construction=200, M=16): + def __init__(self, space:str, dim:int, redis_credentials=None,max_elements=1024, ef_construction=200, M=16, overwrite=True,**kwargs): self.space = space self.dim = dim self.max_elements = max_elements self.ef_construction = ef_construction self.M = M + if kwargs.get("index_name") is None: + self.index_name = "idx" + else: + self.index_name = kwargs.get("index_name") if redis_credentials is None: raise Exception("Redis credentials must be provided") self.redis = Redis(**redis_credentials) self.pipe = None + if overwrite: + try: + self.redis.ft(self.index_name).info() + index_exists = True + except: + index_exists = False + if index_exists: + self.redis.ft(self.index_name).dropindex(delete_documents=True) self.init_hnsw() + # applicable only for user events + self.user_keys=[] def __enter__(self): self.pipe = self.redis.pipeline() @@ -221,24 +242,37 @@ def __len__(self): def __itemgetter__(self, item): return super().get_items([item])[0] + def user_keys(self): + """Get all user keys""" + return [s.decode()[5:] for s in self.redis.keys("user:*")] + + def item_keys(self): + """Get all item keys""" + return [s.decode()[5:] for s in self.redis.keys("item:*")] + + def vector_keys(self): + """Get all vector keys""" + return [s.decode()[4:] for s in self.redis.keys("vec:*")] + def search(self, data, k=1,partition=None): + """Search the nearest neighbors of the given vectors, and a given partition.""" query_vector = np.array(data).astype(np.float32).tobytes() - #prepare the query p = "(@partition:{"+partition+"})" if partition is not None else "*" - q = Query(f'{p}=>[KNN {k} embedding $vec_param AS vector_score]').sort_by('vector_score').paging(0,k).return_fields('vector_score','item_id').dialect(2) + q = Query(f'{p}=>[KNN {k} @embedding $vec_param AS vector_score]').sort_by('vector_score').paging(0,k).return_fields('vector_score','item_id').dialect(2) params_dict = {"vec_param": query_vector} - results = self.redis.ft().search(q, query_params = params_dict) + results = self.redis.ft(self.index_name).search(q, query_params = params_dict) scores, ids = [], [] for item in results.docs: scores.append(item.vector_score) ids.append(item.item_id) - return (scores, ids) + return scores, ids def add_items(self, data, ids=None, partition=None): + """Add items and ids to the index, if a partition is not defined it defaults to NONE""" self.pipe = self.redis.pipeline(transaction=False) if partition is None: - partition="" + partition="NONE" for datum, id in zip(data, ids): key='item:'+ str(id) emb = np.array(datum).astype(np.float32).tobytes() @@ -248,20 +282,91 @@ def add_items(self, data, ids=None, partition=None): self.pipe = None def get_items(self, ids=None): + """Get items by id""" ret = [] for id in ids: - ret.append(self.redis.hget("item:"+str(id), "embedding")) - return ret + ret.append(np.frombuffer(self.redis.hget("item:"+str(id), "embedding"), dtype=np.float32)) + return np.vstack(ret) + + def add_user_event(self, user_id: str, data: Dict[str, str],ttl: int = 60*60*24): + """ + Adds a user event to the index. The event is stored in a hash with the key user:{user_id} and the fields + fields are defined by the `user_keys` property + """ + if not any(self.user_keys): + raise Exception("User keys must be set before adding user events") + vals = [] + for key in self.user_keys: + v = data.get(key,"") + # ad hoc int trimming + try: + if v==int(v): + v=int(v) + except: + pass + vals.append(v) + val = '|'.join(map(str, vals)) + if self.pipe: + self.pipe.rpush("user:"+str(user_id), val) + if ttl: + self.pipe.expire("user:"+str(user_id), ttl) + else: + self.redis.rpush("user:"+str(user_id), val) + if ttl: + self.redis.expire("user:"+str(user_id), ttl) + return self + def del_user(self, user_id): + """Delete a user key from Redis""" + if self.pipe: + self.pipe.delete("user:"+str(user_id)) + else: + self.redis.delete("user:"+str(user_id)) + + def get_user_events(self, user_id: str): + """Gets a list of user events by key""" + if not any(self.user_keys): + raise Exception("User keys must be set before getting user events") + ret = self.redis.lrange("user:"+str(user_id), 0, -1) + return [dict(zip(self.user_keys,x.decode().split('|'))) for x in ret] + + def set_vector(self, key, arr, prefix="vec:"): + """Sets a numpy array as a vector in redis""" + emb = np.array(arr).astype(np.float32).tobytes() + self.redis.set(prefix+str(key), emb) + return self + + def get_vector(self, key, prefix="vec:"): + """Gets a numpy array from redis""" + return np.frombuffer(self.redis.get(prefix+str(key)), dtype=np.float32) + def init_hnsw(self, **kwargs): - self.redis.ft().create_index([ + self.redis.ft(self.index_name).create_index([ VectorField("embedding", "HNSW", {"TYPE": "FLOAT32", "DIM": self.dim, "DISTANCE_METRIC": self.space, "INITIAL_CAP": self.max_elements, "M": self.M, "EF_CONSTRUCTION": self.ef_construction}), TextField("item_id"), TagField("partition") ]) def get_current_count(self): - raise NotImplementedError("RedisIndex is not implemented yet") + """Get number of items in index""" + return int(self.redis.ft(self.index_name).info()["num_docs"]) def get_max_elements(self): - return self.max_elements \ No newline at end of file + """Get max elements in index""" + return self.max_elements + + def info(self): + """Get Redis info as dict""" + return self.redis.ft(self.index_name).info() + +if __name__=="__main__": + # docker run -p 6379:6379 redislabs/redisearch:2.4.5 + sim = RedisIndex(space='cosine',dim=32,redis_credentials={"host":"127.0.0.1", "port": 6379}, overwrite=True) + data=np.random.random((100,32)) + aids=["a"+str(1+i) for i in range(100)] + bids=["b"+str(101+i) for i in range(100)] + sim.add_items(data,aids,partition="a") + sim.add_items(data,bids,partition="b") + # print(sim.search(data[0],k=10,partition=None)) + # print(sim.get_items(aids[:10])) + print (sim.item_keys()) \ No newline at end of file diff --git a/recsplain/strategies.py b/recsplain/strategies.py index 7b5478a..89d5672 100644 --- a/recsplain/strategies.py +++ b/recsplain/strategies.py @@ -38,7 +38,7 @@ def __init__(self, model_dir=None, similarity_engine=None ,engine_params={}): def init_schema(self, **kwargs): self.schema = PartitionSchema(**kwargs) self.partitions[self.schema.base_strategy_id()] = [self.IndexEngine(self.schema.metric, self.schema.dim, - self.schema.index_factory, + index_factory=self.schema.index_factory, **self.engine_params) for _ in self.schema.partitions] enc_sizes = {k: len(v) for k, v in self.schema.encoders[self.schema.base_strategy_id()].items()} @@ -47,7 +47,7 @@ def init_schema(self, **kwargs): def add_variant(self, variant): variant = self.schema.add_variant(variant) self.partitions[variant['id']] = [self.IndexEngine(self.schema.metric, self.schema.dim, - self.schema.index_factory, **self.engine_params) + index_factory=self.schema.index_factory, **self.engine_params) for _ in self.schema.partitions] # enc_sizes = {k: len(v) for k, v in self.schema.encoders[self.schema.base_strategy_id()].items()} return variant#, enc_sizes @@ -131,8 +131,11 @@ def query_by_partition_and_vector(self, partition_num, strategy_id, vec, k, expl try: vec = vec.reshape(1, -1).astype('float32') # for faiss distances, num_ids = self.partitions[strategy_id][partition_num].search(vec, k=k) - indices = np.where(num_ids != -1) - distances, num_ids = distances[indices], num_ids[indices] + if hasattr(distances[0],"__iter__"): + distances=distances[0] + num_ids=num_ids[0] + distances = [d for d,i in zip(distances,num_ids) if i>=0] + num_ids = [i for i in num_ids if i>=0] except Exception as e: raise Exception("Error in querying: " + str(e)) if len(num_ids) == 0: @@ -228,7 +231,7 @@ def load_model(self, model_name): with (model_dir/"schema.json").open('r') as f: schema_dict=json.load(f) self.schema = PartitionSchema(**schema_dict) - self.partitions = {strategy['id']: [self.IndexEngine(self.schema.metric, self.schema.dim, self.schema.index_factory, + self.partitions = {strategy['id']: [self.IndexEngine(self.schema.metric, self.schema.dim, index_factory=self.schema.index_factory, **self.engine_params) for _ in self.schema.partitions] for strategy in self.schema.strategies} model_dir.mkdir(parents=True, exist_ok=True) with (model_dir/"index_labels.json").open('r') as f: @@ -344,8 +347,8 @@ def user_query(self, user_data, item_history, k, strategy_id=None, user_coldstar class RedisStrategy(BaseStrategy): - def __init__(self, model_dir=None, similarity_engine=None, engine_params={}, redis_credentials=None, user_prefix="user:", value_sep="|", user_keys=[],event_key="event",item_key="item",event_weights={}): - super().__init__(model_dir, similarity_engine, engine_params) + def __init__(self, model_dir=None, similarity_engine=None, engine_params={}, redis_credentials=None, user_prefix="user:",vector_prefix="vec:", value_sep="|", user_keys=[],event_key="event",item_key="item",event_weights={}): + super().__init__(model_dir, similarity_engine, dict(engine_params,redis_credentials=redis_credentials)) assert Redis is not None, "RedisStrategy requires redis-py to be installed" assert redis_credentials is not None, "RedisStrategy requires redis credentials" assert len(user_keys)>0, "user_keys not supplied" @@ -353,7 +356,8 @@ def __init__(self, model_dir=None, similarity_engine=None, engine_params={}, red assert item_key in user_keys, "item_key not in user_keys" self.redis = Redis(**redis_credentials) self.sep = value_sep - self. user_prefix = user_prefix + self.user_prefix = user_prefix + self.vector_prefix = vector_prefix self.event_key=event_key self.user_keys=user_keys self.item_key=item_key @@ -366,6 +370,14 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.pipe.execute() self.pipe = None + def set_vector(self, key, arr): + """Sets a numpy array as a vector in redis""" + emb = np.array(arr).astype(np.float32).tobytes() + self.redis.set(self.vector_prefix+str(key), emb) + return self + def get_vector(self, key): + """Gets a numpy array from redis""" + return np.frombuffer(self.redis.get(self.vector_prefix+str(key)), dtype=np.float32) def del_user(self, user_id): if self.pipe: self.pipe.delete(self.user_prefix+str(user_id)) @@ -408,16 +420,15 @@ def user_partition_num(self, user_data): def user_query(self, user_data, k, strategy_id=None, user_coldstart_item=None, user_coldstart_weight=1,user_id=None): if not strategy_id: strategy_id = self.schema.base_strategy_id() - if user_coldstart_item is None: - n = 0 - vec = np.zeros(self.schema.dim) - else: + vec = np.zeros(self.schema.dim) + n = 0 + if user_coldstart_item is not None: n = user_coldstart_weight - if hasattr(user_coldstart_item, "__call__"): - item = user_coldstart_item(user_data) + if type(user_coldstart_item) == str: + vec = self.get_vector(user_coldstart_item) elif type(user_coldstart_item) == dict: item = user_coldstart_item - vec = self.schema.encode(item, strategy_id) + vec = self.schema.encode(item, strategy_id) user_partition_num = self.user_partition_num(user_data) col_mapping = self.schema.component_breakdown() labels, distances = [], [] diff --git a/setup.py b/setup.py index 4026f0e..1d960b5 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ "joblib>=0.17.0", "tqdm>=4.62.3", "pandas>=1.3.0", - "scikit-learning>=0.19.0", + "scikit-learn>=0.19.0", ], long_description="https://github.com/argmaxml/recsplain/blob/master/README.md", long_description_content_type="text/markdown", diff --git a/test/sklearn_engine_test.py b/test/sklearn_engine_test.py new file mode 100644 index 0000000..d6d0475 --- /dev/null +++ b/test/sklearn_engine_test.py @@ -0,0 +1,81 @@ +import unittest, sys +import numpy as np +from pathlib import Path + +sys.path.append(str(Path(__file__).absolute().parent.parent)) +from recsplain import AvgUserStrategy + + +class SKLearnEngineTest(unittest.TestCase): + + def setUp(self): + self.sklearn_engine = AvgUserStrategy(similarity_engine='sklearn') + self.faiss_engine = AvgUserStrategy(similarity_engine='faiss') + + self.schema = { + "filters": [{ + "field": "country", + "values": ["US", "EU"] + }], + "encoders": [{ + "field": "price", + "values": ["low", "mid", "high"], + "type": "onehot", + "weight": 1 + }, + {"field": "category", + "values": ["dairy", "meat"], + "type": "onehot", + "weight": 2 + } + ], + "metric": "l2" + } + + data = [{ + "id": "1", + "price": "low", + "category": "meat", + "country": "US" + }, + { + "id": "2", + "price": "mid", + "category": "meat", + "country": "US" + }, + { + "id": "3", + "price": "low", + "category": "dairy", + "country": "US" + }, + { + "id": "4", + "price": "high", + "category": "meat", + "country": "EU" + }] + + self.sklearn_engine.init_schema(**self.schema) + self.faiss_engine.init_schema(**self.schema) + + self.sklearn_engine.index(data) + self.faiss_engine.index(data) + + def test_item_query_equality(self): + data = { + "id": "2", + "price": "mid", + "category": "meat", + "country": "US" + } + + labels_1, distances_1, _ = self.sklearn_engine.query(data, 3) + labels_2, distances_2, _ = self.faiss_engine.query(data, 3) + self.assertEqual(labels_1, labels_2) + self.assertEqual(distances_1, tuple(np.sqrt(distances_2))) + + +if __name__ == '__main__': + unittest.main()