Skip to content

Commit caf8fcc

Browse files
authored
Merge branch 'develop' into fix_blance_except
2 parents 090cf45 + ed9fb22 commit caf8fcc

30 files changed

+1809
-830
lines changed

Diff for: .travis.yml

-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ matrix:
55
sudo: required
66
install:
77
- go get -u golang.org/x/lint/golint
8-
- curl https://glide.sh/get | bash
98
- sudo apt-get remove clang-format
109
- sudo apt-get install -y clang-format-3.8
1110
- sudo pip install --upgrade pip
@@ -14,4 +13,3 @@ matrix:
1413
- rm -f .copyright.hook
1514
- bash -x .tools/check_style.sh
1615
- ln -s $GOPATH/src/github.com/elasticdeeplearning $GOPATH/src/github.com/elasticdeeplearning
17-
#- cd $GOPATH/src/github.com/elasticdeeplearning/edl && bash ./scripts/build.sh

Diff for: docker/requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ paddle-serving-client
1010
paddle-serving-server-gpu
1111
paddle-serving-app
1212
paddlepaddle-gpu==1.8.0.post107
13+
psutil

Diff for: python/edl/collective/launch.py

+144-97
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import traceback
3232

3333
from ..utils.edl_env import JobEnv
34-
from ..utils.cluster import Pod, PodStatus
35-
from ..utils.register import PodRankRegister, PodResourceRegister, ETCD_POD_RANK, ETCD_POD_RESOURCE
36-
from ..utils.watcher import Watcher, get_pod_leader, get_cluster
34+
from ..utils.pod import Pod
35+
from ..utils.register import PodResourceRegister
36+
from ..utils.leader_register import LeaderRegister
37+
from ..utils.etcd_db import get_global_etcd
38+
from ..utils.watcher import Watcher
3739
from ..utils.pod_server import PodServer
3840
from ..utils.utils import logger
3941
from ..utils import utils
@@ -123,146 +125,191 @@ def _convert_args_to_dict(args):
123125

124126

125127
def edl_barrier(job_env, pod, timeout):
126-
"""
127-
pod under resource barrier togather
128-
"""
129128
start = time.time()
130129

131-
leader = get_pod_leader()
132-
c = PodServerClient(leader.endpoint)
133-
# all pods barrier on leader
130+
log_time = time.time()
134131
while True:
135132
try:
136-
c.barrier(job_env.job_id, pod.get_id())
137-
break
133+
db = get_global_etcd()
134+
leader = db.get_pod_leader()
135+
if leader is None:
136+
raise EdlNotFoundLeader("can't get leader")
137+
138+
logger.debug("barrier on leader:{}".format(leader))
139+
140+
c = PodServerClient(leader.endpoint)
141+
cluster = c.barrier(job_env.job_id, pod.get_id())
142+
return cluster
138143
except Exception as e:
139-
logger.warning(
140-
"wait to barrier with all error:{} leader:[{}] current pod:[{}]".
141-
format(traceback.format_exc(), leader, pod))
142-
time.sleep(3)
144+
if time.time() - log_time > 30:
145+
logger.info("wait to barrier now!")
146+
log_time = time.time()
147+
logger.debug("barrier error:{} {}".format(e,
148+
traceback.format_exc()))
143149

144150
if time.time() - start > timeout:
145-
message = "can't barrier with all, leader:[{}] current pod:{}".format(
146-
leader, pod)
147-
raise EdlBarrierError(message)
148-
151+
message = "wait to barrier with all error:{} leader:[{}] current pod:[{}]".format(
152+
traceback.format_exc(), leader, pod)
153+
logger.fatal(message)
154+
#raise EdlBarrierError(message)
149155

150-
def check_pods_status():
151-
cluster = get_cluster()
152-
found = False
153-
for pod in cluster.pods:
154-
if pod.status == PodStatus.ERROR:
155-
found = True
156-
logger.warning("train in pod:{} meets error".format(pod))
156+
time.sleep(3)
157157

158-
if not found:
159-
logger.info("Congratulate! This job complete!")
158+
return None
160159

161160

162-
def launch(args):
161+
def prepare(args):
163162
args_dict = _convert_args_to_dict(args)
164163

165164
# job enviroment.
166165
job_env = JobEnv(args_dict)
167166
logger.info("get job env:{}".format(str(job_env)))
168167

169168
# get global etcd and lock
170-
get_global_etcd(job_env.etcd_endpoints, job_env.job_id)
169+
db = get_global_etcd(job_env.etcd_endpoints, job_env.job_id)
170+
171+
last_status = db.get_job_status()
172+
if last_status == Status.SUCCEED:
173+
logger.info("job:{} has completed! Need't try!".format(job_env.job_id))
174+
sys.exit(0)
171175

172176
# local pod, and the pod's id does't change.
173177
pod = Pod()
174178
pod.from_env(job_env)
175179

180+
# update pod status
181+
db.set_pod_status(pod.get_id(), Status.INITIAL)
182+
176183
# launch pod server
177184
pod_server = None
178-
pod_server = PodServer(pod.get_id())
185+
pod_server = PodServer(job_env, pod.get_id())
179186
pod_server.start(job_env, pod)
180187
logger.info("pod server started:[{}]".format(pod))
181188

182-
# register pod resource, they can't be stopped.
183-
resource_register = PodResourceRegister(job_env.etcd_endpoints,
184-
job_env.job_id, pod)
185-
186-
# regist and get rank, leader is in it.
187-
# and leader will change the stage to a unique string
188-
rank_register = PodRankRegister(job_env, pod)
189+
return job_env, pod, pod_server
189190

190-
# register rank and watch the rank
191-
# if the rank changed, the pods should restart the training proc.
192-
edl_barrier(job_env, pod, timeout=600)
193191

194-
#watcher exit when cluster changed
195-
watcher = Watcher(job_env.etcd_endpoints, job_env.job_id, pod)
196-
# watch after barrier
197-
watcher.watch()
192+
def job_exit(leader_register,
193+
resource_register,
194+
watcher,
195+
pod,
196+
trainer_flag,
197+
register_flag,
198+
barrier_flag,
199+
resource_flag,
200+
timeout=300):
201+
local_flag = trainer_flag & register_flag & barrier_flag
202+
db = get_global_etcd()
203+
db.set_pod_flag(pod.get_id(), local_flag)
198204

199-
status = False
205+
begin = time.time()
200206
while True:
201-
cluster = watcher.get_cluster()
202-
logger.info("get cluster:{}".format(cluster))
203-
procs = start_local_trainers(
204-
cluster,
205-
pod,
206-
args.training_script,
207-
args.training_script_args,
208-
log_dir=args.log_dir)
209-
210-
if watcher.is_changed():
211-
watcher.stop()
212-
# pod leader need not to change self.
213-
if self.is_self_rank_changed() \
214-
or not rank_register.is_leader() \
215-
or rank_register.is_stoped():
216-
rank_register.stop()
217-
rank_register = PodRankRegister(job_env, pod)
218-
219-
if rank_register.is_leader():
220-
rank_register.update_stage(pod)
221-
222-
logger.info("Cluster changed. New cluster:{}. Old Cluster:{}".
223-
format(cluster2, cluster))
207+
try:
208+
if leader_register.is_leader():
209+
if db.wait_resource(cluster, timeout=15):
210+
job_flag = trainer_flag & register_flag & barrier_flag & resource_flag
211+
db.set_job_flag(job_flag)
212+
logger.info("set job status:{} ok!".format(job_flag))
213+
break
214+
raise EdlWaitFollowersReleaseError("can't wait resource")
215+
else:
216+
break
217+
except Exception as e:
218+
logger.warning("prepare job_exit meets error:{}".format(e))
219+
if time.time() - begin >= timeout:
220+
logger.warning("wait resource error")
221+
break
224222

225-
terminate_local_procs(procs)
223+
time.sleep(3)
224+
continue
226225

227-
# wait all pods info diappeared from etcd
228-
# don't change this time,since the ttl is set to 10s in registers
229-
# FIXME(gongwb): any other method?
230-
time.sleep(15)
226+
leader_register.stop()
227+
watcher.stop()
228+
resource_register.stop()
231229

232-
# barrier agagin
233-
edl_barrier(job_env, pod, timeout=600)
234230

235-
# watcher agagin
236-
watcher = Watcher(job_env.etcd_endpoints, job_env.job_id, pod)
237-
watcher.watch()
238-
continue
231+
def launch(args):
232+
job_env, pod, pod_server = prepare(args)
233+
234+
# register pod resource to tell others:
235+
# this resource can use to train
236+
resource_register = PodResourceRegister(job_env, pod)
239237

240-
alive, status = watch_local_trainers(procs, pod.trainers_num)
241-
if not alive or not status:
238+
# seize the leader
239+
leader_register = LeaderRegister(job_env, pod.get_id())
240+
241+
# register rank and watch the rank
242+
# if the rank changed, the pods should restart the training proc.
243+
# pod exit if barrier error
244+
cluster = edl_barrier(job_env, pod, timeout=600)
245+
246+
# update pod status
247+
db = get_global_etcd()
248+
db.set_pod_status(pod.get_id(), Status.RUNNING)
249+
250+
# watcher after barrier
251+
watcher = Watcher(job_env, cluster, pod)
252+
253+
procs = start_local_trainers(
254+
cluster,
255+
pod,
256+
args.training_script,
257+
args.training_script_args,
258+
log_dir=args.log_dir)
259+
260+
trainer_flag = True
261+
register_flag = True
262+
barrier_flag = True
263+
while True:
264+
# check local status first
265+
alive, trainer_flag = watch_local_trainers(procs, pod.trainers_num)
266+
if not alive or not trainer_flag:
242267
break
243268

269+
if resource_register.is_stopped() or leader_register.is_stopped():
270+
terminate_local_procs()
271+
register_flag = False
272+
break
273+
274+
# check job status second
275+
if watcher.changed:
276+
new_cluster = edl_barrier(job_env, pod, timeout=60)
277+
if not new_cluster:
278+
barrier_flag = False
279+
break
280+
281+
terminate_local_procs(procs)
282+
283+
cluster = new_cluster
284+
watcher = Watcher(job_env, cluster, pod)
285+
286+
procs = start_local_trainers(
287+
cluster,
288+
pod,
289+
args.training_script,
290+
args.training_script_args,
291+
log_dir=args.log_dir)
292+
244293
time.sleep(3)
245294

246-
watcher.stop()
247-
rank_register.complete(status)
295+
if not register_flag:
296+
logger.fatal("register meets error and local exit!")
248297

249-
# try to report the global status
250-
try:
251-
edl_barrier(job_env, pod, timeout=30)
252-
check_pods_status()
253-
return
254-
except Exception as e:
255-
logger.info("barrier error:{}".format(e))
256-
pass
298+
if not leader_register.is_leader():
299+
leader_register.stop()
257300

258-
if not status:
259-
logger.info("local trainer meets error!")
260-
return
261-
logger.info("local trainer run ok and exit!")
301+
job_exit(
302+
leader_register=leader_register,
303+
resource_register=resource_register,
304+
watcher=watcher,
305+
pod=pod,
306+
trainer_flag=trainer_flag,
307+
register_flag=register_flag,
308+
barrier_flag=barrier_flag)
262309

263310

264311
def run_commandline():
265-
utils.get_logger(log_level=20)
312+
utils.get_logger(log_level=10)
266313
args = _parse_args()
267314
launch(args)
268315

Diff for: python/edl/discovery/etcd_client.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@ class EtcdClient(object):
5353
def __init__(self,
5454
endpoints=['127.0.0.1:2379'],
5555
passwd=None,
56-
root="service"):
56+
root="service",
57+
timeout=6):
5758
assert isinstance(endpoints, list), "endpoints must be list"
5859
self._endpoints = set(endpoints)
5960
self._passwd = passwd
6061
self._etcd = None
6162
self._leases = {}
6263
self._root = root
64+
self._timeout = timeout
6365

6466
def _endpoint_to_ip_port(self, endpoint):
6567
a = endpoint.split(":")
@@ -72,7 +74,7 @@ def _connect(self, endpoints):
7274
for ep in ep_lst:
7375
try:
7476
ip, port = self._endpoint_to_ip_port(ep)
75-
conn = etcd.client(host=ip, port=port)
77+
conn = etcd.client(host=ip, port=port, timeout=self._timeout)
7678
except Exception as e:
7779
print(e)
7880
continue
@@ -174,7 +176,7 @@ def set_server_not_exists(self,
174176
server,
175177
info,
176178
ttl=10,
177-
timeout=20):
179+
timeout=6):
178180
"""
179181
:returns: state of transaction, ``True`` if the put was successful,
180182
``False`` otherwise
@@ -217,6 +219,13 @@ def _get_server(self, service_name, server):
217219
value, meta = self._etcd.get(key=key)
218220
return value, meta.key, meta.version, meta.create_revision, meta.mod_revision
219221

222+
@_handle_errors
223+
def get_value(self, service_name, server):
224+
# for debug
225+
key = '/{}/{}/nodes/{}'.format(self._root, service_name, server)
226+
value = self._etcd.get(key=key)
227+
return value[0]
228+
220229
@_handle_errors
221230
def remove_server(self, service_name, server):
222231
key = '/{}/{}/nodes/{}'.format(self._root, service_name, server)

Diff for: python/edl/protos/pod_server.proto

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@ message BarrierRequest {
2424
string pod_id = 2;
2525
}
2626

27+
message BarrierResponse {
28+
common.Status status = 1;
29+
string cluster_json = 2;
30+
}
31+
2732
message ScaleInRequest { int32 num = 1; }
2833

2934
message ScaleOutRequest {}
3035

3136
service PodServer {
32-
rpc Barrier(BarrierRequest) returns (common.Status) {}
37+
rpc Barrier(BarrierRequest) returns (BarrierResponse) {}
3338

3439
// Cluster controller -> master
3540
rpc ScaleOut(ScaleOutRequest) returns (common.Status) {}

0 commit comments

Comments
 (0)