Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nearest dc balancer #206

Open
wants to merge 42 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
3b804d0
init structure of balancer
bakalover Jul 17, 2024
44e86a4
Support location inside NodeInfo
bakalover Jul 28, 2024
b65b0d0
Balancer logic (need to optimize)
bakalover Jul 28, 2024
76b9dbc
Fire to start computation, await on sending addr ready, Pretty vars
bakalover Jul 29, 2024
6a999e3
Structured concurrency
bakalover Jul 31, 2024
aa441be
Construct NodeInfo with location
bakalover Jul 31, 2024
1531617
Balancr backup strategy options
bakalover Jul 31, 2024
8072d92
first draft
bakalover Aug 4, 2024
1156ef7
less diff
bakalover Aug 4, 2024
4a9f1d9
less diff
bakalover Aug 4, 2024
fcfa8c9
less diff
bakalover Aug 4, 2024
03d82ce
info about location inside error trace
bakalover Aug 4, 2024
55eb7f0
config
bakalover Aug 8, 2024
25d8e4b
no need in services yet
bakalover Aug 8, 2024
e97bb2a
lint checks
bakalover Aug 8, 2024
aa9f207
balancer config
bakalover Aug 8, 2024
85c86c9
non experimental default
bakalover Aug 8, 2024
7f2fc72
naming
bakalover Aug 9, 2024
9b233ec
random - default fallback
bakalover Aug 9, 2024
2c8f650
static config
bakalover Aug 9, 2024
e4abe1d
all waiter
bakalover Aug 11, 2024
97eb2cc
share state between self and child Balancer and wait alll if needed +…
bakalover Aug 11, 2024
3b9a944
better concurrent wait on waiters awaits
bakalover Aug 11, 2024
e139cef
reduce imports
bakalover Aug 11, 2024
30caee3
non-blocking balancer with full cancellation control and tracing
bakalover Aug 18, 2024
dd8bf2f
optimize imports
bakalover Aug 18, 2024
4be1624
client waits for balancer set up its state both in waiter and on endp…
bakalover Aug 20, 2024
2212b32
pretty + tests on internal non-async functions
bakalover Aug 31, 2024
a9e787c
ci restart
bakalover Aug 31, 2024
dde7b9b
Fix deadlock on buffer send by producers, fixed livelock on infinite …
bakalover Sep 1, 2024
103a410
clear
bakalover Sep 1, 2024
94f00cf
Move fallback balancer inside fallback strategy
bakalover Sep 7, 2024
9f92b50
Choose random endpoint from prefered
bakalover Sep 7, 2024
57d2627
Iter over only values in dc_to_nodes
bakalover Sep 7, 2024
5661d85
Divide into multiple files
bakalover Sep 15, 2024
fa2e570
Address producers do nothing on ping error
bakalover Sep 15, 2024
a25f949
Check exact map
bakalover Sep 15, 2024
0b83dad
Better random shuffle test
bakalover Sep 15, 2024
a225958
No clones
bakalover Sep 15, 2024
6499c41
Remove useless test
bakalover Sep 15, 2024
4c2071d
pub(crate)
bakalover Sep 15, 2024
079f46c
fix
bakalover Sep 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions ydb/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl DiscoveryState {
Some(&self.nodes)
}

pub(crate) fn get_all_nodes(&self) -> Option<&Vec<NodeInfo>> {
Some(&self.nodes)
}

pub(crate) fn is_empty(&self) -> bool {
self.nodes.len() == 0
}
Expand Down Expand Up @@ -97,11 +101,12 @@ impl Default for DiscoveryState {
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct NodeInfo {
pub(crate) uri: Uri,
pub(crate) location: String,
}

impl NodeInfo {
pub(crate) fn new(uri: Uri) -> Self {
Self { uri }
pub(crate) fn new(uri: Uri, location: String) -> Self {
Self { uri, location }
}
}

Expand Down Expand Up @@ -141,7 +146,7 @@ pub struct StaticDiscovery {
impl StaticDiscovery {
pub fn new_from_str<'a, T: Into<&'a str>>(endpoint: T) -> YdbResult<Self> {
let endpoint = Uri::from_str(endpoint.into())?;
let nodes = vec![NodeInfo::new(endpoint)];
let nodes = vec![NodeInfo::new(endpoint, String::new())];

let state = DiscoveryState::new(std::time::Instant::now(), nodes);
let state = Arc::new(state);
Expand Down Expand Up @@ -324,14 +329,14 @@ impl DiscoverySharedState {

fn list_endpoints_to_node_infos(list: Vec<EndpointInfo>) -> YdbResult<Vec<NodeInfo>> {
list.into_iter()
.map(|item| match Self::endpoint_info_to_uri(item) {
Ok(uri) => YdbResult::<NodeInfo>::Ok(NodeInfo::new(uri)),
.map(|item| match Self::endpoint_info_to_uri(&item) {
Ok(uri) => YdbResult::<NodeInfo>::Ok(NodeInfo::new(uri, item.location.clone())),
Err(err) => YdbResult::<NodeInfo>::Err(err),
})
.try_collect()
}

fn endpoint_info_to_uri(endpoint_info: EndpointInfo) -> YdbResult<Uri> {
fn endpoint_info_to_uri(endpoint_info: &EndpointInfo) -> YdbResult<Uri> {
let authority: Authority =
Authority::from_str(format!("{}:{}", endpoint_info.fqdn, endpoint_info.port).as_str())?;

Expand Down
3 changes: 3 additions & 0 deletions ydb/src/grpc_wrapper/raw_discovery_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ impl GrpcDiscoveryClient {
};
let resp = self.service.list_endpoints(req).await?;
let result: ListEndpointsResult = grpc_read_operation_result(resp)?;

let res = result
.endpoints
.into_iter()
.map(|item| EndpointInfo {
fqdn: item.address,
port: item.port,
ssl: item.ssl,
location: item.location,
})
.collect_vec();
Ok(res)
Expand All @@ -45,6 +47,7 @@ pub(crate) struct EndpointInfo {
pub(crate) fqdn: String,
pub(crate) port: u32,
pub(crate) ssl: bool,
pub(crate) location: String,
}

impl GrpcServiceForDiscovery for GrpcDiscoveryClient {
Expand Down
315 changes: 0 additions & 315 deletions ydb/src/load_balancer.rs

This file was deleted.

Loading
Loading