-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial implementation of both sns-executor and sns-worker #277
Conversation
* fix: make retry-acquire conn cancellable
- add decrypt_128 feature - support polling_interval, max_connections CLI args - rollback db txn, if no tasks available - use pg_notify to avoid SQL inject
@@ -0,0 +1,4 @@ | |||
ALTER TABLE ciphertexts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are migrations always the same (ie same db )?
if so we could have a symbolic link instead for the full directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we could just put the files in the fhevm-engine-common crate or somewhere else, but reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why the migrations folder is duplicated in both fhevm-db and coprocessor, but for now, I'm leaving it as is to prevent conflicts with the tfhe-worker changes.
info!(target: "sns", { count = records.len()}, "Fetched SnS tasks"); | ||
|
||
if records.is_empty() { | ||
return Ok(None); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not an empty vect to simplify the type and the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just personal preference.
|
||
/// Processes the tasks by decompressing and transforming ciphertexts. | ||
fn process_tasks(tasks: &mut [SnSTask], keys: &KeySet) -> Result<(), Box<dyn std::error::Error>> { | ||
set_server_key(keys.public_keys.server_key.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't be multi-thread with that. (but multi-process is ok). Is it temprorary ? The interface where you give the key at the operation is not avail ? (there is one but could be private)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which operation require the set_server_key ?
maybe it could be placed just before it or a comment could say so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set_server_key is required by decompress_ct
which is called in a loop. That's why set_server_key is called once before the loop starts.
Depending on the strategy for this issue #284, we may opt for setting it in https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_start
db_txn: &mut Transaction<'_, Postgres>, | ||
db_channel: &str, | ||
) -> Result<(), Box<dyn Error>> { | ||
sqlx::query("SELECT pg_notify($1, '')") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have seen syntax query!("NOTIFY blabla") . Could be more clear but doesn't preserve case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a config variable/notify_channel that is passed here. The pg_notify
is here to avoiding SQL inject
pub ksk: LweKeyswitchKey<Vec<u64>>, | ||
} | ||
|
||
pub trait AugmentedCiphertextParameters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a way to extend Ciphertext from tfhers locally ?
I don't see the trait use elsewhere. Should it be public ?
If not maybe accessing the self.message_modulus.0 directly avoids this trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The impl of the trait provides total_block_bits
function used in to_large_ciphertext_block
.
It's needed for decrypt_128 feature as well.
Also, I'd like to avoid any refactoring in SnS algorithm until we implement this #286.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just added a few comments/questions!
@@ -0,0 +1,4 @@ | |||
ALTER TABLE ciphertexts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we could just put the files in the fhevm-engine-common crate or somewhere else, but reusable.
hex = "0.4" | ||
|
||
aligned-vec = "0.5.0" | ||
num-traits = "=0.2.19" | ||
sqlx = { version = "0.7", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid"] } | ||
|
||
serde_json = "=1.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could combine these under "crates.io" dependencies.
Also, we could use sqlx
from the workspace dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I'll move it in the workspace with the next PR.
@@ -389,6 +391,28 @@ impl SupportedFheCiphertexts { | |||
} | |||
} | |||
|
|||
pub fn to_regular_ciphertext(self) -> BaseRadixCiphertext<Ciphertext> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Wondering if the name "regular" is the best. But I am not sure what ".0" is in this case, is it "regular"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.0 is actually the Ciphertext64. In kms-core SnS examples, it's also called small_ct
. Maybe we can rename this into fn to_ciphertext_64()
if let Err(err) = sns_executor::run(keys, &conf, cancel_rx).await { | ||
tracing::error!("Worker failed: {:?}", err); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which thread(s) execute the SnS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean I guess we expect to run multiple workers, not multiple threads, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rayon thread-pool threads. In to_large_ciphertext
, there is par_iter
let res = blocks
.par_iter()
.map(|current_block| self.to_large_ciphertext_block(current_block))
.collect::<anyhow::Result<Vec<Ciphertext128Block>>>()?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes, it is multithreaded itself!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's multi-threaded over blocks of a single Ciphertext64 . There is also issue to reconsider this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could have a test just showing that the part extracted (the algorithm) from tfhers still works. (could be a test copied from tfhers).
E.g. like a simple encrypt / compress / ... / decrypt.
}, | ||
}; | ||
|
||
let (_cancel_tx, cancel_rx) = tokio::sync::broadcast::channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to signal the _cancel_tx on any signals, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was overlooked but it's added now
#[arg(long, default_value_t = 10)] | ||
pub pg_pool_connections: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, why is the default value 10? Do we need a pool of so many?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, not sure what should be the value for now. I experienced connection issues with pg_pool_connections=1
So, it just follows the recommendation from Coprocessor
See also:
#[arg(long, default_value_t = 10)] |
This has been tested manually for all TFHE types so far (with Geth and Coprocessor running). Keys generating, unit-tests and benches must be copied/considered when we implement this: #286 |
fixes #283