This document describes the design of a distributed parameter server for ElasticDL.
Parameter server (PS) stores model parameters which are used by workers. Workers get model parameters from PS, compute gradients using different training data and send computed gradients to PS. PS iteratively updates these model parameters using gradients sent by workers. A PS based distributed training system can use an arbitrary number of workers to support the scalability of training data size.
We want to have one or more PS instances in each ElasticDL job. One reason is that models could be large and overrun the memory space of a single PS instance. In such case, we need to partition the model and store different partitions in different PS instances. Even if the model is not too big and fits in the memory of a single PS instance, we might still want to partition the model, as this distributes the model parameter communication from workers among PS instances. This also distributes the computation on PS such as parameter optimization.
ElasticDL is a Kubernetes-native fault-tolerable deep learning system. An ElasticDL distributed PS consists of multiple PS pods, with each PS pod as a PS instance. A failed PS pod will interrupt the training. We can relaunch any failed PS pod and recover the corresponding model parameter partition to support PS fault tolerance.
In the following sections, we will explain the design of the ElasticDL distributed PS with fault-tolerance in detail, including how to partition the model, store model parameters, access parameters from workers, initialize parameters, update parameters from gradients and support PS fault-tolerance.
For a distributed PS with N PS pods, each PS pod stores a model parameter partition. It is noticeable that Kubernetes may preempt some PS pods. In such a case, N might not be a constant. However, we can overcome this case by setting PS pods having higher priority than worker pods in a job. In case a PS pod is preempted, the master will relaunch it using Kubernetes APIs. Since PS pods have a higher priority than worker pods, if there are still some worker pods running, the relaunch will succeed by using either idle or preempted Kubernetes resources. If no worker pods left, ElasticDL has to wait for Kubernetes resources to continue the training. Thus, we can assume that N is a constant number in ElasticDL.
We consider two kinds of model parameters:
- non-embedding parameters, and
- embedding tables
Theoretically, non-embedding parameters might have tremendous size and require partitioning. However, in practice, limited by the amount of GPU memory, researchers don't often define models depending on huge dense tensor parameters. Hence in this design, we don't partition non-embedding parameters; instead, we place each of them on a PS pod.
For a non-embedding parameter, we select its PS pod PSᵢ using a hashing function hash and the parameter name p_name:
i = hash(p_name) % N
Each embedding layer has an embedding table which maps a discrete ID i to an embedding vector vᵢ. Embedding tables could be huge, especially in some recommending and ranking models. Thus, we partition each embedding table and store every partition in an unique PS pod. For an embedding vector vᵢ, we select the (i mod N)-th parameter server to store it.
Each PS node has a dictionary-based data structure to store its partition of model parameters.
We choose to store each non-embedding parameter using the parameter name as its
key, and a
tf.Variable
instance as its value. This is because that we want to update non-embedding
parameters directly by TensorFlow
optimizers.
If a model has one or more embedding layers, a minibatch of training data contains a set of discrete IDs. These discrete IDs correspond to a set of embedding vectors. The worker needs to pull these embedding vectors from their corresponding PS pods using the embedding layer name and the discrete IDs. To store an embedding vector, We use its corresponding embedding layer name and discrete ID as the key, and a 1-D numpy.ndarry as the value.
Each PS pod has a RPC servicer to provide RPC services. Workers use RPC
services to pull model parameters. pull_variable
service is to pull all
non-embedding parameters. pull_embedding_vector
service is to pull embedding
vectors specified by an embedding layer name and a list of discrete IDs.
service PServer{
rpc pull_variable(PullModelRequest) returns (PullModelResponse);
rpc pull_embedding_vector(PullEmbeddingVectorRequest) returns (Tensor);
}
We use lazy initialization for model parameters in PS. PS does not have the model definition. Even if PS has the model definition, it cannot initialize Keras subclass model parameters, as only a forward-pass with a minibatch of data can initialize the parameters. Thus workers are responsible for initializing parameters and push the initialized parameters to corresponding PS pods.
Each PS pod has a parameter initialization status, which is False
after the
PS pod launch. When a worker tries to get non-embedding parameters from the PS
pod through a RPC call pull_variable
, the PS pod tells the worker that the
parameter initialization status is False
in response. If the worker has
already initialized non-embedding parameters, it sends non-embedding parameter
values to the PS pod by a gRPC call push_model
. push_model
is a RPC service
in the PS pod.
service PServer{
rpc push_model(Model) returns (google.protobuf.Empty);
}
If worker has not initialized non-embedding parameters, since the worker has
the model definition and some training data, it can run a forward-pass to
initialize them first before push_model
.
When the PS pod receives non-embedding parameters in its first RPC service for
push_model
, it initializes non-embedding parameters and sets the parameter
initialization status as True
.
For an embedding vector, the corresponding PS pod will initialize it in the
first pull_embedding_vector
service that contains this embedding vector. The
PS pod needs the embedding vector size and the initialization method for the
initialization. The embedding vector size and the initialization method are in
the model definition and workers can send them in push_model
to PS pods
together with non-embedding parameter values.
Thus, we introduce two data structures: Parameters
and EmbeddingTable
.
Parameters
stores both embedding parameters and non-embedding parameters.
EmbeddingTable
stores embedding vectors, which is a subset of an embedding
table.
class Parameters(object):
def __init__(self):
# Parameter initialization status
self.parameter_init_status = False
# Non-embedding parameter dict, maps parameter name to tf.Variable instance
self.non_embedding_params = {}
# Embedding table dict, maps embedding layer name to `EmbeddingTable` instance
self.embedding_params = {}
class EmbeddingTable(object):
def __init__(self, dim, initializer):
# Embedding vector dict, maps ID to 1-D numpy.ndarray
self._embedding_vectors = {}
# the dimension of embedding vectors
self._dim = dim
# the initializer name for initializing embedding vectors
self._initializer = initializer
A worker computes gradients in each training iteration, which contain gradients
for non-embedding parameters and some embedding vectors if applicable. The
worker partitions these gradients using their corresponding parameter names or
discrete IDs for embedding vectors. Then the worker sends gradient partitions
to their corresponding PS pods by RPC calls push_gradient
.
service PServer{
rpc push_gradient(PushGradientRequest) returns (PushGradientResponse);
}
When a PS pod receives gradients in push_gradient
, it uses a TensorFlow
optimizer to apply gradients to non-embedding parameters.
We have already implemented an
OptimizeWrapper
to sparsely
update embedding vectors. OptimizeWrapper
uses corresponding embedding vectors
to form a temporary variable, applies gradients to this temporary variable, and
writes results back to these embedding vectors. The PS pod can use this
OptimizeWrapper directly to update embedding vectors.
In asynchronous SGD, the PS pod can apply gradients directly to model
parameters once it receives gradients. For synchronous SGD, the PS pod
accumulates grads_to_wait
gradients from workers then updates model
parameters with these gradients. grads_to_wait
is an ElasticDL argument
specified by the user.
When the master detects that a PS pod fails, it will relaunch it using Kurbernetes APIs to keep the number of PS pods N constant. After the relaunch, the PS pod recovers its partition of model parameters so that ElasticDL can continue the training job.
Each PS pod provides RPC services for workers. Workers are using RPC stubs to send RPC service requests to PS pods. RPC stubs require PS pod domains. Because ElasticDL is Kubernetes-native, the master can use Kubernetes services to launch/relaunch PS pods with fixed domain names. The master sends these domain names to workers as arguments when launching worker pods. In such way, workers do not need to re-configure RPC stubs after a PS pod relaunch.
The model may contain one or more embedding layers with embedding tables as their parameters. If so, a minibatch of training data in a worker contains some embedding IDs, which correspond to a subset of embedding tables. The worker pulls all non-embedding parameters and only a subset of embedding tables from PS pods in the training. Thus, the PS pod can recover non-embedding parameters from workers but not embedding tables.
For non-embedding parameters, the PS pod can recover them from workers in the
same way as the parameter initialization by setting its parameter
initialization status as False
.
For embedding tables, PS creates replicas to support fault-tolerance. For each PS pod PSᵢ, it can store M replicas of its embedding table partitions in M PS pods indexed from (i+1) mod N to (i+M) mod N. The relaunched PS pod can recover embedding tables from one of its replicas.
Assume Eᵢ is the embedding table partition in PS pod PSᵢ, it has M replicas stored in PS pods indexed from (i + 1) % N to (i + M) % N. Also, PSᵢ stores M other PS pod replicas covering Eᵢs where i ranges from (i - M) mod N to (i - 1) mod N.
PSᵢ maintains M updated embedding vector key sets UKSᵢ(j) for j ∈ [0, M). When PSᵢ sparsely updates its embedding table partition Eᵢ, it also adds the updated embedding vector keys into these M sets.
PSᵢ also periodically synchronize the replicas stored in it from PS pods indexed from (i - M) mod N to (i - 1) mod N. The synchronization frequency can be several seconds.
PSᵢ uses M RPC calls SynchronizeEmbedding
the replicas store in it.
replica_index
values in SynchronizeEmbeddingRequest
are from (i - M) % N
to (i - 1) % N.
When PSᵢ needs to recover its embedding vectors after relaunch, it chooses a
pod PSⱼ from indexed from (i + 1) mod N to (i + M) mod N which is still
alive. PSᵢ uses a RPC call GetReplica
to get its replica from PSⱼ.
Following diagram shows the RPC calls among PS pods for PS fault-tolerance:
Here, we set up 5 PS pods, and set embedding replica number M to 1. PS pod 2
has an embedding replica R1
of PS pod 1. It will periodically synchronize the
replica from PS pod 1. If PS pod 1 is dead, the master will relaunched it and
it needs to get the replica from PS pod 2 after relaunch.
message SynchronizeEmbeddingRequest {
int32 replica_index = 1;
}
message SynchronizeEmbeddingResponse {
repeated Tensor embedding_vectors = 1;
}
service PServer{
# RPC service for replica synchronization
rpc SynchronizeEmbedding(SynchronizeEmbeddingRequest) returns (SynchronizeEmbeddingResponse);
# RPC service for PS to recover embedding vectors after relaunch
rpc GetReplica(SynchronizeEmbeddingRequest) returns (SynchronizeEmbeddingResponse);
}
Each PS pod has a thread dedicated to the replica synchronization:
# T is the number of seconds for synchronization frequency
# Assume current PS is PS(i), self._stub[index] is the stub for PS((i - index) % N)'s gRPC server.
# self.replicas[index] is the replica for PS((i - index) % N).
req = elasticdl_pb2.SynchronizeEmbeddingRequest()
while still training:
time.sleep(T)
for index in range(M):
req.replica_index = index
updated_vectors = self._stub[replica_index].SynchronizeEmbedding(req)
update self.replicas[index] from updated_vectors.embedding_vectors
Following diagram shows the details inside a PS pod:
Following diagram shows the RPC calls between a worker pod and two PS pods:
Please note that there are many worker pods in an ElasticDL job, and each worker pod will have RPC connections with all the PS pods. Here we only show one worker pod for simplicity.
message Tensor {
enum DataType {
BOOL = 0;
INT16 = 1;
INT32 = 2;
INT64 = 3;
FP16 = 4;
FP32 = 5;
FP64 = 6;
}
string name = 1;
DataType data_type = 2;
repeated int64 dim = 3;
bytes content = 4;
repeated int64 indices = 5;
}
message EmbeddingTableInfo{
string name = 1;
repeated int64 dim = 2;
string initializer = 3;
}
message Model {
int64 version = 1;
repeated Tensor variables = 2;
repeated EmbeddingTableInfo embedding_table_info = 3;
}
message PullVariableRequest{
int64 version = 1;
}
message PullVariableResponse{
bool model_init_status = 1;
Model model = 2;
}
message PushGradientRequest{
int32 model_version = 1;
repeated Tensor gradients = 2;
}
message PushGradientResponse{
bool accepted = 1;
int32 model_version = 2;
}
message PullEmbeddingVectorRequest{
string name = 1;
repeated int64 ids = 2;
}
message SynchronizeEmbeddingRequest {
int32 replica_index = 1;
}
message SynchronizeEmbeddingResponse {
repeated Tensor embedding_vectors = 1;
}
service PServer{
# pull trainable tensorflow variables created by Keras layers
rpc pull_variable(PullVariableRequest) returns (PullVariableResponse);
# pull embedding vectors in ElasticDL embedding layers
# Do we need to create a new message `PullEmbeddingVectorRequest` rather than use `Tensor`?
rpc pull_embedding_vector(PullEmbeddingVectorRequest) returns (Tensor);
# push trainable tensorflow variables and meta info for ElasticDL embedding layers
rpc push_model(Model) returns (google.protobuf.Empty);
rpc push_gradient(PushGradientRequest) returns (PushGradientResponse);
# PS to recover embedding vectors after relaunch
rpc get_replica(SynchronizeEmbeddingRequest) returns (SynchronizeEmbeddingResponse);
# PS replica synchronization
rpc synchronize_embedding(SynchronizeEmbeddingRequest) returns (SynchronizeEmbeddingResponse);
}
class Tensor(object):
def __init__(self, name=None, value=None, indices=None):
self.name = name
self.value = value
self.indices = indices
@classmethod
def from_tensor_pb(cls, tensor_pb):
"""Create an ElasticDL Tensor object from tensor protocol buffer.
Return the created tensor object.
"""
pass
def to_tensor_pb(self):
pass
def to_tf_tensor(self):
pass
def to_ndarray(self):
pass
def serialize_to_pb(tensor, pb):
"""Serialize ElasticDL Tensor to tensor protocol buffer."""
pass
def deserialize_from_pb(pb, tensor):
"""Deserialize tensor protocol buffer to ElasticDL Tensor."""
pass
def tensor_pb_to_ndarray(tensor):
"""Deserialize tensor protocol buffer and return a numpy ndarray."""
pass
def tensor_pb_to_tf_tensor(tensor):
"""Deserialize tensor protocol buffer and return a TensorFlow tensor."""
pass
# In `Parameters`, interfaces `set_*_param` have two arguments, `value` and `name` (or `layer_name`).
# If `value` is a ElasticDL `Tensor` instance, `name` can be None.
# Otherwise `value` is a numpy ndarray, and `name` must be specified.
class Parameters(object):
def __init__(self):
# Parameter initialization status
self.parameter_init_status = False
# Non-embedding parameter dict, maps parameter name to tf.Variable instance
self.non_embedding_params = {}
# Embedding table dict, maps embedding layer name to `EmbeddingTable` instance
self.embedding_params = {}
@property
def non_embedding_params(self):
return self._non_embedding_params
def set_embedding_param(self, value, layer_name=None):
pass
def get_embedding_param(self, layer_name, ids):
return self._embedding_params.get(layer_name).get(ids)
def set_non_embedding_param(self, value, name=None):
pass
def init_non_embedding_param(self, value, name=None):
pass
def set_meta_info(self, layer_name, dim, initializer):
pass
class EmbeddingTable(object):
def __init__(self, dim, initializer):
# Embedding vector dict, maps ID to 1-D numpy.ndarray
self._embedding_vectors = {}
# the dimension of embedding vectors
self._dim = dim
# the initializer name for initializing embedding vectors
self._initializer = initializer
def get(self, ids):
values = []
for id in ids:
if id not self._embedding_vectors:
val = initialize_embedding_vector(self._dim, self._initializer)
else:
val = self._embedding_vectors.get(id)
values.append(val)
return np.concatenate(values).reshape(len(ids), -1)
def set(self, ids, values):
pass
Here is the pseudocode for a worker to pull variable from the PS. If the non-embedding variables are not initialized, the PS will tell the worker to initialize them and report to the PS.
class PServer(elasticdl_pb2_grpc.PServerServicer):
...
def pull_variable(self, request):
res = PullModelResponse()
if self._need_initialize_model:
res.model_init_status = True
return res
res.model_init_status = False
res.model = self._get_model() # get model in this PS instance
return res
def push_model(self, request):
model = request.model
... # initialize model in this PS instance
class Worker(object):
...
def pull_variable(self):
# for-loop should be implemented in multithread
for ps_index in range(self._ps_node_num):
req = PullModelRequest() # create request code keeps the same with current code
res = self._stub[ps_index].pull_variable() # pull variable from PS
if res.model_init_status:
# worker initializes its model here if needed
model = serialize_model_to_pb()
self._stub[ps_index].push_model(model) # get model in this worker
req = PullModelRequest() # create request code keeps the same with current code
res = self._stub[ps_index].pull_variable() # pull variable from PS
if res.model_init_status:
raise Error or try a pre-defined constant times
Here is the pseudocode for getting replica from specified PS pod and synching replicas:
# T is the number of seconds for synchronization frequency
# Assume current PS is PS(i), self._stub[index] is the stub for PS((i - index) % N)'s gRPC server.
# self.replicas[index] is the replica for PS((i - index) % N).
req = elasticdl_pb2.SynchronizeEmbeddingRequest()
while still training:
time.sleep(T)
for index in range(M):
req.replica_index = index
updated_vectors = self._stub[replica_index].SynchronizeEmbedding(req)
update self.replicas[index] from updated_vectors.embedding_vectors
def SynchronizeEmbedding(self, request, _):
synch_embeddings = elasticdl_pb2.SynchronizeEmbeddingResponse()
# self.UKS are the M updated embedding vector key sets in current PS
# self.embedding_vector are the embedding vectors in current PS
with self.lock():
assign synch_embeddings.embedding_vectors from self.embedding_vector
self.UKS.clear()
return synch_embeddings
def GetReplica(self, request, _):
replica = elasticdl_pb2.SynchronizeEmbeddingResponse()
assign replica.embedding_vectors from self.replicas[request.replica_index]
return replica