From 47a082b3312cae7aa0f2317a45a26fa5f22d043c Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 16 Aug 2024 10:15:55 +0300 Subject: [PATCH] feat(db): Allow creating owned Postgres connections (#2654) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Changes `Connection` so that it has `'static` lifetime if created from a pool (i.e., when it is non-transactional). - Simplifies `ReadStorageFactory` and `MainBatchExecutor` accordingly. ## Why ❔ Reduces complexity. `'static` connections can be sent to a Tokio task etc., meaning improved DevEx. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- core/lib/db_connection/src/connection.rs | 35 ++-- core/lib/db_connection/src/connection_pool.rs | 8 +- core/lib/state/src/lib.rs | 3 +- core/lib/state/src/storage_factory.rs | 79 +++------ core/node/api_server/src/web3/state.rs | 2 +- .../src/batch_executor/main_executor.rs | 153 +++++++++--------- .../tests/read_storage_factory.rs | 4 +- .../state_keeper/src/state_keeper_storage.rs | 29 ++-- core/node/vm_runner/src/storage.rs | 20 +-- core/node/vm_runner/src/tests/mod.rs | 6 +- core/node/vm_runner/src/tests/storage.rs | 6 +- 11 files changed, 158 insertions(+), 187 deletions(-) diff --git a/core/lib/db_connection/src/connection.rs b/core/lib/db_connection/src/connection.rs index 22a63765b3b..e178395b333 100644 --- a/core/lib/db_connection/src/connection.rs +++ b/core/lib/db_connection/src/connection.rs @@ -1,10 +1,11 @@ use std::{ collections::HashMap, fmt, io, + marker::PhantomData, panic::Location, sync::{ atomic::{AtomicUsize, Ordering}, - Mutex, + Arc, Mutex, Weak, }, time::{Instant, SystemTime}, }; @@ -98,14 +99,14 @@ impl TracedConnections { } } -struct PooledConnection<'a> { +struct PooledConnection { connection: PoolConnection, tags: Option, created_at: Instant, - traced: Option<(&'a TracedConnections, usize)>, + traced: (Weak, usize), } -impl fmt::Debug for PooledConnection<'_> { +impl fmt::Debug for PooledConnection { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { formatter .debug_struct("PooledConnection") @@ -115,7 +116,7 @@ impl fmt::Debug for PooledConnection<'_> { } } -impl Drop for PooledConnection<'_> { +impl Drop for PooledConnection { fn drop(&mut self) { if let Some(tags) = &self.tags { let lifetime = self.created_at.elapsed(); @@ -132,15 +133,17 @@ impl Drop for PooledConnection<'_> { ); } } - if let Some((connections, id)) = self.traced { - connections.mark_as_dropped(id); + + let (traced_connections, id) = &self.traced; + if let Some(connections) = traced_connections.upgrade() { + connections.mark_as_dropped(*id); } } } #[derive(Debug)] enum ConnectionInner<'a> { - Pooled(PooledConnection<'a>), + Pooled(PooledConnection), Transaction { transaction: Transaction<'a, Postgres>, tags: Option<&'a ConnectionTags>, @@ -156,7 +159,7 @@ pub trait DbMarker: 'static + Send + Sync + Clone {} #[derive(Debug)] pub struct Connection<'a, DB: DbMarker> { inner: ConnectionInner<'a>, - _marker: std::marker::PhantomData, + _marker: PhantomData, } impl<'a, DB: DbMarker> Connection<'a, DB> { @@ -166,21 +169,23 @@ impl<'a, DB: DbMarker> Connection<'a, DB> { pub(crate) fn from_pool( connection: PoolConnection, tags: Option, - traced_connections: Option<&'a TracedConnections>, + traced_connections: Option<&Arc>, ) -> Self { let created_at = Instant::now(); let inner = ConnectionInner::Pooled(PooledConnection { connection, tags, created_at, - traced: traced_connections.map(|connections| { + traced: if let Some(connections) = traced_connections { let id = connections.acquire(tags, created_at); - (connections, id) - }), + (Arc::downgrade(connections), id) + } else { + (Weak::new(), 0) + }, }); Self { inner, - _marker: Default::default(), + _marker: PhantomData, } } @@ -196,7 +201,7 @@ impl<'a, DB: DbMarker> Connection<'a, DB> { }; Ok(Connection { inner, - _marker: Default::default(), + _marker: PhantomData, }) } diff --git a/core/lib/db_connection/src/connection_pool.rs b/core/lib/db_connection/src/connection_pool.rs index 78d9184222d..7cf29632b7d 100644 --- a/core/lib/db_connection/src/connection_pool.rs +++ b/core/lib/db_connection/src/connection_pool.rs @@ -347,7 +347,7 @@ impl ConnectionPool { /// /// This method is intended to be used in crucial contexts, where the /// database access is must-have (e.g. block committer). - pub async fn connection(&self) -> DalResult> { + pub async fn connection(&self) -> DalResult> { self.connection_inner(None).await } @@ -361,7 +361,7 @@ impl ConnectionPool { pub fn connection_tagged( &self, requester: &'static str, - ) -> impl Future>> + '_ { + ) -> impl Future>> + '_ { let location = Location::caller(); async move { let tags = ConnectionTags { @@ -375,7 +375,7 @@ impl ConnectionPool { async fn connection_inner( &self, tags: Option, - ) -> DalResult> { + ) -> DalResult> { let acquire_latency = CONNECTION_METRICS.acquire.start(); let conn = self.acquire_connection_retried(tags.as_ref()).await?; let elapsed = acquire_latency.observe(); @@ -386,7 +386,7 @@ impl ConnectionPool { Ok(Connection::::from_pool( conn, tags, - self.traced_connections.as_deref(), + self.traced_connections.as_ref(), )) } diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index c386426d066..ad5361c4608 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -20,8 +20,7 @@ pub use self::{ }, shadow_storage::ShadowStorage, storage_factory::{ - BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, - RocksdbWithMemory, + BatchDiff, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory, }, }; diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory.rs index 4792200a463..e2b5275c48d 100644 --- a/core/lib/state/src/storage_factory.rs +++ b/core/lib/state/src/storage_factory.rs @@ -10,6 +10,9 @@ use zksync_vm_interface::storage::ReadStorage; use crate::{PostgresStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily}; +/// Storage with a static lifetime that can be sent to Tokio tasks etc. +pub type OwnedStorage = PgOrRocksdbStorage<'static>; + /// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param /// (mostly for testing purposes); the default is [`OwnedStorage`]. #[async_trait] @@ -35,8 +38,9 @@ impl ReadStorageFactory for ConnectionPool { _stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { - let storage = OwnedPostgresStorage::new(self.clone(), l1_batch_number); - Ok(Some(storage.into())) + let connection = self.connection().await?; + let storage = OwnedStorage::postgres(connection, l1_batch_number).await?; + Ok(Some(storage)) } } @@ -61,31 +65,29 @@ pub struct RocksdbWithMemory { pub batch_diffs: Vec, } -/// Owned Postgres-backed VM storage for a certain L1 batch. +/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] +/// underneath. #[derive(Debug)] -pub struct OwnedPostgresStorage { - connection_pool: ConnectionPool, - l1_batch_number: L1BatchNumber, +pub enum PgOrRocksdbStorage<'a> { + /// Implementation over a Postgres connection. + Postgres(PostgresStorage<'a>), + /// Implementation over a RocksDB cache instance. + Rocksdb(RocksdbStorage), + /// Implementation over a RocksDB cache instance with in-memory DB diffs. + RocksdbWithMemory(RocksdbWithMemory), } -impl OwnedPostgresStorage { - /// Creates a VM storage for the specified batch number. - pub fn new(connection_pool: ConnectionPool, l1_batch_number: L1BatchNumber) -> Self { - Self { - connection_pool, - l1_batch_number, - } - } - - /// Returns a [`ReadStorage`] implementation backed by Postgres +impl PgOrRocksdbStorage<'static> { + /// Creates a Postgres-based storage. Because of the `'static` lifetime requirement, `connection` must be + /// non-transactional. /// /// # Errors /// - /// Propagates Postgres errors. - pub async fn borrow(&self) -> anyhow::Result> { - let l1_batch_number = self.l1_batch_number; - let mut connection = self.connection_pool.connection().await?; - + /// Propagates Postgres I/O errors. + pub async fn postgres( + mut connection: Connection<'static, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result { let l2_block_number = if let Some((_, l2_block_number)) = connection .blocks_dal() .get_l2_block_range_of_l1_batch(l1_batch_number) @@ -114,42 +116,7 @@ impl OwnedPostgresStorage { .into(), ) } -} - -/// Owned version of [`PgOrRocksdbStorage`]. It is thus possible to send to blocking tasks for VM execution. -#[derive(Debug)] -pub enum OwnedStorage { - /// Readily initialized storage with a static lifetime. - Static(PgOrRocksdbStorage<'static>), - /// Storage that must be `borrow()`ed from. - Lending(OwnedPostgresStorage), -} -impl From for OwnedStorage { - fn from(storage: OwnedPostgresStorage) -> Self { - Self::Lending(storage) - } -} - -impl From> for OwnedStorage { - fn from(storage: PgOrRocksdbStorage<'static>) -> Self { - Self::Static(storage) - } -} - -/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`] -/// underneath. -#[derive(Debug)] -pub enum PgOrRocksdbStorage<'a> { - /// Implementation over a Postgres connection. - Postgres(PostgresStorage<'a>), - /// Implementation over a RocksDB cache instance. - Rocksdb(RocksdbStorage), - /// Implementation over a RocksDB cache instance with in-memory DB diffs. - RocksdbWithMemory(RocksdbWithMemory), -} - -impl PgOrRocksdbStorage<'static> { /// Catches up RocksDB synchronously (i.e. assumes the gap is small) and /// returns a [`ReadStorage`] implementation backed by caught-up RocksDB. /// diff --git a/core/node/api_server/src/web3/state.rs b/core/node/api_server/src/web3/state.rs index b0e74706e52..5c8b47dabeb 100644 --- a/core/node/api_server/src/web3/state.rs +++ b/core/node/api_server/src/web3/state.rs @@ -287,7 +287,7 @@ impl RpcState { #[track_caller] pub(crate) fn acquire_connection( &self, - ) -> impl Future, Web3Error>> + '_ { + ) -> impl Future, Web3Error>> + '_ { self.connection_pool .connection_tagged("api") .map_err(|err| err.generalize().into()) diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/node/state_keeper/src/batch_executor/main_executor.rs index b4090460116..43f1b8e59b1 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/node/state_keeper/src/batch_executor/main_executor.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use anyhow::Context as _; use once_cell::sync::OnceCell; -use tokio::{runtime::Handle, sync::mpsc}; +use tokio::sync::mpsc; use zksync_multivm::{ interface::{ storage::{ReadStorage, StorageView}, @@ -14,7 +14,6 @@ use zksync_multivm::{ MultiVMTracer, VmInstance, }; use zksync_shared_metrics::{InteractionType, TxStage, APP_METRICS}; -use zksync_state::OwnedStorage; use zksync_types::{vm::FastVmMode, vm_trace::Call, Transaction}; use super::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}; @@ -57,10 +56,10 @@ impl MainBatchExecutor { } } -impl BatchExecutor for MainBatchExecutor { +impl BatchExecutor for MainBatchExecutor { fn init_batch( &mut self, - storage: OwnedStorage, + storage: S, l1_batch_params: L1BatchEnv, system_env: SystemEnv, ) -> BatchExecutorHandle { @@ -74,20 +73,19 @@ impl BatchExecutor for MainBatchExecutor { commands: commands_receiver, }; - let handle = tokio::task::spawn_blocking(move || { - let storage = match storage { - OwnedStorage::Static(storage) => storage, - OwnedStorage::Lending(ref storage) => Handle::current() - .block_on(storage.borrow()) - .context("failed accessing state keeper storage")?, - }; - executor.run(storage, l1_batch_params, system_env); - anyhow::Ok(()) - }); + let handle = + tokio::task::spawn_blocking(move || executor.run(storage, l1_batch_params, system_env)); BatchExecutorHandle::from_raw(handle, commands_sender) } } +#[derive(Debug)] +struct TransactionOutput { + tx_result: VmExecutionResultAndLogs, + compressed_bytecodes: Vec, + calls: Vec, +} + /// Implementation of the "primary" (non-test) batch executor. /// Upon launch, it initializes the VM object with provided block context and properties, and keeps invoking the commands /// sent to it one by one until the batch is finished. @@ -105,13 +103,13 @@ struct CommandReceiver { impl CommandReceiver { pub(super) fn run( mut self, - secondary_storage: S, + storage: S, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - ) { + ) -> anyhow::Result<()> { tracing::info!("Starting executing L1 batch #{}", &l1_batch_params.number); - let storage_view = StorageView::new(secondary_storage).to_rc_ptr(); + let storage_view = StorageView::new(storage).to_rc_ptr(); let mut vm = VmInstance::maybe_fast( l1_batch_params, system_env, @@ -122,7 +120,9 @@ impl CommandReceiver { while let Some(cmd) = self.commands.blocking_recv() { match cmd { Command::ExecuteTx(tx, resp) => { - let result = self.execute_tx(&tx, &mut vm); + let result = self + .execute_tx(&tx, &mut vm) + .with_context(|| format!("fatal error executing transaction {tx:?}"))?; if resp.send(result).is_err() { break; } @@ -140,7 +140,7 @@ impl CommandReceiver { } } Command::FinishBatch(resp) => { - let vm_block_result = self.finish_batch(&mut vm); + let vm_block_result = self.finish_batch(&mut vm)?; if resp.send(vm_block_result).is_err() { break; } @@ -152,28 +152,28 @@ impl CommandReceiver { .observe(metrics.time_spent_on_get_value); EXECUTOR_METRICS.batch_storage_interaction_duration[&InteractionType::SetValue] .observe(metrics.time_spent_on_set_value); - return; + return Ok(()); } Command::FinishBatchWithCache(resp) => { - let vm_block_result = self.finish_batch(&mut vm); + let vm_block_result = self.finish_batch(&mut vm)?; let cache = (*storage_view).borrow().cache(); if resp.send((vm_block_result, cache)).is_err() { break; } - - return; + return Ok(()); } } } // State keeper can exit because of stop signal, so it's OK to exit mid-batch. tracing::info!("State keeper exited with an unfinished L1 batch"); + Ok(()) } fn execute_tx( &self, tx: &Transaction, vm: &mut VmInstance, - ) -> TxExecutionResult { + ) -> anyhow::Result { // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot // was already removed), or that we build on top of it (in which case, it can be removed now). vm.pop_snapshot_no_rollback(); @@ -182,33 +182,38 @@ impl CommandReceiver { // Execute the transaction. let latency = KEEPER_METRICS.tx_execution_time[&TxExecutionStage::Execution].start(); - let (tx_result, compressed_bytecodes, call_tracer_result) = - if self.optional_bytecode_compression { - self.execute_tx_in_vm_with_optional_compression(tx, vm) - } else { - self.execute_tx_in_vm(tx, vm) - }; + let output = if self.optional_bytecode_compression { + self.execute_tx_in_vm_with_optional_compression(tx, vm)? + } else { + self.execute_tx_in_vm(tx, vm)? + }; latency.observe(); APP_METRICS.processed_txs[&TxStage::StateKeeper].inc(); APP_METRICS.processed_l1_txs[&TxStage::StateKeeper].inc_by(tx.is_l1().into()); + let TransactionOutput { + tx_result, + compressed_bytecodes, + calls, + } = output; + if let ExecutionResult::Halt { reason } = tx_result.result { - return match reason { + return Ok(match reason { Halt::BootloaderOutOfGas => TxExecutionResult::BootloaderOutOfGasForTx, _ => TxExecutionResult::RejectedByVm { reason }, - }; + }); } let tx_metrics = ExecutionMetricsForCriteria::new(Some(tx), &tx_result); let gas_remaining = vm.gas_remaining(); - TxExecutionResult::Success { + Ok(TxExecutionResult::Success { tx_result: Box::new(tx_result), tx_metrics: Box::new(tx_metrics), compressed_bytecodes, - call_tracer_result, + call_tracer_result: calls, gas_remaining, - } + }) } fn rollback_last_tx(&self, vm: &mut VmInstance) { @@ -228,19 +233,18 @@ impl CommandReceiver { fn finish_batch( &self, vm: &mut VmInstance, - ) -> FinishedL1Batch { + ) -> anyhow::Result { // The vm execution was paused right after the last transaction was executed. // There is some post-processing work that the VM needs to do before the block is fully processed. let result = vm.finish_batch(); - if result.block_tip_execution_result.result.is_failed() { - panic!( - "VM must not fail when finalizing block: {:#?}", - result.block_tip_execution_result.result - ); - } + anyhow::ensure!( + !result.block_tip_execution_result.result.is_failed(), + "VM must not fail when finalizing block: {:#?}", + result.block_tip_execution_result.result + ); BATCH_TIP_METRICS.observe(&result.block_tip_execution_result); - result + Ok(result) } /// Attempts to execute transaction with or without bytecode compression. @@ -249,11 +253,7 @@ impl CommandReceiver { &self, tx: &Transaction, vm: &mut VmInstance, - ) -> ( - VmExecutionResultAndLogs, - Vec, - Vec, - ) { + ) -> anyhow::Result { // Note, that the space where we can put the calldata for compressing transactions // is limited and the transactions do not pay for taking it. // In order to not let the accounts spam the space of compressed bytecodes with bytecodes @@ -270,16 +270,20 @@ impl CommandReceiver { vec![] }; - if let (Ok(()), result) = + if let (Ok(()), tx_result) = vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), true) { let compressed_bytecodes = vm.get_last_tx_compressed_bytecodes(); - let trace = Arc::try_unwrap(call_tracer_result) - .unwrap() + let calls = Arc::try_unwrap(call_tracer_result) + .map_err(|_| anyhow::anyhow!("failed extracting call traces"))? .take() .unwrap_or_default(); - return (result, compressed_bytecodes, trace); + return Ok(TransactionOutput { + tx_result, + compressed_bytecodes, + calls, + }); } // Roll back to the snapshot just before the transaction execution taken in `Self::execute_tx()` @@ -294,20 +298,22 @@ impl CommandReceiver { vec![] }; - let result = + let (compression_result, tx_result) = vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), false); - result - .0 - .expect("Compression can't fail if we don't apply it"); + compression_result.context("compression failed when it wasn't applied")?; let compressed_bytecodes = vm.get_last_tx_compressed_bytecodes(); // TODO implement tracer manager which will be responsible - // for collecting result from all tracers and save it to the database - let trace = Arc::try_unwrap(call_tracer_result) - .unwrap() + // for collecting result from all tracers and save it to the database + let calls = Arc::try_unwrap(call_tracer_result) + .map_err(|_| anyhow::anyhow!("failed extracting call traces"))? .take() .unwrap_or_default(); - (result.1, compressed_bytecodes, trace) + Ok(TransactionOutput { + tx_result, + compressed_bytecodes, + calls, + }) } /// Attempts to execute transaction with mandatory bytecode compression. @@ -316,11 +322,7 @@ impl CommandReceiver { &self, tx: &Transaction, vm: &mut VmInstance, - ) -> ( - VmExecutionResultAndLogs, - Vec, - Vec, - ) { + ) -> anyhow::Result { let call_tracer_result = Arc::new(OnceCell::default()); let tracer = if self.save_call_traces { vec![CallTracer::new(call_tracer_result.clone()).into_tracer_pointer()] @@ -328,22 +330,29 @@ impl CommandReceiver { vec![] }; - let (published_bytecodes, mut result) = + let (published_bytecodes, mut tx_result) = vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), true); if published_bytecodes.is_ok() { let compressed_bytecodes = vm.get_last_tx_compressed_bytecodes(); - - let trace = Arc::try_unwrap(call_tracer_result) - .unwrap() + let calls = Arc::try_unwrap(call_tracer_result) + .map_err(|_| anyhow::anyhow!("failed extracting call traces"))? .take() .unwrap_or_default(); - (result, compressed_bytecodes, trace) + Ok(TransactionOutput { + tx_result, + compressed_bytecodes, + calls, + }) } else { // Transaction failed to publish bytecodes, we reject it so initiator doesn't pay fee. - result.result = ExecutionResult::Halt { + tx_result.result = ExecutionResult::Halt { reason: Halt::FailedToPublishCompressedBytecodes, }; - (result, Default::default(), Default::default()) + Ok(TransactionOutput { + tx_result, + compressed_bytecodes: vec![], + calls: vec![], + }) } } } diff --git a/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs b/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs index 838b9240767..e0096cd0417 100644 --- a/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs +++ b/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs @@ -2,7 +2,7 @@ use anyhow::Context; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; -use zksync_state::{OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage}; +use zksync_state::{OwnedStorage, ReadStorageFactory, RocksdbStorage}; use zksync_types::L1BatchNumber; #[derive(Debug, Clone)] @@ -33,7 +33,7 @@ impl ReadStorageFactory for RocksdbStorageFactory { else { return Ok(None); }; - Ok(Some(PgOrRocksdbStorage::Rocksdb(rocksdb_storage).into())) + Ok(Some(OwnedStorage::Rocksdb(rocksdb_storage))) } } diff --git a/core/node/state_keeper/src/state_keeper_storage.rs b/core/node/state_keeper/src/state_keeper_storage.rs index fbda064b5d7..1b35f8ef73d 100644 --- a/core/node/state_keeper/src/state_keeper_storage.rs +++ b/core/node/state_keeper/src/state_keeper_storage.rs @@ -5,8 +5,7 @@ use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; use zksync_state::{ - AsyncCatchupTask, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, - RocksdbCell, RocksdbStorageOptions, + AsyncCatchupTask, OwnedStorage, ReadStorageFactory, RocksdbCell, RocksdbStorageOptions, }; use zksync_types::L1BatchNumber; @@ -58,24 +57,20 @@ impl ReadStorageFactory for AsyncRocksdbCache { self.rocksdb_cell.get() }; - if let Some(rocksdb) = rocksdb { - let mut connection = self - .pool - .connection_tagged("state_keeper") - .await - .context("Failed getting a Postgres connection")?; - let storage = PgOrRocksdbStorage::rocksdb( - &mut connection, - rocksdb, - stop_receiver, - l1_batch_number, - ) + let mut connection = self + .pool + .connection_tagged("state_keeper") .await - .context("Failed accessing RocksDB storage")?; - Ok(storage.map(Into::into)) + .context("Failed getting a Postgres connection")?; + if let Some(rocksdb) = rocksdb { + let storage = + OwnedStorage::rocksdb(&mut connection, rocksdb, stop_receiver, l1_batch_number) + .await + .context("Failed accessing RocksDB storage")?; + Ok(storage) } else { Ok(Some( - OwnedPostgresStorage::new(self.pool.clone(), l1_batch_number).into(), + OwnedStorage::postgres(connection, l1_batch_number).await?, )) } } diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index b7518903cae..e351b09ad2b 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -11,8 +11,8 @@ use tokio::sync::{watch, RwLock}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_multivm::interface::{L1BatchEnv, SystemEnv}; use zksync_state::{ - AsyncCatchupTask, BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, - RocksdbCell, RocksdbStorage, RocksdbStorageBuilder, RocksdbWithMemory, + AsyncCatchupTask, BatchDiff, OwnedStorage, RocksdbCell, RocksdbStorage, RocksdbStorageBuilder, + RocksdbWithMemory, }; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; use zksync_vm_utils::storage::L1BatchParamsProvider; @@ -140,12 +140,12 @@ impl StorageLoader for VmRunnerStorage { ) .await?; - return Ok(batch_data.map(|data| { - ( - data, - OwnedPostgresStorage::new(self.pool.clone(), l1_batch_number - 1).into(), - ) - })); + return Ok(if let Some(data) = batch_data { + let storage = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; + Some((data, storage)) + } else { + None + }); }; match state.storage.get(&l1_batch_number) { @@ -166,11 +166,11 @@ impl StorageLoader for VmRunnerStorage { .filter(|(&num, _)| num < l1_batch_number) .map(|(_, data)| data.diff.clone()) .collect::>(); - let storage = PgOrRocksdbStorage::RocksdbWithMemory(RocksdbWithMemory { + let storage = OwnedStorage::RocksdbWithMemory(RocksdbWithMemory { rocksdb: rocksdb.clone(), batch_diffs, }); - Ok(Some((data, storage.into()))) + Ok(Some((data, storage))) } } } diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 61f0a5ec3f6..dd14e4dd1b0 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -10,7 +10,7 @@ use zksync_node_test_utils::{ create_l1_batch_metadata, create_l2_block, execute_l2_transaction, l1_batch_metadata_to_commitment_artifacts, }; -use zksync_state::{OwnedPostgresStorage, OwnedStorage}; +use zksync_state::OwnedStorage; use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_test_account::Account; use zksync_types::{ @@ -58,8 +58,8 @@ impl StorageLoader for PostgresLoader { return Ok(None); }; - let storage = OwnedPostgresStorage::new(self.0.clone(), l1_batch_number - 1); - Ok(Some((data, storage.into()))) + let storage = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; + Ok(Some((data, storage))) } } diff --git a/core/node/vm_runner/src/tests/storage.rs b/core/node/vm_runner/src/tests/storage.rs index 1dfb5a60135..f6f7a2ba9e6 100644 --- a/core/node/vm_runner/src/tests/storage.rs +++ b/core/node/vm_runner/src/tests/storage.rs @@ -301,12 +301,8 @@ async fn access_vm_runner_storage() -> anyhow::Result<()> { .unwrap(); let mut pg_storage = PostgresStorage::new(rt_handle.clone(), conn, last_l2_block_number, true); - let (_, vm_storage) = rt_handle + let (_, mut vm_storage) = rt_handle .block_on(vm_runner_storage.load_batch_eventually(L1BatchNumber(i + 1)))?; - let mut vm_storage = match vm_storage { - OwnedStorage::Lending(ref storage) => rt_handle.block_on(storage.borrow()).unwrap(), - OwnedStorage::Static(storage) => storage, - }; // Check that both storages have identical key-value pairs written in them for storage_log in &storage_logs {