diff --git a/RecommenderSystems/dcn/dcn_train_eval.py b/RecommenderSystems/dcn/dcn_train_eval.py index c6a0a7651..dba4bf952 100644 --- a/RecommenderSystems/dcn/dcn_train_eval.py +++ b/RecommenderSystems/dcn/dcn_train_eval.py @@ -46,9 +46,9 @@ def str_list(x): ) parser.add_argument("--embedding_vec_size", type=int, default=16) parser.add_argument("--batch_norm", type=bool, default=False) - parser.add_argument("--dnn_hidden_units", type=int_list, default="1000,1000,1000,1000,1000") - parser.add_argument("--crossing_layers", type=int, default=3) - parser.add_argument("--net_dropout", type=float, default=0.2) + parser.add_argument("--dnn_hidden_units", type=int_list, default="1024,1024") + parser.add_argument("--crossing_layers", type=int, default=6) + parser.add_argument("--net_dropout", type=float, default=0.5) parser.add_argument("--embedding_regularizer", type=float, default=None) parser.add_argument("--net_regularizer", type=float, default=None) @@ -112,9 +112,9 @@ def __init__( self, parquet_file_url_list, batch_size, - num_epochs=1, + num_epochs, shuffle_row_groups=True, - shard_seed=2019, + shard_seed=1234, shard_count=1, cur_shard=0, ): @@ -126,16 +126,17 @@ def __init__( self.shard_count = shard_count self.cur_shard = cur_shard - fields = ["Label"] + fields = ["label"] fields += [f"I{i+1}" for i in range(num_dense_fields)] + self.I_end = len(fields) fields += [f"C{i+1}" for i in range(num_sparse_fields)] + self.C_end = len(fields) self.fields = fields - self.num_fields = len(fields) def __enter__(self): self.reader = make_batch_reader( self.parquet_file_url_list, - workers_count=2, + workers_count=1, shuffle_row_groups=self.shuffle_row_groups, num_epochs=self.num_epochs, shard_seed=self.shard_seed, @@ -152,9 +153,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback): def get_batches(self, reader, batch_size=None): if batch_size is None: batch_size = self.batch_size - tail = None - for rg in reader: rgdict = rg._asdict() rglist = [rgdict[field] for field in self.fields] @@ -164,24 +163,26 @@ def get_batches(self, reader, batch_size=None): tail = list( [ np.concatenate((tail[i], rglist[i][0 : (batch_size - len(tail[i]))])) - for i in range(self.num_fields) + for i in range(self.C_end) ] ) if len(tail[0]) == batch_size: label = tail[0] - features = tail[1 : self.num_fields] + dense = tail[1 : self.I_end] + sparse = tail[self.I_end : self.C_end] tail = None - yield label, np.stack(features, axis=-1) + yield label, np.stack(dense, axis=-1), np.stack(sparse, axis=-1) else: pos = 0 continue while (pos + batch_size) <= len(rglist[0]): label = rglist[0][pos : pos + batch_size] - features = [rglist[j][pos : pos + batch_size] for j in range(1, self.num_fields)] + dense = [rglist[j][pos : pos + batch_size] for j in range(1, self.I_end)] + sparse = [rglist[j][pos : pos + batch_size] for j in range(self.I_end, self.C_end)] pos += batch_size - yield label, np.stack(features, axis=-1) + yield label, np.stack(dense, axis=-1), np.stack(sparse, axis=-1) if pos != len(rglist[0]): - tail = [rglist[i][pos:] for i in range(self.num_fields)] + tail = [rglist[i][pos:] for i in range(self.C_end)] def make_criteo_dataloader(data_path, batch_size, shuffle=True, shard_seed=2022): @@ -340,7 +341,7 @@ def __init__( size_factor=size_factor, ) - input_dim = embedding_vec_size * (num_dense_fields + num_sparse_fields) + input_dim = embedding_vec_size * num_sparse_fields + num_dense_fields self.dnn = ( DNN( @@ -363,13 +364,12 @@ def __init__( self.reset_parameters() - def forward(self, X): - - feature_emb = self.embedding_layer(X) - flat_feature_emb = feature_emb.flatten(start_dim=1) - cross_out = self.crossnet(flat_feature_emb) + def forward(self, dense_fields, sparse_fields) -> flow.Tensor: + feature_emb = self.embedding_layer(sparse_fields) + feature = flow.cat([feature_emb.flatten(start_dim=1), dense_fields], dim=1) + cross_out = self.crossnet(feature) if self.dnn is not None: - dnn_out = self.dnn(flat_feature_emb) + dnn_out = self.dnn(feature) final_out = flow.cat([cross_out, dnn_out], dim=-1) else: final_out = cross_out @@ -409,9 +409,9 @@ def __init__(self, dcn_module, amp=False): if amp: self.config.enable_amp(True) - def build(self, features): - predicts = self.module(features.to("cuda")) - return predicts + def build(self, dense_fields, sparse_fields): + predicts = self.module(dense_fields.to("cuda"), sparse_fields.to("cuda")) + return predicts.sigmoid() # TODO check sigmoid class DCNTrainGraph(flow.nn.Graph): @@ -429,14 +429,12 @@ def __init__( self.config.enable_amp(True) self.set_grad_scaler(grad_scaler) - def build(self, labels, features): - - logits = self.module(features.to("cuda")).squeeze() - loss = self.loss(logits, labels.squeeze().to("cuda")) + def build(self, labels, dense_fields, sparse_fields): + logits = self.module(dense_fields.to("cuda"), sparse_fields.to("cuda")) + loss = self.loss(logits, labels.to("cuda")) reduce_loss = flow.mean(loss) reduce_loss.backward() - - return reduce_loss.to("cpu") + return reduce_loss.to("cpu")# TODO: checkout to cpu def make_lr_scheduler(args, optimizer): @@ -535,8 +533,8 @@ def early_stop( dcn_module.train() last_step, last_time = 0, time.time() for step in range(1, args.train_batches + 1): - labels, features = batch_to_global(*next(loader)) - loss = train_graph(labels, features) + labels, dense_fields, sparse_fields = batch_to_global(*next(loader)) + loss = train_graph(labels, dense_fields, sparse_fields) if step % args.loss_print_interval == 0: loss = loss.numpy() @@ -586,9 +584,10 @@ def early_stop( last_time = time.time() if args.save_best_model: load_model(f"{args.model_save_dir}/best_checkpoint") - if rank == 0: - print("================ Test Evaluation ================") - eval(args, eval_graph, tag="test", cur_step=step, epoch=epoch) + if args.num_test_samples > 0: + if rank == 0: + print("================ Test Evaluation ================") + eval(args, eval_graph, tag="test", cur_step=step, epoch=epoch) def _np_to_global(np_array): @@ -596,18 +595,19 @@ def _np_to_global(np_array): return t.to_global(placement=flow.env.all_device_placement("cpu"), sbp=flow.sbp.split(0)) -def batch_to_global(np_label, np_features, is_train=True): +def batch_to_global(np_label, np_dense, np_sparse, is_train=True): labels = _np_to_global(np_label.reshape(-1, 1)) if is_train else np_label.reshape(-1, 1) - features = _np_to_global(np_features) - return labels, features + dense_fields = _np_to_global(np_dense) + sparse_fields = _np_to_global(np_sparse) + return labels, dense_fields, sparse_fields def prefetch_eval_batches(data_dir, batch_size, num_batches): cached_eval_batches = [] with make_criteo_dataloader(data_dir, batch_size, shuffle=False) as loader: for _ in range(num_batches): - label, features = batch_to_global(*next(loader), is_train=False) - cached_eval_batches.append((label, features)) + label, dense_fields, sparse_fields = batch_to_global(*next(loader), is_train=False) + cached_eval_batches.append((label, dense_fields, sparse_fields)) return cached_eval_batches @@ -627,14 +627,14 @@ def eval(args, eval_graph, tag="val", cur_step=0, epoch=0, cached_eval_batches=N with make_criteo_dataloader(f"{args.data_dir}/{tag}", batch_size, shuffle=False) as loader: eval_start_time = time.time() for i in range(batches_per_epoch): - label, features = batch_to_global(*next(loader), is_train=False) - pred = eval_graph(features) + label, dense_fields, sparse_fields = batch_to_global(*next(loader), is_train=False) + pred = eval_graph(dense_fields, sparse_fields) labels.append(label) preds.append(pred.to_local()) else: for i in range(batches_per_epoch): - label, features = cached_eval_batches[i] - pred = eval_graph(features) + label, dense_fields, sparse_fields = cached_eval_batches[i] + pred = eval_graph(dense_fields, sparse_fields) labels.append(label) preds.append(pred.to_local()) diff --git a/RecommenderSystems/deepfm/deepfm_train_eval.py b/RecommenderSystems/deepfm/deepfm_train_eval.py index 5c396accb..ce8564b16 100644 --- a/RecommenderSystems/deepfm/deepfm_train_eval.py +++ b/RecommenderSystems/deepfm/deepfm_train_eval.py @@ -44,11 +44,11 @@ def str_list(x): help="save model after each eval or not", ) - parser.add_argument("--embedding_vec_size", type=int, default=16, help="embedding vector size") + parser.add_argument("--feature_vec_size", type=int, default=10, help="embedding vector size") parser.add_argument( - "--dnn", type=int_list, default="1000,1000,1000,1000,1000", help="dnn hidden units number" + "--dnn", type=int_list, default="400,400,400", help="dnn hidden units number" ) - parser.add_argument("--net_dropout", type=float, default=0.2, help="net dropout rate") + parser.add_argument("--net_dropout", type=float, default=0.5, help="net dropout rate") parser.add_argument("--disable_fusedmlp", action="store_true", help="disable fused MLP or not") parser.add_argument("--lr_factor", type=float, default=0.1) @@ -56,12 +56,12 @@ def str_list(x): parser.add_argument("--learning_rate", type=float, default=0.001, help="learning rate") parser.add_argument( - "--batch_size", type=int, default=10000, help="training/evaluation batch size" + "--batch_size", type=int, default=16384, help="training/evaluation batch size" ) parser.add_argument( "--train_batches", type=int, default=75000, help="the maximum number of training batches" ) - parser.add_argument("--loss_print_interval", type=int, default=100, help="") + parser.add_argument("--loss_print_interval", type=int, default=1000, help="") parser.add_argument( "--patience", @@ -142,7 +142,7 @@ def __init__( batch_size, num_epochs=1, shuffle_row_groups=True, - shard_seed=2019, + shard_seed=2022, shard_count=1, cur_shard=0, ): @@ -154,16 +154,17 @@ def __init__( self.shard_count = shard_count self.cur_shard = cur_shard - fields = ["Label"] + fields = ["label"] fields += [f"I{i+1}" for i in range(num_dense_fields)] + self.I_end = len(fields) fields += [f"C{i+1}" for i in range(num_sparse_fields)] + self.C_end = len(fields) self.fields = fields - self.num_fields = len(fields) def __enter__(self): self.reader = make_batch_reader( self.parquet_file_url_list, - workers_count=2, + workers_count=1, shuffle_row_groups=self.shuffle_row_groups, num_epochs=self.num_epochs, shard_seed=self.shard_seed, @@ -180,9 +181,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback): def get_batches(self, reader, batch_size=None): if batch_size is None: batch_size = self.batch_size - tail = None - for rg in reader: rgdict = rg._asdict() rglist = [rgdict[field] for field in self.fields] @@ -192,24 +191,27 @@ def get_batches(self, reader, batch_size=None): tail = list( [ np.concatenate((tail[i], rglist[i][0 : (batch_size - len(tail[i]))])) - for i in range(self.num_fields) + for i in range(self.C_end) ] ) if len(tail[0]) == batch_size: label = tail[0] - features = tail[1 : self.num_fields] + dense = tail[1 : self.I_end] + sparse = tail[self.I_end : self.C_end] tail = None - yield label, np.stack(features, axis=-1) + yield label, np.stack(dense, axis=-1), np.stack(sparse, axis=-1) else: pos = 0 continue while (pos + batch_size) <= len(rglist[0]): label = rglist[0][pos : pos + batch_size] - features = [rglist[j][pos : pos + batch_size] for j in range(1, self.num_fields)] + dense = [rglist[j][pos : pos + batch_size] for j in range(1, self.I_end)] + sparse = [rglist[j][pos : pos + batch_size] for j in range(self.I_end, self.C_end)] pos += batch_size - yield label, np.stack(features, axis=-1) + yield label, np.stack(dense, axis=-1), np.stack(sparse, axis=-1) if pos != len(rglist[0]): - tail = [rglist[i][pos:] for i in range(self.num_fields)] + tail = [rglist[i][pos:] for i in range(self.C_end)] + def make_criteo_dataloader(data_path, batch_size, shuffle=True): @@ -349,7 +351,7 @@ def interaction(embedded_x: flow.Tensor) -> flow.Tensor: class DeepFMModule(nn.Module): def __init__( self, - embedding_vec_size=128, + feature_vec_size=128, dnn=[1024, 1024, 512, 256], use_fusedmlp=True, persistent_path=None, @@ -359,12 +361,13 @@ def __init__( dropout=0.2, ): super(DeepFMModule, self).__init__() + self.dense_weight = nn.Parameter(flow.Tensor(1, num_dense_fields, feature_vec_size + 1)) - self.embedding_vec_size = embedding_vec_size + self.feature_vec_size = feature_vec_size self.embedding_layer = OneEmbedding( table_name="sparse_embedding", - embedding_vec_size=[embedding_vec_size, 1], + embedding_vec_size=[feature_vec_size, 1], persistent_path=persistent_path, table_size_array=table_size_array, store_type=one_embedding_store_type, @@ -373,7 +376,7 @@ def __init__( ) self.dnn_layer = DNN( - in_features=embedding_vec_size * (num_dense_fields + num_sparse_fields), + in_features=feature_vec_size * (num_dense_fields + num_sparse_fields), hidden_units=dnn, out_features=1, skip_final_activation=True, @@ -381,25 +384,31 @@ def __init__( fused=use_fusedmlp, ) - def forward(self, inputs) -> flow.Tensor: - multi_embedded_x = self.embedding_layer(inputs) - embedded_x = multi_embedded_x[:, :, 0 : self.embedding_vec_size] - lr_embedded_x = multi_embedded_x[:, :, -1] + def forward(self, dense_fields, sparse_fields) -> flow.Tensor: + dense_features = dense_fields.unsqueeze(-1) * self.dense_weight + dense_embedding = dense_features[:, :, 0 : self.feature_vec_size] + dense_embedding_lr = dense_features[:, :, -1] + sparse_features = self.embedding_layer(sparse_fields) + sparse_embedding = sparse_features[:, :, 0 : self.feature_vec_size] + sparse_embedding_lr = sparse_features[:, :, -1].squeeze() + + lr_input = flow.concat([sparse_embedding_lr, dense_embedding_lr], dim=1) + fm_dnn_input = flow.concat([sparse_embedding, dense_embedding], dim=1) # FM - lr_out = flow.sum(lr_embedded_x, dim=1, keepdim=True) - dot_sum = interaction(embedded_x) + lr_out = flow.sum(lr_input, dim=1, keepdim=True) + dot_sum = interaction(fm_dnn_input) fm_pred = lr_out + dot_sum # DNN - dnn_pred = self.dnn_layer(embedded_x.flatten(start_dim=1)) + dnn_pred = self.dnn_layer(fm_dnn_input.flatten(start_dim=1)) return fm_pred + dnn_pred def make_deepfm_module(args): model = DeepFMModule( - embedding_vec_size=args.embedding_vec_size, + feature_vec_size=args.feature_vec_size, dnn=args.dnn, use_fusedmlp=not args.disable_fusedmlp, persistent_path=args.persistent_path, @@ -418,8 +427,8 @@ def __init__(self, deepfm_module, amp=False): if amp: self.config.enable_amp(True) - def build(self, features): - predicts = self.module(features.to("cuda")) + def build(self, dense_fields, sparse_fields): + predicts = self.module(dense_fields.to("cuda"), sparse_fields.to("cuda")) return predicts.sigmoid() @@ -438,8 +447,8 @@ def __init__( self.config.enable_amp(True) self.set_grad_scaler(grad_scaler) - def build(self, labels, features): - logits = self.module(features.to("cuda")) + def build(self, labels, dense_fields, sparse_fields): + logits = self.module(dense_fields.to("cuda"), sparse_fields.to("cuda")) loss = self.loss(logits, labels.to("cuda")) loss.backward() return loss.to("cpu") @@ -549,8 +558,8 @@ def save_model(subdir): with make_criteo_dataloader(f"{args.data_dir}/train", args.batch_size) as loader: step, last_step, last_time = -1, 0, time.time() for step in range(1, args.train_batches + 1): - labels, features = batch_to_global(*next(loader)) - loss = train_graph(labels, features) + label, dense_fields, sparse_fields = batch_to_global(*next(loader)) + loss = train_graph(label, dense_fields, sparse_fields) if step % args.loss_print_interval == 0: loss = loss.numpy() if rank == 0: @@ -600,9 +609,10 @@ def save_model(subdir): if args.save_best_model: load_model(f"{args.model_save_dir}/best_checkpoint") - if rank == 0: - print("================ Test Evaluation ================") - eval(args, eval_graph, tag="test", cur_step=step, epoch=epoch) + if args.num_test_samples > 0: + if rank == 0: + print("================ Test Evaluation ================") + eval(args, eval_graph, tag="test", cur_step=step, epoch=epoch) def np_to_global(np): @@ -610,18 +620,19 @@ def np_to_global(np): return t.to_global(placement=flow.env.all_device_placement("cpu"), sbp=flow.sbp.split(0)) -def batch_to_global(np_label, np_features, is_train=True): +def batch_to_global(np_label, np_dense, np_sparse, is_train=True): labels = np_to_global(np_label.reshape(-1, 1)) if is_train else np_label.reshape(-1, 1) - features = np_to_global(np_features) - return labels, features + dense_fields = np_to_global(np_dense) + sparse_fields = np_to_global(np_sparse) + return labels, dense_fields, sparse_fields def prefetch_eval_batches(data_dir, batch_size, num_batches): cached_eval_batches = [] with make_criteo_dataloader(data_dir, batch_size, shuffle=False) as loader: for _ in range(num_batches): - label, features = batch_to_global(*next(loader), is_train=False) - cached_eval_batches.append((label, features)) + label, dense_fields, sparse_fields = batch_to_global(*next(loader), is_train=False) + cached_eval_batches.append((label, dense_fields, sparse_fields)) return cached_eval_batches @@ -641,14 +652,14 @@ def eval(args, eval_graph, tag="val", cur_step=0, epoch=0, cached_eval_batches=N ) as loader: eval_start_time = time.time() for i in range(batches_per_epoch): - label, features = batch_to_global(*next(loader), is_train=False) - pred = eval_graph(features) + label, dense_fields, sparse_fields = batch_to_global(*next(loader), is_train=False) + pred = eval_graph(dense_fields, sparse_fields) labels.append(label) preds.append(pred.to_local()) else: for i in range(batches_per_epoch): - label, features = cached_eval_batches[i] - pred = eval_graph(features) + label, dense_fields, sparse_fields = cached_eval_batches[i] + pred = eval_graph(dense_fields, sparse_fields) labels.append(label) preds.append(pred.to_local()) diff --git a/RecommenderSystems/deepfm/train_deepfm.sh b/RecommenderSystems/deepfm/train_deepfm.sh old mode 100644 new mode 100755 index c4b32d107..b5d19d4e2 --- a/RecommenderSystems/deepfm/train_deepfm.sh +++ b/RecommenderSystems/deepfm/train_deepfm.sh @@ -1,8 +1,16 @@ #!/bin/bash DEVICE_NUM_PER_NODE=1 +export ONEFLOW_EAGER_LOCAL_TO_GLOBAL_BALANCED_OVERRIDE=1 + DATA_DIR=/path/to/deepfm_parquet PERSISTENT_PATH=/path/to/persistent -MODEL_SAVE_DIR=/path/to/model/save/dir +rm -rf $PERSISTENT_PATH/* + +table_size_array="39884406,39043,17289,7420,20263,3,7120,1543,63,38532951,2953546,403346,10,2208,11938,155,4,976,14,39979771,25641295,39664984,585935,12972,108,36" +num_train_samples=4195197692 +num_eval_examples=89137319 +batch_size=$(( 6912 * DEVICE_NUM_PER_NODE )) +train_batches=$(( num_train_samples / batch_size + 1 )) python3 -m oneflow.distributed.launch \ --nproc_per_node $DEVICE_NUM_PER_NODE \ @@ -12,18 +20,15 @@ python3 -m oneflow.distributed.launch \ deepfm_train_eval.py \ --data_dir $DATA_DIR \ --persistent_path $PERSISTENT_PATH \ - --table_size_array "649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376,1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572" \ - --store_type 'cached_host_mem' \ - --cache_memory_budget_mb 1024 \ - --batch_size 10000 \ - --train_batches 75000 \ - --loss_print_interval 100 \ - --dnn "1000,1000,1000,1000,1000" \ - --net_dropout 0.2 \ - --learning_rate 0.001 \ - --embedding_vec_size 16 \ - --num_train_samples 36672493 \ - --num_val_samples 4584062 \ - --num_test_samples 4584062 \ - --model_save_dir $MODEL_SAVE_DIR \ - --save_best_model + --table_size_array $table_size_array \ + --store_type cached_host_mem \ + --cache_memory_budget_mb 2048 \ + --loss_print_interval 1000 \ + --net_dropout 0.5 \ + --feature_vec_size 10 \ + --train_batches $train_batches \ + --batch_size $batch_size \ + --num_train_samples $num_train_samples \ + --num_val_samples $num_eval_examples \ + --num_test_samples 0 \ + --amp