Skip to content

Commit

Permalink
refactor: Migrate OSS PR CI workflow to ci-k8s-runner (#8876)
Browse files Browse the repository at this point in the history
  • Loading branch information
perangel committed Nov 1, 2023
1 parent f04b184 commit fe5f344
Show file tree
Hide file tree
Showing 28 changed files with 280 additions and 115 deletions.
9 changes: 6 additions & 3 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
.dockerignore
.git
.idea
.gradle
**/.gradle
**/build
**/node_modules
Dockerfile.*
docker-compose*.yaml
**/build
**/Dockerfile.*
**/docker-compose*.yaml
**/.terraform
tools/bin/workflow/*
1 change: 1 addition & 0 deletions airbyte-proxy/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ function start_container () {
echo $CMD
eval $CMD
wait_for_docker $NAME-$1;
sleep 2
}

function stop_container () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.model.Network;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -107,6 +109,7 @@
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -169,6 +172,10 @@ public class AcceptanceTestHarness {
// https://docs.airbyte.com/understanding-airbyte/jobs/.
public static final Set<JobStatus> IN_PROGRESS_JOB_STATUSES = Set.of(JobStatus.PENDING, JobStatus.INCOMPLETE, JobStatus.RUNNING);

private static final String KUBE_PROCESS_RUNNER_HOST = java.util.Optional.ofNullable(System.getenv("KUBE_PROCESS_RUNNER_HOST")).orElse("");

private static final String DOCKER_NETWORK = java.util.Optional.ofNullable(System.getenv("DOCKER_NETWORK")).orElse("bridge");

private static boolean isKube;
private static boolean isMinikube;
private static boolean isGke;
Expand Down Expand Up @@ -223,12 +230,21 @@ public AcceptanceTestHarness(final AirbyteApiClient apiClient, final UUID defaul
throw new RuntimeException("KUBE Flag should also be enabled if GKE flag is enabled");
}
if (!isGke) {
sourcePsql = new PostgreSQLContainer(SOURCE_POSTGRES_IMAGE_NAME)
.withUsername(SOURCE_USERNAME)
// we attach the container to the appropriate network since there are environments where we use one
// other than the default
final DockerClient dockerClient = DockerClientFactory.lazyClient();
final List<Network> dockerNetworks = dockerClient.listNetworksCmd().withNameFilter(DOCKER_NETWORK).exec();
final Network dockerNetwork = dockerNetworks.get(0);
final org.testcontainers.containers.Network containerNetwork =
org.testcontainers.containers.Network.builder().id(dockerNetwork.getId()).build();
sourcePsql = (PostgreSQLContainer) new PostgreSQLContainer(SOURCE_POSTGRES_IMAGE_NAME)
.withNetwork(containerNetwork);
sourcePsql.withUsername(SOURCE_USERNAME)
.withPassword(SOURCE_PASSWORD);
sourcePsql.start();

destinationPsql = new PostgreSQLContainer(DESTINATION_POSTGRES_IMAGE_NAME);
destinationPsql = (PostgreSQLContainer) new PostgreSQLContainer(DESTINATION_POSTGRES_IMAGE_NAME)
.withNetwork(containerNetwork);
destinationPsql.start();
}

Expand Down Expand Up @@ -726,7 +742,8 @@ private Map<Object, Object> localConfig(final PostgreSQLContainer psql,
final boolean withSchema) {
final Map<Object, Object> dbConfig = new HashMap<>();
// don't use psql.getHost() directly since the ip we need differs depending on environment
dbConfig.put(JdbcUtils.HOST_KEY, getHostname());
// NOTE: Use the container ip IFF we aren't on the "bridge" network
dbConfig.put(JdbcUtils.HOST_KEY, DOCKER_NETWORK.equals("bridge") ? getHostname() : psql.getHost());

if (hiddenPassword) {
dbConfig.put(JdbcUtils.PASSWORD_KEY, "**********");
Expand All @@ -747,6 +764,9 @@ private Map<Object, Object> localConfig(final PostgreSQLContainer psql,

public String getHostname() {
if (isKube) {
if (!KUBE_PROCESS_RUNNER_HOST.equals("")) {
return KUBE_PROCESS_RUNNER_HOST;
}
if (isMinikube) {
// used with minikube driver=none instance
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
import io.airbyte.test.utils.Asserts;
import io.airbyte.test.utils.TestConnectionCreate;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -88,13 +90,15 @@ class AdvancedAcceptanceTests {
private static AcceptanceTestHarness testHarness;
private static AirbyteApiClient apiClient;
private static UUID workspaceId;
private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001");

@BeforeAll
static void init() throws URISyntaxException, IOException, InterruptedException, ApiException {
final URI url = new URI(AIRBYTE_SERVER_HOST);
apiClient = new AirbyteApiClient(
new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api"));
// work in whatever default workspace is present.
workspaceId = apiClient.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,10 @@
import io.airbyte.test.utils.Asserts;
import io.airbyte.test.utils.TestConnectionCreate;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -121,10 +119,14 @@ public class AirbyteApiAcceptanceTests {
// NOTE: this is just a base64 encoding of a jwt representing a test user in some deployments.
private static final String AIRBYTE_AUTH_HEADER = "eyJ1c2VyX2lkIjogImNsb3VkLWFwaSIsICJlbWFpbF92ZXJpZmllZCI6ICJ0cnVlIn0K";
private static final String AIRBYTE_ACCEPTANCE_TEST_WORKSPACE_ID = "AIRBYTE_ACCEPTANCE_TEST_WORKSPACE_ID";
private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001");
private static final String AIRBYTE_PUBLIC_API_SERVER_HOST =
Optional.ofNullable(System.getenv("AIRBYTE_PUBLIC_API_SERVER_HOST")).orElse("http://localhost:8006");
private static final String LOCAL_PUBLIC_API_SERVER_URL = "http://localhost:8006/v1";
private static final String AUTHORIZATION_HEADER = "AUTHORIZATION";
// NOTE: this is just the default airbyte/password user's basic auth header.
private static final String AIRBYTE_BASIC_AUTH_HEADER = "Basic YWlyYnl0ZTpwYXNzd29yZA==";

private static UUID workspaceId;

private static final String TEST_ACTOR_NAME = "test-actor-name";
Expand All @@ -142,9 +144,10 @@ static void init() throws Exception {

final boolean isGke = System.getenv().containsKey(IS_GKE);
// Set up the API client.
final var underlyingApiClient = new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
final URI url = new URI(AIRBYTE_SERVER_HOST);
final var underlyingApiClient = new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api");
underlyingApiClient.setRequestInterceptor(builder -> builder.setHeader(AUTHORIZATION_HEADER, AIRBYTE_BASIC_AUTH_HEADER));
if (isGke) {
Expand All @@ -166,8 +169,9 @@ static void init() throws Exception {
testHarness = new AcceptanceTestHarness(configApiClient, workspaceId);

testHarness.ensureCleanSlate();
final URI publicApiUrl = new URI(AIRBYTE_PUBLIC_API_SERVER_HOST);
airbyteApiClient = Airbyte.builder()
.setServerURL(LOCAL_PUBLIC_API_SERVER_URL)
.setServerURL(publicApiUrl.toString())
.setSecurity(new Security() {

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import io.airbyte.test.utils.TestConnectionCreate;
import io.temporal.client.WorkflowQueryException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.time.Duration;
Expand Down Expand Up @@ -152,6 +153,7 @@ class BasicAcceptanceTests {
// NOTE: this is just a base64 encoding of a jwt representing a test user in some deployments.
private static final String AIRBYTE_AUTH_HEADER = "eyJ1c2VyX2lkIjogImNsb3VkLWFwaSIsICJlbWFpbF92ZXJpZmllZCI6ICJ0cnVlIn0K";
private static final String AIRBYTE_ACCEPTANCE_TEST_WORKSPACE_ID = "AIRBYTE_ACCEPTANCE_TEST_WORKSPACE_ID";
private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001");
private static final UUID POSTGRES_SOURCE_DEF_ID = UUID.fromString("decd338e-5647-4c0b-adf4-da0e75f5a750");
private static final UUID POSTGRES_DEST_DEF_ID = UUID.fromString("25c5221d-dce2-4163-ade9-739ef790f503");
public static final String IS_GKE = "IS_GKE";
Expand Down Expand Up @@ -192,19 +194,20 @@ static void init() throws URISyntaxException, IOException, InterruptedException,
// TODO(mfsiega-airbyte): clean up and centralize the way we do config.
final boolean isGke = System.getenv().containsKey(IS_GKE);
// Set up the API client.
final var underlyingApiClient = new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
final URI url = new URI(AIRBYTE_SERVER_HOST);
final var underlyingApiClient = new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api");
if (isGke) {
underlyingApiClient.setRequestInterceptor(builder -> builder.setHeader(GATEWAY_AUTH_HEADER, AIRBYTE_AUTH_HEADER));
}
apiClient = new AirbyteApiClient(underlyingApiClient);

// Set up the WebBackend API client.
final var underlyingWebBackendApiClient = new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
final var underlyingWebBackendApiClient = new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api");
if (isGke) {
underlyingWebBackendApiClient.setRequestInterceptor(builder -> builder.setHeader(GATEWAY_AUTH_HEADER, AIRBYTE_AUTH_HEADER));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.airbyte.test.utils.SchemaTableNamePair;
import io.airbyte.test.utils.TestConnectionCreate;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.time.Instant;
Expand Down Expand Up @@ -129,17 +130,20 @@ record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt,

private AcceptanceTestHarness testHarness;

private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001");

@BeforeAll
static void init() throws ApiException {
static void init() throws ApiException, URISyntaxException {
final URI url = new URI(AIRBYTE_SERVER_HOST);
apiClient = new AirbyteApiClient(
new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api"));
webBackendApi = new WebBackendApi(
new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api"));
// work in whatever default workspace is present.
workspaceId = apiClient.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import io.airbyte.test.utils.Databases;
import io.airbyte.test.utils.SchemaTableNamePair;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -64,6 +66,7 @@
public class ConnectorBuilderTests {

private static final String ECHO_SERVER_IMAGE = "mendhak/http-https-echo:29";
private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001");

private static AirbyteApiClient apiClient;
private static UUID workspaceId;
Expand Down Expand Up @@ -165,9 +168,10 @@ public class ConnectorBuilderTests {

@BeforeAll
static void init() throws URISyntaxException, IOException, InterruptedException, ApiException, SQLException {
final var underlyingApiClient = new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
final URI url = new URI(AIRBYTE_SERVER_HOST);
final var underlyingApiClient = new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api");
apiClient = new AirbyteApiClient(underlyingApiClient);
workspaceId = apiClient.getWorkspaceApi()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import io.airbyte.test.utils.TestConnectionCreate;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -61,19 +63,22 @@
class ContainerOrchestratorAcceptanceTests {

private static final Logger LOGGER = LoggerFactory.getLogger(ContainerOrchestratorAcceptanceTests.class);
private static final String AIRBYTE_WORKER = "airbyte-worker";
private static final String DEFAULT = "default";
private static final String AIRBYTE_WORKER = Optional.ofNullable(System.getenv("AIRBYTE_WORKER_DEPLOYMENT_NAME")).orElse("airbyte-worker");
private static final String NAMESPACE = Optional.ofNullable(System.getenv("NAMESPACE")).orElse("default");

private static AcceptanceTestHarness testHarness;
private static AirbyteApiClient apiClient;
private static KubernetesClient kubernetesClient;

private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001");

@BeforeAll
static void init() throws URISyntaxException, IOException, InterruptedException, ApiException {
final URI url = new URI(AIRBYTE_SERVER_HOST);
apiClient = new AirbyteApiClient(
new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
.setBasePath("/api"));
// work in whatever default workspace is present.
UUID workspaceId = apiClient.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId();
Expand Down Expand Up @@ -136,10 +141,10 @@ void testDowntimeDuringSync() throws Exception {
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

LOGGER.info("Scaling down worker...");
kubernetesClient.apps().deployments().inNamespace(DEFAULT).withName(AIRBYTE_WORKER).scale(0, true);
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).withName(AIRBYTE_WORKER).scale(0, true);

LOGGER.info("Scaling up worker...");
kubernetesClient.apps().deployments().inNamespace(DEFAULT).withName(AIRBYTE_WORKER).scale(1);
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).withName(AIRBYTE_WORKER).scale(1);

waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
}
Expand Down Expand Up @@ -170,8 +175,8 @@ void testCancelSyncWithInterruption() throws Exception {
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

kubernetesClient.apps().deployments().inNamespace(DEFAULT).withName(AIRBYTE_WORKER).scale(0, true);
kubernetesClient.apps().deployments().inNamespace(DEFAULT).withName(AIRBYTE_WORKER).scale(1);
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).withName(AIRBYTE_WORKER).scale(0, true);
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).withName(AIRBYTE_WORKER).scale(1);

final var resp = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()));
assertEquals(JobStatus.CANCELLED, resp.getJob().getStatus());
Expand Down Expand Up @@ -209,7 +214,7 @@ void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception {
Thread.sleep(1000);

LOGGER.info("Scale down workers...");
kubernetesClient.apps().deployments().inNamespace(DEFAULT).withName(AIRBYTE_WORKER).scale(0, true);
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).withName(AIRBYTE_WORKER).scale(0, true);

LOGGER.info("Starting background cancellation request...");
final var pool = Executors.newSingleThreadExecutor();
Expand All @@ -229,7 +234,7 @@ void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception {
Thread.sleep(2000);

LOGGER.info("Scaling up workers...");
kubernetesClient.apps().deployments().inNamespace(DEFAULT).withName(AIRBYTE_WORKER).scale(1);
kubernetesClient.apps().deployments().inNamespace(NAMESPACE).withName(AIRBYTE_WORKER).scale(1);

LOGGER.info("Waiting for cancellation to go into effect...");
assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus());
Expand Down
Loading

0 comments on commit fe5f344

Please sign in to comment.