-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamic client endpoints #676
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,10 @@ use temporal_sdk_core_protos::{ | |
}, | ||
TaskToken, | ||
}; | ||
use tokio::sync::{ | ||
mpsc::{error::SendError, Sender}, | ||
Mutex as TokioMutex, | ||
}; | ||
use tonic::{ | ||
body::BoxBody, | ||
client::GrpcService, | ||
|
@@ -71,7 +75,7 @@ use tonic::{ | |
transport::{Certificate, Channel, Endpoint, Identity}, | ||
Code, Status, | ||
}; | ||
use tower::ServiceBuilder; | ||
use tower::{discover::Change, ServiceBuilder}; | ||
use url::Url; | ||
use uuid::Uuid; | ||
|
||
|
@@ -87,6 +91,9 @@ static LONG_POLL_METHOD_NAMES: [&str; 3] = [ | |
const LONG_POLL_TIMEOUT: Duration = Duration::from_secs(70); | ||
const OTHER_CALL_TIMEOUT: Duration = Duration::from_secs(30); | ||
|
||
/// Buffer size for the channel that listens to change events | ||
const DEFAULT_BUFFER_SIZE: usize = 1024; | ||
|
||
type Result<T, E = tonic::Status> = std::result::Result<T, E>; | ||
|
||
/// Options for the connection to the temporal server. Construct with [ClientOptionsBuilder] | ||
|
@@ -134,8 +141,94 @@ pub struct ClientOptions { | |
pub keep_alive: Option<ClientKeepAliveConfig>, | ||
} | ||
|
||
/// Connection options that can be dynamically updated. Construct with [ClientOptionsUpdateBuilder]. | ||
/// All fields are optional, and will be ignored if not set. To disable optional fields in [ClientOptions], | ||
/// set field to a tombstone specified in the field description. | ||
#[derive(Clone, Debug, Default, derive_builder::Builder)] | ||
#[non_exhaustive] | ||
pub struct ClientOptionsUpdate { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the entire (would also be neat if some of the runtime options like headers was set/updated via this common mechanism and didn't require reconnect, but that doesn't have to be now) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this probably makes decent sense too. I imagine the lang-side API will typically involve the user copying some existing options and changing them, in which case this all works out fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are the only fields that I can change dynamically, because they are associated with the endpoint, and I think it makes it more clear what can be changed with a separate struct. Also, fields like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we go with swapping out the whole options behind a mutex which I mentioned in another comment, then the other fields should be changeable too I think, which could work out nicely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Things like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't they be changed? I agree if they can't be changed it makes sense to accept something that only allows changing what can be. But, I'm not sure I follow why we couldn't change those. They are accessed each time a request is made - seems like changing them would affect subsequent requests and work properly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for retry_config we cache it inside: The others we can, but the question is whether we should, if it is used in metrics, or server side, it may be confusing, and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't have to actually expose those to end users, but I take your point. Especially since the |
||
/// The URL of the Temporal server to connect to. | ||
#[builder(setter(into, strip_option), default)] | ||
pub target_url: Option<Url>, | ||
|
||
/// If specified, use TLS as configured by the [TlsConfig] struct. If this is set, core will | ||
/// attempt to use TLS when connecting to the Temporal server. Lang SDK is expected to pass any | ||
/// certs or keys as bytes, loading them from disk itself if needed. | ||
/// If set with all optional fields to `None` in [TlsConfig], it disables TLS. | ||
#[builder(setter(strip_option), default)] | ||
pub tls_cfg: Option<TlsConfig>, | ||
|
||
/// If set, override the origin used when connecting. May be useful in rare situations where tls | ||
/// verification needs to use a different name from what should be set as the `:authority` | ||
/// header. | ||
/// If set with the URI default value, i.e., "/", it disables the origin override. | ||
#[builder(setter(strip_option), default)] | ||
pub override_origin: Option<Uri>, | ||
|
||
/// If set (which it is by default), HTTP2 gRPC keep alive will be enabled. | ||
/// If specified with all fields in [ClientKeepAliveConfig] set to zero, it disables `keep_alive`. | ||
#[builder(setter(strip_option), default)] | ||
pub keep_alive: Option<ClientKeepAliveConfig>, | ||
} | ||
|
||
impl ClientOptionsUpdate { | ||
/// Override fields that are present in the proposed update. | ||
pub fn merge(&mut self, update: ClientOptionsUpdate) { | ||
if update.target_url.is_some() { | ||
self.target_url = update.target_url; | ||
} | ||
if update.tls_cfg.is_some() { | ||
self.tls_cfg = update.tls_cfg; | ||
} | ||
if update.override_origin.is_some() { | ||
self.override_origin = update.override_origin; | ||
} | ||
if update.keep_alive.is_some() { | ||
self.keep_alive = update.keep_alive; | ||
} | ||
} | ||
|
||
/// Transform client options using this update. | ||
pub fn apply(&self, options: &ClientOptions) -> ClientOptions { | ||
let mut result = options.clone(); | ||
if let Some(target_url) = self.target_url.as_ref() { | ||
result.target_url = target_url.clone(); | ||
} | ||
|
||
if let Some(tls_cfg) = self.tls_cfg.as_ref() { | ||
result.tls_cfg = if *tls_cfg == Default::default() { | ||
// Default, i.e., all fields `None`, is a tombstone for tls_cfg | ||
None | ||
} else { | ||
Some(tls_cfg.clone()) | ||
}; | ||
} | ||
|
||
if let Some(override_origin) = self.override_origin.as_ref() { | ||
// Tombstone is "/" | ||
result.override_origin = if *override_origin == Uri::default() { | ||
None | ||
} else { | ||
Some(override_origin.clone()) | ||
} | ||
} | ||
|
||
if let Some(keep_alive) = self.keep_alive.as_ref() { | ||
// Tombstone sets all durations to zero | ||
let zero_duration = Duration::new(0, 0); | ||
result.keep_alive = | ||
if keep_alive.interval == zero_duration && keep_alive.timeout == zero_duration { | ||
None | ||
} else { | ||
Some(keep_alive.clone()) | ||
} | ||
} | ||
|
||
result | ||
} | ||
} | ||
/// Configuration options for TLS | ||
#[derive(Clone, Debug, Default)] | ||
#[derive(Clone, Debug, Default, PartialEq)] | ||
pub struct TlsConfig { | ||
/// Bytes representing the root CA certificate used by the server. If not set, and the server's | ||
/// cert is issued by someone the operating system trusts, verification will still work (ex: | ||
|
@@ -149,7 +242,7 @@ pub struct TlsConfig { | |
} | ||
|
||
/// If using mTLS, both the client cert and private key must be specified, this contains them. | ||
#[derive(Clone)] | ||
#[derive(Clone, PartialEq)] | ||
pub struct ClientTlsConfig { | ||
/// The certificate for this client | ||
pub client_cert: Vec<u8>, | ||
|
@@ -272,6 +365,28 @@ pub enum ClientInitError { | |
SystemInfoCallError(tonic::Status), | ||
} | ||
|
||
/// Errors thrown while updating an existing client. | ||
#[derive(thiserror::Error, Debug)] | ||
pub enum ClientUpdateError { | ||
/// Cannot change the channel configuration. | ||
#[error("Cannot send the update: {0:?}")] | ||
ControlChannelError(#[from] SendError<Change<i32, Endpoint>>), | ||
/// Invalid update, cannot create the new endpoint. | ||
#[error("Cannot create an endpoint: {0:?}")] | ||
InvalidUpdateError(#[from] ClientInitError), | ||
} | ||
|
||
/// Metadata to enable the dynamic configuration of a client. | ||
#[derive(Clone, Debug)] | ||
pub struct DynamicUpdateInfo { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't need to be pub There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They may need to access the changeset to get the current options when there is no mutable ref to the client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not accessible as it stands anyway. It's stored as a private field and there's no accessor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. We need to decide whether to get rid of changeset and update ClientOptions directly (with mutex), or not, and then either add the accessor or make everything private. |
||
/// Version number of the current update, or zero if no updates have been applied. | ||
version: i32, | ||
/// Control channel sender | ||
tx: Sender<Change<i32, Endpoint>>, | ||
/// Summary of contributions from all the previous updates. | ||
changeset: ClientOptionsUpdate, | ||
} | ||
|
||
/// A client with [ClientOptions] attached, which can be passed to initialize workers, | ||
/// or can be used directly. Is cheap to clone. | ||
#[derive(Clone, Debug)] | ||
|
@@ -282,6 +397,7 @@ pub struct ConfiguredClient<C> { | |
/// Capabilities as read from the `get_system_info` RPC call made on client connection | ||
capabilities: Option<get_system_info_response::Capabilities>, | ||
workers: Arc<SlotManager>, | ||
update: Arc<TokioMutex<DynamicUpdateInfo>>, | ||
} | ||
|
||
impl<C> ConfiguredClient<C> { | ||
|
@@ -306,6 +422,57 @@ impl<C> ConfiguredClient<C> { | |
pub fn workers(&self) -> Arc<SlotManager> { | ||
self.workers.clone() | ||
} | ||
|
||
/// Patches the client options, and then dynamically updates the endpoint with them. | ||
/// If the update fails, the client does not rollback to the previous configuration, and future | ||
/// connections will fail. | ||
Comment on lines
+427
to
+428
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is rough. Why can't we only apply the update after the new endpoint is connected and works? Can't you do an atomic swap of the endpoint where there is no downtime and it only contains a successfully connected endpoint? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I would call this It'd be fine to put a mutex around the options as well, so that they can be updated too. This also fits better with the idea of using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been looking into the atomic swap reusing the id for the endpoint, but the problem is that it s anything but atomic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah I see that with the balance channel and makes sense. But can we wait to even send the insert until we have connected the endpoint? And can we send the remove after the insert? So logic would be:
Also curious, if you send an insert then a remove, is it guaranteed that the next call will be on the inserted? And in the case of what's there now w/ remove-then-insert, is it possible for multi-threaded use that there is a time where no endpoint is there?
I wonder if we can have our own "mutex_channel" or something, or if that's too much (EDIT: I see
This is unfortunate but if we can't eagerly validate, ok. I now wonder if we should go higher than channel. What I mean is if we can swap our own channel for a completely new one instead of updating the channel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that you can easily swap the channel without creating a new client, and the problem of creating new clients is that there is a lot of shared state associated to them, like eager workflow start workers, or metrics,... that need to be moved across atomically. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that it is "lazy". It will create the connection with the first request, and we want to make sure that this connection is not created until we finish the upgrade. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should attempt a get-system-info call on the client before swapping it. IMO we want to make sure the connection is created and is a known config to be successful even way before we swap it out. We don't want to swap out lazy or a when given a bad client, the worker will just silently fail for a minute (our default retry iirc) then fatal out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before the swap we can check the connection, but that doesn't help for lazy. Internally Tonic defers the connection until it gets the first request. And even if we do a get-system-info, there is a race with someone else doing the first call after dropping the lock, if they grabbed it before the update started, and getting the error... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is why I'm suggesting a higher level client swap. Don't swap things at the Tonic level, swap the connected Temporal client.
I think we can accept/document this race. If you're really concerned, we can use a separate lock for the entire client update process to prevent concurrent updates and put a short timeout on the get-system-info call, but of course we should never hold a lock during regular client operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would probably do this: pub struct ConfiguredClient<C> {
client_and_options: ArcSwap<ClientAndOptions>,
headers: Arc<RwLock<HashMap<String, String>>>,
/// Capabilities as read from the `get_system_info` RPC call made on client connection
capabilities: Option<get_system_info_response::Capabilities>,
workers: Arc<SlotManager>,
}
pub struct ClientAndOptions<C> {
client: C,
options: ClientOptions
} If ArcSwap doesn't work out for whatever reason (it might be hard to use |
||
/// Internally it maintains a mutable changeset so that we can apply a sequence of updates without a | ||
/// mutable client reference. However, this means it cannot update the `options` field. | ||
/// To keep `options` consistent use `update_options_mut` or, when needed, manually `apply()` the | ||
/// changeset to the original options. | ||
pub async fn update_options( | ||
&self, | ||
update: ClientOptionsUpdate, | ||
) -> Result<ClientOptions, ClientUpdateError> { | ||
// We take a "hard" approach for updating. First remove the previous endpoint, and if successful, | ||
// add the updated one. If there is a failure, don't try to recover, and just let the upper layer | ||
// handle the failed client. | ||
// This approach may leave a short window in which the client cannot open new connections. Again, | ||
// we let the upper layer handle this infrequent failure. | ||
// The "soft" approach, i.e., remove-last or override-when-ready, cannot guarantee when the update | ||
// happens, and also the asynchronous update could fail silently. | ||
|
||
let mut update_meta = self.update.lock().await; | ||
let remove_change = Change::Remove(update_meta.version); | ||
if let Err(err) = update_meta.tx.send(remove_change).await { | ||
return Err(ClientUpdateError::ControlChannelError(err)); | ||
} | ||
update_meta.changeset.merge(update); | ||
let new_options = update_meta.changeset.apply(&self.options); | ||
update_meta.version += 1; | ||
let endpoint = new_options.create_endpoint().await?; | ||
let add_change = Change::Insert(update_meta.version, endpoint); | ||
if let Err(err) = update_meta.tx.send(add_change).await { | ||
return Err(ClientUpdateError::ControlChannelError(err)); | ||
} | ||
|
||
Ok(new_options) | ||
} | ||
|
||
/// Similar to `update_options` but it also updates the `options` field in this `ConfiguredClient` | ||
/// to reflect the changes. | ||
pub async fn update_options_mut( | ||
&mut self, | ||
update: ClientOptionsUpdate, | ||
) -> Result<ClientOptions, ClientUpdateError> { | ||
match self.update_options(update).await { | ||
Ok(new_options) => { | ||
*Arc::get_mut(&mut self.options).unwrap() = new_options.clone(); | ||
Ok(new_options) | ||
} | ||
Err(error) => Err(error), | ||
} | ||
} | ||
} | ||
|
||
// The configured client is effectively a "smart" (dumb) pointer | ||
|
@@ -340,6 +507,26 @@ impl ClientOptions { | |
Ok(retry_client) | ||
} | ||
|
||
/// Creates an endpoint for an HTTP/2 channel. | ||
pub async fn create_endpoint(&self) -> Result<Endpoint, ClientInitError> { | ||
let endpoint = Channel::from_shared(self.target_url.to_string())?; | ||
let endpoint = self.add_tls_to_channel(endpoint).await?; | ||
let endpoint = if let Some(keep_alive) = self.keep_alive.as_ref() { | ||
endpoint | ||
.keep_alive_while_idle(true) | ||
.http2_keep_alive_interval(keep_alive.interval) | ||
.keep_alive_timeout(keep_alive.timeout) | ||
} else { | ||
endpoint | ||
}; | ||
let endpoint = if let Some(origin) = self.override_origin.clone() { | ||
endpoint.origin(origin) | ||
} else { | ||
endpoint | ||
}; | ||
Ok(endpoint) | ||
} | ||
|
||
/// Attempt to establish a connection to the Temporal server and return a gRPC client which is | ||
/// intercepted with retry, default headers functionality, and metrics if provided. | ||
/// | ||
|
@@ -350,22 +537,15 @@ impl ClientOptions { | |
headers: Option<Arc<RwLock<HashMap<String, String>>>>, | ||
) -> Result<RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>, ClientInitError> | ||
{ | ||
let channel = Channel::from_shared(self.target_url.to_string())?; | ||
let channel = self.add_tls_to_channel(channel).await?; | ||
let channel = if let Some(keep_alive) = self.keep_alive.as_ref() { | ||
channel | ||
.keep_alive_while_idle(true) | ||
.http2_keep_alive_interval(keep_alive.interval) | ||
.keep_alive_timeout(keep_alive.timeout) | ||
} else { | ||
channel | ||
}; | ||
let channel = if let Some(origin) = self.override_origin.clone() { | ||
channel.origin(origin) | ||
} else { | ||
channel | ||
}; | ||
let channel = channel.connect().await?; | ||
let endpoint = self.create_endpoint().await?; | ||
let (channel, tx) = Channel::balance_channel(DEFAULT_BUFFER_SIZE); | ||
let change = Change::Insert(0, endpoint); | ||
if let Err(err) = tx.send(change).await { | ||
return Err(ClientInitError::SystemInfoCallError(tonic::Status::new( | ||
Code::Unavailable, | ||
format!("Cannot initialize channel with endpoint: {:?}", err), | ||
))); | ||
} | ||
let service = ServiceBuilder::new() | ||
.layer_fn(move |channel| GrpcMetricSvc { | ||
inner: channel, | ||
|
@@ -385,6 +565,11 @@ impl ClientOptions { | |
options: Arc::new(self.clone()), | ||
capabilities: None, | ||
workers: Arc::new(SlotManager::new()), | ||
update: Arc::new(TokioMutex::new(DynamicUpdateInfo { | ||
version: 0, | ||
tx, | ||
changeset: Default::default(), | ||
})), | ||
}; | ||
match client | ||
.get_system_info(GetSystemInfoRequest::default()) | ||
|
@@ -1573,4 +1758,41 @@ mod tests { | |
let opts = builder.keep_alive(None).build().unwrap(); | ||
assert!(opts.keep_alive.is_none()); | ||
} | ||
|
||
#[test] | ||
fn tombstones_reset_options() { | ||
let mut builder = ClientOptionsBuilder::default(); | ||
builder | ||
.target_url(Url::parse("https://smolkitty").unwrap()) | ||
.client_name("cute-kitty".to_string()) | ||
.client_version("0.1.0".to_string()) | ||
.tls_cfg(TlsConfig { | ||
client_tls_config: None, | ||
domain: Some("something.cloud".to_string()), | ||
server_root_ca_cert: None, | ||
}) | ||
.override_origin(Some("/foo/bar".parse::<Uri>().unwrap())); | ||
// If unset, defaults to Some | ||
let opts = builder.build().unwrap(); | ||
let mut builder_update = ClientOptionsUpdateBuilder::default(); | ||
let mut first_update = ClientOptionsUpdate::default(); | ||
let update = builder_update | ||
.tls_cfg(Default::default()) | ||
.override_origin(Default::default()) | ||
.keep_alive(ClientKeepAliveConfig { | ||
interval: Duration::new(0, 0), | ||
timeout: Duration::new(0, 0), | ||
}) | ||
.build() | ||
.unwrap(); | ||
first_update.merge(update); | ||
let new_opts = first_update.apply(&opts); | ||
assert!(new_opts.keep_alive.is_none()); | ||
assert!(new_opts.tls_cfg.is_none()); | ||
assert!(new_opts.override_origin.is_none()); | ||
assert_eq!( | ||
new_opts.target_url, | ||
"https://smolkitty".parse::<Url>().unwrap() | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect this can probably be a lot smaller (though I doubt it matters a whole lot either way).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copied the constant used internally by tonic, frankly, no idea about the right size... But, yes, being a control channel, that looks large...