Skip to content

Commit

Permalink
Merge pull request #269 from zama-ai/rudy/feat/event-tfhe-operation
Browse files Browse the repository at this point in the history
feat: listening events for TFHE operation on chain
  • Loading branch information
rudy-6-4 authored Feb 3, 2025
2 parents 0a6bf28 + 00893b7 commit 685e253
Show file tree
Hide file tree
Showing 8 changed files with 3,374 additions and 1 deletion.
2 changes: 1 addition & 1 deletion fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["coprocessor", "executor", "fhevm-engine-common"]
members = ["coprocessor", "executor", "fhevm-engine-common", "listener"]

[workspace.package]
authors = ["Zama"]
Expand Down
22 changes: 22 additions & 0 deletions fhevm-engine/listener/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "listener"
version = "0.0.1"
edition = "2021"

[[bin]]
path = "src/bin/main.rs"
name = "listen"
test = false
bench = false

[dependencies]
# workspace dependencies
alloy = { version = "=0.9.2", features = ["contract", "json", "providers", "provider-ws", "pubsub", "rpc-types", "sol-types"] }
alloy-rpc-types = "=0.9.2"
alloy-sol-types = "=0.8.19"
alloy-primitives = "=0.8.19"
alloy-provider = "=0.9.2"
clap = { workspace = true }
futures-util = "=0.3"
serde = { workspace = true }
tokio = { workspace = true }
99 changes: 99 additions & 0 deletions fhevm-engine/listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Listener

The listener primary role is to observe the block chain execution and extend that execution off the chain.

## How

Our contracts actively emits events that forms the trace of a symbolic execution. These events can be observed via the blockchain node pubsub events feature.

## Command-line

WIP

## Events in FHEVM

### Blockchain Events
> Status: in progress
Blockchain events are used export the symbolic execution of TFHE operations from a blockchain node configured to accept pubsub requests.
A listener subscribe to the blockchain node and converts the events to a TFHE workload in a database.

There are 3 types of events related to:
- TFHE operations
- ACL, can be used to preprocess ciphertext for certain use case
- Public and User Decryption

### Database Events
> Status: proposal
Database events are used to hint the scheduler to dispath workload and to notice workload completion.

> https://stackoverflow.com/questions/56747634/how-do-i-use-the-postgres-crate-to-receive-table-modification-events-from-postgr
### Decryption Events
> Status: in progress
### Overview FHEVM
> **_NOTE:_** Listener and scheduler could be in the same service.**
```mermaid
sequenceDiagram
participant BC App Node
participant Listener
participant Scheduler
participant DB
participant Coprocessor
Listener-->>BC App Node: Subscribe Contract Events
Scheduler-->>DB: Subscribe Computations Insertions/Status<br/>(proposal)
loop Block Execution - Symbolic Operations
Note over BC App Node: Solidity traces a Symbolic Sequence
Note over BC App Node: TFHEExecutor contract
Note over BC App Node: ACL contract
end
Note over BC App Node: End of Block Execution (MAYBE)
BC App Node-)Listener: TFHE Operations Events
BC App Node-)Listener: ACL Events
Listener->>DB: Insert TFHE Operations
DB-)Scheduler: Notice TFHE Operations Insertions<br/>(proposal)
Scheduler-)Coprocessor: THFE Operation Workload
BC App Node-)Listener: Decryption Events
loop FHE Computation
Coprocessor -->> DB: Read Operands Ciphertexts
Note over Coprocessor: TFHE Computation
Coprocessor -->> DB: Write Result Ciphertext
Coprocessor-->>DB: Mark TFHE Operation as Done
end
DB-)Scheduler: Notice TFHE Operations Status<br/>(proposal)
```

### Overview Relayer (maybe incorrect to be refined)

```mermaid
sequenceDiagram
participant Relayer
participant Listener
participant Scheduler
participant DB
participant Coprocessor
Note over Listener: THEFE Operations Events
Note over Listener: Decryption Events
Listener->>DB: Insert TFHE Operations
Listener->>Relayer: Decryption Workload
DB-)Scheduler: Notice TFHE Operations Insertions<br/>(proposal)
Scheduler-)Coprocessor: THEFE Operation Workload
loop FHE Computation
Coprocessor -->> DB: Read Operands Ciphertexts
Note over Coprocessor: TFHE Computation
Coprocessor -->> DB: Write Result Ciphertexts
Coprocessor-->>DB: TFHE Operation Done
end
DB-)Scheduler: Notice TFHE Operations Status<br/>(proposal)
Scheduler-)Relayer: Notice Ciphertext ready for decryption
```

215 changes: 215 additions & 0 deletions fhevm-engine/listener/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use futures_util::stream::StreamExt;
use std::str::FromStr;
use std::time::Duration;

use alloy::primitives::Address;
use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect};
use alloy::pubsub::{PubSubFrontend, SubscriptionStream};
use alloy::rpc::types::{BlockNumberOrTag, Filter};
use alloy_rpc_types::Log;
use alloy_sol_types::SolEventInterface;

use clap::Parser;

use listener::contracts::{AclContract, TfheContract};

const DEFAULT_BLOCK: BlockNumberOrTag = BlockNumberOrTag::Latest;
const DEFAULT_CATCHUP: u64 = 5;

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
#[arg(long, default_value = "ws://0.0.0.0:8746")]
pub url: String,

#[arg(long, default_value = "false")]
pub ignore_tfhe_events: bool,

#[arg(long, default_value = "false")]
pub ignore_acl_events: bool,

#[arg(long, default_value = None)]
pub acl_contract_address: Option<String>,

#[arg(long, default_value = None)]
pub tfhe_contract_address: Option<String>,

#[arg(long, default_value = None)]
pub database_url: Option<String>,

#[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)]
pub start_at_block: Option<i64>,

#[arg(long, default_value = None)]
pub end_at_block: Option<u64>,
}

// TODO: to merge with Levent works
struct InfiniteLogIter {
url: String,
contract_addresses: Vec<Address>,
stream: Option<SubscriptionStream<Log>>,
provider: Option<RootProvider<PubSubFrontend>>, // required to maintain the stream
last_seen_block: Option<u64>,
start_at_block: Option<i64>,
end_at_block: Option<u64>,
}

impl InfiniteLogIter {
fn new(args: &Args) -> Self {
let mut contract_addresses = vec![];
if let Some(acl_contract_address) = &args.acl_contract_address {
contract_addresses.push(Address::from_str(acl_contract_address).unwrap());
};
if let Some(tfhe_contract_address) = &args.tfhe_contract_address {
contract_addresses.push(Address::from_str(tfhe_contract_address).unwrap());
};
Self {
url: args.url.clone(),
contract_addresses: contract_addresses,
stream: None,
provider: None,
last_seen_block: None,
start_at_block: args.start_at_block,
end_at_block: args.end_at_block,
}
}

async fn catchup_block_from(
&self,
provider: &RootProvider<PubSubFrontend>,
) -> BlockNumberOrTag {
if let Some(last_seen_block) = self.last_seen_block {
return BlockNumberOrTag::Number(last_seen_block - 1);
}
if let Some(start_at_block) = self.start_at_block {
if start_at_block >= 0 {
return BlockNumberOrTag::Number(start_at_block.try_into().unwrap());
}
}
let Ok(last_block) = provider.get_block_number().await else {
return BlockNumberOrTag::Earliest; // should not happend
};
let catch_size = if let Some(start_at_block) = self.start_at_block {
(-start_at_block).try_into().unwrap()
} else {
DEFAULT_CATCHUP
};
return BlockNumberOrTag::Number(last_block - catch_size.min(last_block));
}

async fn new_log_stream(&mut self, not_initialized: bool) {
let mut retry = 20;
loop {
let ws = WsConnect::new(&self.url);
match ProviderBuilder::new().on_ws(ws).await {
Ok(provider) => {
let catch_up_from = self.catchup_block_from(&provider).await;
if not_initialized {
eprintln!("Catchup from {:?}", catch_up_from);
}
let mut filter = Filter::new().from_block(catch_up_from);
if let Some(end_at_block) = self.end_at_block {
filter = filter.to_block(BlockNumberOrTag::Number(end_at_block));
// inclusive
}
if !self.contract_addresses.is_empty() {
filter = filter.address(self.contract_addresses.clone())
}
eprintln!("Listening on {}", &self.url);
eprintln!("Contracts {:?}", &self.contract_addresses);
self.stream = Some(
provider
.subscribe_logs(&filter)
.await
.expect("BLA2")
.into_stream(),
);
self.provider = Some(provider);
return;
}
Err(err) => {
let delay = if not_initialized {
if retry == 0 {
panic!("Cannot connect to {} due to {err}.", &self.url)
}
5
} else {
1
};
if not_initialized {
eprintln!("Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times.", &self.url);
} else {
eprintln!("Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively.", &self.url);
}
retry -= 1;
tokio::time::sleep(Duration::from_secs(delay)).await;
}
}
}
}

async fn next(&mut self) -> Option<Log> {
let mut not_initialized = true;
loop {
let Some(stream) = &mut self.stream else {
self.new_log_stream(not_initialized).await;
not_initialized = false;
continue;
};
let Some(log) = stream.next().await else {
// the stream ends, could be a restart of the full node, or just a temporary gap
self.stream = None;
if let (Some(end_at_block), Some(last_seen_block)) =
(self.end_at_block, self.last_seen_block)
{
if end_at_block == last_seen_block {
eprintln!("Nothing to read, reached end of block range");
return None;
}
}
eprintln!("Nothing to read, retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
return Some(log);
}
}
}

#[tokio::main]
async fn main() -> () {
let args = Args::parse();
let mut log_iter = InfiniteLogIter::new(&args);
if let Some(acl_contract_address) = &args.acl_contract_address {
if let Err(err) = Address::from_str(acl_contract_address) {
panic!("Invalid acl contract address: {err}");
};
};
if let Some(tfhe_contract_address) = &args.tfhe_contract_address {
if let Err(err) = Address::from_str(tfhe_contract_address) {
panic!("Invalid tfhe contract address: {err}");
};
}

log_iter.new_log_stream(true).await;
while let Some(log) = log_iter.next().await {
if let Some(block_number) = log.block_number {
eprintln!("Event at block: {}", { block_number });
log_iter.last_seen_block = Some(block_number);
}
if !args.ignore_tfhe_events {
if let Ok(event) = TfheContract::TfheContractEvents::decode_log(&log.inner, true) {
// TODO: filter on contract address if known
println!("TFHE {event:#?}");
continue;
}
}
if !args.ignore_tfhe_events {
if let Ok(event) = AclContract::AclContractEvents::decode_log(&log.inner, true) {
println!("ACL {event:#?}");
continue;
}
}
}
}
Loading

0 comments on commit 685e253

Please sign in to comment.