Skip to content

Commit

Permalink
chore: fix ui_commands queue config (#14865)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Dec 19, 2024
1 parent 6f4e943 commit 8bec679
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.commons.temporal
import com.google.common.annotations.VisibleForTesting
import com.google.protobuf.ByteString
import io.airbyte.commons.annotation.InternalForTesting
import io.airbyte.commons.temporal.config.TemporalQueueConfiguration
import io.airbyte.commons.temporal.exception.DeletedWorkflowException
import io.airbyte.commons.temporal.exception.UnreachableWorkflowException
import io.airbyte.commons.temporal.scheduling.CheckCommandInput
Expand Down Expand Up @@ -40,7 +41,6 @@ import io.airbyte.metrics.lib.OssMetricsRegistry
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.persistence.job.models.JobRunConfig
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Property
import io.temporal.api.common.v1.WorkflowType
import io.temporal.api.enums.v1.WorkflowExecutionStatus
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest
Expand Down Expand Up @@ -87,7 +87,7 @@ data class ManualOperationResult(
@Singleton
class TemporalClient(
@param:Named("workspaceRootTemporal") private val workspaceRoot: Path,
@param:Property(name = "airbyte.temporal.queues.ui-commands") private val uiCommandsQueue: String?,
private val queueConfiguration: TemporalQueueConfiguration,
private val workflowClientWrapped: WorkflowClientWrapped,
private val serviceStubsWrapped: WorkflowServiceStubsWrapped,
private val streamResetPersistence: StreamResetPersistence,
Expand Down Expand Up @@ -581,7 +581,7 @@ class TemporalClient(
val workflowOptions =
WorkflowOptions
.newBuilder()
.setTaskQueue(uiCommandsQueue)
.setTaskQueue(queueConfiguration.uiCommandsQueue)
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
.setWorkflowId(String.format("%s_%s", input.type, jobRunConfig.getJobId()))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package io.airbyte.commons.temporal
* Singleton helper object that provides translation between a Temporal job type and
* the Temporal queue name.
*/
@Deprecated("Check and Discover queues are obsolete, SYNC should be migrated to TemporalQueueConfiguration")
object TemporalTaskQueueUtils {
val DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name
val DEFAULT_CHECK_TASK_QUEUE = TemporalJobType.CHECK_CONNECTION.name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.airbyte.commons.temporal.config

import jakarta.inject.Singleton

@Singleton
class TemporalQueueConfiguration {
val uiCommandsQueue = "ui_commands"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.config.TemporalQueueConfiguration;
import io.airbyte.commons.temporal.exception.DeletedWorkflowException;
import io.airbyte.commons.temporal.scheduling.CheckCommandInput;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
Expand Down Expand Up @@ -90,7 +91,7 @@ public class TemporalClientTest {

private static final String CHECK_TASK_QUEUE = "CHECK_CONNECTION";
private static final String DISCOVER_TASK_QUEUE = "DISCOVER_SCHEMA";
private static final String UI_COMMANDS_TASK_QUEUE = "ui-commands-queue";
private static final String UI_COMMANDS_TASK_QUEUE = "ui_commands";
private static final JobRunConfig JOB_RUN_CONFIG = new JobRunConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_ID);
Expand All @@ -115,12 +116,10 @@ public class TemporalClientTest {
private ConnectionManagerUtils connectionManagerUtils;
private StreamResetRecordsHelper streamResetRecordsHelper;
private Path workspaceRoot;
private String uiCommandsQueue;

@BeforeEach
void setup() throws IOException {
workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_client_test");
uiCommandsQueue = "ui-commands-queue";
logPath = workspaceRoot.resolve(String.valueOf(JOB_ID)).resolve(String.valueOf(ATTEMPT_ID)).resolve(DEFAULT_LOG_FILENAME);
workflowClient = mock(WorkflowClient.class);
when(workflowClient.getOptions()).thenReturn(WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());
Expand All @@ -138,7 +137,8 @@ void setup() throws IOException {
connectionManagerUtils = spy(new ConnectionManagerUtils(workflowClientWrapped, metricClient));
streamResetRecordsHelper = mock(StreamResetRecordsHelper.class);
temporalClient =
spy(new TemporalClient(workspaceRoot, uiCommandsQueue, workflowClientWrapped, workflowServiceStubsWrapped, streamResetPersistence,
spy(new TemporalClient(workspaceRoot, new TemporalQueueConfiguration(), workflowClientWrapped, workflowServiceStubsWrapped,
streamResetPersistence,
streamRefreshesRepository,
connectionManagerUtils, streamResetRecordsHelper, mock(MetricClient.class), new TestClient(), scopedConfigurationService));
}
Expand All @@ -155,7 +155,7 @@ void init() {
final var metricClient = mock(MetricClient.class);
final var scopedConfigurationService = mock(ScopedConfigurationService.class);
temporalClient = spy(
new TemporalClient(workspaceRoot, uiCommandsQueue, new WorkflowClientWrapped(workflowClient, metricClient),
new TemporalClient(workspaceRoot, new TemporalQueueConfiguration(), new WorkflowClientWrapped(workflowClient, metricClient),
new WorkflowServiceStubsWrapped(workflowServiceStubs, metricClient), streamResetPersistence, streamRefreshesRepository,
mConnectionManagerUtils, streamResetRecordsHelper, metricClient, new TestClient(), scopedConfigurationService));
}
Expand Down
2 changes: 0 additions & 2 deletions airbyte-server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ airbyte:
max-jobs: ${MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE:30}
max-fields-per-connection: ${MAX_FIELDS_PER_CONNECTION:20000}
temporal:
queues:
ui-commands: ui_commands
web-app:
url: ${WEBAPP_URL:}
workspace:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.commons.temporal.TemporalInitializationUtils;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.config.TemporalQueueConfiguration;
import io.airbyte.config.MaxWorkersConfig;
import io.airbyte.micronaut.temporal.TemporalProxyHelper;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
Expand All @@ -21,7 +22,6 @@
import io.airbyte.workers.temporal.workflows.DiscoverCatalogAndAutoPropagateWorkflowImpl;
import io.airbyte.workers.tracing.StorageObjectGetInterceptor;
import io.airbyte.workers.tracing.TemporalSdkInterceptor;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
Expand Down Expand Up @@ -106,6 +106,10 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
private TemporalUtils temporalUtils;
@Inject
private WorkerFactory workerFactory;

@Inject
private TemporalQueueConfiguration temporalQueueConfiguration;

@Value("${airbyte.data.sync.task-queue}")
private String syncTaskQueue;

Expand All @@ -115,9 +119,6 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
@Value("${airbyte.data.discover.task-queue}")
private String discoverTaskQueue;

@Property(name = "airbyte.temporal.queues.ui-commands")
private String uiCommandsQueue;

@Override
public void onApplicationEvent(final ServiceReadyEvent event) {
try {
Expand Down Expand Up @@ -178,7 +179,8 @@ private void registerWorkerFactory(final WorkerFactory workerFactory,
}

private void registerUiCommandsWorker(final WorkerFactory factory, final MaxWorkersConfig maxWorkersConfiguration) {
final Worker uiCommandsWorker = factory.newWorker(uiCommandsQueue, getWorkerOptions(maxWorkersConfiguration.getMaxCheckWorkers()));
final Worker uiCommandsWorker =
factory.newWorker(temporalQueueConfiguration.getUiCommandsQueue(), getWorkerOptions(maxWorkersConfiguration.getMaxCheckWorkers()));
final WorkflowImplementationOptions workflowOptions = WorkflowImplementationOptions.newBuilder()
.setFailWorkflowExceptionTypes(NonDeterministicException.class).build();

Expand Down
2 changes: 0 additions & 2 deletions airbyte-workers/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ airbyte:
client-secret: ${AB_AZURE_KEY_VAULT_CLIENT_SECRET:}
tags: ${AB_AZURE_KEY_VAULT_TAGS:}
temporal:
queues:
ui-commands: ui_commands
worker:
ports: ${TEMPORAL_WORKER_PORTS:}
tracking:
Expand Down

0 comments on commit 8bec679

Please sign in to comment.