Skip to content

Commit

Permalink
move catalog breaking change function to helper class (#8892)
Browse files Browse the repository at this point in the history
  • Loading branch information
flutra-osmani committed Sep 19, 2023
1 parent fa522a8 commit a36ce10
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationSyncMode;
import io.airbyte.api.model.generated.DestinationUpdate;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.JobConfigType;
import io.airbyte.api.model.generated.JobCreate;
import io.airbyte.api.model.generated.JobIdRequestBody;
Expand All @@ -43,14 +42,14 @@
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceUpdate;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum;
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.converters.ConfigurationUpdate;
import io.airbyte.commons.server.converters.JobConverter;
import io.airbyte.commons.server.errors.ValueConflictKnownException;
import io.airbyte.commons.server.handlers.helpers.AutoPropagateSchemaChangeHelper;
import io.airbyte.commons.server.handlers.helpers.CatalogConverter;
import io.airbyte.commons.server.handlers.helpers.JobCreationAndStatusUpdateHelper;
import io.airbyte.commons.server.scheduler.EventRunner;
Expand Down Expand Up @@ -616,7 +615,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);
final boolean containsBreakingChange = AutoPropagateSchemaChangeHelper.containsBreakingChange(diff);

if (containsBreakingChange) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.BREAKING_SCHEMA_CHANGE_DETECTED, 1,
Expand Down Expand Up @@ -652,7 +651,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco

private boolean shouldAutoPropagate(final CatalogDiff diff, final UUID workspaceId, final ConnectionRead connectionRead) {
final boolean hasDiff = !diff.getTransforms().isEmpty();
final boolean nonBreakingChange = !containsBreakingChange(diff);
final boolean nonBreakingChange = !AutoPropagateSchemaChangeHelper.containsBreakingChange(diff);
final boolean autoPropagationIsEnabledForWorkspace = featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId));
final boolean autoPropagationIsEnabledForConnection =
connectionRead.getNonBreakingChangesPreference() != null
Expand Down Expand Up @@ -775,20 +774,4 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
return jobConverter.getJobInfoRead(job);
}

@VisibleForTesting
boolean containsBreakingChange(final CatalogDiff diff) {
for (final StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
}

final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.DestinationSyncMode;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
Expand Down Expand Up @@ -153,4 +154,20 @@ static Map<StreamDescriptor, AirbyteStreamAndConfiguration> extractStreamAndConf
airbyteStreamAndConfiguration -> airbyteStreamAndConfiguration));
}

@VisibleForTesting
public static boolean containsBreakingChange(final CatalogDiff diff) {
for (final StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != StreamTransform.TransformTypeEnum.UPDATE_STREAM) {
continue;
}

final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
}

return false;
}

}

0 comments on commit a36ce10

Please sign in to comment.