Skip to content

Commit

Permalink
Use meituan hdfs to read or write train data and model (#1088)
Browse files Browse the repository at this point in the history
* add meituan hdfs IO feature

* remove loader.env.CLASSPATH, it's loaded from host now.

* Pull code script adapt Meituan viewfs hdfs

---------

Co-authored-by: huangxu17 <[email protected]>
Co-authored-by: gejielun <[email protected]>
  • Loading branch information
3 people authored Mar 20, 2024
1 parent 561e259 commit c3ea95d
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 17 deletions.
2 changes: 1 addition & 1 deletion deploy/scripts/env_to_args.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ normalize_env_to_args() {
pull_code() {
cwd=$PWD
cd $2
if [[ $1 == "hdfs://"* ]]; then
if [[ $1 == "hdfs://"* || $1 == "viewfs://"* ]]; then
${HADOOP_HOME}/bin/hadoop fs -copyToLocal $1 pulled_file
elif [[ $1 == "http://"* || $1 == "https://"* ]]; then
wget $1 -O pulled_file
Expand Down
9 changes: 8 additions & 1 deletion deploy/scripts/hdfs_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ then
export HADOOP_CONF_DIR=$HADOOP_HOME/conf
source "$HADOOP_HOME/conf/hadoop-env.sh" &> /dev/null
else
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
source "$HADOOP_HOME/etc/hadoop/hadoop-env.sh" &> /dev/null
fi

if [[ "$USING_MT_HADOOP" == "True" ]]; then
echo "mt kerberos auth"
python3 /app/deploy/scripts/mt_auth_util.py ${HADOOP_USER_NAME}
export KRB5CCNAME=/tmp/krb5cc_0
echo $KRB5CCNAME
fi
export LD_LIBRARY_PATH=${HADOOP_HOME}/lib/native:${HADOOP_HOME}/lib/native/nfs:${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}
export CLASSPATH=.:$CLASSPATH:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$($HADOOP_HOME/bin/hadoop classpath --glob)
else
Expand Down
5 changes: 4 additions & 1 deletion deploy/scripts/sgx/run_trainer_master_sgx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
set -ex
source ~/.env
export CUDA_VISIBLE_DEVICES=
unset HTTPS_PROXY https_proxy http_proxy ftp_proxy
cp /app/sgx/gramine/CI-Examples/tensorflow_io.py ./
source /app/deploy/scripts/hdfs_common.sh || true
source /app/deploy/scripts/pre_start_hook.sh || true
Expand All @@ -39,6 +40,7 @@ local_data_source=$(normalize_env_to_args "--local-data-source" $LOCAL_DATA_SOUR
local_data_path=$(normalize_env_to_args "--local-data-path" $LOCAL_DATA_PATH)
local_start_date=$(normalize_env_to_args "--local-start-date" $LOCAL_START_DATE)
local_end_date=$(normalize_env_to_args "--local-end-date" $LOCAL_END_DATE)
using_mt_hadoop=$(normalize_env_to_args "--using_mt_hadoop" $USING_MT_HADOOP)

if [ -n "$CHECKPOINT_PATH" ]; then
checkpoint_path="--checkpoint-path=$CHECKPOINT_PATH"
Expand Down Expand Up @@ -128,4 +130,5 @@ taskset -c $START_CPU_SN-$END_CPU_SN stdbuf -o0 gramine-sgx python /gramine/$ROL
$summary_save_steps $summary_save_secs \
$local_data_source $local_data_path $local_start_date \
$local_end_date $epoch_num $start_date $end_date \
$shuffle $shuffle_in_day $extra_params $export_model
$shuffle $shuffle_in_day $extra_params $export_model \
$using_mt_hadoop
13 changes: 12 additions & 1 deletion deploy/scripts/sgx/run_trainer_worker_sgx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ batch_size=$(normalize_env_to_args "--batch-size" "$BATCH_SIZE")
learning_rate=$(normalize_env_to_args "--learning-rate" "$LEARNING_RATE")
extra_params=$(normalize_env_to_args "--extra-params" "$EXTRA_PARAMS")

using_embedding_protection=$(normalize_env_to_args "--using_embedding_protection" $USING_EMBEDDING_PROTECTION)
using_marvell_protection=$(normalize_env_to_args "--using_marvell_protection" $USING_MARVELL_PROTECTION)
discorloss_weight=$(normalize_env_to_args "--discorloss_weight" $DISCORLOSS_WEIGHT)
sumkl_threshold=$(normalize_env_to_args "--sumkl_threshold" $SUMKL_THRESHOLD)
using_emb_attack=$(normalize_env_to_args "--using_emb_attack" $USING_EMB_ATTACK)
using_norm_attack=$(normalize_env_to_args "--using_norm_attack" $USING_NORM_ATTACK)
using_mt_hadoop=$(normalize_env_to_args "--using_mt_hadoop" $USING_MT_HADOOP)


if [ -n "$CLUSTER_SPEC" ]; then
# get master address from clusteSpec["master"]
MASTER_HOST=`python -c "
Expand Down Expand Up @@ -112,4 +121,6 @@ taskset -c $START_CPU_SN-$END_CPU_SN stdbuf -o0 gramine-sgx python /gramine/$ROL
--peer-addr="$PEER_ADDR" \
--worker-rank="$INDEX" \
$server_port $mode $batch_size \
$sparse_estimator $learning_rate
$sparse_estimator $learning_rate \
$using_embedding_protection $using_marvell_protection $discorloss_weight $sumkl_threshold $using_emb_attack $using_norm_attack \
$using_mt_hadoop
186 changes: 186 additions & 0 deletions example/wide_n_deep/leader_mask_loss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2020 The FedLearner Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# coding: utf-8
# pylint: disable=no-else-return, inconsistent-return-statements

import tensorflow.compat.v1 as tf
import fedlearner.trainer as flt

ROLE = 'leader'

parser = flt.trainer_worker.create_argument_parser()
parser.add_argument('--batch-size', type=int, default=32,
help='Training batch size.')
args = parser.parse_args()


def input_fn(bridge, trainer_master=None):
dataset = flt.data.DataBlockLoader(
args.batch_size, ROLE, bridge, trainer_master).make_dataset()

def parse_fn(example):
feature_map = {"x_{0}".format(i): tf.VarLenFeature(
tf.int64) for i in range(512)}
feature_map["example_id"] = tf.FixedLenFeature([], tf.string)
feature_map["y"] = tf.FixedLenFeature([], tf.int64)
feature_map["loss_mask"] = tf.FixedLenFeature([], tf.int64)
features = tf.parse_example(example, features=feature_map)
return features, dict(y=features.pop('y'))

dataset = dataset.map(map_func=parse_fn,
num_parallel_calls=tf.data.experimental.AUTOTUNE)

return dataset


def serving_input_receiver_fn():
feature_map = {"x_{0}".format(i): tf.VarLenFeature(
tf.int64) for i in range(512)}
feature_map["example_id"] = tf.FixedLenFeature([], tf.string)

record_batch = tf.placeholder(dtype=tf.string, name='examples')
features = tf.parse_example(record_batch, features=feature_map)
features['act1_f'] = tf.placeholder(dtype=tf.float32, name='act1_f')
receiver_tensors = {
'examples': record_batch,
'act1_f': features['act1_f']
}
return tf.estimator.export.ServingInputReceiver(
features, receiver_tensors)


def model_fn(model, features, labels, mode):
"""Model Builder of wide&deep learning models
Args:
Returns
"""
global_step = tf.train.get_or_create_global_step()

# 新增一个特征,用于决定是否将loss设置为0
loss_mask_feature = features.get('loss_mask', None)

x = dict()
for i in range(512):
x_name = "x_{}".format(i)
x[x_name] = features[x_name]


num_slot = 512
fid_size, embed_size = 101, 16
embeddings = [
tf.get_variable(
'slot_emb{0}'.format(i), shape=[fid_size, embed_size],
dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))
for i in range(num_slot)]
embed_output = tf.concat(
[
tf.nn.embedding_lookup_sparse(
embeddings[i], x['x_{}'.format(i)], sp_weights=None,
combiner='mean')
for i in range(512)],
axis=1)

output_size = num_slot * embed_size
fc1_size, fc2_size = 256, 64
w1l = tf.get_variable(
'w1l', shape=[output_size, fc1_size], dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))
b1l = tf.get_variable(
'b1l', shape=[fc1_size], dtype=tf.float32,
initializer=tf.zeros_initializer())
w2 = tf.get_variable(
'w2', shape=[fc1_size, fc2_size], dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))
b2 = tf.get_variable(
'b2', shape=[fc2_size], dtype=tf.float32,
initializer=tf.zeros_initializer())
w3 = tf.get_variable(
'w3', shape=[fc2_size*2, 2], dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01))

act1_l = tf.nn.relu(tf.nn.bias_add(tf.matmul(embed_output, w1l), b1l))
act2_l = tf.nn.bias_add(tf.matmul(act1_l, w2), b2)

if mode == tf.estimator.ModeKeys.TRAIN:
act1_f = model.recv('act1_f', tf.float32, require_grad=True)
elif mode == tf.estimator.ModeKeys.EVAL:
act1_f = model.recv('act1_f', tf.float32, require_grad=False)
else:
act1_f = features['act1_f']

output = tf.concat([act2_l, act1_f], axis=1)
logits = tf.matmul(output, w3)


if mode == tf.estimator.ModeKeys.PREDICT:
return model.make_spec(mode, predictions=logits)

y = labels['y']
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
labels=y, logits=logits)
# 打印原始损失
loss_print_op = tf.print("Original loss:", loss)

# 判断loss_mask_feature是否存在,以及其值是否为1,如果为1,则将loss设置为0
if loss_mask_feature is not None:
# 将loss_mask_feature的形状转换为与loss相同,以便进行元素乘法
loss_mask = tf.cast(tf.reshape(loss_mask_feature, [-1]), tf.float32)
# 打印loss_mask
loss_mask_print_op = tf.print("Loss mask:", loss_mask)

# 应用mask,如果loss_mask中的值为0,则相应的loss保持不变;如果为1,则将相应的loss设置为0
loss = loss * (1 - loss_mask)
else:
loss_mask_print_op = tf.no_op()

# 打印应用loss_mask后的损失
loss_after_mask_print_op = tf.print("Loss after applying mask:", loss)
# 确保在计算平均损失之前打印出所有的信息
with tf.control_dependencies([loss_print_op, loss_mask_print_op, loss_after_mask_print_op]):
loss = tf.math.reduce_mean(loss)


if mode == tf.estimator.ModeKeys.EVAL:
auc_pair = tf.metrics.auc(y, logits[:, 1])
return model.make_spec(
mode, loss=loss, eval_metric_ops={'auc': auc_pair})

# mode == tf.estimator.ModeKeys.TRAIN:
logging_hook = tf.train.LoggingTensorHook(
{"loss" : loss}, every_n_iter=10)
metric_hook = flt.GlobalStepMetricTensorHook(tensor_dict={"loss": loss},
every_steps=10)
optimizer = tf.train.GradientDescentOptimizer(0.1)
train_op = model.minimize(optimizer, loss, global_step=global_step)
return model.make_spec(mode, loss=loss, train_op=train_op,
training_hooks=[logging_hook, metric_hook])

class ExportModelHook(flt.trainer_worker.ExportModelHook):
def after_save(self, sess, model, export_dir, inputs, outputs):
print("**************export model hook**************")
print("sess :", sess)
print("model: ", model)
print("export_dir: ", export_dir)
print("inputs: ", inputs)
print("outpus: ", outputs)
print("*********************************************")


if __name__ == '__main__':
flt.trainer_worker.train(
ROLE, args, input_fn,
model_fn, serving_input_receiver_fn,
export_model_hook=ExportModelHook())
2 changes: 2 additions & 0 deletions example/wide_n_deep/make_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
Feature(bytes_list=BytesList(value=[str(idx).encode()]))
features_l['y'] = \
Feature(int64_list=Int64List(value=[random.randint(0, 1)]))
features_l['loss_mask'] = \
Feature(int64_list=Int64List(value=[random.randint(0, 1)]))
for k in range(512):
features_l['x_{0}'.format(k)] = \
Feature(int64_list=Int64List(value=[random.randint(0, 100)]))
Expand Down
12 changes: 9 additions & 3 deletions fedlearner-sgx-dev.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ ENV FEDLEARNER_PATH=/fedlearner

RUN apt-get install -y libmysqlclient-dev

# Build gRPC
# Build gRPC
COPY sgx/grpc/common ${GRPC_PATH}
COPY sgx/grpc/v1.38.1 ${GRPC_PATH}

Expand All @@ -144,8 +144,14 @@ RUN cd ${TF_BUILD_PATH} \
# Build and install fedlearner
COPY . ${FEDLEARNER_PATH}

# For meituan hadoop
RUN if [ -f ${FEDLEARNER_PATH}/docker/hadoop-mt-2.7.0.tar.gz ]; then mkdir -p /opt/meituan/ && tar -xzf ${FEDLEARNER_PATH}/docker/hadoop-mt-2.7.0.tar.gz -C /opt/meituan/; fi

# For meituan hadoop auth
RUN apt-get install -y libkrb5-dev openjdk-8-jdk

RUN pip3 install --upgrade pip \
&& pip3 install -r ${FEDLEARNER_PATH}/requirements.txt
&& pip3 install -r ${FEDLEARNER_PATH}/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/

RUN cd ${FEDLEARNER_PATH} \
&& make protobuf \
Expand Down Expand Up @@ -188,7 +194,7 @@ WORKDIR ${WORK_SPACE_PATH}

EXPOSE 6006 50051 50052

RUN bash -x /app/deploy/scripts/sgx/get_token.sh
RUN bash -x /fedlearner/deploy/scripts/sgx/get_token.sh

RUN chmod +x /root/entrypoint.sh
# ENTRYPOINT ["/root/entrypoint.sh"]
Loading

0 comments on commit c3ea95d

Please sign in to comment.