Skip to content

Commit

Permalink
Merge branch 'main' into tab/remove-schema-registry
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Oct 11, 2024
2 parents 74cbf76 + ce65186 commit 61d92b0
Show file tree
Hide file tree
Showing 42 changed files with 269 additions and 329 deletions.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,4 @@ VoteNotify
SharedMergeTree
JWT
TOML
WebHDFS
4 changes: 2 additions & 2 deletions docs/guides/ingest-from-postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
);
```

Syntax for creating a CDC table. Note that a primary key is required and must be consistent with the upstream table.
Syntax for creating a CDC table on top of this CDC Source. Note that a primary key is required and must be consistent with the upstream table. We must also specify the Postgres table name (`pg_table_name`) which we are selecting from.

```sql
CREATE TABLE [ IF NOT EXISTS ] table_name (
Expand All @@ -183,7 +183,7 @@ CREATE TABLE [ IF NOT EXISTS ] table_name (
WITH (
snapshot='true'
)
FROM source TABLE table_name;
FROM source TABLE pg_table_name;
```

To check the progress of backfilling historical data, find the corresponding internal table using the [`SHOW INTERNAL TABLES`](/sql/commands/sql-show-internal-tables.md) command and query from it.
Expand Down
57 changes: 57 additions & 0 deletions docs/guides/sink-to-aws-s3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
---
id: sink-to-aws-s3
title: Sink data to AWS S3
description: Describes how to sink data to AWS S3.
slug: /sink-to-aws-s3
---
<head>
<link rel="canonical" href="https://docs.risingwave.com/docs/current/sink-to-aws-s3/" />
</head>

This guide describes how to sink data from RisingWave to Amazon S3 sink using S3 connector in RisingWave.

[Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) is an object storage service that offers industry-leading scalability, data availability, security, and performance.

## Syntax

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='s3',
connector_parameter = 'value', ...
);
```

## Parameters

| Parameter names | Description |
|-|-|
| connector | Required. Support the S3 connector only.|
| s3.region_name | Required. The service region. |
| s3.bucket_name | Required. The name of the bucket where the sink data is stored in. |
| s3.path | Required. The directory where the sink file is located.|
| s3.credentials.access | Optional. The access key ID of AWS. |
| s3.credentials.secret | Optional. The secret access key of AWS. |
| s3.endpoint_url | Optional. The host URL for an S3-compatible object storage server. This allows users to use a different server instead of the standard S3 server.|
| s3.assume_role | Optional. Specifies the ARN of an IAM role to assume when accessing S3. It allows temporary, secure access to S3 resources without sharing long-term credentials. |
| type | Required. Defines the type of the sink. Options include `append-only` or `upsert`.|

## Example

```sql
CREATE SINK s3_sink AS SELECT v1
FROM t
WITH (
connector='s3',
s3.path = '<test_path>',
s3.region_name = '<region_name>',
s3.bucket_name = '<bucket_name>',
s3.credentials.account_name = '<account_name>',
s3.credentials.account_key = '<account_key>',
s3.endpoint_url = '<endpoint_url>',
type = 'append-only',
)FORMAT PLAIN ENCODE PARQUET(force_append_only=true);
```

For more information about encode `Parquet`, see [Sink data in parquet format](/data-delivery.md#sink-data-in-parquet-format).
52 changes: 52 additions & 0 deletions docs/guides/sink-to-google-cloud-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
id: sink-to-google-cloud-storage
title: Sink data to Google Cloud Storage
description: Describes how to sink data to Google Cloud Storage.
slug: /sink-to-google-cloud-storage
---
<head>
<link rel="canonical" href="https://docs.risingwave.com/docs/current/sink-to-google-cloud-storage/" />
</head>

This guide describes how to sink data from RisingWave to Google Cloud Storage sink using GCS connector in RisingWave.

[Google Cloud Storage](https://cloud.google.com/storage/docs) is a RESTful online file storage web service for storing and accessing data on Google Cloud Platform infrastructure.

## Syntax

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='gcs',
connector_parameter = 'value', ...
);
```

## Parameters

| Parameter names | Description |
|-|-|
| connector | Required. Support the GCS connector only.|
| gcs.bucket_name | Required. The name of the bucket where the sink data is stored in. |
| gcs.credential | Required. Base64-encoded credential key obtained from the GCS service account key JSON file. To get this JSON file, refer to the [guides of GCS documentation](https://cloud.google.com/iam/docs/keys-create-delete#iam-service-account-keys-create-console). To encode it in base64, run the following command: <code>cat ~/Downloads/rwc-byoc-test-464bdd851bce.json &#124; base64 -b 0 &#124; pbcopy</code>, and then paste the output as the value for this parameter. If this field is not specified, ADC (application default credentials) will be used.|
| gcs.service_account| Optional. The service account of the GCS sink. If `gcs.credential` or ADC is not specified, the credentials will be derived from the service account.|
| gcs.path | Required. The directory where the sink file is located. |
| type | Required. Defines the type of the sink. Options include `append-only` or `upsert`. |

## Example

```sql
CREATE SINK gcs_sink AS SELECT v1
FROM t1
WITH (
connector='gcs',
gcs.path = '<test_path>',
gcs.bucket_name = '<bucket_name>',
gcs.credential = '<account_name>',
gcs.service_account = '<service_account>'
type = 'append-only',
)FORMAT PLAIN ENCODE PARQUET(force_append_only=true);
```

For more information about encode `Parquet`, see [Sink data in parquet format](/data-delivery.md#sink-data-in-parquet-format).
1 change: 1 addition & 0 deletions docs/guides/sink-to-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ WITH (
| catalog.url | Conditional. The URL of the catalog. It is required when `catalog.type` is not `storage`. |
| primary_key | The primary key for an upsert sink. It is only applicable to the upsert mode. |
| commit_checkpoint_interval | Optional. Commit every N checkpoints (N > 0). Default value is 10. <br/>The behavior of this field also depends on the `sink_decouple` setting:<ul><li>If `sink_decouple` is true (the default), the default value of `commit_checkpoint_interval` is 10.</li> <li>If `sink_decouple` is set to false, the default value of `commit_checkpoint_interval` is 1.</li> <li>If `sink_decouple` is set to false and `commit_checkpoint_interval` is set to larger than 1, an error will occur.</li></ul> |
| create_table_if_not_exists | Optional. When set to `true`, it will automatically create a table for the Iceberg sink. |

## Data type mapping

Expand Down
12 changes: 7 additions & 5 deletions docs/guides/sink-to-nats.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
---
id: sink-to-nats
title: Sink data to NATS
description: Sink data from RisingWave to NATS.
title: Sink data to NATS and NATS JetStream
description: Sink data from RisingWave to NATS and NATS JetStream.
slug: /sink-to-nats
---
This guide describes how to sink data from RisingWave to NATS subjects using the NATS sink connector in RisingWave.
This guide describes how to sink data from RisingWave to NATS subjects using the NATS sink connector in RisingWave. Both NATS and NATS JetStream are supported by this connector.

[NATS](https://nats.io/) is an open source messaging system for cloud native applications. It provides a lightweight publish-subscribe architecture for high performance messaging.

[NATS](https://nats.io/) is an open source messaging system for cloud native applications. It provides a lightweight publish-subscribe architecture for high performance messaging.

Expand All @@ -14,10 +16,10 @@ This feature is in the public preview stage, meaning it's nearing the final prod

## Prerequisites

Before sinking data from RisingWave to NATS, please ensure the following:
Before sinking data from RisingWave to NATS or NATSJetStream, please ensure the following:

- The RisingWave cluster is running.
- A NATS server is running and accessible from your RisingWave cluster.
- A NATS or NATSJetStream server is running and accessible from your RisingWave cluster.
- Create a NATS subject that you want to sink data to.
- You have the permission to publish data to the NATS subject.

Expand Down
46 changes: 46 additions & 0 deletions docs/guides/sink-to-webhdfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
id: sink-to-webhdfs
title: Sink data to WebHDFS
description: Describes how to sink data to WebHDFS.
slug: /sink-to-webhdfs
---
<head>
<link rel="canonical" href="https://docs.risingwave.com/docs/current/sink-to-webhdfs/" />
</head>

This guide describes how to sink data from RisingWave to WebHDFS.

As a workaround for HDFS, WebHDFS allows external clients to execute Hadoop file system operations without necessarily running on the Hadoop cluster itself. Therefore, it reduces the dependency on the Hadoop environment when using HDFS.

## Syntax

```sql
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='webhdfs',
connector_parameter = 'value', ...
);
```

## Parameters

| Parameter names | Description |
|-|-|
| connector | Required. Support the WebHDFS connector only. |
| webhdfs.endpoint | Required. The endpoint for the WebHDFS service. |
| webhdfs.path | Required. The directory where the sink file is located. |
| type | Required. Defines the type of the sink. Options include `append-only` or `upsert`. |

## Example

```sql
CREATE SINK webhdfs_sink AS SELECT v1
FROM t1
WITH (
connector='webhdfs',
webhdfs.path = '<test_path>',
webhdfs.endpoint = '<test_endpoint>',
type = 'append-only',
);
```
8 changes: 5 additions & 3 deletions docs/ingest/data-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,17 @@ WITH (
) FORMAT PLAIN ENCODE JSON;
```
The statement will create a streaming job that continuously ingests data from the Kafka topic to the table and the data will be stored in RisingWave's internal storage, which brings three benefits:
The statement will create a streaming job that continuously ingests data from the Kafka topic to the table and the data will be stored in RisingWave's internal storage, which brings the following benefits:

1. **Improved ad-hoc query performance:** When users execute queries such as `SELECT * FROM table_on_kafka`, the query engine will directly access the data from RisingWave's internal storage, eliminating unnecessary network overhead and avoiding read pressure on upstream systems. Additionally, users can create [indexes](/transform/indexes.md) on the table to accelerate queries.
2. **Allow defining primary keys:** With the help of its internal storage, RisingWave can efficiently maintain primary key constraints. Users can define a primary key on a specific column of the table and define different behaviors for primary key conflicts with [ON CONFLICT clause](/sql/commands/sql-create-table.md#pk-conflict-behavior).
3. **Ability to handle delete/update changes**: Based on the definition of primary keys, RisingWave can efficiently process upstream synchronized delete and update operations. For systems that synchronize delete/update operations from external systems, such as database's CDC, we **do not** allow creating a source on it but require a table with connectors.
3. **Ability to handle delete/update changes**: Based on the definition of primary keys, RisingWave can efficiently process upstream synchronized delete and update operations. For systems that synchronize delete/update operations from external systems, such as database's CDC and UPSERT format messages from message queues, we **do not** allow creating a source on it but require a table with connectors.

At the same time, like regular tables, tables with connectors also accept DML statements and [CREATE SINK INTO TABLE](/sql/commands/sql-create-sink-into.md), which provides greater flexibility.
4. **Stronger consistency guarantee**: When using a table with connectors, all downstream jobs will be guaranteed to have a consistent view of the data persisted in the table; while for source, different jobs may see inconsistent results due to different ingestion speed or data retention in the external system.

5. **Greater flexibility**: Like regular tables, you can use DML statements like [`INSERT`](/sql/commands/sql-insert.md), [`UPDATE`](/sql/commands/sql-update.md) and [`DELETE`](/sql/commands/sql-delete.md) to insert or modify data in tables with connectors, and use [CREATE SINK INTO TABLE](/sql/commands/sql-create-sink-into.md) to merge other data streams into the table.

## DML on tables

Expand Down
2 changes: 1 addition & 1 deletion docs/ingest/ingest-from-gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ FORMAT data_format ENCODE data_encode (
|Field|Notes|
|---|---|
|gcs.bucket_name |Required. The name of the bucket the data source is stored in. |
|gcs.credential|Optional. The base64 encoded credential key. This key is obtained from the GCS service account key JSON file, and should be encoded with base64. To get this JSON file, refer to the [guides of GCS documentation](https://cloud.google.com/iam/docs/keys-create-delete#iam-service-account-keys-create-console). To encode it with base64, run the following command: <code>cat ~/Downloads/rwc-byoc-test-464bdd851bce.json &#124; base64 -b 0 &#124; pbcopy</code>, and then paste the output as the value for this parameter. If this field is not specified, ADC (application default credentials) will be used. |
|gcs.credential|Optional. Base64-encoded credential key obtained from the GCS service account key JSON file. To get this JSON file, refer to the [guides of GCS documentation](https://cloud.google.com/iam/docs/keys-create-delete#iam-service-account-keys-create-console). To encode it in base64, run the following command: <code>cat ~/Downloads/rwc-byoc-test-464bdd851bce.json &#124; base64 -b 0 &#124; pbcopy</code>, and then paste the output as the value for this parameter. If this field is not specified, ADC (application default credentials) will be used. |
|gcs.service_account|Optional. The service account of the target GCS source. If `gcs.credential` or ADC is not specified, the credentials will be derived from the service account.|
|match_pattern| Conditional. This field is used to find object keys in the bucket that match the given pattern. Standard Unix-style [glob](https://en.wikipedia.org/wiki/Glob_(programming)) syntax is supported. |
|compression_format|Optional. This field specifies the compression format of the file being read. You can define `compression_format` in the `CREATE TABLE` statement. When set to `gzip` or `gz`, the file reader reads all files with the .gz suffix. When set to `None` or not defined, the file reader will automatically read and decompress .gz and .gzip files.|
Expand Down
130 changes: 0 additions & 130 deletions docs/ingest/ingest-from-upstash-kafka.md

This file was deleted.

Loading

0 comments on commit 61d92b0

Please sign in to comment.