Skip to content

Commit

Permalink
Metadata JSON Ingest and Backfill (#123)
Browse files Browse the repository at this point in the history
* feat(metadata_json): port the backfill command. refactor to a worker struct to be used by the backfill and ingest commands. setup receiver for sending assets that need to be indexed to a redis stream.

* feat(metadata_json): add new crate for continuesly process messages and do backfill.
  • Loading branch information
kespinola authored Jan 17, 2024
1 parent 0cc0482 commit e9e0afd
Show file tree
Hide file tree
Showing 24 changed files with 939 additions and 261 deletions.
57 changes: 56 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"digital_asset_types",
"metadata_json",
"metaplex-rpc-proxy",
"nft_ingester",
"tree_backfiller",
Expand Down
66 changes: 66 additions & 0 deletions metadata_json/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[package]
name = "das-metadata-json"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]

[[bin]]
name = "das-metadata-json"

[dependencies]

backon = "0.4.1"
log = "0.4.17"
env_logger = "0.10.0"
anyhow = "1.0.75"
derive_more = "0.99.17"
redis = { version = "0.22.3", features = [
"aio",
"tokio-comp",
"streams",
"tokio-native-tls-comp",
] }
futures = { version = "0.3.25" }
indicatif = "0.17.5"
thiserror = "1.0.31"
serde_json = "1.0.81"
das-tree-backfiller = { path = "../tree_backfiller" }
tokio = { version = "1.26.0", features = ["full", "tracing"] }
sqlx = { version = "0.6.2", features = [
"macros",
"runtime-tokio-rustls",
"postgres",
"uuid",
"offline",
"json",
] }
sea-orm = { version = "0.10.6", features = [
"macros",
"runtime-tokio-rustls",
"sqlx-postgres",
"with-chrono",
"mock",
] }
sea-query = { version = "0.28.1", features = ["postgres-array"] }
chrono = "0.4.19"
cadence = "0.29.0"
cadence-macros = "0.29.0"
tokio-postgres = "0.7.7"
serde = "1.0.136"
bs58 = "0.4.0"
reqwest = "0.11.11"
plerkle_messenger = { version = "1.6.0", features = ['redis'] }
digital_asset_types = { path = "../digital_asset_types", features = [
"json_types",
"sql_types",
] }
figment = { version = "0.10.6", features = ["env", "toml", "yaml"] }
rand = "0.8.5"
url = "2.3.1"
tokio-stream = "0.1.12"
clap = { version = "4.2.2", features = ["derive", "cargo", "env"] }

[lints]
workspace = true
102 changes: 102 additions & 0 deletions metadata_json/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# DAS Metadata JSON Indexer CLI

## Overview
The DAS Metadata JSON Indexer CLI is a tool for indexing metadata JSON associated with tokens. It supports operations such as ingesting new metadata and backfilling missing metadata, as well as providing metrics and performance tuning options.

## Features
- **Ingest**: Process and index new metadata JSON files with various configuration options.
- **Backfill**: Fill in missing metadata for previously indexed tokens with configurable parameters.
- **Metrics**: Collect and send metrics to a specified host and port.

## Installation
Ensure you have Rust installed on your machine. If not, install it from [the official Rust website](https://www.rust-lang.org/).


```
cargo run --bin das-metadata-json -- --help
```

## Usage

### Ingest Command

To continuously process metadata JSON, the METADATA_JSON Redis stream is monitored. Upon reading an ID from the stream, the ingest loop lookups the corresponding asset_data using the ID within the DAS DB, fetches the metadata JSON, and then updates the asset_data record with the retrieved metadata.
```
das-metadata-json ingest [OPTIONS] --messenger-redis-url <MESSENGER_REDIS_URL> --database-url <DATABASE_URL>
```

#### Options
- `--messenger-redis-url <MESSENGER_REDIS_URL>`: The Redis URL for the messenger service.
- `--messenger-redis-batch-size <MESSENGER_REDIS_BATCH_SIZE>`: Batch size for Redis operations (default: 100).
- `--metrics-host <METRICS_HOST>`: Host for sending metrics (default: 127.0.0.1).
- `--metrics-port <METRICS_PORT>`: Port for sending metrics (default: 8125).
- `--metrics-prefix <METRICS_PREFIX>`: Prefix for metrics (default: das.backfiller).
- `--database-url <DATABASE_URL>`: The database URL.
- `--database-max-connections <DATABASE_MAX_CONNECTIONS>`: Maximum database connections (default: 125).
- `--database-min-connections <DATABASE_MIN_CONNECTIONS>`: Minimum database connections (default: 5).
- `--timeout <TIMEOUT>`: Timeout for operations in milliseconds (default: 1000).
- `--queue-size <QUEUE_SIZE>`: Size of the job queue (default: 1000).
- `--worker-count <WORKER_COUNT>`: Number of worker threads (default: 100).
- `-h, --help`: Print help information.

### Backfill Command

To backfill any `asset_data` marked for indexing with `reindex=true`:

```
das-metadata-json backfill [OPTIONS] --database-url <DATABASE_URL>
```

#### Options
- `--database-url <DATABASE_URL>`: The database URL.
- `--database-max-connections <DATABASE_MAX_CONNECTIONS>`: Maximum database connections (default: 125).
- `--database-min-connections <DATABASE_MIN_CONNECTIONS>`: Minimum database connections (default: 5).
- `--metrics-host <METRICS_HOST>`: Host for sending metrics (default: 127.0.0.1).
- `--metrics-port <METRICS_PORT>`: Port for sending metrics (default: 8125).
- `--metrics-prefix <METRICS_PREFIX>`: Prefix for metrics (default: das.backfiller).
- `--queue-size <QUEUE_SIZE>`: Size of the job queue (default: 1000).
- `--worker-count <WORKER_COUNT>`: Number of worker threads (default: 100).
- `--timeout <TIMEOUT>`: Timeout for operations in milliseconds (default: 1000).
- `--batch-size <BATCH_SIZE>`: Number of records to process in a single batch (default: 1000).
- `-h, --help`: Print help information.

## Lib

The `das-metadata-json` crate provides a `sender` module which can be integrated in a third-party service (eg `nft_ingester`) to push asset data IDs for indexing. To configure follow the steps below:

### Configuration

1. **Set up the `SenderArgs`:** Ensure that the `nft_ingester` is configured with the necessary `SenderArgs`. These arguments include the Redis URL, batch size, and the number of queue connections. For example:

```rust
let sender_args = SenderArgs {
messenger_redis_url: "redis://localhost:6379".to_string(),
messenger_redis_batch_size: "100".to_string(),
messenger_queue_connections: 5,
};
```

2. **Initialize the `SenderPool`:** Use the `try_from_config` async function to create a `SenderPool` instance from the `SenderArgs`. This will set up the necessary channels and messengers for communication.

```rust
let sender_pool = SenderPool::try_from_config(sender_args).await?;
```

3. **Push Asset Data IDs for Indexing:** With the `SenderPool` instance, you can now push asset data IDs to be indexed using the `push` method. The IDs should be serialized into a byte array before being sent. The `asset_data` record should be written to the database before pushing its ID.

```rust
let message = asset_data.id;

sender_pool.push(&message).await?;
```

Within the `nft_ingester`, the `sender_pool` is orchestrated by the `TaskManager`. When configured appropriately, upon receiving a `DownloadMetadata` task, the `task_manager` will forego the usual process of creating a task record. Instead, it will directly push the asset ID to the `METADATA_JSON` Redis stream. This action queues the ID for processing by the `das-metadata-json` indexer, streamlining the workflow for indexing metadata JSON.

## Configuration
The CLI can be configured using command-line options or environment variables. For options that have an associated environment variable, you can set the variable instead of passing the option on the command line.

## Logging
Logging is managed by `env_logger`. Set the `RUST_LOG` environment variable to control the logging level, e.g., `RUST_LOG=info`.

## Error Handling
The CLI provides error messages for any issues encountered during execution.
85 changes: 85 additions & 0 deletions metadata_json/src/cmds/backfill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use {
crate::worker::{Worker, WorkerArgs},
clap::Parser,
das_tree_backfiller::db,
das_tree_backfiller::metrics::{setup_metrics, MetricsArgs},
digital_asset_types::dao::asset_data,
log::info,
reqwest::ClientBuilder,
sea_orm::{entity::*, prelude::*, query::*, EntityTrait, SqlxPostgresConnector},
tokio::time::Duration,
};

#[derive(Parser, Clone, Debug)]
pub struct BackfillArgs {
#[clap(flatten)]
database: db::PoolArgs,

#[command(flatten)]
metrics: MetricsArgs,

#[command(flatten)]
worker: WorkerArgs,

#[arg(long, default_value = "1000")]
timeout: u64,

#[arg(long, default_value = "1000")]
batch_size: u64,
}

pub async fn run(args: BackfillArgs) -> Result<(), anyhow::Error> {
let batch_size = args.batch_size;

let pool = db::connect(args.database).await?;

setup_metrics(args.metrics)?;

let client = ClientBuilder::new()
.timeout(Duration::from_millis(args.timeout))
.build()?;

let worker = Worker::from(args.worker);

let (tx, handle) = worker.start(pool.clone(), client.clone());

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

let mut condition = Condition::all();
condition = condition.add(asset_data::Column::Reindex.eq(true));
let query = asset_data::Entity::find()
.filter(condition)
.order_by(asset_data::Column::Id, Order::Asc);

let mut after = None;

loop {
let mut query = query.clone().cursor_by(asset_data::Column::Id);
let mut query = query.first(batch_size);

if let Some(after) = after {
query = query.after(after);
}

let assets = query.all(&conn).await?;
let assets_count = assets.len();

for asset in assets.clone() {
tx.send(asset.id).await?;
}

if u64::try_from(assets_count)? < batch_size {
break;
}

after = assets.last().cloned().map(|asset| asset.id);
}

drop(tx);

info!("Waiting for tasks to finish");
handle.await?;

info!("Tasks finished");
Ok(())
}
Loading

0 comments on commit e9e0afd

Please sign in to comment.