From 7cdd4764624353a7b56193ad60e0b72e32eed97f Mon Sep 17 00:00:00 2001 From: Ben Church Date: Thu, 15 Aug 2024 07:17:27 -0700 Subject: [PATCH] feat: add ai server proxy to builder server (#13467) --- .../ConnectorBuilderController.java | 17 ++++- .../exceptions/AssistProxyException.kt | 67 +++++++++++++++++++ .../handlers/AssistProxyHandler.kt | 34 ++++++++++ .../requester/assist/AssistConfiguration.kt | 17 +++++ .../assist/AssistConfigurationImpl.kt | 39 +++++++++++ .../requester/assist/AssistProxy.kt | 66 ++++++++++++++++++ .../src/main/openapi/openapi.yaml | 31 ++++++++- .../src/main/resources/application.yml | 2 + ...ectorBuilderControllerIntegrationTest.java | 5 +- .../ConnectorBuilderControllerTest.java | 6 +- .../values.yaml | 3 +- 11 files changed, 281 insertions(+), 6 deletions(-) create mode 100644 airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/exceptions/AssistProxyException.kt create mode 100644 airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/handlers/AssistProxyHandler.kt create mode 100644 airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfiguration.kt create mode 100644 airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfigurationImpl.kt create mode 100644 airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistProxy.kt diff --git a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/controllers/ConnectorBuilderController.java b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/controllers/ConnectorBuilderController.java index 98d4a84c3fd..c022ebc2c3a 100644 --- a/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/controllers/ConnectorBuilderController.java +++ b/airbyte-connector-builder-server/src/main/java/io/airbyte/connector_builder/controllers/ConnectorBuilderController.java @@ -16,6 +16,7 @@ import io.airbyte.connector_builder.api.model.generated.ResolveManifestRequestBody; import io.airbyte.connector_builder.api.model.generated.StreamRead; import io.airbyte.connector_builder.api.model.generated.StreamReadRequestBody; +import io.airbyte.connector_builder.handlers.AssistProxyHandler; import io.airbyte.connector_builder.handlers.ConnectorContributionHandler; import io.airbyte.connector_builder.handlers.HealthHandler; import io.airbyte.connector_builder.handlers.ResolveManifestHandler; @@ -30,6 +31,7 @@ import io.micronaut.scheduling.annotation.ExecuteOn; import io.micronaut.security.annotation.Secured; import io.micronaut.security.rules.SecurityRule; +import java.util.Map; /** * Micronaut controller that defines the behavior for all endpoints related to building and testing @@ -43,15 +45,18 @@ public class ConnectorBuilderController implements V1Api { private final StreamHandler streamHandler; private final ResolveManifestHandler resolveManifestHandler; private final ConnectorContributionHandler connectorContributionHandler; + private final AssistProxyHandler assistProxyHandler; public ConnectorBuilderController(final HealthHandler healthHandler, final ResolveManifestHandler resolveManifestHandler, final StreamHandler streamHandler, - final ConnectorContributionHandler connectorContributionHandler) { + final ConnectorContributionHandler connectorContributionHandler, + final AssistProxyHandler assistProxyHandler) { this.healthHandler = healthHandler; this.streamHandler = streamHandler; this.resolveManifestHandler = resolveManifestHandler; this.connectorContributionHandler = connectorContributionHandler; + this.assistProxyHandler = assistProxyHandler; } @Override @@ -99,4 +104,14 @@ public ResolveManifest resolveManifest(@Body final ResolveManifestRequestBody re return resolveManifestHandler.resolveManifest(resolveManifestRequestBody); } + @Override + @Post(uri = "/v1/assist/v1/process", + consumes = MediaType.APPLICATION_JSON, + produces = MediaType.APPLICATION_JSON) + @Secured({AUTHENTICATED_USER}) + @ExecuteOn(TaskExecutors.IO) + public Map assistV1Process(@Body final Map requestBody) { + return assistProxyHandler.process(requestBody); + } + } diff --git a/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/exceptions/AssistProxyException.kt b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/exceptions/AssistProxyException.kt new file mode 100644 index 00000000000..4dec1c0b4a3 --- /dev/null +++ b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/exceptions/AssistProxyException.kt @@ -0,0 +1,67 @@ +@file:Suppress("ktlint:standard:package-name") + +package io.airbyte.connector_builder.exceptions + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.commons.server.errors.KnownException + +class AssistProxyException(private var responseCode: Int, jsonBody: JsonNode) : + KnownException(getStringFromResponse(jsonBody), getThrowableFromResponse(jsonBody)) { + override fun getHttpCode(): Int { + return responseCode + } +} + +fun getStringFromResponse(jsonBody: JsonNode): String { + if (jsonBody.has("message")) { + return jsonBody.get("message").asText() + } + return "Unknown AI Assist error" +} + +fun getThrowableFromResponse(jsonBody: JsonNode): Throwable? { + if (jsonBody.has("exceptionStack")) { + val message = getStringFromResponse(jsonBody) + val givenStack = jsonBody.get("exceptionStack") + val givenClassName = jsonBody.get("exceptionClassName")?.asText() ?: "Python" + val stackTrace = convertToStackTrace(givenStack, givenClassName) ?: return null + + val throwable = Throwable(message) + throwable.stackTrace = stackTrace + return throwable + } + return null +} + +fun convertToStackTrace( + exceptionStack: JsonNode, + exceptionClassName: String, +): Array? { + if (!exceptionStack.isArray) return null + + // exceptionStack is an array of strings from python + return exceptionStack.mapIndexed { index, stackLine -> + val stackTraceParts = stackLine.asText().split(":") + val (fileName, lineNumber, functionName) = parseStackTraceParts(stackTraceParts, index) + StackTraceElement(exceptionClassName, functionName, fileName, lineNumber) + }.toTypedArray() +} + +private fun parseStackTraceParts( + parts: List, + index: Int, +): Triple { + return when (parts.size) { + 3 -> Triple(parts[0], parseLineNumber(parts[1], index), parts[2]) + 2 -> Triple(parts[0], parseLineNumber(parts[1], index), "unknown_function") + 1 -> Triple("unknown_file.py", index + 1, parts[0]) + else -> Triple("unknown_file.py", index + 1, "unknown_function") + } +} + +private fun parseLineNumber( + lineNumber: String, + index: Int, +): Int { + return lineNumber.toIntOrNull() ?: (index + 1) +} diff --git a/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/handlers/AssistProxyHandler.kt b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/handlers/AssistProxyHandler.kt new file mode 100644 index 00000000000..1e37409a88e --- /dev/null +++ b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/handlers/AssistProxyHandler.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ +@file:Suppress("ktlint:standard:package-name") + +package io.airbyte.connector_builder.handlers + +import io.airbyte.commons.json.Jsons +import io.airbyte.connector_builder.exceptions.ConnectorBuilderException +import io.airbyte.connector_builder.requester.assist.AssistConfiguration +import io.airbyte.connector_builder.requester.assist.AssistProxy +import jakarta.inject.Inject +import jakarta.inject.Singleton + +/** + * Proxy to the Assist API. + */ +@Singleton +class AssistProxyHandler + @Inject + constructor(private val proxyConfig: AssistConfiguration) { + /** + * Call the Assistant to get connector data + */ + @Throws(ConnectorBuilderException::class) + fun process(requestBody: Map): Map { + val path = "/v1/process" + val proxy = AssistProxy(this.proxyConfig) + + val jsonBody = Jsons.jsonNode(requestBody) + val result = proxy.post(path, jsonBody) + return Jsons.`object`(result, Map::class.java) as Map + } + } diff --git a/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfiguration.kt b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfiguration.kt new file mode 100644 index 00000000000..2d64c2d0e18 --- /dev/null +++ b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfiguration.kt @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ +@file:Suppress("ktlint:standard:package-name") + +package io.airbyte.connector_builder.requester.assist + +import java.io.IOException +import java.net.HttpURLConnection + +/** + * Proxy to the Assist Service. Blocks until the job completes. + */ +interface AssistConfiguration { + @Throws(IOException::class) + fun getConnection(path: String): HttpURLConnection +} diff --git a/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfigurationImpl.kt b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfigurationImpl.kt new file mode 100644 index 00000000000..3ec3a87b6c9 --- /dev/null +++ b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistConfigurationImpl.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ +@file:Suppress("ktlint:standard:package-name") + +package io.airbyte.connector_builder.requester.assist + +import io.micronaut.context.annotation.Value +import jakarta.inject.Singleton +import org.jooq.tools.StringUtils +import java.io.IOException +import java.net.HttpURLConnection +import java.net.MalformedURLException +import java.net.ProtocolException +import java.net.URL + +/** + * Construct and send requests to the CDK's Connector Builder handler. + */ +@Singleton +class AssistConfigurationImpl( + @Value("\${airbyte.connector-builder-server.ai-assist.url-base}") private val targetApiBaseUrl: String, +) : AssistConfiguration { + @Throws(IOException::class) + override fun getConnection(path: String): HttpURLConnection { + if (StringUtils.isBlank(targetApiBaseUrl)) { + throw RuntimeException("Assist Service URL is not set.") + } + try { + val url = URL("$targetApiBaseUrl$path") + val connection = url.openConnection() as HttpURLConnection + return connection + } catch (e: ProtocolException) { + throw RuntimeException(e) + } catch (e: MalformedURLException) { + throw RuntimeException(e) + } + } +} diff --git a/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistProxy.kt b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistProxy.kt new file mode 100644 index 00000000000..17e3a1f6f6c --- /dev/null +++ b/airbyte-connector-builder-server/src/main/kotlin/io/airbyte/connector_builder/requester/assist/AssistProxy.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ +@file:Suppress("ktlint:standard:package-name") + +package io.airbyte.connector_builder.requester.assist + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import io.airbyte.connector_builder.exceptions.AssistProxyException +import io.airbyte.connector_builder.exceptions.ConnectorBuilderException +import java.io.IOException +import java.io.InputStreamReader + +class AssistProxy(private val proxyConfig: AssistConfiguration) { + fun post( + path: String, + jsonBody: JsonNode?, + ): JsonNode { + val connection = proxyConfig.getConnection(path) + connection.apply { + requestMethod = "POST" + setRequestProperty("Content-Type", "application/json") + doOutput = true + } + + connection.outputStream.use { outputStream -> + objectMapper.writeValue(outputStream, jsonBody) + } + val responseCode: Int + val jsonResponse: JsonNode + + try { + responseCode = connection.responseCode + val inputStream = + if (responseCode in 200..299) { + connection.inputStream + } else { + connection.errorStream + } + + jsonResponse = + inputStream.use { inputStream -> + InputStreamReader(inputStream, "utf-8").use { reader -> + reader.readText().let { + objectMapper.readTree(it) + } + } + } + } catch (e: IOException) { + throw ConnectorBuilderException("AI Assist processing error", e) + } finally { + connection.disconnect() + } + + if (responseCode !in 200..299) { + throw AssistProxyException(responseCode, jsonResponse) + } + + return jsonResponse + } + + companion object { + private val objectMapper = ObjectMapper() + } +} diff --git a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml index 3788951d334..59d3d03fd0d 100644 --- a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml +++ b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml @@ -108,6 +108,29 @@ paths: $ref: "#/components/responses/ExceptionResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/assist/v1/process: + post: + tags: + - connectorBuilderServer + summary: Assist server access point + operationId: assistV1Process + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/AssistV1ProcessRequestBody" + required: true + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/AssistV1Process" + "400": + $ref: "#/components/responses/ExceptionResponse" + "422": + $ref: "#/components/responses/InvalidInputResponse" /v1/health: get: tags: @@ -123,7 +146,6 @@ paths: $ref: "#/components/schemas/HealthCheckRead" # This route is unsecured for external monitoring. security: [] - components: securitySchemes: bearerAuth: @@ -373,6 +395,12 @@ components: manifest: type: object description: The config-based connector manifest contents with $refs and $parameters resolved + AssistV1ProcessRequestBody: + type: object + additionalProperties: true + AssistV1Process: + type: object + additionalProperties: true HealthCheckRead: type: object required: @@ -437,7 +465,6 @@ components: type: array items: $ref: "#/components/schemas/InvalidInputProperty" - responses: InvalidInputResponse: description: Input failed validation diff --git a/airbyte-connector-builder-server/src/main/resources/application.yml b/airbyte-connector-builder-server/src/main/resources/application.yml index 939173dcd0f..52f7b79fe43 100644 --- a/airbyte-connector-builder-server/src/main/resources/application.yml +++ b/airbyte-connector-builder-server/src/main/resources/application.yml @@ -53,6 +53,8 @@ airbyte: connector-builder-server: github: airbyte-pat-token: ${BUILDER_GITHUB_AIRBYTE_PAT_TOKEN:} + ai-assist: + url-base: ${AI_ASSIST_URL_BASE:} acceptance: test: enabled: ${ACCEPTANCE_TEST_ENABLED:false} diff --git a/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java b/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java index 67aa43e8d57..d2a2ffab5f7 100644 --- a/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java +++ b/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java @@ -23,6 +23,7 @@ import io.airbyte.connector_builder.exceptions.CdkUnknownException; import io.airbyte.connector_builder.exceptions.ConnectorBuilderException; import io.airbyte.connector_builder.file_writer.MockAirbyteFileWriterImpl; +import io.airbyte.connector_builder.handlers.AssistProxyHandler; import io.airbyte.connector_builder.handlers.ConnectorContributionHandler; import io.airbyte.connector_builder.handlers.HealthHandler; import io.airbyte.connector_builder.handlers.ResolveManifestHandler; @@ -67,6 +68,7 @@ class ConnectorBuilderControllerIntegrationTest { private MockAirbyteFileWriterImpl writer; private AirbyteStreamFactory streamFactory; private ContributionTemplates contributionTemplates; + private AssistProxyHandler assistProxyHandler; @BeforeEach void setup() { @@ -74,6 +76,7 @@ void setup() { this.writer = new MockAirbyteFileWriterImpl(); this.streamFactory = VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory(); this.contributionTemplates = new ContributionTemplates(); + this.assistProxyHandler = mock(AssistProxyHandler.class); } @BeforeAll @@ -101,7 +104,7 @@ ConnectorBuilderController createControllerWithSynchronousRunner( this.writer, this.streamFactory, shouldThrow, exitCode, inputStream, errorStream, outputStream); final AirbyteCdkRequesterImpl requester = new AirbyteCdkRequesterImpl(commandRunner); return new ConnectorBuilderController(this.healthHandler, new ResolveManifestHandler(requester), new StreamHandler(requester), - new ConnectorContributionHandler(contributionTemplates, null)); + new ConnectorContributionHandler(contributionTemplates, null), this.assistProxyHandler); } @Test diff --git a/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerTest.java b/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerTest.java index 00c74a90f49..7d374f03e7c 100644 --- a/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerTest.java +++ b/airbyte-connector-builder-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerTest.java @@ -15,6 +15,7 @@ import io.airbyte.connector_builder.api.model.generated.StreamRead; import io.airbyte.connector_builder.api.model.generated.StreamReadRequestBody; import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException; +import io.airbyte.connector_builder.handlers.AssistProxyHandler; import io.airbyte.connector_builder.handlers.ConnectorContributionHandler; import io.airbyte.connector_builder.handlers.HealthHandler; import io.airbyte.connector_builder.handlers.ResolveManifestHandler; @@ -33,6 +34,7 @@ class ConnectorBuilderControllerTest { private ResolveManifestRequestBody resolveManifestRequestBody; private ResolveManifest resolveManifest; private ConnectorContributionHandler connectorContributionHandler; + private AssistProxyHandler assistProxyHandler; @BeforeEach void setup() { @@ -45,9 +47,11 @@ void setup() { this.resolveManifestRequestBody = mock(ResolveManifestRequestBody.class); this.resolveManifest = mock(ResolveManifest.class); this.connectorContributionHandler = mock(ConnectorContributionHandler.class); + this.assistProxyHandler = mock(AssistProxyHandler.class); this.controller = - new ConnectorBuilderController(this.healthHandler, this.resolveManifestHandler, this.streamHandler, this.connectorContributionHandler); + new ConnectorBuilderController(this.healthHandler, this.resolveManifestHandler, this.streamHandler, this.connectorContributionHandler, + this.assistProxyHandler); } @Test diff --git a/charts/airbyte-connector-builder-server/values.yaml b/charts/airbyte-connector-builder-server/values.yaml index 835bfa890fc..917a20397b7 100644 --- a/charts/airbyte-connector-builder-server/values.yaml +++ b/charts/airbyte-connector-builder-server/values.yaml @@ -178,7 +178,8 @@ secrets: {} # env_vars: # AIRBYTE_VERSION: 0.40.4 -env_vars: {} +env_vars: + AI_ASSIST_URL_BASE: "https://builder-ai-production-t4klofs23a-wm.a.run.app/api" gsm: BUILDER_GITHUB_AIRBYTE_PAT_TOKEN: builder-github-airbyte-pat-token