Skip to content

Commit

Permalink
Fix train/predict bugs in PairwiseANN (#271)
Browse files Browse the repository at this point in the history
Co-authored-by: Wei-Cheng Chang <[email protected]>
  • Loading branch information
OctoberChang and Wei-Cheng Chang authored Dec 7, 2023
1 parent c884e71 commit e4c824d
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 156 deletions.
75 changes: 37 additions & 38 deletions pecos/ann/pairwise/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,24 @@ class PredParams(pecos.BaseParams):
"""Prediction Parameters of PairwiseANN class
Attributes:
topk (int): maximum number of candidates (sorted by distances, nearest first) return by the searcher per query
batch_size (int): maximum number of (input, label) pairs te be inference on for the Searchers
only_topk (int): maximum number of candidates (sorted by distances, nearest first) return by kNN
"""

topk: int = 10
batch_size: int = 1024
only_topk: int = 10

class Searchers(object):
def __init__(self, model, max_batch_size=256, max_only_topk=10, num_searcher=1):
def __init__(self, model, pred_params, num_searcher=1):
self.searchers_ptr = model.fn_dict["searchers_create"](
model.model_ptr,
num_searcher,
)
self.destruct_fn = model.fn_dict["searchers_destruct"]

# searchers also hold the memory of returned np.ndarray
self.max_batch_size = max_batch_size
self.max_only_topk = max_only_topk
max_nnz = max_batch_size * max_only_topk
self.pred_params = pred_params
max_nnz = pred_params.batch_size * pred_params.only_topk
self.Imat = np.zeros(max_nnz, dtype=np.uint32)
self.Mmat = np.zeros(max_nnz, dtype=np.uint32)
self.Dmat = np.zeros(max_nnz, dtype=np.float32)
Expand Down Expand Up @@ -214,11 +215,18 @@ def save(self, model_folder):
c_model_dir = f"{model_folder}/c_model"
self.fn_dict["save"](self.model_ptr, c_char_p(c_model_dir.encode("utf-8")))

def searchers_create(self, max_batch_size=256, max_only_topk=10, num_searcher=1):
def get_pred_params(self):
"""Return a deep copy of prediction parameters
Returns:
copied_pred_params (dict): Prediction parameters.
"""
return copy.deepcopy(self.pred_params)

def searchers_create(self, pred_params=None, num_searcher=1):
"""create searchers that pre-allocate intermediate variables (e.g., topk_queue)
Args:
max_batch_size (int): the maximum batch size for the input/label pairs to be inference
max_only_topk (int): the maximum only topk for the kNN to return
pred_params (Pairwise.PredParams, optional): instance of pecos.ann.pairwise.Pairwise.PredParams
num_searcher: number of searcher for multi-thread inference
Returns:
PairwiseANN.Searchers: the pre-allocated PairwiseANN.Searchers (class object)
Expand All @@ -227,31 +235,25 @@ def searchers_create(self, max_batch_size=256, max_only_topk=10, num_searcher=1)
raise ValueError("self.model_ptr must exist before using searchers_create()")
if num_searcher <= 0:
raise ValueError("num_searcher={} <= 0 is NOT valid".format(num_searcher))
return PairwiseANN.Searchers(self, max_batch_size, max_only_topk, num_searcher)

def get_pred_params(self):
"""Return a deep copy of prediction parameters
Returns:
copied_pred_params (dict): Prediction parameters.
"""
return copy.deepcopy(self.pred_params)
pred_params = self.get_pred_params() if pred_params is None else pred_params
return PairwiseANN.Searchers(self, pred_params, num_searcher)

def predict(self, input_feat, label_keys, searchers, pred_params=None, is_same_input=False):
def predict(self, input_feat, label_keys, searchers, is_same_input=False):
"""predict with multi-thread. The searchers are required to be provided.
Args:
input_feat (numpy.array or smat.csr_matrix): input feature matrix (first key) to find kNN.
if is_same_input == False, the shape should be (batch_size, feat_dim)
if is_same_input == True, the shape should be (1, feat_dim)
label_keys (numpy.array): the label keys (second key) to find kNN. The shape should be (batch_size, ).
searchers (c_void_p): pointer to C/C++ vector<pecos::ann::PairwiseANN:Searcher>. Created by PairwiseANN.searchers_create().
pred_params (Pairwise.PredParams, optional): instance of pecos.ann.pairwise.Pairwise.PredParams.
if is_same_input == False, the shape should be (batch_size, feat_dim).
if is_same_input == True, the shape should be (1, feat_dim).
label_keys (numpy.array): the label keys (second key) to find kNN.
The shape should be (batch_size, ).
searchers (c_void_p): pointer to C/C++ vector<pecos::ann::PairwiseANN:Searcher>.
Created by PairwiseANN.searchers_create().
is_same_input (bool): whether to use the same first row of X to do prediction.
For real-time inference with same input query, set is_same_input = True.
For batch prediction with varying input querues, set is_same_input = False.
Returns:
Imat (np.array): returned kNN input key indices. Shape of (batch_size, topk)
Mmat (np.array): returned kNN masking array. 1/0 mean value is or is not presented. Shape of (batch_size, topk)
Mmat (np.array): returned kNN masking array. 1/0 mean value IS/ISNOT presented. Shape of (batch_size, topk)
Dmat (np.array): returned kNN distance array. Shape of (batch_size, topk)
Vmat (np.array): returned kNN value array. Shape of (batch_size, topk)
"""
Expand All @@ -273,19 +275,16 @@ def predict(self, input_feat, label_keys, searchers, pred_params=None, is_same_i
if not is_same_input and input_feat_py.rows != label_keys.shape[0]:
raise ValueError(f"input_feat_py.rows != label_keys.shape[0]")

batch_size = label_keys.shape[0]
pred_params = self.get_pred_params() if pred_params is None else pred_params
only_topk = pred_params.topk
cur_nnz = batch_size * only_topk
if batch_size > searchers.max_batch_size:
raise ValueError(f"cur_batch_size > searchers.max_batch_size")
if only_topk > searchers.max_only_topk:
raise ValueError(f"cur_only_topk > searchers.max_only_topk")
cur_bsz = label_keys.shape[0]
if cur_bsz > searchers.pred_params.batch_size:
raise ValueError(f"cur_batch_size > searchers.batch_size!")
only_topk = searchers.pred_params.only_topk
cur_nnz = cur_bsz * only_topk

searchers.reset(cur_nnz)
self.fn_dict["predict"](
searchers.ctypes(),
batch_size,
cur_bsz,
only_topk,
input_feat_py,
label_keys.ctypes.data_as(POINTER(c_uint32)),
Expand All @@ -295,8 +294,8 @@ def predict(self, input_feat, label_keys, searchers, pred_params=None, is_same_i
searchers.Vmat.ctypes.data_as(POINTER(c_float)),
c_bool(is_same_input),
)
Imat = searchers.Imat[:cur_nnz].reshape(batch_size, only_topk)
Mmat = searchers.Mmat[:cur_nnz].reshape(batch_size, only_topk)
Dmat = searchers.Dmat[:cur_nnz].reshape(batch_size, only_topk)
Vmat = searchers.Vmat[:cur_nnz].reshape(batch_size, only_topk)
Imat = searchers.Imat[:cur_nnz].reshape(cur_bsz, only_topk)
Mmat = searchers.Mmat[:cur_nnz].reshape(cur_bsz, only_topk)
Dmat = searchers.Dmat[:cur_nnz].reshape(cur_bsz, only_topk)
Vmat = searchers.Vmat[:cur_nnz].reshape(cur_bsz, only_topk)
return Imat, Mmat, Dmat, Vmat
35 changes: 22 additions & 13 deletions pecos/core/ann/pairwise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ namespace ann {
mmap_s.fput_one<index_type>(X.cols);
mmap_s.fput_one<mem_index_type>(nnz);
mmap_s.fput_multiple<value_type>(X.val, nnz);
}
}

template<class MAT_T>
void load_mat(MAT_T &X, mmap_util::MmapStore& mmap_s) {
Expand All @@ -99,7 +99,7 @@ namespace ann {
X.cols = mmap_s.fget_one<index_type>();
auto nnz = mmap_s.fget_one<mem_index_type>();
X.val = mmap_s.fget_multiple<value_type>(nnz);
}
}

template <typename T1, typename T2>
struct KeyValPair {
Expand All @@ -111,16 +111,16 @@ namespace ann {
bool operator<(const KeyValPair<T1, T2>& other) const { return input_key_dist < other.input_key_dist; }
bool operator>(const KeyValPair<T1, T2>& other) const { return input_key_dist > other.input_key_dist; }
};
// PairwiseANN Interface

// PairwiseANN Interface
template<class FeatVec_T, class MAT_T>
struct PairwiseANN {
typedef FeatVec_T feat_vec_t;
typedef MAT_T mat_t;
typedef pecos::ann::KeyValPair<index_type, value_type> pair_t;
typedef pecos::ann::heap_t<pair_t, std::less<pair_t>> max_heap_t;

struct Searcher {
struct Searcher {
typedef PairwiseANN<feat_vec_t, mat_t> pairwise_ann_t;

const pairwise_ann_t* pairwise_ann;
Expand All @@ -132,8 +132,8 @@ namespace ann {

max_heap_t& predict_single(const feat_vec_t& query_vec, const index_type label_key, index_type topk) {
return pairwise_ann->predict_single(query_vec, label_key, topk, *this);
}
};
}
};

Searcher create_searcher() const {
return Searcher(this);
Expand All @@ -143,7 +143,7 @@ namespace ann {
index_type num_input_keys; // N
index_type num_label_keys; // L
index_type feat_dim; // d

// matrices
pecos::csc_t Y_csc; // shape of [N, L]
mat_t X_trn; // shape of [N, d]
Expand All @@ -152,7 +152,14 @@ namespace ann {
pecos::mmap_util::MmapStore mmap_store;

// destructor
~PairwiseANN() {}
~PairwiseANN() {
// If mmap_store is not open for read, then the memory of Y/X is owned by this class
// Thus, we need to explicitly free the underlying memory of Y/X during destructor
if (!mmap_store.is_open_for_read()) {
this->Y_csc.free_underlying_memory();
this->X_trn.free_underlying_memory();
}
}

static nlohmann::json load_config(const std::string& filepath) {
std::ifstream loadfile(filepath);
Expand Down Expand Up @@ -215,7 +222,7 @@ namespace ann {
save_mat(X_trn, mmap_s);
mmap_s.close();
}

void load(const std::string& model_dir, bool lazy_load = false) {
auto config = load_config(model_dir + "/config.json");
std::string version = config.find("version") != config.end() ? config["version"] : "not found";
Expand Down Expand Up @@ -248,9 +255,11 @@ namespace ann {
this->num_input_keys = Y_csc.rows;
this->num_label_keys = Y_csc.cols;
this->feat_dim = X_trn.cols;
// matrices
this->Y_csc = Y_csc;
this->X_trn = X_trn;
// Deepcopy the memory of X/Y.
// Otherwise, after Python API of PairwiseANN.train(),
// the input matrices pX/pY can be modified or deleted.
this->Y_csc = Y_csc.deep_copy();
this->X_trn = X_trn.deep_copy();
}

max_heap_t& predict_single(
Expand Down
6 changes: 3 additions & 3 deletions pecos/core/libpecos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ extern "C" {
void c_pairwise_ann_predict ## SUFFIX( \
void* searchers_ptr, \
uint32_t batch_size, \
uint32_t topk, \
uint32_t only_topk, \
const PY_MAT* pQ, \
uint32_t* label_keys, \
uint32_t* ret_Imat, \
Expand All @@ -559,9 +559,9 @@ extern "C" {
int tid = omp_get_thread_num(); \
auto input_key = (is_same_input ? 0 : bidx); \
auto label_key = label_keys[bidx]; \
auto& ret_pairs = searchers[tid].predict_single(Q_tst.get_row(input_key), label_key, topk); \
auto& ret_pairs = searchers[tid].predict_single(Q_tst.get_row(input_key), label_key, only_topk); \
for (uint32_t k=0; k < ret_pairs.size(); k++) { \
uint64_t offset = static_cast<uint64_t>(bidx) * topk; \
uint64_t offset = static_cast<uint64_t>(bidx) * only_topk; \
ret_Imat[offset + k] = ret_pairs[k].input_key_idx; \
ret_Dmat[offset + k] = ret_pairs[k].input_key_dist; \
ret_Vmat[offset + k] = ret_pairs[k].input_label_val; \
Expand Down
30 changes: 29 additions & 1 deletion pecos/core/utils/matrix.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ namespace pecos {
if(touched_indices[i] < len) {
touched_indices[write_pos] = touched_indices[i];
write_pos += 1;
}
}
}
nr_touch = write_pos;
}
Expand Down Expand Up @@ -540,6 +540,34 @@ namespace pecos {
mem_index_type get_nnz() const {
return static_cast<mem_index_type>(rows) * static_cast<mem_index_type>(cols);
}

// Frees the underlying memory of the matrix (i.e., col_ptr, row_idx, and val arrays)
// Every function in the inference code that returns a matrix has allocated memory, and
// therefore one should call this function to free that memory.
void free_underlying_memory() {
if (val) {
delete[] val;
val = nullptr;
}
}

// Creates a deep copy of this matrix
// This allocates memory, so one should call free_underlying_memory on the copy when
// one is finished using it.
drm_t deep_copy() const {
mem_index_type nnz = get_nnz();
drm_t res;
res.allocate(rows, cols, nnz);
std::memcpy(res.val, val, sizeof(value_type) * nnz);
return res;
}

void allocate(index_type rows, index_type cols, mem_index_type nnz) {
this->rows = rows;
this->cols = cols;
val = new value_type[nnz];
}

};

struct dcm_t { // Dense Column Majored Matrix
Expand Down
Loading

0 comments on commit e4c824d

Please sign in to comment.