Skip to content

Commit

Permalink
DeepBook Server (#19764)
Browse files Browse the repository at this point in the history
## Description 

Run a DeepBook server along with the indexer.

## Test plan 

Integration testing

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:

---------

Co-authored-by: longbowlu <[email protected]>
  • Loading branch information
0xaslan and longbowlu authored Oct 12, 2024
1 parent 2cddd9d commit c4a0f35
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 10 deletions.
17 changes: 14 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ aws-sdk-s3 = "0.29.0"
aws-smithy-http = "0.56"
aws-smithy-runtime-api = "0.56"
axum = { version = "0.7", default-features = false, features = [
"macros",
"tokio",
"http1",
"http2",
Expand Down
6 changes: 2 additions & 4 deletions crates/sui-deepbook-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ clap.workspace = true
mysten-metrics.workspace = true
prometheus.workspace = true
serde_yaml.workspace = true
sui-bridge.workspace = true
sui-sdk.workspace = true
sui-json-rpc-types.workspace = true
sui-data-ingestion-core.workspace = true
Expand All @@ -32,12 +31,11 @@ backoff.workspace = true
sui-config.workspace = true
sui-indexer-builder.workspace = true
tempfile.workspace = true
bigdecimal = "0.4.0"
axum.workspace = true
bigdecimal = { version = "0.4.5" }

[dev-dependencies]
sui-types = { workspace = true, features = ["test-utils"] }
sui-test-transaction-builder.workspace = true
test-cluster.workspace = true
hex-literal = "0.3.4"

[[bin]]
Expand Down
1 change: 1 addition & 0 deletions crates/sui-deepbook-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct IndexerConfig {
pub deepbook_genesis_checkpoint: u64,
pub concurrency: u64,
pub metric_port: u16,
pub service_port: u16,
}

impl sui_config::Config for IndexerConfig {}
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-deepbook-indexer/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#[derive(Debug, Clone)]
pub enum DeepBookError {
InternalError(String),
}
2 changes: 2 additions & 0 deletions crates/sui-deepbook-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// SPDX-License-Identifier: Apache-2.0

pub mod config;
pub mod error;
pub mod events;
pub mod metrics;
pub mod models;
pub mod postgres_manager;
pub mod schema;
pub mod server;
pub mod types;

pub mod sui_datasource;
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-deepbook-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use sui_data_ingestion_core::DataIngestionMetrics;
use sui_deepbook_indexer::config::IndexerConfig;
use sui_deepbook_indexer::metrics::DeepBookIndexerMetrics;
use sui_deepbook_indexer::postgres_manager::get_connection_pool;
use sui_deepbook_indexer::server::run_server;
use sui_deepbook_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_deepbook_indexer::sui_deepbook_indexer::PgDeepbookPersistent;
use sui_deepbook_indexer::sui_deepbook_indexer::SuiDeepBookDataMapper;
Expand Down Expand Up @@ -84,6 +85,11 @@ async fn main() -> Result<()> {
ingestion_metrics.clone(),
indexer_meterics.clone(),
);

let service_address =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), config.service_port);
run_server(service_address, datastore.clone());

let indexer = IndexerBuilder::new(
"SuiDeepBookIndexer",
sui_checkpoint_datasource,
Expand Down
30 changes: 28 additions & 2 deletions crates/sui-deepbook-indexer/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
use diesel::data_types::PgTimestamp;
use diesel::{Identifiable, Insertable, Queryable, Selectable};

use serde::Serialize;
use sui_indexer_builder::{Task, LIVE_TASK_TARGET_CHECKPOINT};

use crate::schema::{
balances, flashloans, order_fills, order_updates, pool_prices, progress_store, proposals,
rebates, stakes, sui_error_transactions, trade_params_update, votes,
balances, flashloans, order_fills, order_updates, pool_prices, pools, progress_store,
proposals, rebates, stakes, sui_error_transactions, trade_params_update, votes,
};

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
Expand Down Expand Up @@ -57,6 +58,13 @@ pub struct OrderFill {
pub onchain_timestamp: i64,
}

#[derive(Queryable)]
pub struct OrderFillSummary {
pub maker_balance_manager_id: String,
pub taker_balance_manager_id: String,
pub base_quantity: i64,
}

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
#[diesel(table_name = flashloans, primary_key(digest))]
pub struct Flashloan {
Expand Down Expand Up @@ -164,6 +172,24 @@ pub struct Votes {
pub stake: i64,
}

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug, Serialize)]
#[diesel(table_name = pools, primary_key(pool_id))]
pub struct Pools {
pub pool_id: String,
pub pool_name: String,
pub base_asset_id: String,
pub base_asset_decimals: i16,
pub base_asset_symbol: String,
pub base_asset_name: String,
pub quote_asset_id: String,
pub quote_asset_decimals: i16,
pub quote_asset_symbol: String,
pub quote_asset_name: String,
pub min_size: i32,
pub lot_size: i32,
pub tick_size: i32,
}

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
#[diesel(table_name = sui_error_transactions, primary_key(txn_digest))]
pub struct SuiErrorTransactions {
Expand Down
144 changes: 144 additions & 0 deletions crates/sui-deepbook-indexer/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
error::DeepBookError,
models::{OrderFillSummary, Pools},
schema,
sui_deepbook_indexer::PgDeepbookPersistent,
};
use axum::{
debug_handler,
extract::{Path, State},
http::StatusCode,
routing::get,
Json, Router,
};
use diesel::BoolExpressionMethods;
use diesel::QueryDsl;
use diesel::{ExpressionMethods, SelectableHelper};
use diesel_async::RunQueryDsl;
use std::net::SocketAddr;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::{net::TcpListener, task::JoinHandle};

pub const GET_POOLS_PATH: &str = "/get_pools";
pub const GET_24HR_VOLUME_PATH: &str = "/get_24hr_volume/:pool_id";
pub const GET_24HR_VOLUME_BY_BALANCE_MANAGER_ID: &str =
"/get_24hr_volume_by_balance_manager_id/:pool_id/:balance_manager_id";

pub fn run_server(socket_address: SocketAddr, state: PgDeepbookPersistent) -> JoinHandle<()> {
tokio::spawn(async move {
let listener = TcpListener::bind(socket_address).await.unwrap();
axum::serve(listener, make_router(state)).await.unwrap();
})
}

pub(crate) fn make_router(state: PgDeepbookPersistent) -> Router {
Router::new()
.route("/", get(health_check))
.route(GET_POOLS_PATH, get(get_pools))
.route(GET_24HR_VOLUME_PATH, get(get_24hr_volume))
.route(
GET_24HR_VOLUME_BY_BALANCE_MANAGER_ID,
get(get_24hr_volume_by_balance_manager_id),
)
.with_state(state)
}

impl axum::response::IntoResponse for DeepBookError {
// TODO: distinguish client error.
fn into_response(self) -> axum::response::Response {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {:?}", self),
)
.into_response()
}
}

impl<E> From<E> for DeepBookError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self::InternalError(err.into().to_string())
}
}

async fn health_check() -> StatusCode {
StatusCode::OK
}

/// Get all pools stored in database
#[debug_handler]
async fn get_pools(
State(state): State<PgDeepbookPersistent>,
) -> Result<Json<Vec<Pools>>, DeepBookError> {
let connection = &mut state.pool.get().await?;
let results = schema::pools::table
.select(Pools::as_select())
.load(connection)
.await?;

Ok(Json(results))
}

async fn get_24hr_volume(
Path(pool_id): Path<String>,
State(state): State<PgDeepbookPersistent>,
) -> Result<Json<u64>, DeepBookError> {
let connection = &mut state.pool.get().await?;
let unix_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let day_ago = unix_ts - 24 * 60 * 60 * 1000;
let vols: Vec<i64> = schema::order_fills::table
.select(schema::order_fills::base_quantity)
.filter(schema::order_fills::pool_id.eq(pool_id))
.filter(schema::order_fills::onchain_timestamp.gt(day_ago))
.load(connection)
.await?;
Ok(Json(vols.into_iter().map(|v| v as u64).sum()))
}

async fn get_24hr_volume_by_balance_manager_id(
Path((pool_id, balance_manager_id)): Path<(String, String)>,
State(state): State<PgDeepbookPersistent>,
) -> Result<Json<Vec<i64>>, DeepBookError> {
let connection = &mut state.pool.get().await?;
let unix_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let day_ago = unix_ts - 24 * 60 * 60 * 1000;
let results: Vec<OrderFillSummary> = schema::order_fills::table
.select((
schema::order_fills::maker_balance_manager_id,
schema::order_fills::taker_balance_manager_id,
schema::order_fills::base_quantity,
))
.filter(schema::order_fills::pool_id.eq(pool_id))
.filter(schema::order_fills::onchain_timestamp.gt(day_ago))
.filter(
schema::order_fills::maker_balance_manager_id
.eq(&balance_manager_id)
.or(schema::order_fills::taker_balance_manager_id.eq(&balance_manager_id)),
)
.load(connection)
.await?;

let mut maker_vol = 0;
let mut taker_vol = 0;
for order_fill in results {
if order_fill.maker_balance_manager_id == balance_manager_id {
maker_vol += order_fill.base_quantity;
};
if order_fill.taker_balance_manager_id == balance_manager_id {
taker_vol += order_fill.base_quantity;
};
}

Ok(Json(vec![maker_vol, taker_vol]))
}
2 changes: 1 addition & 1 deletion crates/sui-deepbook-indexer/src/sui_deepbook_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{models, schema};
/// Persistent layer impl
#[derive(Clone)]
pub struct PgDeepbookPersistent {
pool: PgPool,
pub pool: PgPool,
save_progress_policy: ProgressSavingPolicy,
}

Expand Down
14 changes: 14 additions & 0 deletions crates/sui-json-rpc-api/src/deepbook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;

use sui_open_rpc_macros::open_rpc;

#[open_rpc(namespace = "suix", tag = "DeepBook Read API")]
#[rpc(server, client, namespace = "suix")]
pub trait DeepBookApi {
#[method(name = "ping")]
async fn ping(&self) -> RpcResult<String>;
}

0 comments on commit c4a0f35

Please sign in to comment.