Skip to content

Commit

Permalink
[data ingestion] initial draft version of the docs (#16260)
Browse files Browse the repository at this point in the history
Co-authored-by: Ronny Roland <[email protected]>
  • Loading branch information
phoenix-o and ronny-mysten authored Feb 21, 2024
1 parent 53237e5 commit a8fc78d
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 1 deletion.
134 changes: 134 additions & 0 deletions docs/content/guides/developer/advanced/custom-indexer.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
---
title: Custom Indexer
description: You can build custom indexers using the Sui micro-data ingestion framework. To create an indexer, you subscribe to a checkpoint stream with full checkpoint content. Establishing a custom indexer helps improve latency, allows pruning the data of your Sui Full node, and provides efficient assemblage of checkpoint data.
---

You can build custom indexers using the Sui micro-data ingestion framework. To create an indexer, you subscribe to a checkpoint stream with full checkpoint content. This stream can be one of the publicly available streams from Mysten Labs, one that you set up in your local environment, or a combination of the two.

Establishing a custom indexer helps improve latency, allows pruning the data of your Sui Full node, and provides efficient assemblage of checkpoint data.

## Interface and data format

To use the framework, implement a basic interface:

```rust
#[async_trait]
trait Worker: Send + Sync {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()>;
}
```

In this example, the `CheckpointData` struct represents full checkpoint content. The struct contains checkpoint summary and contents, as well as detailed information about each individual transaction. The individual transaction data includes events and input/output objects. The full definition for this content is in the [full_checkpoint_content.rs](https://github.com/MystenLabs/sui/blob/releases/sui-graphql-rpc-v2024.1.0-release/crates/sui-types/src/full_checkpoint_content.rs) file of the `sui-types` crate.


## Checkpoint stream sources

Data ingestion for your indexer supports several checkpoint stream sources.

### Remote reader

The most straightforward stream source is to subscribe to a remote store of checkpoint contents. Mysten Labs provides the following buckets:

- Testnet: `https://checkpoints.testnet.sui.io`
- Mainnet: `https://checkpoints.mainnet.sui.io`

```mermaid
flowchart LR
A("fa:fa-cloud Cloud storage (S3, GCP)");
B[("fa:fa-gears
Indexer
daemon")];
B-->A;
B<-->C("fa:fa-floppy-disk Progress store");
subgraph External
D("fa:fa-database Postgres");
E("fa:fa-database BigQuery");
F("fa:fa-database S3");
end
B-->External
```

### Local reader

Colocate the data ingestion daemon with a Full node and enable checkpoint dumping on the latter to set up a local stream source. After enabling, the Full node starts dumping executed checkpoints as files to a local directory, and the data ingestion daemon subscribes to changes in the directory through an inotify-like mechanism. This approach allows minimizing ingestion latency (checkpoint are processed immediately after a checkpoint executor on a Full node) and getting rid of dependency on an externally managed bucket.

To enable, add the following to your [Full node configuration](../../operator/sui-full-node.mdx) file:

```yaml
checkpoint-executor-config:
checkpoint-execution-max-concurrency: 200
local-execution-timeout-sec: 30
data-ingestion-dir: <path to a local directory>
```
```mermaid
flowchart LR
subgraph Sui
A("fa:fa-server Full node binary");
A-->B("fa:fa-folder Local directory");
B-->C[("fa:fa-gears
Indexer
daemon")];
end
subgraph cloud[Cloud storage]
pg(fa:fa-cloud Postgres);
bq(fa:fa-cloud BigQuery);
s1(fa:fa-cloud S3);
end
C-->cloud;
C<-->D("fa:fa-floppy-disk Progress store");
```


### Hybrid mode

Specify both a local and remote store as a fallback to ensure constant data flow. The framework always prioritizes locally available checkpoint data over remote data. It's useful when you want to start utilizing your own Full node for data ingestion but need to partially backfill historical data or just have a failover.


## Example

Here's a basic example of a custom data ingestion worker.

```rust
struct CustomWorker;

#[async_trait]
impl Worker for CustomWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
// custom processing logic
...
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let (exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());
let progress_store = FileProgressStore::new("path_to_file");
let mut executor = IndexerExecutor::new(progress_store, 1 /* number of workflow types */, metrics);
let worker_pool = WorkerPool::new(CustomWorker, "custom worker", 100);
executor.register(worker_pool).await?;
executor.run(
PathBuf::from("..."), // path to a local directory
Some("https://checkpoints.mainnet.sui.io".to_string()),
vec![], // optional remote store access options
exit_receiver,
).await?;
Ok(())
}
```

Let's highlight a couple lines of code:

```rust
let worker_pool = WorkerPool::new(CustomWorker, "custom worker", 100);
executor.register(worker_pool).await?;
```

The data ingestion executor can run multiple workflows simultaneously. For each workflow, you need to create a separate worker pool and register it in the executor. The `WorkerPool` requires an instance of the `Worker` trait, the name of the workflow (which is used for tracking the progress of the flow in the progress store and metrics), and concurrency.

The concurrency parameter specifies how many threads the workflow uses. Having a concurrency value greater than 1 is helpful when tasks are idempotent and can be processed in parallel and out of order. The executor only updates the progress/watermark to a certain checkpoint when all preceding checkpoints are processed.

Find more examples of custom ingestion pipelines in the [Sui repository](https://github.com/MystenLabs/sui/tree/main/crates/sui-data-ingestion/src/workers).

2 changes: 1 addition & 1 deletion docs/content/sidebars/guides.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ const guides = [
},*/
'guides/developer/advanced/asset-tokenization',
'guides/developer/advanced/graphql-migration',
//'guides/developer/advanced/custom-indexers',
'guides/developer/advanced/custom-indexer',
],
},
{
Expand Down
4 changes: 4 additions & 0 deletions docs/site/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ const config = {
"sha384-odtC+0UGzzFL/6PNoE8rX/SPcQDXBJ+uRepguP4QkPCm2LBxH3FA3y+fKSiJ+AmM",
crossorigin: "anonymous",
},
{
href: "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.1/css/all.min.css",
type: "text/css",
},
],
themes: ["@docusaurus/theme-mermaid"],
themeConfig:
Expand Down

0 comments on commit a8fc78d

Please sign in to comment.