diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 27954a68f8..9367c48089 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -688,6 +688,7 @@ mod test { "transforms": Array [ Object { "name": String("my-name"), + "shuffle": String("any"), "source": String("usageB/CollectionA"), }, ], diff --git a/crates/agent/src/publications/test_resources/happy_path.sql b/crates/agent/src/publications/test_resources/happy_path.sql index 8af24e1be4..af13e55c73 100644 --- a/crates/agent/src/publications/test_resources/happy_path.sql +++ b/crates/agent/src/publications/test_resources/happy_path.sql @@ -24,7 +24,8 @@ p4 as ( "transforms":[ { "name": "my-name", - "source": "usageB/CollectionA" + "source": "usageB/CollectionA", + "shuffle": "any" } ] } diff --git a/examples/acmeBank.flow.yaml b/examples/acmeBank.flow.yaml index 20bce05c0e..37a9425de1 100644 --- a/examples/acmeBank.flow.yaml +++ b/examples/acmeBank.flow.yaml @@ -119,6 +119,7 @@ collections: partitions: include: outcome: ["approve"] + shuffle: any lambda: | -- Map approved transfers to a credit for the recipient and a debit for the sender. select $recipient as account, $amount as balance @@ -193,4 +194,4 @@ tests: documents: - { account: alice, balance: 35 } - { account: bob, balance: 5 } - - { account: deposit, balance: -40 } \ No newline at end of file + - { account: deposit, balance: -40 } diff --git a/examples/citi-bike/last-seen.flow.yaml b/examples/citi-bike/last-seen.flow.yaml index 60bf58e807..14276ea534 100644 --- a/examples/citi-bike/last-seen.flow.yaml +++ b/examples/citi-bike/last-seen.flow.yaml @@ -18,4 +18,5 @@ collections: transforms: - name: locationFromRide source: { name: examples/citi-bike/rides } + shuffle: any lambda: select $bike_id, $end as last; diff --git a/examples/citi-bike/rides.flow.yaml b/examples/citi-bike/rides.flow.yaml index 439fabfbf9..d57c1fed43 100644 --- a/examples/citi-bike/rides.flow.yaml +++ b/examples/citi-bike/rides.flow.yaml @@ -10,3 +10,4 @@ collections: transforms: - name: fromCsvRides source: { name: examples/citi-bike/csv-rides } + shuffle: any diff --git a/examples/citi-bike/stations.flow.yaml b/examples/citi-bike/stations.flow.yaml index 75cab21300..b51e242013 100644 --- a/examples/citi-bike/stations.flow.yaml +++ b/examples/citi-bike/stations.flow.yaml @@ -14,5 +14,5 @@ collections: module: stations.flow.ts transforms: - name: ridesAndMoves - source: - name: examples/citi-bike/rides-and-relocations + source: examples/citi-bike/rides-and-relocations + shuffle: any diff --git a/examples/stock-stats/flow.yaml b/examples/stock-stats/flow.yaml index 7cabf36a01..35d20cb76a 100644 --- a/examples/stock-stats/flow.yaml +++ b/examples/stock-stats/flow.yaml @@ -27,6 +27,7 @@ collections: source: name: stock/ticks partitions: { include: { exchange: [NYSE, NASDAQ] } } + shuffle: any tests: # TODO this is a pretty silly test, and is the result of my messing around diff --git a/go/bindings/.snapshots/TestBuildCatalog-all-collections b/go/bindings/.snapshots/TestBuildCatalog-all-collections index b9b2923fc4..1e59c1f7d6 100644 --- a/go/bindings/.snapshots/TestBuildCatalog-all-collections +++ b/go/bindings/.snapshots/TestBuildCatalog-all-collections @@ -1,6 +1,6 @@ ([]*flow.CollectionSpec) (len=4) { (*flow.CollectionSpec)(name:"a/collection" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1collection/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"},\"a_val\":{\"type\":\"integer\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections: exists:MUST > > projections: > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > ), - (*flow.CollectionSpec)(name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections: exists:MUST > > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > derivation: exists:MUST > > projections: > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector: > exclude:<> > lambda_config_json:"null" journal_read_suffix:"derive/a/derivation/swizzle" > shuffle_key_types:STRING shard_template: labels: labels: labels: labels: labels: > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template: labels: labels: labels: labels: > fragment: > flags:4 max_append_rate:4194304 > > ), + (*flow.CollectionSpec)(name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections: exists:MUST > > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > derivation: exists:MUST > > projections: > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector: > exclude:<> > lambda_config_json:"null" journal_read_suffix:"derive/a/derivation/swizzle" > shard_template: labels: labels: labels: labels: labels: > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template: labels: labels: labels: labels: > fragment: > flags:4 max_append_rate:4194304 > > ), (*flow.CollectionSpec)(name:"ops.us-central1.v1/logs" write_schema_json:"{\"$defs\":{\"__flowInline1\":{\"$id\":\"builtin://flow/ops-catalog/ops-shard-schema.json\",\"$schema\":\"https://json-schema.org/draft-07/schema\",\"description\":\"Identifies a specific shard of a task, which may be the source of a log message or metrics\",\"properties\":{\"keyBegin\":{\"description\":\"The inclusive beginning of the shard's assigned key range\",\"pattern\":\"[0-9a-f]{8}\",\"type\":\"string\"},\"kind\":{\"description\":\"The type of the catalog task\",\"enum\":[\"capture\",\"derivation\",\"materialization\"]},\"name\":{\"description\":\"The name of the catalog task (without the task type prefix)\",\"type\":\"string\"},\"rClockBegin\":{\"description\":\"The inclusive beginning of the shard's assigned rClock range\",\"pattern\":\"[0-9a-f]{8}\",\"type\":\"string\"}},\"required\":[\"kind\",\"name\",\"keyBegin\",\"rClockBegin\"],\"title\":\"Flow shard id\",\"type\":\"object\"}},\"$id\":\"builtin://flow/ops-catalog/ops-log-schema.json\",\"$schema\":\"https://json-schema.org/draft-07/schema\",\"description\":\"Logs related to the processing of a Flow capture, derivation, or materialization\",\"properties\":{\"fields\":{\"description\":\"Map of keys and values that are associated with this log entry.\",\"type\":\"object\"},\"level\":{\"enum\":[\"error\",\"warn\",\"info\",\"debug\",\"trace\"]},\"message\":{\"type\":\"string\"},\"shard\":{\"$ref\":\"ops-shard-schema.json\"},\"ts\":{\"description\":\"Timestamp corresponding to the start of the transaction\",\"format\":\"date-time\",\"type\":\"string\"}},\"required\":[\"shard\",\"ts\",\"level\"],\"title\":\"Flow task logs\",\"type\":\"object\"}" key:"/shard/name" key:"/shard/keyBegin" key:"/shard/rClockBegin" key:"/ts" uuid_ptr:"/_meta/uuid" partition_fields:"kind" partition_fields:"name" projections: > projections: > projections: description:"The type of the catalog task" exists:MUST > > projections: exists:MUST > > projections: exists:MAY > > projections: description:"The name of the catalog task (without the task type prefix)" exists:MUST > > projections: > projections: description:"The inclusive beginning of the shard's assigned key range" exists:MUST > > projections: description:"The type of the catalog task" exists:MUST > > projections: description:"The name of the catalog task (without the task type prefix)" exists:MUST > > projections: description:"The inclusive beginning of the shard's assigned rClock range" exists:MUST > > projections: description:"Timestamp corresponding to the start of the transaction" exists:MUST > > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > ), (*flow.CollectionSpec)(name:"ops.us-central1.v1/stats" write_schema_json:"{\"$defs\":{\"__flowInline1\":{\"$id\":\"builtin://flow/ops-catalog/ops-shard-schema.json\",\"$schema\":\"https://json-schema.org/draft-07/schema\",\"description\":\"Identifies a specific shard of a task, which may be the source of a log message or metrics\",\"properties\":{\"keyBegin\":{\"description\":\"The inclusive beginning of the shard's assigned key range\",\"pattern\":\"[0-9a-f]{8}\",\"type\":\"string\"},\"kind\":{\"description\":\"The type of the catalog task\",\"enum\":[\"capture\",\"derivation\",\"materialization\"]},\"name\":{\"description\":\"The name of the catalog task (without the task type prefix)\",\"type\":\"string\"},\"rClockBegin\":{\"description\":\"The inclusive beginning of the shard's assigned rClock range\",\"pattern\":\"[0-9a-f]{8}\",\"type\":\"string\"}},\"required\":[\"kind\",\"name\",\"keyBegin\",\"rClockBegin\"],\"title\":\"Flow shard id\",\"type\":\"object\"},\"docsAndBytes\":{\"properties\":{\"bytesTotal\":{\"default\":0,\"description\":\"Total number of bytes representing the JSON encoded documents\",\"reduce\":{\"strategy\":\"sum\"},\"type\":\"integer\"},\"docsTotal\":{\"default\":0,\"description\":\"Total number of documents\",\"reduce\":{\"strategy\":\"sum\"},\"type\":\"integer\"}},\"reduce\":{\"strategy\":\"merge\"},\"required\":[\"docsTotal\",\"bytesTotal\"],\"type\":\"object\"},\"transformStats\":{\"description\":\"Stats for a specific transform of a derivation, which will have an update, publish, or both.\",\"properties\":{\"input\":{\"$ref\":\"#/$defs/docsAndBytes\",\"description\":\"The input documents that were fed into this transform.\"},\"source\":{\"description\":\"The name of the collection that this transform sources from\",\"type\":\"string\"}},\"reduce\":{\"strategy\":\"merge\"},\"required\":[\"input\"],\"type\":\"object\"}},\"$id\":\"builtin://flow/ops-catalog/ops-stats-schema.json\",\"$schema\":\"https://json-schema.org/draft-07/schema\",\"description\":\"Statistics related to the processing of a Flow capture, derivation, or materialization\",\"oneOf\":[{\"required\":[\"capture\"]},{\"required\":[\"derive\"]},{\"required\":[\"materialize\"]}],\"properties\":{\"capture\":{\"additionalProperties\":{\"properties\":{\"out\":{\"$ref\":\"#/$defs/docsAndBytes\"},\"right\":{\"$ref\":\"#/$defs/docsAndBytes\",\"description\":\"Documents fed into the combiner from the source\"}},\"reduce\":{\"strategy\":\"merge\"},\"type\":\"object\"},\"description\":\"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.\",\"reduce\":{\"strategy\":\"merge\"},\"type\":\"object\"},\"derive\":{\"properties\":{\"out\":{\"$ref\":\"#/$defs/docsAndBytes\"},\"published\":{\"$ref\":\"#/$defs/docsAndBytes\"},\"transforms\":{\"additionalProperties\":{\"$ref\":\"#/$defs/transformStats\"},\"description\":\"A map of each transform (transform name, not collection name) to stats for that transform\",\"reduce\":{\"strategy\":\"merge\"},\"type\":\"object\"}},\"reduce\":{\"strategy\":\"merge\"},\"type\":\"object\"},\"materialize\":{\"additionalProperties\":{\"properties\":{\"left\":{\"$ref\":\"#/$defs/docsAndBytes\"},\"out\":{\"$ref\":\"#/$defs/docsAndBytes\"},\"right\":{\"$ref\":\"#/$defs/docsAndBytes\"}},\"reduce\":{\"strategy\":\"merge\"},\"type\":\"object\"},\"description\":\"A map of each binding source (collection name) to combiner stats for that binding\",\"reduce\":{\"strategy\":\"merge\"},\"type\":\"object\"},\"openSecondsTotal\":{\"description\":\"Total time that the transaction was open before starting to commit\",\"reduce\":{\"strategy\":\"sum\"},\"type\":\"number\"},\"shard\":{\"$ref\":\"ops-shard-schema.json\"},\"ts\":{\"description\":\"Timestamp corresponding to the start of the transaction\",\"format\":\"date-time\",\"type\":\"string\"},\"txnCount\":{\"description\":\"Total number of transactions represented by this stats document\",\"reduce\":{\"strategy\":\"sum\"},\"type\":\"integer\"}},\"reduce\":{\"strategy\":\"merge\"},\"required\":[\"shard\",\"ts\",\"txnCount\",\"openSecondsTotal\"],\"title\":\"Flow task stats\",\"type\":\"object\"}" key:"/shard/name" key:"/shard/keyBegin" key:"/shard/rClockBegin" key:"/ts" uuid_ptr:"/_meta/uuid" partition_fields:"kind" partition_fields:"name" projections: > projections: > projections: > projections: > projections: > projections: > projections: > projections: > projections: > projections: > projections: description:"The type of the catalog task" exists:MUST > > projections: > projections: description:"The name of the catalog task (without the task type prefix)" exists:MUST > > projections: > projections: > projections: description:"The inclusive beginning of the shard's assigned key range" exists:MUST > > projections: description:"The type of the catalog task" exists:MUST > > projections: description:"The name of the catalog task (without the task type prefix)" exists:MUST > > projections: description:"The inclusive beginning of the shard's assigned rClock range" exists:MUST > > projections: description:"Timestamp corresponding to the start of the transaction" exists:MUST > > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > ) } diff --git a/go/bindings/.snapshots/TestBuildCatalog-one-derivation b/go/bindings/.snapshots/TestBuildCatalog-one-derivation index b7f6dc347b..2cefbaa15e 100644 --- a/go/bindings/.snapshots/TestBuildCatalog-one-derivation +++ b/go/bindings/.snapshots/TestBuildCatalog-one-derivation @@ -1 +1 @@ -(*flow.CollectionSpec)(name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections: exists:MUST > > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > derivation: exists:MUST > > projections: > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector: > exclude:<> > lambda_config_json:"null" journal_read_suffix:"derive/a/derivation/swizzle" > shuffle_key_types:STRING shard_template: labels: labels: labels: labels: labels: > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template: labels: labels: labels: labels: > fragment: > flags:4 max_append_rate:4194304 > > ) +(*flow.CollectionSpec)(name:"a/derivation" write_schema_json:"{\"$id\":\"file:///build.flow.yaml?ptr=/collections/a~1derivation/schema\",\"properties\":{\"a_key\":{\"type\":\"string\"}},\"required\":[\"a_key\"],\"type\":\"object\"}" key:"/a_key" uuid_ptr:"/_meta/uuid" projections: exists:MUST > > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > derivation: exists:MUST > > projections: > projections: > ack_template_json:"{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}" partition_template: labels: labels: labels: > fragment: path_postfix_template:"utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}" > flags:4 max_append_rate:4194304 > > partition_selector: > exclude:<> > lambda_config_json:"null" journal_read_suffix:"derive/a/derivation/swizzle" > shard_template: labels: labels: labels: labels: labels: > ring_buffer_size:65536 read_channel_size:131072 > recovery_log_template: labels: labels: labels: labels: > fragment: > flags:4 max_append_rate:4194304 > > ) diff --git a/go/bindings/testdata/build.flow.yaml b/go/bindings/testdata/build.flow.yaml index 85d5f20190..fab725800c 100644 --- a/go/bindings/testdata/build.flow.yaml +++ b/go/bindings/testdata/build.flow.yaml @@ -30,8 +30,8 @@ collections: } transforms: - name: swizzle - source: - name: a/collection + source: a/collection + shuffle: any captures: example/capture: diff --git a/go/flow/testdata/specs_test.flow.yaml b/go/flow/testdata/specs_test.flow.yaml index 70891ab7ca..28912dc892 100644 --- a/go/flow/testdata/specs_test.flow.yaml +++ b/go/flow/testdata/specs_test.flow.yaml @@ -37,7 +37,8 @@ collections: } transforms: - name: swizzle - source: { name: example/collection } + source: example/collection + shuffle: any shards: # Expect this is reflected in shard specs. maxTxnDuration: 60s diff --git a/go/shuffle/testdata/ab.flow.yaml b/go/shuffle/testdata/ab.flow.yaml index 3f94e41143..34cda9d8a1 100644 --- a/go/shuffle/testdata/ab.flow.yaml +++ b/go/shuffle/testdata/ab.flow.yaml @@ -19,12 +19,12 @@ collections: derive: using: - sqlite: - migrations: [] + sqlite: {} transforms: # Note that our test depends on this transform being validated as `readOnly`. - name: swizzle source: a/collection + shuffle: any lambda: SELECT 1; storageMappings: diff --git a/ops-catalog/template-common.flow.yaml b/ops-catalog/template-common.flow.yaml index 88e2b174f1..f69be8b7e1 100644 --- a/ops-catalog/template-common.flow.yaml +++ b/ops-catalog/template-common.flow.yaml @@ -77,5 +77,5 @@ collections: module: catalog-stats-rollup.ts transforms: - name: fromOps.us-central1.v1 - source: - name: ops.us-central1.v1/catalog-stats-L1 + source: ops.us-central1.v1/catalog-stats-L1 + shuffle: any diff --git a/site/docs/concepts/bank/balances.flow.yaml b/site/docs/concepts/bank/balances.flow.yaml index 03e214bac3..d28b628713 100644 --- a/site/docs/concepts/bank/balances.flow.yaml +++ b/site/docs/concepts/bank/balances.flow.yaml @@ -14,3 +14,4 @@ collections: partitions: include: outcome: [approve] + shuffle: any diff --git a/site/docs/concepts/bank/grouped.flow.yaml b/site/docs/concepts/bank/grouped.flow.yaml index e97c648aa0..033674dcec 100644 --- a/site/docs/concepts/bank/grouped.flow.yaml +++ b/site/docs/concepts/bank/grouped.flow.yaml @@ -22,9 +22,11 @@ collections: transforms: - name: enrichAndAddToWindow source: acmeBank/transfers + shuffle: { key: [/sender] } lambda: enrichAndAddToWindow.sql - name: removeFromWindow source: acmeBank/transfers + shuffle: { key: [/sender] } readDelay: 24h lambda: DELETE FROM transfers WHERE id = $id; diff --git a/site/docs/concepts/bank/last-large-send.flow.yaml b/site/docs/concepts/bank/last-large-send.flow.yaml index c28c3c538e..8ffc6b2698 100644 --- a/site/docs/concepts/bank/last-large-send.flow.yaml +++ b/site/docs/concepts/bank/last-large-send.flow.yaml @@ -9,4 +9,5 @@ collections: transforms: - name: filterTransfers source: acmeBank/transfers + shuffle: any lambda: SELECT $id, $sender, $recipient, $amount WHERE $amount > 100;