Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/Luishfs/connectors into bac…
Browse files Browse the repository at this point in the history
…kup/source-google-analytics-data-api
  • Loading branch information
Luishfs committed Apr 10, 2024
2 parents 60ea9bd + a54358b commit 76df21a
Show file tree
Hide file tree
Showing 181 changed files with 26,133 additions and 1,094 deletions.
9 changes: 8 additions & 1 deletion .github/actions/deploy/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ runs:
shell: bash
run: sudo apt install postgresql

- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
connector:
- "${{ inputs.connector }}/**"
- name: Refresh connector tags for ${{ inputs.connector }}
if: ${{ github.event_name == 'push' }}
if: github.event_name == 'push' && steps.filter.outputs.connector == 'true'
shell: bash
env:
PGDATABASE: ${{ inputs.pg_database }}
Expand Down
9 changes: 8 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,15 @@ jobs:
sudo apt update
sudo apt install postgresql
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
connector:
- "${{ matrix.connector }}/**"
- name: Refresh connector tags for ${{ matrix.connector }}
if: ${{ github.event_name == 'push' }}
if: github.event_name == 'push' && steps.filter.outputs.connector == 'true'
env:
PGHOST: ${{ secrets.POSTGRES_CONNECTOR_REFRESH_HOST }}
PGUSER: ${{ secrets.POSTGRES_CONNECTOR_REFRESH_USER }}
Expand Down
11 changes: 11 additions & 0 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ on:
- "source-hubspot-native/**"
- "source-hubspot/**"
- "source-google-analytics-data-api/**"
- "source-notion/**"
- "source-linkedin-pages/**"
pull_request:
branches: [main]
paths:
Expand All @@ -27,6 +29,8 @@ on:
- "source-hubspot-native/**"
- "source-hubspot/**"
- "source-google-analytics-data-api/**"
- "source-notion/**"
- "source-linkedin-pages/**"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down Expand Up @@ -77,6 +81,13 @@ jobs:
- name: source-google-analytics-data-api
type: capture
version: v3
- name: source-notion
type: capture
version: v2
usage_rate: "1.0"
- name: source-linkedin-pages
type: capture
version: v1
usage_rate: "1.0"

steps:
Expand Down
1 change: 1 addition & 0 deletions build-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ docker buildx build \
--platform linux/amd64 \
--build-arg CONNECTOR_NAME="$1" \
--build-arg CONNECTOR_TYPE="$CONNECTOR_TYPE" \
--build-arg="USAGE_RATE=1.0" \
--load \
-t ghcr.io/estuary/"$1":local \
-f "$DOCKERFILE" \
Expand Down
3 changes: 2 additions & 1 deletion estuary-cdk/estuary_cdk/capture/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class Backfill(BaseModel, extra="forbid"):
description="LogCursor at which incremental replication began"
)
next_page: PageCursor = Field(
description="PageCursor of the next page to fetch"
description="PageCursor of the next page to fetch",
default=None
)

class Snapshot(BaseModel, extra="forbid"):
Expand Down
64 changes: 54 additions & 10 deletions estuary-cdk/estuary_cdk/shim_airbyte_cdk.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass
from logging import Logger
from pydantic import Field
from typing import Any, ClassVar, Annotated, Callable, Awaitable, Literal
from typing import Any, ClassVar, Annotated, Callable, Awaitable, List, Literal
import logging
import os

Expand Down Expand Up @@ -78,14 +78,51 @@ class ResourceState(common.BaseResourceState, extra="forbid"):
"""state is a dict encoding of AirbyteStateMessage"""


ConnectorState = common.ConnectorState[ResourceState]
"""Use the common.ConnectorState shape with ResourceState"""
class ConnectorState(common.ConnectorState[ResourceState], extra="ignore"):
"""ConnectorState represents a number of ResourceStates, keyed by binding state key.
Top-level fields other than bindingStateV1 are ignored, to allow for a lossy migration from
states that existed prior to adopting this convection. Connectors transitioning in this way will
effectively start over from the beginning.
"""

bindingStateV1: dict[str, ResourceState] = {}


class Document(common.BaseDocument, extra="allow"):
pass


def escape_field(field: str) -> str:
return field.replace("~", "~0").replace("/", "~1")


def transform_airbyte_key(key: str | List[str] | List[List[str]]) -> List[str]:
key_fields: List[str] = []

if isinstance(key, str):
# key = "piz/za"
# key_fields: ["piz~1za"]
key_fields = [escape_field(key)]
elif isinstance(key, list):
for component in key:
if isinstance(component, str):
# key = ["piz/za", "par~ty"]
# key_fields: ["piz~1za", "pa~0rty"]
key_fields.append(escape_field(component))
elif isinstance(component, list):
# key = [["pizza", "che/ese"], "potato"]
# Implies a document like {"potato": 12, "pizza": {"che/ese": 5}}
# key_fields: ["pizza/che~1ese", "potato"]
key_fields.append(
"/".join((escape_field(field) for field in component))
)
else:
raise ValueError(f"Invalid key component: {component}")

return key_fields


@dataclass
class CaptureShim(BaseCaptureConnector):
delegate: AirbyteSource
Expand Down Expand Up @@ -124,11 +161,8 @@ async def _all_resources(
if stream.source_defined_primary_key:
# Map array of array of property names into an array of JSON pointers.
key = [
"/"
+ "/".join(
p.replace("~", "~0").replace("/", "~1") for p in component
)
for component in stream.source_defined_primary_key
"/" + key
for key in transform_airbyte_key(stream.source_defined_primary_key)
]
elif resource_config.sync_mode == "full_refresh":
# Synthesize a key based on the record's order within each stream refresh.
Expand Down Expand Up @@ -176,13 +210,17 @@ async def spec(self, log: Logger, _: request.Spec) -> ConnectorSpec:
)

async def discover(
self, log: Logger, discover: request.Discover[EndpointConfig],
self,
log: Logger,
discover: request.Discover[EndpointConfig],
) -> response.Discovered:
resources = await self._all_resources(log, discover.config)
return common.discovered(resources)

async def validate(
self, log: Logger, validate: request.Validate[EndpointConfig, ResourceConfig],
self,
log: Logger,
validate: request.Validate[EndpointConfig, ResourceConfig],
) -> response.Validated:

result = self.delegate.check(log, validate.config)
Expand Down Expand Up @@ -237,6 +275,12 @@ async def _run(
]
airbyte_catalog = ConfiguredAirbyteCatalog(streams=airbyte_streams)

if "bindingStateV1" not in connector_state.__fields_set__:
# Initialize the top-level state object so that it is properly serialized if this is an
# "empty" state, which occurs for a brand new task that has never emitted any
# checkpoints.
connector_state.__setattr__("bindingStateV1", {})

# Index of Airbyte (namespace, stream) => ResourceState.
# Use `setdefault()` to initialize ResourceState if it's not already part of `connector_state`.
index: dict[tuple[str | None, str], tuple[int, ResourceState]] = {
Expand Down
Empty file added estuary-cdk/tests/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions estuary-cdk/tests/test_airbyte_cdk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from estuary_cdk.shim_airbyte_cdk import transform_airbyte_key

def test_transform_airbyte_key():
assert transform_airbyte_key("pizza") == ["pizza"]
assert transform_airbyte_key("piz/za") == ["piz~1za"]
assert transform_airbyte_key(["piz/za"]) == ["piz~1za"]
assert transform_airbyte_key(["piz/za", "par~ty"]) == ["piz~1za", "par~0ty"]
assert transform_airbyte_key([["pizza", "che/ese"], "potato"]) == [
"pizza/che~1ese", "potato"
]
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/go-mysql-org/go-mysql v1.5.0
github.com/go-sql-driver/mysql v1.6.0
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt/v5 v5.2.0
github.com/google/uuid v1.3.1
github.com/iancoleman/orderedmap v0.2.0
github.com/invopop/jsonschema v0.5.0
Expand All @@ -50,7 +51,7 @@ require (
github.com/segmentio/encoding v0.3.6
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed
github.com/sirupsen/logrus v1.9.0
github.com/snowflakedb/gosnowflake v1.7.2
github.com/snowflakedb/gosnowflake v1.9.0
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.16.0
github.com/trinodb/trino-go-client v0.313.0
Expand Down Expand Up @@ -91,7 +92,7 @@ require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/apache/arrow/go/v15 v15.0.0 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.1 // indirect
Expand Down Expand Up @@ -126,7 +127,6 @@ require (
github.com/go-jose/go-jose/v3 v3.0.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/glog v1.1.2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6IC
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs=
github.com/apache/arrow/go/v12 v12.0.1 h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/DiJbg=
github.com/apache/arrow/go/v12 v12.0.1/go.mod h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw=
github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw=
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/apache/arrow/go/v15 v15.0.0 h1:1zZACWf85oEZY5/kd9dsQS7i+2G5zVQcbKTHgslqHNA=
github.com/apache/arrow/go/v15 v15.0.0/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
Expand Down Expand Up @@ -849,8 +849,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/snowflakedb/gosnowflake v1.7.2 h1:HRSwva8YXC64WUppfmHcMNVVzSE1+EwXXaJxgS0EkTo=
github.com/snowflakedb/gosnowflake v1.7.2/go.mod h1:03tW856vc3ceM4rJuj7KO4dzqN7qoezTm+xw7aPIIFo=
github.com/snowflakedb/gosnowflake v1.9.0 h1:s2ZdwFxFfpqwa5CqlhnzRESnLmwU3fED6zyNOJHFBQA=
github.com/snowflakedb/gosnowflake v1.9.0/go.mod h1:4ZgHxVf2OKwecx07WjfyAMr0gn8Qj4yvwAo68Og8wsU=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down
12 changes: 9 additions & 3 deletions materialize-bigquery/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,15 @@ var bqDialect = func() sql.Dialect {
sql.STRING: sql.StringTypeMapper{
Fallback: sql.NewStaticMapper("STRING"),
WithFormat: map[string]sql.TypeMapper{
"integer": sql.NewStaticMapper("BIGNUMERIC(38,0)", sql.WithElementConverter(sql.StdStrToInt())),
// https://cloud.google.com/bigquery/docs/reference/standard-sql/conversion_functions#cast_as_floating_point
"number": sql.NewStaticMapper("FLOAT64", sql.WithElementConverter(sql.StdStrToFloat("NaN", "Infinity", "-Infinity"))),
"integer": sql.PrimaryKeyMapper{
PrimaryKey: sql.NewStaticMapper("STRING"),
Delegate: sql.NewStaticMapper("BIGNUMERIC(38,0)", sql.WithElementConverter(sql.StdStrToInt())),
},
"number": sql.PrimaryKeyMapper{
PrimaryKey: sql.NewStaticMapper("STRING"),
// https://cloud.google.com/bigquery/docs/reference/standard-sql/conversion_functions#cast_as_floating_point
Delegate: sql.NewStaticMapper("FLOAT64", sql.WithElementConverter(sql.StdStrToFloat("NaN", "Infinity", "-Infinity"))),
},
"date": sql.NewStaticMapper("DATE", sql.WithElementConverter(sql.ClampDate())),
"date-time": sql.NewStaticMapper("TIMESTAMP", sql.WithElementConverter(sql.ClampDatetime())),
},
Expand Down
10 changes: 6 additions & 4 deletions materialize-boilerplate/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ func ApplyChanges(ctx context.Context, req *pm.Request_Apply, applier Applier, i
}

newRequired := projection.Inference.Exists == pf.Inference_MUST && !slices.Contains(projection.Inference.Types, pf.JsonTypeNull)
if !existingField.Nullable && !newRequired && !existingField.HasDefault {
// The existing field is not nullable and does not have a default value, but
// the proposed projection for the field is nullable. The existing field
// will need to be modified to be made nullable.
newlyNullable := !existingField.Nullable && !newRequired
projectionHasDefault := projection.Inference.DefaultJson != nil
if newlyNullable && !existingField.HasDefault && !projectionHasDefault {
// The field has newly been made nullable and neither the existing field nor
// the projection has a default value. The existing field will need to be
// modified to be made nullable since it may need to hold null values now.
params.NewlyNullableFields = append(params.NewlyNullableFields, existingField)
}
} else {
Expand Down
42 changes: 31 additions & 11 deletions materialize-databricks/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ func (c *client) PreReqs(ctx context.Context) *sql.PrereqErr {

var httpPathSplit = strings.Split(c.cfg.HTTPPath, "/")
var warehouseId = httpPathSplit[len(httpPathSplit)-1]
var warehouseStopped = true
var warehouseErr error
if res, err := wsClient.Warehouses.GetById(ctx, warehouseId); err != nil {
errs.Err(err)
} else {
Expand All @@ -184,23 +186,41 @@ func (c *client) PreReqs(ctx context.Context) *sql.PrereqErr {
case databricksSql.StateDeleting:
errs.Err(fmt.Errorf("The selected SQL Warehouse is being deleted, please use an active SQL warehouse."))
case databricksSql.StateStarting:
errs.Err(fmt.Errorf("The selected SQL Warehouse is starting, please wait a couple of minutes before trying again."))
warehouseErr = fmt.Errorf("The selected SQL Warehouse is starting, please wait a couple of minutes before trying again.")
case databricksSql.StateStopped:
errs.Err(fmt.Errorf("The selected SQL Warehouse is stopped, please start the SQL warehouse and try again."))
warehouseErr = fmt.Errorf("The selected SQL Warehouse is stopped, please start the SQL warehouse and try again.")
case databricksSql.StateStopping:
errs.Err(fmt.Errorf("The selected SQL Warehouse is stopping, please start the SQL warehouse and try again."))
warehouseErr = fmt.Errorf("The selected SQL Warehouse is stopping, please start the SQL warehouse and try again.")
case databricksSql.StateRunning:
warehouseStopped = false
}
}

// Use a reasonable timeout for this connection test. It is not uncommon for a misconfigured
// connection (wrong host, wrong port, etc.) to hang for several minutes on Ping and we want to
// bail out well before then. Note that it is normal for Databricks warehouses to go offline
// after inactivity, and this attempt to connect to the warehouse will initiate their boot-up
// process however we don't want to wait 5 minutes as that does not create a good UX for the
// user in the UI
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
if errs.Len() > 0 {
return errs
}

if warehouseStopped {
// Use a reasonable timeout for this connection test. It is not uncommon for a misconfigured
// connection (wrong host, wrong port, etc.) to hang for several minutes on Ping and we want to
// bail out well before then. Note that it is normal for Databricks warehouses to go offline
// after inactivity, and this attempt to connect to the warehouse will initiate their boot-up
// process however we don't want to wait 5 minutes as that does not create a good UX for the
// user in the UI
if r, err := wsClient.Warehouses.Start(ctx, databricksSql.StartRequest{Id: warehouseId}); err != nil {
errs.Err(fmt.Errorf("Could not start the warehouse: %w", err))
} else if _, err := r.GetWithTimeout(60 * time.Second); err != nil {
errs.Err(warehouseErr)
}

if errs.Len() > 0 {
return errs
}
}

// We avoid running this ping if the warehouse is not awake, see
// the issue below for more information on why:
// https://github.com/databricks/databricks-sql-go/issues/198
if err := c.db.PingContext(ctx); err != nil {
// Provide a more user-friendly representation of some common error causes.
var execErr dbsqlerr.DBExecutionError
Expand Down
Loading

0 comments on commit 76df21a

Please sign in to comment.