Skip to content

Commit

Permalink
remove built-in sqlite materialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Apr 12, 2023
1 parent c35b49a commit 5fa4356
Show file tree
Hide file tree
Showing 25 changed files with 36 additions and 1,330 deletions.
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,6 @@ endif
.PHONY: catalog-test
catalog-test: data-plane-test-setup
${PKGDIR}/bin/flowctl-go test --source examples/flow.yaml $(ARGS)
# Cleanup generated SQLite DB.
# TODO(johnny): Remove with in-process SQLite.
rm examples/examples.db*

.PHONY: end-to-end-test
end-to-end-test: data-plane-test-setup
Expand Down
5 changes: 3 additions & 2 deletions examples/citi-bike/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ $ examples/citi-bike/load-rides.sh

[last-seen.flow.yaml](last-seen.flow.yaml) is a derivation that derives,
for each bike, the station it last arrived at. It's materialized into
`citi_last_seen` in examples.db.
`citi_last_seen` in a `materialize-sqlite` instance.

```console
$ sqlite examples/examples.db 'select bike_id, "last/station/name", "last/timestamp" from last_seen limit 10;
$ docker ps | grep materialize-sqlite # find the docker container name or id
$ docker exec -it <container-name> sqlite3 /tmp/sqlite.db 'select bike_id, "last/station/name", "last/timestamp" from last_seen limit 10
```

The materialization updates continuously as bikes move around the system.
Expand Down
7 changes: 3 additions & 4 deletions examples/citi-bike/views.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import:
materializations:
examples/citi-bike/views:
endpoint:
sqlite:
# Use WAL mode so that Flow-external reads (e.x. from `sqlite3`)
# don't fail with "database is locked" errors.
path: ../examples.db?_journal=WAL
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}
bindings:
- source: examples/citi-bike/stations
resource: { table: citi_stations }
Expand Down
5 changes: 3 additions & 2 deletions examples/hello-world/flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ captures:
materializations:
examples/test/views:
endpoint:
sqlite:
path: ../examples.db?_journal=WAL
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}
bindings:
- resource:
table: greetings
Expand Down
5 changes: 3 additions & 2 deletions examples/net-trace/services.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ collections:
materializations:
examples/net-trace/views:
endpoint:
sqlite:
path: ../examples.db?_journal=WAL
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}
bindings:
- source: examples/net-trace/services
resource: { table: net_services }
5 changes: 3 additions & 2 deletions examples/ops/logs.flow.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
materializations:
examples/collection-stats:
endpoint:
sqlite:
path: ../examples.db?_journal=WAL
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}
bindings:
- resource:
table: logs
Expand Down
12 changes: 7 additions & 5 deletions examples/segment/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ newline JSON:
We can range-read over user profiles from our SQLite key/value store stand-in:

```console
$ sqlite3 examples/examples.db 'SELECT flow_document FROM segment_profiles limit 5;' |
$ docker ps | grep materialize-sqlite # find the docker container name or id
$ docker exec -it <container-name> sqlite3 /tmp/sqlite.db 'SELECT flow_document FROM segment_profiles limit 5;' |
jq -c '{user: .user, segments: [.segments[] | select (.member) | .segment.name ] }'
{"user":"usr-000000","segments":["seg-0","seg-11B1","seg-178F","seg-3","seg-55","seg-65","seg-7E"]}
{"user":"usr-000001","segments":["seg-0","seg-111","seg-19","seg-275","seg-2A","seg-3","seg-331","seg-35E","seg-8","seg-B","seg-F8E"]}
Expand All @@ -118,7 +119,8 @@ $ sqlite3 examples/examples.db 'SELECT flow_document FROM segment_profiles limit
Or do a point lookup of a specific user:

```console
$ sqlite3 examples/examples.db 'SELECT flow_document FROM segment_profiles WHERE user = "usr-000fce"' | \
$ docker ps | grep materialize-sqlite # find the docker container name or id
$ docker exec -it <container-name> sqlite3 /tmp/sqlite.db 'SELECT flow_document FROM segment_profiles WHERE user = "usr-000fce"' | \
jq '.segments'
[
{
Expand Down Expand Up @@ -164,7 +166,8 @@ $ sqlite3 examples/examples.db 'SELECT flow_document FROM segment_profiles WHERE
We can range-read over members currently in a segment:

```console
$ sqlite3 examples/examples.db 'SELECT vendor, segment_name, user, first, last FROM segment_memberships WHERE member LIMIT 10;'
$ docker ps | grep materialize-sqlite # find the docker container name or id
$ docker exec -it <container-name> sqlite3 /tmp/sqlite.db 'SELECT vendor, segment_name, user, first, last FROM segment_memberships WHERE member LIMIT 10;'
1|seg-0|usr-000000|2021-03-19T15:55:31-04:00|2021-03-19T18:00:00-04:00
1|seg-0|usr-000004|2021-03-19T15:53:58-04:00|2021-03-19T18:04:26-04:00
1|seg-0|usr-000010|2021-03-19T15:54:51-04:00|2021-03-19T17:23:19-04:00
Expand Down Expand Up @@ -242,8 +245,7 @@ webhook server.

## Cleanup

Stop `flowctl-go`, remove the `flowctl_develop` runtime directory, and remove
`examples/examples.db` to complete a development session and restore
Stop `flowctl-go`, remove the `flowctl_develop` runtime directory to complete a development session and restore
to a pristine state. If you're working from a Git clone of the Flow repo,
simply `git clean -f -d`.

Expand Down
4 changes: 3 additions & 1 deletion examples/segment/flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ collections:
materializations:
examples/segment/views:
endpoint:
sqlite: { path: ../examples.db?_journal=WAL }
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}

bindings:
- resource: { table: segment_memberships }
Expand Down
4 changes: 3 additions & 1 deletion examples/soak-tests/set-ops/flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ tests:
materializations:
soak/set-ops/views:
endpoint:
sqlite: { path: ../../examples.db?_journal=WAL }
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}
bindings:
- source: soak/set-ops/sets
resource: { table: sets }
Expand Down
4 changes: 3 additions & 1 deletion examples/temp-sensors/flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ collections:
materializations:
temperature/views:
endpoint:
sqlite: { path: ../examples.db?_journal=WAL }
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}
bindings:
- source: temperature/averages
resource: { table: temperatures }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
([]*flow.MaterializationSpec) (len=1) {
(*flow.MaterializationSpec)(name:"example/materialization" connector_type:SQLITE config_json:"{\"path\":\":memory:\"}" bindings:<resource_config_json:"{\"table\":\"a_table\"}" resource_path:"a_table" collection:<name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/derivation" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector:<include:<labels:<name:"estuary.dev/collection" value:"a/derivation" > > exclude:<> > field_selection:<keys:"a_key" document:"flow_document" > journal_read_suffix:"materialize/example/materialization/a_table" > shard_template:<id:"materialize/example/materialization" recovery_log_prefix:"recovery" hint_prefix:"/estuary/flow/hints" hint_backups:2 max_txn_duration:<seconds:1 > labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/log-level" value:"info" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template:<name:"recovery/materialize/example/materialization" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-gazette-recoverylog" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > fragment:<length:268435456 compression_codec:SNAPPY stores:"s3://a-bucket/" refresh_interval:<seconds:300 > > flags:4 max_append_rate:4194304 > )
(*flow.MaterializationSpec)(name:"example/materialization" connector_type:IMAGE config_json:"{\"image\":\"ghcr.io/estuary/materialize-sqlite:dev\",\"config\":{}}" bindings:<resource_config_json:"{\"table\":\"a_table\"}" resource_path:"a_table" collection:<name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/derivation" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector:<include:<labels:<name:"estuary.dev/collection" value:"a/derivation" > > exclude:<> > field_selection:<keys:"a_key" document:"flow_document" > journal_read_suffix:"materialize/example/materialization/a_table" > shard_template:<id:"materialize/example/materialization" recovery_log_prefix:"recovery" hint_prefix:"/estuary/flow/hints" hint_backups:2 max_txn_duration:<seconds:1 > labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/expose-port" value:"8001" > labels:<name:"estuary.dev/hostname" value:"16480cc2f6ecc2c0" > labels:<name:"estuary.dev/log-level" value:"info" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template:<name:"recovery/materialize/example/materialization" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-gazette-recoverylog" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > fragment:<length:268435456 compression_codec:SNAPPY stores:"s3://a-bucket/" refresh_interval:<seconds:300 > > flags:4 max_append_rate:4194304 > network_ports:<number:8001 > )
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
(*flow.MaterializationSpec)(name:"example/materialization" connector_type:SQLITE config_json:"{\"path\":\":memory:\"}" bindings:<resource_config_json:"{\"table\":\"a_table\"}" resource_path:"a_table" collection:<name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/derivation" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector:<include:<labels:<name:"estuary.dev/collection" value:"a/derivation" > > exclude:<> > field_selection:<keys:"a_key" document:"flow_document" > journal_read_suffix:"materialize/example/materialization/a_table" > shard_template:<id:"materialize/example/materialization" recovery_log_prefix:"recovery" hint_prefix:"/estuary/flow/hints" hint_backups:2 max_txn_duration:<seconds:1 > labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/log-level" value:"info" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template:<name:"recovery/materialize/example/materialization" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-gazette-recoverylog" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > fragment:<length:268435456 compression_codec:SNAPPY stores:"s3://a-bucket/" refresh_interval:<seconds:300 > > flags:4 max_append_rate:4194304 > )
(*flow.MaterializationSpec)(name:"example/materialization" connector_type:IMAGE config_json:"{\"image\":\"ghcr.io/estuary/materialize-sqlite:dev\",\"config\":{}}" bindings:<resource_config_json:"{\"table\":\"a_table\"}" resource_path:"a_table" collection:<name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/derivation" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector:<include:<labels:<name:"estuary.dev/collection" value:"a/derivation" > > exclude:<> > field_selection:<keys:"a_key" document:"flow_document" > journal_read_suffix:"materialize/example/materialization/a_table" > shard_template:<id:"materialize/example/materialization" recovery_log_prefix:"recovery" hint_prefix:"/estuary/flow/hints" hint_backups:2 max_txn_duration:<seconds:1 > labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/expose-port" value:"8001" > labels:<name:"estuary.dev/hostname" value:"16480cc2f6ecc2c0" > labels:<name:"estuary.dev/log-level" value:"info" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template:<name:"recovery/materialize/example/materialization" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-gazette-recoverylog" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/task-name" value:"example/materialization" > labels:<name:"estuary.dev/task-type" value:"materialization" > > fragment:<length:268435456 compression_codec:SNAPPY stores:"s3://a-bucket/" refresh_interval:<seconds:300 > > flags:4 max_append_rate:4194304 > network_ports:<number:8001 > )
5 changes: 3 additions & 2 deletions go/bindings/testdata/build.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ captures:
materializations:
example/materialization:
endpoint:
sqlite:
path: ":memory:"
connector:
image: ghcr.io/estuary/materialize-sqlite:dev
config: {}
bindings:
- source: a/derivation
resource: { table: a_table }
Expand Down
20 changes: 0 additions & 20 deletions go/connector/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"io"

"github.com/estuary/flow/go/materialize/driver/sqlite"
pc "github.com/estuary/flow/go/protocols/capture"
pf "github.com/estuary/flow/go/protocols/flow"
pm "github.com/estuary/flow/go/protocols/materialize"
Expand All @@ -31,7 +30,6 @@ type Driver struct {
// The following are variants of a driver's enumeration type.
// A "remote: *grpc.ClientConn" variant may be added in the future if there's a well-defined use case.
container *Container
sqlite *sqlite.InProcessServer

// Unwrapped configuration of the endpoint.
config json.RawMessage
Expand Down Expand Up @@ -61,19 +59,6 @@ func NewDriver(
exposePorts ExposePorts,
) (*Driver, error) {

if connectorType == "SQLITE" {
var srv, err = sqlite.NewInProcessServer(ctx)
if err != nil {
return nil, err
}

return &Driver{
container: nil,
sqlite: srv,
config: config,
}, nil
}

if connectorType == "IMAGE" {
var parsedSpec = new(imageSpec)

Expand All @@ -87,7 +72,6 @@ func NewDriver(

return &Driver{
container: container,
sqlite: nil,
config: parsedSpec.Config,
}, nil
}
Expand All @@ -100,8 +84,6 @@ func NewDriver(
func (d *Driver) MaterializeClient() pm.ConnectorClient {
if d.container != nil {
return pm.NewConnectorClient(d.container.conn)
} else if d.sqlite != nil {
return d.sqlite.Client()
} else {
panic("invalid driver type for materialization")
}
Expand Down Expand Up @@ -130,8 +112,6 @@ func (d *Driver) Close() error {
var err error
if d.container != nil {
err = d.container.Stop()
} else if d.sqlite != nil {
err = d.sqlite.Stop()
}

return err
Expand Down
11 changes: 0 additions & 11 deletions go/materialize/driver/sqlite/.snapshots/TestSQLiteDriver

This file was deleted.

31 changes: 0 additions & 31 deletions go/materialize/driver/sqlite/.snapshots/TestSpecification

This file was deleted.

Loading

0 comments on commit 5fa4356

Please sign in to comment.