Skip to content

Commit

Permalink
update all tests for new shuffle: any semantics
Browse files Browse the repository at this point in the history
Fix a logic bug in the grouped-transfers example, which needs to shuffle
on /sender (and not transfer /id).
  • Loading branch information
jgraettinger committed May 19, 2023
1 parent 4195c95 commit 10c1520
Show file tree
Hide file tree
Showing 16 changed files with 24 additions and 13 deletions.
1 change: 1 addition & 0 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ mod test {
"transforms": Array [
Object {
"name": String("my-name"),
"shuffle": String("any"),
"source": String("usageB/CollectionA"),
},
],
Expand Down
3 changes: 2 additions & 1 deletion crates/agent/src/publications/test_resources/happy_path.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ p4 as (
"transforms":[
{
"name": "my-name",
"source": "usageB/CollectionA"
"source": "usageB/CollectionA",
"shuffle": "any"
}
]
}
Expand Down
3 changes: 2 additions & 1 deletion examples/acmeBank.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ collections:
partitions:
include:
outcome: ["approve"]
shuffle: any
lambda: |
-- Map approved transfers to a credit for the recipient and a debit for the sender.
select $recipient as account, $amount as balance
Expand Down Expand Up @@ -193,4 +194,4 @@ tests:
documents:
- { account: alice, balance: 35 }
- { account: bob, balance: 5 }
- { account: deposit, balance: -40 }
- { account: deposit, balance: -40 }
1 change: 1 addition & 0 deletions examples/citi-bike/last-seen.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ collections:
transforms:
- name: locationFromRide
source: { name: examples/citi-bike/rides }
shuffle: any
lambda: select $bike_id, $end as last;
1 change: 1 addition & 0 deletions examples/citi-bike/rides.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ collections:
transforms:
- name: fromCsvRides
source: { name: examples/citi-bike/csv-rides }
shuffle: any
4 changes: 2 additions & 2 deletions examples/citi-bike/stations.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ collections:
module: stations.flow.ts
transforms:
- name: ridesAndMoves
source:
name: examples/citi-bike/rides-and-relocations
source: examples/citi-bike/rides-and-relocations
shuffle: any
1 change: 1 addition & 0 deletions examples/stock-stats/flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ collections:
source:
name: stock/ticks
partitions: { include: { exchange: [NYSE, NASDAQ] } }
shuffle: any

tests:
# TODO this is a pretty silly test, and is the result of my messing around
Expand Down
2 changes: 1 addition & 1 deletion go/bindings/.snapshots/TestBuildCatalog-all-collections

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go/bindings/.snapshots/TestBuildCatalog-one-derivation
Original file line number Diff line number Diff line change
@@ -1 +1 @@
(*flow.CollectionSpec)(name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/derivation" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > derivation:<connector_type:TYPESCRIPT config_json:"{\"module\":\"import { IDerivation, Document, SourceSwizzle } from 'flow/a/derivation.ts';\\n\\nexport class Derivation extends IDerivation {\\n swizzle(source: { doc: SourceSwizzle }): Document[] {\\n const doc = source.doc;\\n return [{a_key: doc.a_key.repeat(doc.a_val ? doc.a_val : 1)}];\\n }\\n}\\n\"}" transforms:<name:"swizzle" collection:<name:"a/collection" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1collection/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"},\"a_val\":{\"type\":\"integer\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<ptr:"/a_val" field:"a_val" inference:<types:"integer" exists:MAY > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/collection" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/collection" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector:<include:<labels:<name:"estuary.dev/collection" value:"a/collection" > > exclude:<> > lambda_config_json:"null" journal_read_suffix:"derive/a/derivation/swizzle" > shuffle_key_types:STRING shard_template:<id:"derivation/a/derivation" recovery_log_prefix:"recovery" hint_prefix:"/estuary/flow/hints" hint_backups:2 max_txn_duration:<seconds:1 > labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/log-level" value:"info" > labels:<name:"estuary.dev/task-name" value:"a/derivation" > labels:<name:"estuary.dev/task-type" value:"derivation" > > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template:<name:"recovery/derivation/a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-gazette-recoverylog" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/task-name" value:"a/derivation" > labels:<name:"estuary.dev/task-type" value:"derivation" > > fragment:<length:268435456 compression_codec:SNAPPY stores:"s3://a-bucket/" refresh_interval:<seconds:300 > > flags:4 max_append_rate:4194304 > > )
(*flow.CollectionSpec)(name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/derivation" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > derivation:<connector_type:TYPESCRIPT config_json:"{\"module\":\"import { IDerivation, Document, SourceSwizzle } from 'flow/a/derivation.ts';\\n\\nexport class Derivation extends IDerivation {\\n swizzle(source: { doc: SourceSwizzle }): Document[] {\\n const doc = source.doc;\\n return [{a_key: doc.a_key.repeat(doc.a_val ? doc.a_val : 1)}];\\n }\\n}\\n\"}" transforms:<name:"swizzle" collection:<name:"a/collection" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1collection/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"},\"a_val\":{\"type\":\"integer\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections:<ptr:"/a_key" field:"a_key" is_primary_key:true inference:<types:"string" string:<> exists:MUST > > projections:<ptr:"/a_val" field:"a_val" inference:<types:"integer" exists:MAY > > projections:<field:"flow_document" inference:<types:"object" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template:<name:"a/collection" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-ndjson" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/collection" value:"a/collection" > > fragment:<length:536870912 compression_codec:GZIP stores:"s3://a-bucket/" refresh_interval:<seconds:300 > path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector:<include:<labels:<name:"estuary.dev/collection" value:"a/collection" > > exclude:<> > lambda_config_json:"null" journal_read_suffix:"derive/a/derivation/swizzle" > shard_template:<id:"derivation/a/derivation" recovery_log_prefix:"recovery" hint_prefix:"/estuary/flow/hints" hint_backups:2 max_txn_duration:<seconds:1 > labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/log-level" value:"info" > labels:<name:"estuary.dev/task-name" value:"a/derivation" > labels:<name:"estuary.dev/task-type" value:"derivation" > > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template:<name:"recovery/derivation/a/derivation" replication:3 labels:<labels:<name:"app.gazette.dev/managed-by" value:"estuary.dev/flow" > labels:<name:"content-type" value:"application/x-gazette-recoverylog" > labels:<name:"estuary.dev/build" value:"fixture" > labels:<name:"estuary.dev/task-name" value:"a/derivation" > labels:<name:"estuary.dev/task-type" value:"derivation" > > fragment:<length:268435456 compression_codec:SNAPPY stores:"s3://a-bucket/" refresh_interval:<seconds:300 > > flags:4 max_append_rate:4194304 > > )
4 changes: 2 additions & 2 deletions go/bindings/testdata/build.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ collections:
}
transforms:
- name: swizzle
source:
name: a/collection
source: a/collection
shuffle: any

captures:
example/capture:
Expand Down
3 changes: 2 additions & 1 deletion go/flow/testdata/specs_test.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ collections:
}
transforms:
- name: swizzle
source: { name: example/collection }
source: example/collection
shuffle: any
shards:
# Expect this is reflected in shard specs.
maxTxnDuration: 60s
Expand Down
4 changes: 2 additions & 2 deletions go/shuffle/testdata/ab.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ collections:

derive:
using:
sqlite:
migrations: []
sqlite: {}
transforms:
# Note that our test depends on this transform being validated as `readOnly`.
- name: swizzle
source: a/collection
shuffle: any
lambda: SELECT 1;

storageMappings:
Expand Down
4 changes: 2 additions & 2 deletions ops-catalog/template-common.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ collections:
module: catalog-stats-rollup.ts
transforms:
- name: fromOps.us-central1.v1
source:
name: ops.us-central1.v1/catalog-stats-L1
source: ops.us-central1.v1/catalog-stats-L1
shuffle: any
1 change: 1 addition & 0 deletions site/docs/concepts/bank/balances.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ collections:
partitions:
include:
outcome: [approve]
shuffle: any
2 changes: 2 additions & 0 deletions site/docs/concepts/bank/grouped.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ collections:
transforms:
- name: enrichAndAddToWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
lambda: enrichAndAddToWindow.sql

- name: removeFromWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
readDelay: 24h
lambda: DELETE FROM transfers WHERE id = $id;
1 change: 1 addition & 0 deletions site/docs/concepts/bank/last-large-send.flow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ collections:
transforms:
- name: filterTransfers
source: acmeBank/transfers
shuffle: any
lambda: SELECT $id, $sender, $recipient, $amount WHERE $amount > 100;

0 comments on commit 10c1520

Please sign in to comment.