Skip to content

Commit df99c1a

Browse files
authored
Simplifies types used in metric producer descriptions (#125)
- Removes the `ProducerId` type entirely. This was mostly there due to a misunderstanding of how Dropshot deserializes path types. It's been replaced with a Uuid - Makes most fields of the `ProducerEndpoint` type public. This now stores the base collection route and the ID, and concats them on-demand, rather than storing the joined path and splitting it on demand.
1 parent 1441c73 commit df99c1a

File tree

5 files changed

+42
-143
lines changed

5 files changed

+42
-143
lines changed

omicron-common/src/model.rs

Lines changed: 7 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::fmt::Debug;
2424
use std::fmt::Display;
2525
use std::fmt::Formatter;
2626
use std::fmt::Result as FormatResult;
27-
use std::net::{SocketAddr, ToSocketAddrs};
27+
use std::net::SocketAddr;
2828
use std::num::NonZeroU32;
2929
use std::time::Duration;
3030
use thiserror::Error;
@@ -1394,133 +1394,24 @@ pub struct BootstrapAgentShareResponse {
13941394
* Oximeter producer/collector objects.
13951395
*/
13961396

1397-
/**
1398-
* Idenitifier for a producer.
1399-
*/
1400-
#[derive(
1401-
Debug,
1402-
Clone,
1403-
Copy,
1404-
PartialEq,
1405-
PartialOrd,
1406-
Ord,
1407-
Eq,
1408-
JsonSchema,
1409-
Serialize,
1410-
Deserialize,
1411-
)]
1412-
pub struct ProducerId {
1413-
pub producer_id: Uuid,
1414-
}
1415-
1416-
impl ProducerId {
1417-
/**
1418-
* Construct a new producer ID.
1419-
*/
1420-
pub fn new() -> Self {
1421-
Self { producer_id: Uuid::new_v4() }
1422-
}
1423-
}
1424-
1425-
impl Default for ProducerId {
1426-
fn default() -> Self {
1427-
Self::new()
1428-
}
1429-
}
1430-
1431-
impl std::fmt::Display for ProducerId {
1432-
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1433-
write!(f, "{}", self.producer_id.to_string())
1434-
}
1435-
}
1436-
14371397
/**
14381398
* Information announced by a metric server, used so that clients can contact it and collect
14391399
* available metric data from it.
14401400
*/
14411401
#[derive(Debug, Clone, JsonSchema, Serialize, Deserialize)]
14421402
pub struct ProducerEndpoint {
1443-
producer_id: ProducerId,
1444-
address: SocketAddr,
1445-
collection_route: String,
1446-
interval: Duration,
1403+
pub id: Uuid,
1404+
pub address: SocketAddr,
1405+
pub base_route: String,
1406+
pub interval: Duration,
14471407
}
14481408

14491409
impl ProducerEndpoint {
1450-
/**
1451-
* Generate info for a metric server listening on the given address and route.
1452-
*
1453-
* This will generate a new, random [`ProducerId`] for the server. The `base_route` should be
1454-
* a route stem, to which the producer ID will be appended. `interval` is the desired initial
1455-
* collection interval for the producers metrics.
1456-
*
1457-
* Example
1458-
* -------
1459-
* ```rust
1460-
* use std::time::Duration;
1461-
* use omicron_common::model::ProducerEndpoint;
1462-
*
1463-
* let info = ProducerEndpoint::new("127.0.0.1:4444", "/collect", Duration::from_secs(10));
1464-
* assert_eq!(info.collection_route(), format!("/collect/{}", info.producer_id()));
1465-
* ```
1466-
*/
1467-
pub fn new<T>(address: T, base_route: &str, interval: Duration) -> Self
1468-
where
1469-
T: ToSocketAddrs,
1470-
{
1471-
Self::with_id(ProducerId::new(), address, base_route, interval)
1472-
}
1473-
1474-
/**
1475-
* Generate info for a metric server, listening on the given address and route, with a known
1476-
* ID.
1477-
*/
1478-
pub fn with_id<T>(
1479-
producer_id: ProducerId,
1480-
address: T,
1481-
base_route: &str,
1482-
interval: Duration,
1483-
) -> Self
1484-
where
1485-
T: ToSocketAddrs,
1486-
{
1487-
Self {
1488-
producer_id,
1489-
address: address.to_socket_addrs().unwrap().next().unwrap(),
1490-
collection_route: format!(
1491-
"{}/{}",
1492-
base_route, producer_id.producer_id
1493-
),
1494-
interval,
1495-
}
1496-
}
1497-
1498-
/**
1499-
* Return the producer ID for this server.
1500-
*/
1501-
pub fn producer_id(&self) -> ProducerId {
1502-
self.producer_id
1503-
}
1504-
1505-
/**
1506-
* Return the address on which this server listens.
1507-
*/
1508-
pub fn address(&self) -> SocketAddr {
1509-
self.address
1510-
}
1511-
15121410
/**
15131411
* Return the route that can be used to request metric data.
15141412
*/
1515-
pub fn collection_route(&self) -> &str {
1516-
&self.collection_route
1517-
}
1518-
1519-
/**
1520-
* Return the interval on which metrics should be collected.
1521-
*/
1522-
pub fn interval(&self) -> &Duration {
1523-
&self.interval
1413+
pub fn collection_route(&self) -> String {
1414+
format!("{}/{}", &self.base_route, &self.id)
15241415
}
15251416
}
15261417

omicron-nexus/src/nexus.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1244,7 +1244,7 @@ impl Nexus {
12441244
info!(
12451245
self.log,
12461246
"assigned collector to new producer";
1247-
"producer_id" => ?producer_info.producer_id(),
1247+
"producer_id" => ?producer_info.id,
12481248
"collector_id" => ?collector.id,
12491249
);
12501250
Ok(())

oximeter/oximeter/examples/producer.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,12 @@ async fn main() {
8888
ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Debug };
8989
let registration_info =
9090
RegistrationInfo::new("127.0.0.1:12221", "/metrics/producers");
91-
let server_info =
92-
ProducerEndpoint::new(address, "/collect", Duration::from_secs(10));
91+
let server_info = ProducerEndpoint {
92+
id: Uuid::new_v4().into(),
93+
address,
94+
base_route: "/collect".to_string(),
95+
interval: Duration::from_secs(10),
96+
};
9397
let config = ProducerServerConfig {
9498
server_info,
9599
registration_info,

oximeter/oximeter/src/collect.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use dropshot::{
1111
endpoint, ApiDescription, ConfigDropshot, ConfigLogging, HttpError,
1212
HttpResponseOk, HttpServer, HttpServerStarter, Path, RequestContext,
1313
};
14-
use omicron_common::model::{ProducerEndpoint, ProducerId};
14+
use omicron_common::model::ProducerEndpoint;
1515
use reqwest::Client;
1616
use schemars::JsonSchema;
1717
use serde::{Deserialize, Serialize};
1818
use slog::{debug, info, o};
19+
use uuid::Uuid;
1920

2021
use crate::types;
2122
use crate::{Error, Producer};
@@ -57,7 +58,7 @@ pub type ProducerResults = Vec<Result<BTreeSet<types::Sample>, Error>>;
5758
#[derive(Clone)]
5859
pub struct Collector {
5960
producers: Arc<Mutex<ProducerList>>,
60-
producer_id: ProducerId,
61+
producer_id: Uuid,
6162
}
6263

6364
impl Default for Collector {
@@ -69,11 +70,11 @@ impl Default for Collector {
6970
impl Collector {
7071
/// Construct a new `Collector`.
7172
pub fn new() -> Self {
72-
Self::with_id(ProducerId::new())
73+
Self::with_id(Uuid::new_v4())
7374
}
7475

7576
/// Construct a new `Collector` with the given producer ID.
76-
pub fn with_id(producer_id: ProducerId) -> Self {
77+
pub fn with_id(producer_id: Uuid) -> Self {
7778
Self { producers: Arc::new(Mutex::new(vec![])), producer_id }
7879
}
7980

@@ -101,7 +102,7 @@ impl Collector {
101102
}
102103

103104
/// Return the producer ID associated with this collector.
104-
pub fn producer_id(&self) -> ProducerId {
105+
pub fn producer_id(&self) -> Uuid {
105106
self.producer_id
106107
}
107108
}
@@ -132,7 +133,7 @@ impl ProducerServer {
132133
.logging_config
133134
.to_logger("metric-server")
134135
.map_err(|msg| Error::ProducerServer(msg.to_string()))?;
135-
let collector = Collector::with_id(config.server_info.producer_id());
136+
let collector = Collector::with_id(config.server_info.id);
136137
let dropshot_log = log.new(o!("component" => "dropshot"));
137138
let server = HttpServerStarter::new(
138139
&config.dropshot_config,
@@ -160,7 +161,7 @@ impl ProducerServer {
160161
"starting oximeter metric server";
161162
"route" => config.server_info.collection_route(),
162163
"producer_id" => ?collector.producer_id(),
163-
"address" => config.server_info.address(),
164+
"address" => config.server_info.address,
164165
);
165166
Ok(Self { collector, server })
166167
}
@@ -190,17 +191,22 @@ fn metric_server_api() -> ApiDescription<Collector> {
190191
api
191192
}
192193

194+
#[derive(Clone, Copy, Debug, Deserialize, JsonSchema, Serialize)]
195+
struct ProducerIdPathParams {
196+
pub producer_id: Uuid,
197+
}
198+
193199
// Implementation of the actual collection routine used by the `ProducerServer`.
194200
#[endpoint {
195201
method = GET,
196202
path = "/collect/{producer_id}",
197203
}]
198204
async fn collect_endpoint(
199205
request_context: Arc<RequestContext<Collector>>,
200-
path_params: Path<ProducerId>,
206+
path_params: Path<ProducerIdPathParams>,
201207
) -> Result<HttpResponseOk<ProducerResults>, HttpError> {
202208
let collector = request_context.context();
203-
let producer_id = path_params.into_inner();
209+
let producer_id = path_params.into_inner().producer_id;
204210
collect(collector, producer_id).await
205211
}
206212

@@ -231,7 +237,7 @@ pub async fn register(
231237
/// Handle a request to pull available metric data from a [`Collector`].
232238
pub async fn collect(
233239
collector: &Collector,
234-
producer_id: ProducerId,
240+
producer_id: Uuid,
235241
) -> Result<HttpResponseOk<ProducerResults>, HttpError> {
236242
if producer_id == collector.producer_id() {
237243
Ok(HttpResponseOk(collector.collect()))

oximeter/oximeter/src/oximeter_server.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ use dropshot::{
1313
RequestContext, TypedBody,
1414
};
1515
use omicron_common::backoff;
16-
use omicron_common::model::{
17-
OximeterStartupInfo, ProducerEndpoint, ProducerId,
18-
};
16+
use omicron_common::model::{OximeterStartupInfo, ProducerEndpoint};
1917
use reqwest::Client;
2018
use serde::{Deserialize, Serialize};
2119
use slog::{debug, info, o, trace, warn, Logger};
@@ -49,13 +47,13 @@ async fn collection_task(
4947
outbox: mpsc::Sender<ProducerResults>,
5048
) {
5149
let client = Client::new();
52-
let mut collection_timer = interval(*producer.interval());
50+
let mut collection_timer = interval(producer.interval);
5351
collection_timer.tick().await; // completes immediately
5452
debug!(
5553
log,
5654
"starting oximeter collection task";
5755
"collector_id" => ?id,
58-
"interval" => ?producer.interval(),
56+
"interval" => ?producer.interval,
5957
);
6058
loop {
6159
tokio::select! {
@@ -99,9 +97,9 @@ async fn collection_task(
9997
log,
10098
"collecting from producer";
10199
"collector_id" => ?id,
102-
"producer_id" => ?producer.producer_id().producer_id,
100+
"producer_id" => ?producer.id,
103101
);
104-
let res = client.get(format!("http://{}{}", producer.address(), producer.collection_route()))
102+
let res = client.get(format!("http://{}{}", producer.address, producer.collection_route()))
105103
.send()
106104
.await;
107105
match res {
@@ -113,7 +111,7 @@ async fn collection_task(
113111
"collected {} total results",
114112
results.len();
115113
"collector_id" => ?id,
116-
"producer_id" => ?producer.producer_id().producer_id,
114+
"producer_id" => ?producer.id,
117115
);
118116
outbox.send(results).await.unwrap();
119117
},
@@ -123,7 +121,7 @@ async fn collection_task(
123121
"failed to collect results from producer: {}",
124122
e.to_string();
125123
"collector_id" => ?id,
126-
"producer_id" => ?producer.producer_id().producer_id,
124+
"producer_id" => ?producer.id,
127125
);
128126
}
129127
}
@@ -134,7 +132,7 @@ async fn collection_task(
134132
"failed to send collection request to producer: {}",
135133
e.to_string();
136134
"collector_id" => ?id,
137-
"producer_id" => ?producer.producer_id().producer_id,
135+
"producer_id" => ?producer.id,
138136
);
139137
}
140138
}
@@ -245,7 +243,7 @@ struct OximeterAgent {
245243
// Handle to the TX-side of a channel for collecting results from the collection tasks
246244
result_sender: mpsc::Sender<ProducerResults>,
247245
// The actual tokio tasks running the collection on a timer.
248-
collection_tasks: Arc<Mutex<BTreeMap<ProducerId, CollectionTask>>>,
246+
collection_tasks: Arc<Mutex<BTreeMap<Uuid, CollectionTask>>>,
249247
}
250248

251249
impl OximeterAgent {
@@ -287,12 +285,12 @@ impl OximeterAgent {
287285
&self,
288286
info: ProducerEndpoint,
289287
) -> Result<(), Error> {
290-
let id = info.producer_id();
288+
let id = info.id;
291289
match self.collection_tasks.lock().unwrap().entry(id) {
292290
Entry::Vacant(value) => {
293291
info!(self.log, "registered new metric producer";
294-
"producer_id" => info.producer_id().to_string(),
295-
"address" => info.address(),
292+
"producer_id" => id.to_string(),
293+
"address" => info.address,
296294
);
297295

298296
// Build channel to control the task and receive results.

0 commit comments

Comments
 (0)