Skip to content

Commit edc7d90

Browse files
brigibodrov
authored andcommitted
STRDTORC-772: concord-task: using withItems with fork (walmartlabs#756)
1 parent 38884f1 commit edc7d90

File tree

8 files changed

+132
-18
lines changed

8 files changed

+132
-18
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ Concord) repositories.
1313

1414
- docker: updated Ansible to 2.6.7.
1515

16+
### Breaking
17+
18+
- concord-tasks: IN parameter `jobs` renamed to `forks` to avoid
19+
confusion with OUT parameter `jobs`.
20+
1621

1722

1823
## [0.95.0] - 2018-11-03

examples/fork_join/concord.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ flows:
88
# disable the `onCancel` handler, because it's going to handle
99
# the parent's cancellation only
1010
disableOnCancel: true
11-
jobs:
11+
forks:
1212
# spawn multiple jobs with different parameters
1313
- entryPoint: aJob
1414
arguments:

it/server/src/test/java/com/walmartlabs/concord/it/server/ConcordTaskIT.java

+68
Original file line numberDiff line numberDiff line change
@@ -251,4 +251,72 @@ public void testStartChildFinishedWithError() throws Exception {
251251
byte[] ab = getLog(pir.getLogFileName());
252252
assertLog(".*Child process.*FAILED.*BOOOM.*", ab);
253253
}
254+
255+
@Test(timeout = 60000)
256+
public void testForkWithItemsWithOutVariable() throws Exception {
257+
String orgName = "org_" + randomString();
258+
259+
OrganizationsApi orgApi = new OrganizationsApi(getApiClient());
260+
orgApi.createOrUpdate(new OrganizationEntry().setName(orgName));
261+
262+
String projectName = "project_" + randomString();
263+
264+
ProjectsApi projectsApi = new ProjectsApi(getApiClient());
265+
projectsApi.createOrUpdate(orgName, new ProjectEntry()
266+
.setName(projectName)
267+
.setVisibility(ProjectEntry.VisibilityEnum.PUBLIC)
268+
.setAcceptsRawPayload(true));
269+
270+
byte[] payload = archive(ProcessRbacIT.class.getResource("concordTaskForkWithItemsWithOut").toURI());
271+
Map<String, Object> input = new HashMap<>();
272+
input.put("archive", payload);
273+
input.put("org", orgName);
274+
input.put("project", projectName);
275+
276+
StartProcessResponse spr = start(input);
277+
278+
ProcessApi processApi = new ProcessApi(getApiClient());
279+
ProcessEntry pir = waitForStatus(processApi, spr.getInstanceId(), ProcessEntry.StatusEnum.FAILED);
280+
281+
// ---
282+
283+
byte[] ab = getLog(pir.getLogFileName());
284+
assertLog(".*color=RED.*", ab);
285+
assertLog(".*color=WHITE.*", ab);
286+
assertLog(".*Done.*\\[\\[.*\\], \\[.*\\]\\] is completed.*", ab);
287+
}
288+
289+
@Test(timeout = 60000)
290+
public void testForkWithItems() throws Exception {
291+
String orgName = "org_" + randomString();
292+
293+
OrganizationsApi orgApi = new OrganizationsApi(getApiClient());
294+
orgApi.createOrUpdate(new OrganizationEntry().setName(orgName));
295+
296+
String projectName = "project_" + randomString();
297+
298+
ProjectsApi projectsApi = new ProjectsApi(getApiClient());
299+
projectsApi.createOrUpdate(orgName, new ProjectEntry()
300+
.setName(projectName)
301+
.setVisibility(ProjectEntry.VisibilityEnum.PUBLIC)
302+
.setAcceptsRawPayload(true));
303+
304+
byte[] payload = archive(ProcessRbacIT.class.getResource("concordTaskForkWithItems").toURI());
305+
Map<String, Object> input = new HashMap<>();
306+
input.put("archive", payload);
307+
input.put("org", orgName);
308+
input.put("project", projectName);
309+
310+
StartProcessResponse spr = start(input);
311+
312+
ProcessApi processApi = new ProcessApi(getApiClient());
313+
ProcessEntry pir = waitForStatus(processApi, spr.getInstanceId(), ProcessEntry.StatusEnum.FAILED);
314+
315+
// ---
316+
317+
byte[] ab = getLog(pir.getLogFileName());
318+
assertLog(".*color=RED.*", ab);
319+
assertLog(".*color=WHITE.*", ab);
320+
assertLog(".*Done.*\\[\\[.*\\], \\[.*\\]\\] is completed.*", ab);
321+
}
254322
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
configuration:
2+
arguments:
3+
forkItems:
4+
- entryPoint: sayHello
5+
arguments:
6+
color: "RED"
7+
- entryPoint: sayHello
8+
arguments:
9+
color: "WHITE"
10+
11+
flows:
12+
default:
13+
- task: concord
14+
in:
15+
action: fork
16+
entryPoint: ${item.entryPoint}
17+
arguments: ${item.arguments}
18+
withItems: ${forkItems}
19+
20+
- log: "Done! ${jobs} is completed"
21+
22+
sayHello:
23+
- log: "FORK: Hello, ${color}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
configuration:
2+
arguments:
3+
forkItems:
4+
- entryPoint: sayHello
5+
arguments:
6+
color: "RED"
7+
- entryPoint: sayHello
8+
arguments:
9+
color: "WHITE"
10+
11+
flows:
12+
default:
13+
- task: concord
14+
in:
15+
action: fork
16+
entryPoint: ${item.entryPoint}
17+
arguments: ${item.arguments}
18+
out:
19+
myJobs: ${jobs}
20+
withItems: ${forkItems}
21+
22+
- log: "Done! ${myJobs} is completed"
23+
24+
sayHello:
25+
- log: "FORK: Hello, ${color}"

it/server/src/test/resources/com/walmartlabs/concord/it/server/killCascade/concord.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ flows:
55
in:
66
action: fork
77
disableOnCancel: true
8-
jobs:
8+
forks:
99
- entryPoint: aJob
1010
arguments:
1111
color: "red"

it/server/src/test/resources/com/walmartlabs/concord/it/server/processWithChildren/concord.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ flows:
99
# disable the `onCancel` handler, because it's going to handle
1010
# the parent's cancellation only
1111
disableOnCancel: true
12-
jobs:
12+
forks:
1313
# spawn multiple jobs with different parameters
1414
- entryPoint: aJob
1515
arguments:

plugins/tasks/concord/src/main/java/com/walmartlabs/concord/client/ConcordTask.java

+8-15
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.walmartlabs.concord.common.IOUtils;
2626
import com.walmartlabs.concord.sdk.Constants;
2727
import com.walmartlabs.concord.sdk.Context;
28+
import com.walmartlabs.concord.sdk.ContextUtils;
2829
import com.walmartlabs.concord.sdk.InjectVariable;
2930
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
3031
import org.slf4j.Logger;
@@ -68,6 +69,7 @@ public class ConcordTask extends AbstractConcordTask {
6869
private static final String DISABLE_ON_CANCEL_KEY = "disableOnCancel";
6970
private static final String DISABLE_ON_FAILURE_KEY = "disableOnFailure";
7071
private static final String ENTRY_POINT_KEY = "entryPoint";
72+
private static final String FORKS_KEY = "forks";
7173
private static final String INSTANCE_ID_KEY = "instanceId";
7274
private static final String INSTANCES_KEY = "instances";
7375
private static final String JOB_OUT_KEY = "jobOut";
@@ -360,30 +362,21 @@ private Object getOutVars(Context ctx, String childId) throws Exception {
360362
});
361363
}
362364

363-
@SuppressWarnings("unchecked")
364365
private void fork(Context ctx) throws Exception {
365-
List<Map<String, Object>> jobs;
366-
367-
Object v = ctx.getVariable(JOBS_KEY);
368-
if (v != null) {
369-
if (v instanceof List) {
370-
jobs = (List<Map<String, Object>>) v;
371-
} else {
372-
throw new IllegalArgumentException("'" + JOBS_KEY + "' must be a list");
373-
}
374-
} else {
366+
List<Map<String, Object>> jobs = ContextUtils.getList(ctx, FORKS_KEY, null);
367+
if (jobs == null) {
375368
jobs = Collections.singletonList(createJobCfg(ctx, null));
376369
}
377370

371+
if (jobs.isEmpty()) {
372+
throw new IllegalArgumentException("'" + FORKS_KEY + "' can't be an empty list");
373+
}
374+
378375
List<String> jobIds = forkMany(ctx, jobs);
379376
ctx.setVariable(JOBS_KEY, jobIds);
380377
}
381378

382379
private List<String> forkMany(Context ctx, List<Map<String, Object>> jobs) throws Exception {
383-
if (jobs.isEmpty()) {
384-
throw new IllegalArgumentException("'" + JOBS_KEY + "' can't be an empty list");
385-
}
386-
387380
List<String> ids = new ArrayList<>();
388381

389382
for (Map<String, Object> job : jobs) {

0 commit comments

Comments
 (0)