Skip to content

Commit

Permalink
Jun/add tests (#81)
Browse files Browse the repository at this point in the history
* Improve the flow engine and add unit tests.

* Add additional sample workflows.

* Fix some typos.
  • Loading branch information
jun-he authored Jan 30, 2025
1 parent 7a0d0b9 commit 9090b77
Show file tree
Hide file tree
Showing 45 changed files with 3,029 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ provides a fully managed workflow-as-a-service (WAAS) to the data platform users
It serves thousands of users, including data scientists, data engineers, machine learning engineers,
software engineers, content producers, and business analysts, for various use cases.
It schedules hundreds of thousands of workflows, millions of jobs every day
and operate with a strict SLO even when there are spikes in the traffic.
and operates with a strict SLO even when there are spikes in the traffic.
Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users.

You can read more details about it in our latest [blog post](https://netflixtechblog.com/maestro-netflixs-workflow-orchestrator-ee13a06f9c78).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private Constants() {}
public static final int MAX_PLATFORM_RETRY_LIMIT_SECS = 24 * 3600; // 1 day

/** maximum retry wait limit for timeout errors. */
public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 days
public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 day

/** Max timeout limit in milliseconds. */
public static final long MAX_TIME_OUT_LIMIT_IN_MILLIS = TimeUnit.DAYS.toMillis(120); // 120 days
Expand Down Expand Up @@ -316,7 +316,7 @@ public static WorkflowVersion of(String version) {
/** Workflow create request data size limit used for validation. */
public static final String WORKFLOW_CREATE_REQUEST_DATA_SIZE_LIMIT = "256KB";

/** params' total size (in JSON format) limit for a workflow instance or a step instance. */
/** param's total size (in JSON format) limit for a workflow instance or a step instance. */
public static final int JSONIFIED_PARAMS_STRING_SIZE_LIMIT = 750000;

/** Defines limit for the query for step attempt state view. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
* <p>If unset (null value), means there is no change for this field.
*
* <p>Properties changes are kept separately and can evolve independently from the workflow version
* <p>Properties changes are kept separately and can evolve independently of the workflow version
* changes.
*/
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface Step {
/** Get step type. */
StepType getType();

/** Get step sub type. */
/** Get step subtype. */
default String getSubType() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class StepDependenciesDefinition {
/** param name for step dependency name. */
public static final String STEP_DEPENDENCY_NAME = "name";

/** param name for step dependency sub type, like input_table, input_s3. */
/** param name for step dependency subtype, like input_table, input_s3. */
public static final String STEP_DEPENDENCY_SUB_TYPE = "_step_dependency_sub_type";

private final List<MapParamDefinition> definitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class OutputSignalInstance {
private String outputSignalInstanceId;
// announced time will be null if its a duplicate signal
// announced time will be null if it is a duplicate signal
private Long announcedTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ public enum Status {
/** Step is disabled at workflow instance start time, terminal state. */
DISABLED(true, true, false, false),
/**
* Step should not run and user logic does not run. Maestro runs over this step when its if
* condition is false or the workflow is already failed when failure mode is FAIL_AFTER_RUNNING.
* Users can discard steps with this status. terminal state.
* Step should not run and user logic does not run. Maestro runs over this step when condition
* is false or the workflow is already failed when failure mode is FAIL_AFTER_RUNNING. Users can
* discard steps with this status. terminal state.
*/
UNSATISFIED(true, true, false, false),
/** Step is skipped by users at runtime, terminal state. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static TimelineLogEvent debug(String template, Object... args) {
return TimelineLogEvent.builder().level(Level.DEBUG).message(template, args).build();
}

/** static method to generate a info level {@link TimelineLogEvent}. */
/** static method to generate an info level {@link TimelineLogEvent}. */
@JsonIgnore
public static TimelineLogEvent info(String template, Object... args) {
return TimelineLogEvent.builder().level(Level.INFO).message(template, args).build();
Expand All @@ -112,7 +112,7 @@ public static TimelineLogEvent warn(String template, Object... args) {
return TimelineLogEvent.builder().level(Level.WARN).message(template, args).build();
}

/** static method to generate a error level {@link TimelineLogEvent}. */
/** static method to generate an error level {@link TimelineLogEvent}. */
@JsonIgnore
public static TimelineLogEvent error(String template, Object... args) {
return TimelineLogEvent.builder().level(Level.ERROR).message(template, args).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static boolean isInlineWorkflowId(String workflowId) {
* 11) vs 9 (base62: 9) -> 211 vs 19.
*
* @param value value to encode
* @param isOrdered should the output encoded string perserve the ordering. True for rangeKey case
* @param isOrdered should the output encoded string preserve the ordering. True for rangeKey case
* and false for hashKey.
* @return encoded base62 string
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Status(boolean success, String message) {
}
}

/** acquire permits for every tag in tagList for a given a uuid (e.g. step uuid). */
/** acquire permits for every tag in tagList for a given an uuid (e.g. step uuid). */
Status acquire(List<Tag> tagsList, String uuid);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ default Result start(
* Customized step execution logic.
*
* <p>While the step status is RUNNING, the code in execute() will be called periodically with a
* preset polling interval. Additionally, if the execution throws an exception, the execute will
* preset polling interval. Additionally, if the execution throws an exception, the execution will
* be retried as another step instance run.
*
* <p>The input data are a copy of the original summary data. Any changes on them will be
Expand Down
6 changes: 6 additions & 0 deletions maestro-flow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ dependencies {
implementation jacksonDatabindDep
implementation jacksonAnnotationsDep
api slf4jApiDep

testImplementation junitDep
testImplementation mockitoCoreDep
testImplementation testcontainerDep
testImplementation postgresqlDep
testImplementation(testFixtures(project(':maestro-common')))
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netflix.maestro.flow.actor;

import com.netflix.maestro.annotations.Nullable;
import com.netflix.maestro.annotations.VisibleForTesting;
import com.netflix.maestro.flow.engine.ExecutionContext;
import com.netflix.maestro.metrics.MaestroMetrics;
import java.util.HashMap;
Expand All @@ -9,7 +10,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Stream;
import lombok.Getter;
import org.slf4j.Logger;

Expand Down Expand Up @@ -120,49 +120,33 @@ boolean noChildActorsRunning() {
}

void startShutdown(Action action) {
if (childActors.isEmpty()) {
checkShutdown();
} else {
if (!checkShutdown()) {
cancelPendingActions();
wakeUpChildActors(action);
}
}

void checkShutdown() {
// return true if shutdown is finished, otherwise false
boolean checkShutdown() {
if (noChildActorsRunning()) {
terminateNow();
if (parent != null) {
parent.post(Action.FLOW_DOWN);
}
return true;
}
return false;
}

void terminateNow() {
cancelPendingActions();
running = false;
}

void cancelPendingActions() {
private void cancelPendingActions() {
scheduledActions.values().forEach(f -> f.cancel(true));
}

Stream<String> dequeRetryActions() {
return scheduledActions.entrySet().stream()
.filter(
e ->
e.getKey() instanceof Action.FlowTaskRetry
&& !e.getValue().isDone()
&& e.getValue().cancel(false))
.map(e -> ((Action.FlowTaskRetry) e.getKey()).taskRefName());
}

boolean dequeRetryAction(String taskRef) {
var action = new Action.FlowTaskRetry(taskRef);
return scheduledActions.containsKey(action)
&& !scheduledActions.get(action).isDone()
&& scheduledActions.get(action).cancel(false);
}

void schedule(Action action, long delayInMillis) {
if (!isRunning()
|| (scheduledActions.containsKey(action) && !scheduledActions.get(action).isDone())) {
Expand Down Expand Up @@ -230,4 +214,14 @@ private Action dequeueAction() {
return null;
}
}

@VisibleForTesting
Map<Action, ScheduledFuture<?>> getScheduledActions() {
return scheduledActions;
}

@VisibleForTesting
BlockingQueue<Action> getActions() {
return actions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

Expand Down Expand Up @@ -76,8 +77,9 @@ void runForAction(Action action) {

@Override
void afterRunning() {
getMetrics()
.counter("num_of_finished_flows", getClass(), "finalized", String.valueOf(finalized));
if (finalized) {
getMetrics().counter("num_of_finished_flows", getClass());
getContext().deleteFlow(flow);
LOG.info("Flow for {} is deleted as it finishes.", reference());
}
Expand Down Expand Up @@ -338,4 +340,22 @@ private void runTask(Task task, Action initAction) {
var actor = new TaskActor(cloned, flow, this, getContext());
runActionFor(actor, initAction);
}

private Stream<String> dequeRetryActions() {
return getScheduledActions().entrySet().stream()
.filter(
e ->
e.getKey() instanceof Action.FlowTaskRetry
&& !e.getValue().isDone()
&& e.getValue().cancel(false))
.map(e -> ((Action.FlowTaskRetry) e.getKey()).taskRefName());
}

private boolean dequeRetryAction(String taskRef) {
var scheduledActions = getScheduledActions();
var action = new Action.FlowTaskRetry(taskRef);
return scheduledActions.containsKey(action)
&& !scheduledActions.get(action).isDone()
&& scheduledActions.get(action).cancel(false);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.netflix.maestro.flow.dao;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.maestro.annotations.VisibleForTesting;
import com.netflix.maestro.database.AbstractDatabaseDao;
import com.netflix.maestro.database.DatabaseConfiguration;
import com.netflix.maestro.exceptions.MaestroRetryableError;
import com.netflix.maestro.flow.models.Flow;
import com.netflix.maestro.flow.models.FlowGroup;
import com.netflix.maestro.flow.properties.FlowEngineProperties;
import com.netflix.maestro.metrics.MaestroMetrics;
import com.netflix.maestro.utils.Checks;
import java.sql.ResultSet;
Expand Down Expand Up @@ -40,13 +41,15 @@ public class MaestroFlowDao extends AbstractDatabaseDao {
"INSERT INTO maestro_flow_group (group_id,generation,address) VALUES (?,?,?) ON CONFLICT DO NOTHING";
private static final String GET_FLOW_WITH_SAME_KEYS_QUERY =
"SELECT 1 FROM maestro_flow WHERE group_id=? AND flow_id=? LIMIT 1";
private static final String REMOVE_GROUP_QUERY =
"DELETE FROM maestro_flow_group WHERE group_id=?";

public MaestroFlowDao(
DataSource dataSource,
ObjectMapper objectMapper,
FlowEngineProperties properties,
DatabaseConfiguration config,
MaestroMetrics metrics) {
super(dataSource, objectMapper, properties, metrics);
super(dataSource, objectMapper, config, metrics);
}

/**
Expand Down Expand Up @@ -239,4 +242,13 @@ public boolean existFlowWithSameKeys(long groupId, String flowId) {
groupId,
flowId);
}

@VisibleForTesting
void deleteGroup(long groupId) {
withMetricLogError(
() -> withRetryableUpdate(REMOVE_GROUP_QUERY, stmt -> stmt.setLong(1, groupId)),
"deleteGroup",
"Failed to delete the group for the groupId [{}]",
groupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ public void prepare(Flow flow) {
prepare.setStartTime(System.currentTimeMillis());
flowTaskMap.get(prepare.getTaskType()).execute(flow, prepare);
if (!prepare.getStatus().isTerminal()) {
LOG.info("prepare task for flow [{}] is not done yet, will retry", flow.getReference());
throw new MaestroRetryableError("prepare task is not done yet, will retry");
} else {
flow.setReasonForIncompletion(prepare.getReasonForIncompletion());
flow.markUpdate();
}
} catch (MaestroRetryableError mre) {
throw mre;
} catch (RuntimeException e) {
LOG.warn("prepare task in flow {} throws an error, will retry it", flow.getReference(), e);
throw new MaestroRetryableError(e, "retry prepare task due to an exception");
Expand Down Expand Up @@ -213,7 +216,7 @@ public void resumeFlow(Flow flow) {
} catch (MaestroNotFoundException nfe) {
LOG.info("cannot find the reference flow: {}. Ignore it.", flow.getReference(), nfe);
} catch (RuntimeException e) {
LOG.warn("got an exceptino for resuming flow for {} and will retry", flow.getReference(), e);
LOG.warn("got an exception for resuming flow for {} and will retry", flow.getReference(), e);
throw new MaestroRetryableError(e, "retry resuming flow due to an exception");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,12 @@ private Actor getOrCreateNewGroup(long groupId) {
}

/** Wake up a flow or a task. */
public void wakeUp(Long groupId, String flowReference, String taskReference) {
public boolean wakeUp(Long groupId, String flowReference, String taskReference) {
Actor groupActor = groupActors.get(groupId);
if (groupActor != null && groupActor.isRunning()) {
groupActor.post(new Action.FlowWakeUp(flowReference, taskReference));
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* task actor can switch to execute. So those inactive tasks are not real maestro tasks. This is
* required to avoid that the child actor runs the business logic but the parent flow is unaware and
* decide to finish. Also, the active flag is a local state and not thread safe and can only be
* accessed within the actor (e.g. flow owns a list of copied tasks, and it can mutate active flag
* accessed within the actor, e.g. flow owns a list of copied tasks, and it can mutate active flag
* for its own snapshots.
*
* <p>Basic rule: flow actor can only activate a task actor. A task actor can only deactivate itself
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2025 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.maestro.flow;

import com.netflix.maestro.flow.models.Flow;
import com.netflix.maestro.flow.models.FlowDef;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.mockito.MockitoAnnotations;

public abstract class FlowBaseTest {

private AutoCloseable closeable;

@Before
public void openMocks() {
closeable = MockitoAnnotations.openMocks(this);
}

@After
public void releaseMocks() throws Exception {
closeable.close();
}

protected Flow createFlow() {
Flow flow =
new Flow(10, "test-flow-id", 1, System.currentTimeMillis() + 3600000, "test-flow-ref");
flow.setInput(Map.of());
flow.setFlowDef(new FlowDef());
flow.setStatus(Flow.Status.RUNNING);
flow.setUpdateTime(flow.getStartTime());
return flow;
}
}
Loading

0 comments on commit 9090b77

Please sign in to comment.