@@ -83,6 +83,27 @@ def get(self):
83
83
self ._next_rid += 1
84
84
return rid
85
85
86
+ class SchedulerMonitor ():
87
+ def __init__ (self , expect_status ):
88
+ self .expect_status = expect_status
89
+ self .schedulers = {}
90
+ self .current_status = {}
91
+
92
+ def check_status (self , test ):
93
+ self .check_counter += 1
94
+ for key , value in self .schedulers .items ():
95
+ if key not in self .current_status .keys ():
96
+ self .current_status [key ] = ""
97
+ if self .schedulers [key ]["status" ] != self .current_status [key ]:
98
+ test .assertEqual (self .schedulers [key ]["status" ], self .expect_status [key ][0 ])
99
+ self .current_status [key ] = self .expect_status [key ].pop (0 )
100
+
101
+ def finished (self , test ):
102
+ for value in self .expect_status .values ():
103
+ if value :
104
+ return False
105
+ return True
106
+
86
107
87
108
class SchedulerCase (unittest .TestCase ):
88
109
def setUp (self ):
@@ -145,46 +166,23 @@ def test_pending_priority(self):
145
166
late = time () + 100000
146
167
early = time () + 1
147
168
148
- schedule = {}
149
-
150
- expect = [
151
- {0 : "pending" },
152
- {0 : "pending" , 1 : "pending" },
153
- {0 : "pending" , 1 : "pending" , 2 : "pending" },
154
- {0 : "preparing" , 1 : "pending" , 2 : "pending" },
155
- {0 : "prepare_done" , 1 : "pending" , 2 : "pending" },
156
- {0 : "running" , 1 : "pending" , 2 : "preparing" },
157
- {0 : "running" , 1 : "pending" , 2 : "prepare_done" },
158
- {0 : "paused" , 1 : "pending" , 2 : "running" },
159
- {0 : "paused" , 1 : "pending" , 2 : "run_done" },
160
- {0 : "running" , 1 : "pending" , 2 : "analyzing" },
161
- {0 : "running" , 1 : "pending" , 2 : "deleting" },
162
- ]
169
+ expect_status = {
170
+ 0 : ["pending" , "preparing" , "prepare_done" ,
171
+ "running" , "paused" , "running" ],
172
+ 1 : ["pending" ],
173
+ 2 : ["pending" , "preparing" , "prepare_done" ,
174
+ "running" , "run_done" , "analyzing" , "deleting" ],
175
+ }
176
+
177
+ scheduler_mon = SchedulerMonitor (expect_status )
163
178
164
179
done = asyncio .Event ()
165
- expect_idx = 0
166
- skip_next = False
167
180
168
181
def notify (mod ):
169
- nonlocal expect_idx , skip_next
170
- process_mod (schedule , mod )
171
-
172
- # gather status of each RID
173
- current_status = {}
174
- for rid , info in schedule .items ():
175
- current_status [rid ] = info ["status" ]
176
-
177
- # skip once after prepare_done or run_done
178
- if skip_next :
179
- skip_next = False
180
- else :
181
- self .assertEqual (current_status , expect [expect_idx ])
182
- expect_idx += 1
183
- if "prepare_done" in current_status .values () or \
184
- "run_done" in current_status .values ():
185
- skip_next = True
182
+ process_mod (scheduler_mon .schedulers , mod )
183
+ scheduler_mon .check_status (self )
186
184
187
- if expect_idx >= len ( expect ):
185
+ if scheduler_mon . finished ( self ):
188
186
done .set ()
189
187
190
188
scheduler .notifier .publish = notify
0 commit comments