Skip to content

Commit

Permalink
feat(auctioneer): add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperFluffy committed Feb 6, 2025
1 parent e23f3d0 commit 91c1547
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 23 deletions.
6 changes: 3 additions & 3 deletions crates/astria-auctioneer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ homepage = "https://astria.org"
astria-build-info = { path = "../astria-build-info", features = ["runtime"] }
astria-core = { path = "../astria-core", features = ["serde", "client"] }
astria-eyre = { path = "../astria-eyre" }
config = { package = "astria-config", path = "../astria-config" }
sequencer_client = { package = "astria-sequencer-client", path = "../astria-sequencer-client" }
telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [
astria-telemetry = { path = "../astria-telemetry", features = [
"display",
] }
config = { package = "astria-config", path = "../astria-config" }
sequencer_client = { package = "astria-sequencer-client", path = "../astria-sequencer-client" }

base64 = { workspace = true }
bytes = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/astria-auctioneer/src/auctioneer/auction/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub(in crate::auctioneer) struct Factory {
/// nonce from Sequencer in time. Starts unset at the beginning of the program and
/// is set externally via `Factory::set_last_succesful_nonce`.
pub(in crate::auctioneer) last_successful_nonce: Option<u32>,
pub(in crate::auctioneer) metrics: &'static crate::Metrics,
}

impl Factory {
Expand Down Expand Up @@ -70,6 +71,7 @@ impl Factory {
rollup_id: self.rollup_id,
cancellation_token: cancellation_token.clone(),
last_successful_nonce: self.last_successful_nonce,
metrics: self.metrics,
};

Auction {
Expand Down
12 changes: 11 additions & 1 deletion crates/astria-auctioneer/src/auctioneer/auction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub(super) struct Worker {
/// submitted). Is usually only unset if no auction was yet submitted (for example
/// at the beginning of the program).
pub(super) last_successful_nonce: Option<u32>,
pub(super) metrics: &'static crate::Metrics,
}

impl Worker {
Expand Down Expand Up @@ -183,6 +184,8 @@ impl Worker {
let mut auction_is_open = false;

let mut nonce_fetch = None;

self.metrics.reset_auction_bids_admitted_gauge();
#[expect(
clippy::semicolon_if_nothing_returned,
reason = "we want to pattern match on the latency timer's return value"
Expand All @@ -197,6 +200,9 @@ impl Worker {
.await
}, if latency_margin_timer.is_some() => {
info!("timer is up; bids left unprocessed: {}", self.bids.len());

self.metrics.set_auction_bids_dropped_gauge(self.bids.len());

break Ok(AuctionItems {
winner: allocation_rule.winner(),
nonce_fetch,
Expand Down Expand Up @@ -245,9 +251,13 @@ impl Worker {
// TODO: this is an unbounded channel. Can we process multiple bids at a time?
Some(bid) = self.bids.recv(), if auction_is_open => {
allocation_rule.bid(&bid);
self.metrics.increment_auction_bids_admitted_gauge();
}

else => break Err(Error::ChannelsClosed),
else => {
self.metrics.set_auction_bids_dropped_gauge(self.bids.len());
break Err(Error::ChannelsClosed);
}
}
}
}
Expand Down
23 changes: 20 additions & 3 deletions crates/astria-auctioneer/src/auctioneer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub(super) struct Auctioneer {
block_commitments: BlockCommitmentStream,
bids: BidStream,
cancelled_auctions: FuturesUnordered<auction::Auction>,
metrics: &'static crate::Metrics,
executed_blocks: ExecuteOptimisticBlockStream,
running_auction: Option<auction::Auction>,
optimistic_blocks: OptimisticBlockStream,
Expand All @@ -61,7 +62,11 @@ pub(super) struct Auctioneer {

impl Auctioneer {
/// Creates an [`Auctioneer`] service from a [`Config`].
pub(super) fn new(config: Config, shutdown_token: CancellationToken) -> eyre::Result<Self> {
pub(super) fn new(
config: Config,
metrics: &'static crate::Metrics,
shutdown_token: CancellationToken,
) -> eyre::Result<Self> {
let Config {
sequencer_grpc_endpoint,
sequencer_abci_endpoint,
Expand Down Expand Up @@ -100,6 +105,7 @@ impl Auctioneer {
rollup_id,
cancellation_token: shutdown_token.child_token(),
last_successful_nonce: None,
metrics,
};

Ok(Self {
Expand All @@ -108,6 +114,7 @@ impl Auctioneer {
bids: rollup_channel.open_bid_stream(),
cancelled_auctions: FuturesUnordered::new(),
executed_blocks: rollup_channel.open_execute_optimistic_block_stream(),
metrics,
optimistic_blocks: sequencer_channel.open_get_optimistic_block_stream(rollup_id),
rollup_id,
running_auction: None,
Expand Down Expand Up @@ -187,9 +194,9 @@ impl Auctioneer {
nonce_used, ..
}) = &res
{
self.metrics.increment_auctions_submitted_count();
self.auction_factory.set_last_successful_nonce(*nonce_used);
}

let _ = self.running_auction.take();
res
}
Expand All @@ -215,14 +222,16 @@ impl Auctioneer {
) -> eyre::Result<()> {
let optimistic_block =
optimistic_block.wrap_err("encountered problem receiving optimistic block message")?;

Span::current().record("block_hash", field::display(optimistic_block.block_hash()));

self.metrics.increment_optimistic_blocks_received_counter();

let new_auction = self.auction_factory.start_new(&optimistic_block);
info!(auction_id = %new_auction.id(), "started new auction");

if let Some(old_auction) = self.running_auction.replace(new_auction) {
old_auction.cancel();
self.metrics.increment_auctions_cancelled_count();
info!(auction_id = %old_auction.id(), "cancelled running auction");
self.cancelled_auctions.push(old_auction);
}
Expand All @@ -247,6 +256,8 @@ impl Auctioneer {
let block_commitment = commitment.wrap_err("failed to receive block commitment")?;
Span::current().record("block_hash", field::display(block_commitment.block_hash()));

self.metrics.increment_block_commitments_received_counter();

if let Some(running_auction) = &mut self.running_auction {
running_auction
.start_timer(block_commitment)
Expand All @@ -272,6 +283,9 @@ impl Auctioneer {
"block_hash",
field::display(executed_block.sequencer_block_hash()),
);

self.metrics.increment_executed_blocks_received_counter();

if let Some(running_auction) = &mut self.running_auction {
running_auction
.start_bids(executed_block)
Expand All @@ -296,6 +310,9 @@ impl Auctioneer {
"block_hash",
field::display(bid.sequencer_parent_block_hash()),
);

self.metrics.increment_auction_bids_received_counter();

if let Some(running_auction) = &mut self.running_auction {
running_auction
.forward_bid_to_auction(bid)
Expand Down
5 changes: 2 additions & 3 deletions crates/astria-auctioneer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use astria_eyre::eyre::{
pub use build_info::BUILD_INFO;
pub use config::Config;
pub use metrics::Metrics;
pub use telemetry;
use tokio::task::{
JoinError,
JoinHandle,
Expand All @@ -92,9 +91,9 @@ impl Auctioneer {
///
/// # Errors
/// Returns an error if the Auctioneer cannot be initialized.
pub fn spawn(cfg: Config, _metrics: &'static Metrics) -> eyre::Result<Self> {
pub fn spawn(cfg: Config, metrics: &'static Metrics) -> eyre::Result<Self> {
let shutdown_token = CancellationToken::new();
let inner = auctioneer::Auctioneer::new(cfg, shutdown_token.child_token())?;
let inner = auctioneer::Auctioneer::new(cfg, metrics, shutdown_token.child_token())?;
let task = tokio::spawn(inner.run());

Ok(Self {
Expand Down
14 changes: 7 additions & 7 deletions crates/astria-auctioneer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing::{
async fn main() -> ExitCode {
astria_eyre::install().expect("astria eyre hook must be the first hook installed");

eprintln!("{}", telemetry::display::json(&BUILD_INFO));
eprintln!("{}", astria_telemetry::display::json(&BUILD_INFO));

let cfg: Config = match config::get() {
Err(err) => {
Expand All @@ -39,23 +39,23 @@ async fn main() -> ExitCode {
};
eprintln!(
"starting with configuration:\n{}",
telemetry::display::json(&cfg),
astria_telemetry::display::json(&cfg),
);

let mut telemetry_conf = telemetry::configure()
let mut astria_telemetry_conf = astria_telemetry::configure()
.set_no_otel(cfg.no_otel)
.set_force_stdout(cfg.force_stdout)
.set_pretty_print(cfg.pretty_print)
.set_filter_directives(&cfg.log);

if !cfg.no_metrics {
telemetry_conf =
telemetry_conf.set_metrics(&cfg.metrics_http_listener_addr, env!("CARGO_PKG_NAME"));
astria_telemetry_conf = astria_telemetry_conf
.set_metrics(&cfg.metrics_http_listener_addr, env!("CARGO_PKG_NAME"));
}

let (metrics, _telemetry_guard) = match telemetry_conf
let (metrics, _astria_telemetry_guard) = match astria_telemetry_conf
.try_init(&())
.wrap_err("failed to setup telemetry")
.wrap_err("failed to setup astria_telemetry")
{
Err(e) => {
eprintln!("initializing auctioneer failed:\n{e:?}");
Expand Down
140 changes: 134 additions & 6 deletions crates/astria-auctioneer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,153 @@
use telemetry::{
use astria_telemetry::{
metric_names,
metrics::{
self,
Counter,
Gauge,
IntoF64,
RegisteringBuilder,
},
};

pub struct Metrics {}
const AUCTION_BIDS_PROCESSED_LABEL: &str = "auction_bids_processed";
const AUCTION_BIDS_PROCESSED_ADMITTED: &str = "admitted";
const AUCTION_BIDS_PROCESSED_DROPPED: &str = "dropped";

impl Metrics {}
pub struct Metrics {
auction_bids_admitted_gauge: Gauge,
auction_bids_dropped_gauge: Gauge,
auction_bids_received_count: Counter,
auctions_cancelled_count: Counter,
auctions_submitted_count: Counter,
block_commitments_received_count: Counter,
executed_blocks_received_count: Counter,
optimistic_blocks_received_count: Counter,
}

impl Metrics {
pub(crate) fn increment_auction_bids_admitted_gauge(&self) {
self.auction_bids_admitted_gauge.increment(1);
}

pub(crate) fn increment_auction_bids_received_counter(&self) {
self.auction_bids_received_count.increment(1);
}

pub(crate) fn increment_auctions_cancelled_count(&self) {
self.auctions_cancelled_count.increment(1);
}

impl metrics::Metrics for Metrics {
pub(crate) fn increment_auctions_submitted_count(&self) {
self.auctions_submitted_count.increment(1);
}

pub(crate) fn increment_block_commitments_received_counter(&self) {
self.block_commitments_received_count.increment(1);
}

pub(crate) fn increment_executed_blocks_received_counter(&self) {
self.executed_blocks_received_count.increment(1);
}

pub(crate) fn increment_optimistic_blocks_received_counter(&self) {
self.optimistic_blocks_received_count.increment(1);
}

pub(crate) fn reset_auction_bids_admitted_gauge(&self) {
self.auction_bids_admitted_gauge.set(0);
}

pub(crate) fn set_auction_bids_dropped_gauge(&self, val: impl IntoF64) {
self.auction_bids_dropped_gauge.set(val);
}
}

impl astria_telemetry::metrics::Metrics for Metrics {
type Config = ();

fn register(
_builder: &mut RegisteringBuilder,
builder: &mut RegisteringBuilder,
_config: &Self::Config,
) -> Result<Self, metrics::Error> {
Ok(Self {})
let block_commitments_received_count = builder
.new_counter_factory(
BLOCK_COMMITMENTS_RECEIVED,
"the number of block commitments received from the Sequencer node",
)?
.register()?;

let executed_blocks_received_count = builder
.new_counter_factory(
EXECUTED_BLOCKS_RECEIVED,
"the number of executed blocks received from the Rollup node",
)?
.register()?;

let optimistic_blocks_received_count = builder
.new_counter_factory(
OPTIMISTIC_BLOCKS_RECEIVED,
"the number of optimistic blocks received from the Sequencer node",
)?
.register()?;

let auction_bids_received_count = builder
.new_counter_factory(
AUCTION_BIDS_RECEIVED,
"the number of auction bids received from the Rollup node (total number over the \
runtime of auctioneer)",
)?
.register()?;

let mut auction_bids_processed_factory = builder.new_gauge_factory(
AUCTION_BIDS_PROCESSED,
"the number of auction bids processed during an auction (either admitted or dropped \
because the time was up or due to some other issue)",
)?;
let auction_bids_admitted_gauge =
auction_bids_processed_factory.register_with_labels(&[(
AUCTION_BIDS_PROCESSED_LABEL,
AUCTION_BIDS_PROCESSED_ADMITTED.to_string(),
)])?;
let auction_bids_dropped_gauge =
auction_bids_processed_factory.register_with_labels(&[(
AUCTION_BIDS_PROCESSED_LABEL,
AUCTION_BIDS_PROCESSED_DROPPED.to_string(),
)])?;

let auctions_cancelled_count = builder
.new_counter_factory(
AUCTIONS_CANCELLED,
"the number of auctions that were cancelled due to a proposed block pre-empting a \
prior proposed block",
)?
.register()?;

let auctions_submitted_count = builder
.new_counter_factory(
AUCTIONS_SUBMITTED,
"the number of successfully submitted auctions",
)?
.register()?;

Ok(Self {
auction_bids_admitted_gauge,
auction_bids_dropped_gauge,
auction_bids_received_count,
auctions_submitted_count,
auctions_cancelled_count,
block_commitments_received_count,
executed_blocks_received_count,
optimistic_blocks_received_count,
})
}
}

metric_names!(const METRICS_NAMES:
BLOCK_COMMITMENTS_RECEIVED,
EXECUTED_BLOCKS_RECEIVED,
OPTIMISTIC_BLOCKS_RECEIVED,
AUCTIONS_CANCELLED,
AUCTIONS_SUBMITTED,
AUCTION_BIDS_PROCESSED,
AUCTION_BIDS_RECEIVED,
);

0 comments on commit 91c1547

Please sign in to comment.