diff --git a/core/kernels/sparse_table_ops.cc b/core/kernels/sparse_table_ops.cc index 1864fad..0700e9e 100644 --- a/core/kernels/sparse_table_ops.cc +++ b/core/kernels/sparse_table_ops.cc @@ -267,7 +267,7 @@ class SparseTablePullKernel : public AsyncOpKernel { float* w_matrix = var_tensor->matrix().data(); size_t emb_size = sizeof(float) * dim; - CHECK_EQ(emb_size, emb_buf.cutn(w_matrix + sign_index * dim, emb_size)); + CHECK_EQ(emb_size, emb_buf.cutn(w_matrix + sign_index * dim , emb_size)); } } @@ -281,22 +281,26 @@ REGISTER_KERNEL_BUILDER(Name("SparseTablePull").Device(DEVICE_CPU), struct SparsePushVarInfo { public: - SparsePushVarInfo(const Tensor* t_value, const Tensor* t_grad) + SparsePushVarInfo(const Tensor* t_value, const Tensor* t_grad, const Tensor* t_labels) : value(t_value) - , grad(t_grad) { + , grad(t_grad) + , labels(t_labels) { const int64* feasign_vec = value->flat().data(); + const int64* fea_label_vec = t_labels->flat().data(); std::map sign_id_mapping; for (int i = 0; i < value->NumElements(); ++i) { uint64 sign = (uint64)feasign_vec[i]; + int label = static_cast(fea_label_vec[i]); auto ret = sign_id_mapping.insert({sign, sign_id_mapping.size()}); if (ret.second) { - virtual_sign_infos.emplace_back(sign, 1); + virtual_sign_infos.emplace_back(sign, 1, label); } else { auto iter = ret.first; virtual_sign_infos[iter->second].batch_show += 1; + virtual_sign_infos[iter->second].batch_click += label; } } } @@ -308,6 +312,7 @@ struct SparsePushVarInfo { public: const Tensor* value; const Tensor* grad; + const Tensor* labels; std::vector virtual_sign_infos; }; @@ -321,16 +326,17 @@ class SparseTablePushKernel : public AsyncOpKernel { } void ComputeAsync(OpKernelContext* c, DoneCallback done) override { - OP_REQUIRES_ASYNC(c, c->num_inputs() == N_ * 2, + OP_REQUIRES_ASYNC(c, c->num_inputs() == N_ * 3, errors::InvalidArgument("SparseTable push num_inputs:", c->num_inputs(), - " not equal:", N_ * 2), + " not equal:", N_ * 3), done); std::vector var_infos; for (int i = 0; i < N_; i++) { const Tensor* value = &c->input(i); const Tensor* grad = &c->input(N_ + i); + const Tensor* labels = &c->input(2 * N_ + i); OP_REQUIRES_ASYNC( c, TensorShapeUtils::IsMatrix(grad->shape()), @@ -339,7 +345,7 @@ class SparseTablePushKernel : public AsyncOpKernel { grad->shape().DebugString()), done); - var_infos.emplace_back(value, grad); + var_infos.emplace_back(value, grad, labels); } CHECK_GT(var_infos.size(), 0); diff --git a/core/main/py_wrapper.cc b/core/main/py_wrapper.cc index 08697ae..21825fc 100644 --- a/core/main/py_wrapper.cc +++ b/core/main/py_wrapper.cc @@ -114,10 +114,12 @@ PYBIND11_MODULE(_pywrap_tn, m) { return py::reinterpret_steal(obj); }) - .def("create_sparse_table", [](py::object obj, std::string name, int dimension) { + .def("create_sparse_table", [](py::object obj, std::string name, int dimension, bool use_cvm) { OptimizerBase* opt = static_cast(PyCapsule_GetPointer(obj.ptr(), nullptr)); + opt->SetUseCvm(use_cvm); + PsCluster* cluster = PsCluster::Instance(); SparseTable* table = CreateSparseTable(opt, name, dimension, cluster->RankNum(), cluster->Rank()); diff --git a/core/ops/sparse_table_ops.cc b/core/ops/sparse_table_ops.cc index 1f996c3..859314a 100644 --- a/core/ops/sparse_table_ops.cc +++ b/core/ops/sparse_table_ops.cc @@ -48,6 +48,7 @@ REGISTER_OP("SparseTablePush") )doc") .Input("values: N * int64") .Input("grads: N * float") + .Input("feature_labels: N * int64") .Attr("table_handle: int") .Attr("N: int") .SetShapeFn(shape_inference::NoOutputs); diff --git a/core/ps/optimizer/ada_grad_kernel.cc b/core/ps/optimizer/ada_grad_kernel.cc index 3a2bb94..ba356b1 100644 --- a/core/ps/optimizer/ada_grad_kernel.cc +++ b/core/ps/optimizer/ada_grad_kernel.cc @@ -94,13 +94,22 @@ SparseAdaGradValue::SparseAdaGradValue(int dim, const AdaGrad* opt) { } } + use_cvm_ = opt->ShouldUseCvm(); g2sum_ = opt->initial_g2sum; old_compat_ = false; no_show_days_ = 0; + click_ = 0; + show_ = 0; + if(opt->ShouldUseCvm()){ + w[dim] = 0; + w[dim+1] = 0; + } + } void SparseAdaGradValue::Apply(const AdaGrad* opt, SparseGradInfo& grad_info, int dim) { show_ += grad_info.batch_show; + click_ += grad_info.batch_click; no_show_days_ = 0; float* w = Weight(); @@ -116,6 +125,13 @@ void SparseAdaGradValue::Apply(const AdaGrad* opt, SparseGradInfo& grad_info, in for (int i = 0; i < dim; ++i) { w[i] -= opt->learning_rate * grad_info.grad[i] / (opt->epsilon + sqrt(g2sum_)); } + if(opt->ShouldUseCvm()){ + float log_show = log(show_ + 1); + float log_click = log(click_ + 1); + w[dim] = show_; + w[dim+1] = (log_click - log_show); + } + } void SparseAdaGradValue::SerializeTxt_(std::ostream& os, int dim) { @@ -126,7 +142,10 @@ void SparseAdaGradValue::SerializeTxt_(std::ostream& os, int dim) { os << g2sum_ << "\t"; os << show_ << "\t"; - os << no_show_days_; + os << no_show_days_ << "\t"; + if(use_cvm_){ + os << click_; + } } void SparseAdaGradValue::DeSerializeTxt_(std::istream& is, int dim) { @@ -139,6 +158,13 @@ void SparseAdaGradValue::DeSerializeTxt_(std::istream& is, int dim) { is >> show_; if(!old_compat_) { is >> no_show_days_; + if(use_cvm_){ + is >> click_; + float log_show = log(show_ + 1); + float log_click = log(click_ + 1); + Weight()[dim] = show_; + Weight()[dim+1] = (log_click - log_show); + } } } @@ -147,6 +173,9 @@ void SparseAdaGradValue::SerializeBin_(std::ostream& os, int dim) { os.write(reinterpret_cast(&g2sum_), sizeof(g2sum_)); os.write(reinterpret_cast(&show_), sizeof(show_)); os.write(reinterpret_cast(&no_show_days_), sizeof(no_show_days_)); + if(use_cvm_){ + os.write(reinterpret_cast(&click_), sizeof(click_)); + } } void SparseAdaGradValue::DeSerializeBin_(std::istream& is, int dim) { @@ -155,11 +184,19 @@ void SparseAdaGradValue::DeSerializeBin_(std::istream& is, int dim) { is.read(reinterpret_cast(&show_), sizeof(show_)); if(!old_compat_) { is.read(reinterpret_cast(&no_show_days_), sizeof(no_show_days_)); + if(use_cvm_){ + is.read(reinterpret_cast(&click_), sizeof(click_)); + float log_show = log(show_ + 1); + float log_click = log(click_ + 1); + Weight()[dim] = show_; + Weight()[dim+1] = (log_click - log_show); + } } } void SparseAdaGradValue::ShowDecay(const AdaGrad* opt, int delta_days) { show_ *= opt->show_decay_rate; + click_ *= opt->show_decay_rate; no_show_days_ += delta_days; } diff --git a/core/ps/optimizer/ada_grad_kernel.h b/core/ps/optimizer/ada_grad_kernel.h index 0cf9a67..5d3b1f0 100644 --- a/core/ps/optimizer/ada_grad_kernel.h +++ b/core/ps/optimizer/ada_grad_kernel.h @@ -88,7 +88,9 @@ class alignas(4) SparseAdaGradValue int dim_; float g2sum_; float show_ = 0.0; + float click_ = 0.0; int no_show_days_ = 0; + bool use_cvm_ = false; float data_[0]; }; diff --git a/core/ps/optimizer/data_struct.h b/core/ps/optimizer/data_struct.h index 09da668..5965379 100644 --- a/core/ps/optimizer/data_struct.h +++ b/core/ps/optimizer/data_struct.h @@ -20,6 +20,7 @@ namespace tensornet { struct SparseGradInfo { float* grad; int batch_show; + int batch_click; }; extern int const SERIALIZE_FMT_ID; @@ -56,6 +57,7 @@ class alignas(4) SparseOptValue { protected: float show_ = 0.0; + float click_ = 0.0; int delta_show_ = 0; bool old_compat_ = false; }; diff --git a/core/ps/optimizer/optimizer.h b/core/ps/optimizer/optimizer.h index c58c582..910197d 100644 --- a/core/ps/optimizer/optimizer.h +++ b/core/ps/optimizer/optimizer.h @@ -43,9 +43,18 @@ class OptimizerBase { return std::make_tuple(false, emptyString); } + virtual void SetUseCvm(bool use_cvm) { + use_cvm_ = use_cvm; + } + + virtual bool ShouldUseCvm() const { + return use_cvm_; + } + public: float learning_rate = 0.01; float show_decay_rate = 0.98; + float use_cvm_ = false; }; class Adam : public OptimizerBase { @@ -90,8 +99,9 @@ class AdaGrad : public OptimizerBase { ++column_count; } - // columns should be sign, dim_, dims_ * weight, g2sum, show, no_show_days - // if columnCount is 12, means no no_show_days column + // if use cvm plugins, columns should be sign, dim_, dims_ * weight, g2sum, show, no_show_days, click,should be dim + 6 + // if no use cvm, no click, should be dim + 5 + // for old version, no no_show_days column, column_count should be dim + 4 if(column_count == dim + 4){ need_old_compat = true; } diff --git a/core/ps/optimizer/optimizer_kernel.h b/core/ps/optimizer/optimizer_kernel.h index f6ae170..44b5025 100644 --- a/core/ps/optimizer/optimizer_kernel.h +++ b/core/ps/optimizer/optimizer_kernel.h @@ -276,7 +276,7 @@ class SparseKernelBlock { SparseKernelBlock(const OptimizerBase* opt, int dimension) : values_(15485863, sparse_key_hasher) , dim_(dimension) - , alloc_(ValueType::DynSizeof(dim_), 1 << 16) { + , alloc_(ValueType::DynSizeof(dimension + opt->ShouldUseCvm() * 2), 1 << 16) { values_.max_load_factor(0.75); opt_ = dynamic_cast(opt); mutex_ = std::make_unique(); diff --git a/core/ps/table/sparse_table.cc b/core/ps/table/sparse_table.cc index 9f6e278..5d5ee36 100644 --- a/core/ps/table/sparse_table.cc +++ b/core/ps/table/sparse_table.cc @@ -47,7 +47,6 @@ void SparseTable::SetHandle(uint32_t handle) { void SparseTable::Pull(const SparsePullRequest* req, butil::IOBuf& out_emb_buf, SparsePullResponse* resp) { resp->set_table_handle(req->table_handle()); - CHECK_EQ(dim_, req->dim()); resp->set_dim(req->dim()); for (int i = 0; i < req->signs_size(); ++i) { @@ -57,23 +56,23 @@ void SparseTable::Pull(const SparsePullRequest* req, butil::IOBuf& out_emb_buf, float* w = op_kernel_->GetWeight(sign); CHECK(nullptr != w); - out_emb_buf.append(w, sizeof(float) * dim_); + out_emb_buf.append(w, sizeof(float) * (req->dim())); } } void SparseTable::Push(const SparsePushRequest* req, butil::IOBuf& grad_buf, SparsePushResponse* resp) { - CHECK_EQ(dim_, req->dim()); - float grad[dim_]; + float grad[req->dim()]; SparsePushSignInfo sign_info; while (sizeof(sign_info) == grad_buf.cutn(&sign_info, sizeof(sign_info))) { - size_t grad_size = sizeof(float) * dim_; + size_t grad_size = sizeof(float) * req->dim(); CHECK_EQ(grad_size, grad_buf.cutn(grad, grad_size)); SparseGradInfo grad_info; grad_info.grad = grad; grad_info.batch_show = sign_info.batch_show; + grad_info.batch_click = sign_info.batch_click; op_kernel_->Apply(sign_info.sign, grad_info); } diff --git a/core/ps_interface/ps_raw_interface.h b/core/ps_interface/ps_raw_interface.h index f2c8b81..a2569ea 100644 --- a/core/ps_interface/ps_raw_interface.h +++ b/core/ps_interface/ps_raw_interface.h @@ -22,16 +22,18 @@ namespace tensornet { struct SparsePushSignInfo { public: SparsePushSignInfo() - : SparsePushSignInfo(0, 0) + : SparsePushSignInfo(0, 0, 0) { } - SparsePushSignInfo(uint64_t s, int bs) + SparsePushSignInfo(uint64_t s, int bs, int cs) : sign(s) , batch_show(bs) + , batch_click(cs) { } uint64_t sign; int batch_show; + int batch_click; }; } // namespace tensornet diff --git a/examples/common/feature_column.py b/examples/common/feature_column.py index 4083a49..51674ca 100755 --- a/examples/common/feature_column.py +++ b/examples/common/feature_column.py @@ -24,11 +24,14 @@ def create_emb_model(features, columns_group, suffix = "_input"): for slot in features: inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype="int64", sparse=True) + inputs["label"] = tf.keras.layers.Input(name="label", shape=(None,), dtype="int64", sparse=False) + sparse_opt = tn.core.AdaGrad(learning_rate=0.01, initial_g2sum=0.1, initial_scale=0.1) for column_group_name in columns_group.keys(): embs = tn.layers.EmbeddingFeatures(columns_group[column_group_name], sparse_opt, - name=column_group_name + suffix)(inputs) + name=column_group_name + suffix, target_columns=["label"])(inputs) + #name=column_group_name + suffix)(inputs) model_output.append(embs) emb_model = tn.model.Model(inputs=inputs, outputs=model_output, name="emb_model") diff --git a/examples/main.py b/examples/main.py index 48b6d69..c7d9602 100755 --- a/examples/main.py +++ b/examples/main.py @@ -22,7 +22,7 @@ def parse_line_batch(example_proto): fea_desc[slot] = tf.io.VarLenFeature(tf.int64) feature_dict = tf.io.parse_example(example_proto, fea_desc) - label = feature_dict.pop('label') + label = feature_dict.pop['label'] return feature_dict, label def create_model(): diff --git a/examples/models/wide_deep.py b/examples/models/wide_deep.py index 96490d9..a78f276 100755 --- a/examples/models/wide_deep.py +++ b/examples/models/wide_deep.py @@ -25,13 +25,14 @@ def create_sub_model(linear_embs, deep_embs, deep_hidden_units): for i, unit in enumerate(C.DEEP_HIDDEN_UNITS): deep = tf.keras.layers.Dense(unit, activation='relu', name='dnn_{}'.format(i))(deep) - if linear_inputs and not deep_inputs: - output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(linear) - elif deep_inputs and not linear_inputs: - output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(deep) - else: - both = tf.keras.layers.concatenate([deep, linear], name='both') - output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both) +# if linear_inputs and not deep_inputs: +# output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(linear) +# elif deep_inputs and not linear_inputs: +# output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(deep) +# else: + both = tf.keras.layers.concatenate([deep, linear], name='both') + both = tn.layers.TNBatchNormalization(synchronized=True, sync_freq=4, max_count=1000000)(both) + output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both) return tn.model.Model(inputs=[linear_inputs, deep_inputs], outputs=output, name="sub_model") @@ -45,6 +46,7 @@ def WideDeep(linear_features, dnn_features, dnn_hidden_units=(128, 128)): inputs = {} for slot in features: inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype="int64", sparse=True) + inputs['label'] = tf.keras.layers.Input(name="label", shape=(None,), dtype="int64", sparse=False) emb_model = create_emb_model(features, columns_group) linear_embs, deep_embs = emb_model(inputs) sub_model = create_sub_model(linear_embs, deep_embs, dnn_hidden_units) diff --git a/tensornet/core/gen_sparse_table_ops.py b/tensornet/core/gen_sparse_table_ops.py index 7f14ec7..ecfaeb8 100644 --- a/tensornet/core/gen_sparse_table_ops.py +++ b/tensornet/core/gen_sparse_table_ops.py @@ -128,7 +128,7 @@ def sparse_table_pull_eager_fallback(resources, values, table_handle, name, ctx) @_dispatch.add_dispatch_list @tf_export('sparse_table_push') -def sparse_table_push(values, grads, table_handle, name=None): +def sparse_table_push(values, grads, feature_labels, table_handle, name=None): r"""push variable from parameter server Args: @@ -146,17 +146,17 @@ def sparse_table_push(values, grads, table_handle, name=None): try: _result = pywrap_tfe.TFE_Py_FastPathExecute( _ctx._context_handle, tld.device_name, "SparseTablePush", name, - tld.op_callbacks, values, grads, "table_handle", table_handle) + tld.op_callbacks, values, grads, feature_labels, "table_handle", table_handle) return _result except _core._FallbackException: try: return sparse_table_push_eager_fallback( - values, grads, table_handle=table_handle, name=name, ctx=_ctx) + values, grads, feature_labels, table_handle=table_handle, name=name, ctx=_ctx) except _core._SymbolicException: pass # Add nodes to the TensorFlow graph. except (TypeError, ValueError): result = _dispatch.dispatch( - sparse_table_push, values=values, grads=grads, + sparse_table_push, values=values, grads=grads, feature_labels=feature_labels, table_handle=table_handle, name=name) if result is not _dispatch.OpDispatcher.NOT_SUPPORTED: return result @@ -181,11 +181,11 @@ def sparse_table_push(values, grads, table_handle, name=None): table_handle = _execute.make_int(table_handle, "table_handle") try: _, _, _op, _outputs = _op_def_library._apply_op_helper( - "SparseTablePush", values=values, grads=grads, + "SparseTablePush", values=values, grads=grads, feature_labels=feature_labels, table_handle=table_handle, name=name) except (TypeError, ValueError): result = _dispatch.dispatch( - sparse_table_push, values=values, grads=grads, + sparse_table_push, values=values, grads=grads, feature_labels=feature_labels, table_handle=table_handle, name=name) if result is not _dispatch.OpDispatcher.NOT_SUPPORTED: return result @@ -194,7 +194,7 @@ def sparse_table_push(values, grads, table_handle, name=None): SparseTablePush = tf_export("raw_ops.SparseTablePush")(_ops.to_raw_op(sparse_table_push)) -def sparse_table_push_eager_fallback(values, grads, table_handle, name, ctx): +def sparse_table_push_eager_fallback(values, grads, feature_labels, table_handle, name, ctx): if not isinstance(values, (list, tuple)): raise TypeError( "Expected list for 'values' argument to " @@ -212,7 +212,8 @@ def sparse_table_push_eager_fallback(values, grads, table_handle, name, ctx): table_handle = _execute.make_int(table_handle, "table_handle") values = _ops.convert_n_to_tensor(values, _dtypes.int64) grads = _ops.convert_n_to_tensor(grads, _dtypes.float32) - _inputs_flat = list(values) + list(grads) + feature_labels = _ops.convert_n_to_tensor(feature_labels, _dtypes.int64) + _inputs_flat = list(values) + list(grads) + list(feature_labels) _attrs = ("table_handle", table_handle, "N", _attr_N) _result = _execute.execute(b"SparseTablePush", 0, inputs=_inputs_flat, attrs=_attrs, ctx=ctx, name=name) diff --git a/tensornet/layers/embedding_features.py b/tensornet/layers/embedding_features.py index f4597e3..94d4c13 100644 --- a/tensornet/layers/embedding_features.py +++ b/tensornet/layers/embedding_features.py @@ -21,6 +21,7 @@ import collections import json +import tensorflow as tf import tensornet as tn from tensornet.core import gen_sparse_table_ops @@ -35,16 +36,21 @@ class StateManagerImpl(fc.StateManager): """ """ - def __init__(self, layer, name, sparse_opt, dimension, trainable): + def __init__(self, layer, name, sparse_opt, dimension, trainable, target_columns=None, use_cvm=False): self._trainable = trainable self._layer = layer + self.use_cvm = use_cvm + if target_columns: + self.use_cvm = True - self.sparse_table_handle = tn.core.create_sparse_table(sparse_opt, name if name else "", dimension) + self.sparse_table_handle = tn.core.create_sparse_table(sparse_opt, name if name else "", dimension, self.use_cvm) self.pulled_mapping_values = {} if self._layer is not None and not hasattr(self._layer, '_resources'): self._layer._resources = [] # pylint: disable=protected-access + self._target_columns = target_columns + # be different with tensorflow StateManager implementation, we only support # store one variable for one unique feature column, which is name 'embedding_weights' self._cols_to_var_map = collections.defaultdict(lambda: None) @@ -66,10 +72,14 @@ def create_variable(self, assert isinstance(feature_column, fc.EmbeddingColumn) var_name = column_name + '/' + name + new_shape = shape + + if self._target_columns: + new_shape = (shape[0], shape[-1] + 2) var = self._layer.add_weight( name=var_name, - shape=shape, + shape=new_shape, dtype=dtype, initializer=initializer, trainable=self._trainable and trainable, @@ -90,7 +100,9 @@ def pull(self, features): feature_values = [] for column_name, sparse_feature in features.items(): - if column_name not in self._cols_to_var_map: + if self._target_columns and column_name in self._target_columns: + continue + if column_name not in self._cols_to_var_map and column_name != "label": raise ValueError("slot embedding variable not created, ", column_name) if not isinstance(sparse_feature, sparse_tensor_lib.SparseTensor): @@ -103,7 +115,6 @@ def pull(self, features): [var.handle for var in vars], [f.values for f in feature_values], table_handle=self.sparse_table_handle) - assert len(pulled_mapping_values) == len(vars) for var, mapping_value in zip(vars, pulled_mapping_values): @@ -118,6 +129,7 @@ def pull(self, features): def push(self, grads_and_vars, features): grads = [] feature_values = [] + feature_labels = [] for grad, var in grads_and_vars: if var.ref() not in self._var_to_cols_map: @@ -136,11 +148,13 @@ def push(self, grads_and_vars, features): grads.append(grad.values) feature_values.append(sparse_feature.values) + feature_label = self._layer.feature_clicks[column_name] + feature_labels.append(tf.squeeze(feature_label.value())) # grads and feature_values must not empty assert grads and feature_values - return gen_sparse_table_ops.sparse_table_push(feature_values, grads, + return gen_sparse_table_ops.sparse_table_push(feature_values, grads, feature_labels, table_handle=self.sparse_table_handle) def get_feature_mapping_values(self, column_name): @@ -165,6 +179,7 @@ def __init__(self, trainable=True, name=None, is_concat=False, + target_columns=None, **kwargs): """create a embedding feature layer. when this layer is been called, all the embedding data of `feature_columns` will be @@ -180,6 +195,9 @@ def __init__(self, name: Name to give to the EmbeddingFeatures. is_concat: when this parameter is True, all the tensor of pulled will be concat with axis=-1 and returned. + target_columns: labels used for cvm plugin. labels will be counted as a feature, + calculate total_count, label_count (usually used for ctr, counting show/click number) + embedding output will include embedding size float + totol_count + label_count / total_count """ super(EmbeddingFeatures, self).__init__( @@ -192,9 +210,16 @@ def __init__(self, assert feature_column.dimension == dim, "currently we only support feature_columns with same dimension in EmbeddingFeatures" self._feature_columns = feature_columns - self._state_manager = StateManagerImpl(self, name, sparse_opt, dim, self.trainable) # pylint: disable=protected-access self.sparse_pulling_features = None self.is_concat = is_concat + self._target_columns = target_columns + if target_columns and len(target_columns) > 1: + raise ValueError( + 'For now cvm plugin only support one column, ' + 'Given: {}'.format(target_columns)) + self._state_manager = StateManagerImpl(self, name, sparse_opt, dim, self.trainable, target_columns) # pylint: disable=protected-access + self.sparse_target_features = None + self.feature_clicks = {} for column in self._feature_columns: if not isinstance(column, fc.EmbeddingColumn): @@ -204,6 +229,9 @@ def __init__(self, def build(self, input_shapes): for column in self._feature_columns: + initial_value = tf.zeros([0, 1], dtype=tf.int64) + var = tf.Variable(initial_value, shape=[None, 1], name="label_count" + column.categorical_column.name, trainable=False) + self.feature_clicks[column.categorical_column.name] = var with ops.name_scope(column.name): column.create_state(self._state_manager) @@ -217,7 +245,21 @@ def call(self, features, cols_to_output_tensors=None): using_features = self.filter_not_used_features(features) transformation_cache = fc.FeatureTransformationCache(using_features) - self.sparse_pulling_features = self.get_sparse_pulling_feature(using_features) + self.sparse_pulling_features, self.sparse_target_features = self.get_sparse_pulling_feature(using_features) + + if self._target_columns: + labels = features[self._target_columns[0]] + + for tensor_index, sparse_tensor_key in enumerate(self.sparse_pulling_features): + sparse_tensor = features[sparse_tensor_key] + indices_for_gather = tf.expand_dims(sparse_tensor.indices[:, 0], axis=-1) + if self._target_columns: + feature_values = tf.gather_nd(labels, indices_for_gather) + self.feature_clicks[str(sparse_tensor_key)].assign(feature_values) + else: + num_elements = tf.shape(indices_for_gather)[0] + zeros_tensor = tf.zeros((num_elements,1), dtype=tf.int64) + self.feature_clicks[str(sparse_tensor_key)].assign(zeros_tensor) pulled_mapping_values = self._state_manager.pull(self.sparse_pulling_features) @@ -233,10 +275,19 @@ def call(self, features, cols_to_output_tensors=None): processed_tensors = self._process_dense_tensor(column, tensor) + if self._target_columns: + tensor_shape = tf.shape(processed_tensors) + + num_features = tensor_shape[1] + mask = tf.concat([tf.ones([1, num_features - 2], dtype=tensor.dtype), tf.zeros([1, 2], dtype=tensor.dtype)], axis=1) + new_tensor = processed_tensors * mask + tf.stop_gradient(processed_tensors * (1 - mask)) + else: + new_tensor = processed_tensors + if cols_to_output_tensors is not None: - cols_to_output_tensors[column] = processed_tensors + cols_to_output_tensors[column] = new_tensor - output_tensors.append(processed_tensors) + output_tensors.append(masked_tensor) if self.is_concat: return self._verify_and_concat_tensors(output_tensors) @@ -257,13 +308,18 @@ def filter_not_used_features(self, features): def get_sparse_pulling_feature(self, features): new_features = {} + target_features = {} for column_name, feature in features.items(): if not isinstance(feature, sparse_tensor_lib.SparseTensor): continue + if self._target_columns and column_name in self._target_columns: + target_features[column_name] = feature + continue + new_features[column_name] = feature - return new_features + return new_features, target_features def save_sparse_table(self, filepath, mode): return self._state_manager.save_sparse_table(filepath, mode) @@ -281,12 +337,16 @@ def compute_output_shape(self, input_shape): total_elements = 0 for column in self._feature_columns: total_elements += column.variable_shape.num_elements() + if self._target_columns: + total_elements += 2 return self._target_shape(input_shape, total_elements) def _process_dense_tensor(self, column, tensor): """ """ num_elements = column.variable_shape.num_elements() + if self._target_columns: + num_elements += 2 target_shape = self._target_shape(array_ops.shape(tensor), num_elements) return array_ops.reshape(tensor, shape=target_shape)