Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update tests and context passing
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
Licenser committed May 8, 2023
1 parent aea52d4 commit 123cf97
Showing 16 changed files with 107 additions and 93 deletions.
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -385,7 +385,7 @@ mod tests {
let id = Alias::new("my_id");
let res = Connector::from_config(&id, "fancy_schmancy".into(), &config);
assert!(res.is_err());
assert_eq!(String::from("Invalid Definition for connector \"app/flow::my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default());
assert_eq!(String::from("Invalid Definition for connector \"my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default());
}

#[test]
3 changes: 1 addition & 2 deletions src/connectors.rs
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@ use crate::{
log_error, pipeline, qsize, raft,
system::{flow::AppContext, KillSwitch, Runtime},
};
use beef::Cow;
use futures::Future;
use halfbrown::HashMap;
use simd_json::{Builder, Mutable, ValueAccess};
@@ -315,7 +314,7 @@ pub(crate) trait Context: Display + Clone {
'ct: 'event,
{
let t: &str = self.connector_type().into();
event_meta.get(&Cow::borrowed(t))
event_meta.get(t)
}
}

4 changes: 1 addition & 3 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
@@ -971,9 +971,7 @@ mod tests {
let connector_config =
ConnectorConfig::from_config(&alias, builder.connector_type(), &config)?;
assert_eq!(
String::from(
"Invalid Definition for connector \"app/flow::my_elastic\": empty nodes provided"
),
String::from("Invalid Definition for connector \"my_elastic\": empty nodes provided"),
builder
.build(&alias, &connector_config,)
.await
31 changes: 15 additions & 16 deletions src/connectors/impls/gbq/writer/sink.rs
Original file line number Diff line number Diff line change
@@ -781,7 +781,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.0.field.len(), 0);
@@ -802,7 +802,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.0.field.len(), 0);
@@ -832,7 +832,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.1.len(), 1);
@@ -864,7 +864,7 @@ mod test {
precision: 0,
scale: 0,
}],
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
);

assert_eq!(result.1.len(), 1);
@@ -1040,9 +1040,8 @@ mod test {

let mut result = Vec::new();
assert!(encode_field(&value, &field, &mut result).is_ok());

// json is currently not supported, so we expect the field to be skipped
assert_eq!([] as [u8; 0], result[..]);
assert_eq!([10, 2, 123, 125] as [u8; 4], result[..]);
}

#[test]
@@ -1079,7 +1078,7 @@ mod test {

#[test]
pub fn mapping_generates_a_correct_descriptor() {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![
TableFieldSchema {
@@ -1120,7 +1119,7 @@ mod test {

#[test]
pub fn can_map_json_to_protobuf() -> Result<()> {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![
TableFieldSchema {
@@ -1157,7 +1156,7 @@ mod test {

#[test]
fn map_field_ignores_fields_that_are_not_in_definition() -> Result<()> {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![
TableFieldSchema {
@@ -1195,7 +1194,7 @@ mod test {

#[test]
fn map_field_ignores_struct_fields_that_are_not_in_definition() -> Result<()> {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![TableFieldSchema {
name: "a".to_string(),
@@ -1231,7 +1230,7 @@ mod test {

#[test]
fn fails_on_bytes_type_mismatch() {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![TableFieldSchema {
name: "a".to_string(),
@@ -1258,7 +1257,7 @@ mod test {

#[test]
fn fails_if_the_event_is_not_an_object() {
let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");
let mapping = JsonToProtobufMapping::new(
&vec![TableFieldSchema {
name: "a".to_string(),
@@ -1311,7 +1310,7 @@ mod test {
.on_event(
"",
Event::signal_tick(),
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
&mut EventSerializer::dummy(None)?,
0,
)
@@ -1340,7 +1339,7 @@ mod test {
.on_event(
"",
Event::signal_tick(),
&SinkContext::dummy(),
&SinkContext::dummy("gbq_writer"),
&mut EventSerializer::dummy(None)?,
0,
)
@@ -1401,7 +1400,7 @@ mod test {
}),
);

let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");

sink.connect(&ctx, &Attempt::default()).await?;

@@ -1471,7 +1470,7 @@ mod test {
}),
);

let ctx = SinkContext::dummy();
let ctx = SinkContext::dummy("gbq_writer");

sink.connect(&ctx, &Attempt::default()).await?;

4 changes: 2 additions & 2 deletions src/connectors/impls/gcl/writer/sink.rs
Original file line number Diff line number Diff line change
@@ -356,7 +356,7 @@ mod test {
tx,
MockChannelFactory,
);
let sink_context = SinkContext::dummy();
let sink_context = SinkContext::dummy("gcl_writer");

sink.connect(&sink_context, &Attempt::default()).await?;

@@ -439,7 +439,7 @@ mod test {
.on_event(
"",
Event::signal_tick(),
&SinkContext::dummy(),
&SinkContext::dummy("gcl_writer"),
&mut EventSerializer::dummy(None)?,
0,
)
12 changes: 6 additions & 6 deletions src/connectors/impls/gcs/streamer.rs
Original file line number Diff line number Diff line change
@@ -560,7 +560,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate sink lifecycle
@@ -730,7 +730,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate sink lifecycle
@@ -888,7 +888,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate sink lifecycle
@@ -967,7 +967,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = YoloSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");

// simulate sink lifecycle
sink.on_start(&context).await?;
@@ -1011,7 +1011,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = ConsistentSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate standard sink lifecycle
@@ -1220,7 +1220,7 @@ pub(crate) mod tests {
ChunkedBuffer,
> = ConsistentSink::new(sink_impl);

let context = SinkContext::dummy();
let context = SinkContext::dummy("gcs_streamer");
let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?;

// simulate standard sink lifecycle
2 changes: 1 addition & 1 deletion src/connectors/impls/kafka.rs
Original file line number Diff line number Diff line change
@@ -362,7 +362,7 @@ mod tests {
&literal!({
"measurement": "kafka_consumer_stats",
"tags": {
"connector": "app/fake::fake"
"connector": "fake"
},
"fields": {
"rx_msgs": 42,
6 changes: 3 additions & 3 deletions src/connectors/sink.rs
Original file line number Diff line number Diff line change
@@ -284,12 +284,12 @@ pub(crate) struct SinkContextInner {
pub(crate) struct SinkContext(Arc<SinkContextInner>);
impl SinkContext {
#[cfg(test)]
pub(crate) fn dummy() -> Self {
pub(crate) fn dummy(ct: &str) -> Self {
let (tx, _rx) = bounded(1024);
Self::new(
SinkUId::default(),
Alias::from("dummy"),
ConnectorType::from("dummy"),
Alias::from(ct),
ConnectorType::from(ct),
QuiescenceBeacon::default(),
ConnectionLostNotifier::new(tx),
AppContext::default(),
43 changes: 28 additions & 15 deletions src/connectors/source.rs
Original file line number Diff line number Diff line change
@@ -303,15 +303,28 @@ pub(crate) struct SourceContext {
/// kill switch
pub(crate) killswitch: KillSwitch,
}

impl SourceContext {
pub(crate) fn killswitch(&self) -> KillSwitch {
self.killswitch.clone()
}
#[cfg(test)]
pub(crate) fn dummy() -> Self {
SourceContext {
alias: Alias::new("test"),
uid: SourceUId::default(),
connector_type: ConnectorType::default(),
quiescence_beacon: QuiescenceBeacon::default(),
notifier: ConnectionLostNotifier::dummy(),
app_ctx: AppContext::default(),
killswitch: KillSwitch::dummy(),
}
}
}

impl Display for SourceContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}[Source::{}]", self.app_ctx, &self.alias)
write!(f, "[Source::{}::{}]", self.app_ctx, &self.alias)
}
}

@@ -1030,7 +1043,7 @@ where
let mut ingest_ns = nanotime();
if let Some(mut stream_state) = self.streams.end_stream(stream_id) {
let results = build_last_events(
&self.ctx.alias,
&self.ctx,
&mut stream_state,
&mut ingest_ns,
pull_id,
@@ -1100,7 +1113,7 @@ where
if let Some(stream) = stream {
let stream_state = self.streams.get_or_create_stream(stream, &self.ctx)?;
let results = build_events(
&self.ctx.alias,
&self.ctx,
stream_state,
&mut ingest_ns,
pull_id,
@@ -1127,7 +1140,7 @@ where
let mut stream_state = self.streams.create_anonymous_stream(codec_overwrite)?;
let meta = meta.unwrap_or_else(Value::object);
let mut results = build_events(
&self.ctx.alias,
&self.ctx,
&mut stream_state,
&mut ingest_ns,
pull_id,
@@ -1139,7 +1152,7 @@ where
);
// finish up the stream immediately
let mut last_events = build_last_events(
&self.ctx.alias,
&self.ctx,
&mut stream_state,
&mut ingest_ns,
pull_id,
@@ -1259,7 +1272,7 @@ where
/// preprocessor or codec errors are turned into events to the ERR port of the source/connector
#[allow(clippy::too_many_arguments)]
fn build_events(
alias: &Alias,
ctx: &SourceContext,
stream_state: &mut StreamState,
ingest_ns: &mut u64,
pull_id: u64,
@@ -1273,7 +1286,7 @@ fn build_events(
stream_state.preprocessors.as_mut_slice(),
ingest_ns,
data,
alias,
ctx,
) {
Ok(processed) => {
let mut res = Vec::with_capacity(processed.len());
@@ -1293,7 +1306,7 @@ fn build_events(
Err(None) => continue,
Err(Some(e)) => (
ERR,
make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone()),
make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone()),
),
};
let event = build_event(
@@ -1310,7 +1323,7 @@ fn build_events(
}
Err(e) => {
// preprocessor error
let err_payload = make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone());
let err_payload = make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone());
let event = build_event(
stream_state,
pull_id,
@@ -1328,7 +1341,7 @@ fn build_events(
/// preprocessor or codec errors are turned into events to the ERR port of the source/connector
#[allow(clippy::too_many_arguments)]
fn build_last_events(
alias: &Alias,
ctx: &SourceContext,
stream_state: &mut StreamState,
ingest_ns: &mut u64,
pull_id: u64,
@@ -1337,7 +1350,7 @@ fn build_last_events(
meta: &Value<'static>,
is_transactional: bool,
) -> Vec<(Port<'static>, Event)> {
match finish(stream_state.preprocessors.as_mut_slice(), alias) {
match finish(stream_state.preprocessors.as_mut_slice(), ctx) {
Ok(processed) => {
let mut res = Vec::with_capacity(processed.len());
for chunk in processed {
@@ -1356,7 +1369,7 @@ fn build_last_events(
Err(None) => continue,
Err(Some(e)) => (
ERR,
make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone()),
make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone()),
),
};
let event = build_event(
@@ -1373,7 +1386,7 @@ fn build_last_events(
}
Err(e) => {
// preprocessor error
let err_payload = make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone());
let err_payload = make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone());
let event = build_event(
stream_state,
pull_id,
@@ -1389,7 +1402,7 @@ fn build_last_events(

/// create an error payload
fn make_error(
connector_alias: &Alias,
ctx: &SourceContext,
error: &Error,
stream_id: u64,
pull_id: u64,
@@ -1398,7 +1411,7 @@ fn make_error(
let e_string = error.to_string();
let data = literal!({
"error": e_string.clone(),
"source": connector_alias.to_string(),
"source": ctx.to_string(),
"stream_id": stream_id,
"pull_id": pull_id
});
2 changes: 1 addition & 1 deletion src/connectors/tests/http/client.rs
Original file line number Diff line number Diff line change
@@ -683,7 +683,7 @@ async fn missing_tls_config_https() -> Result<()> {
.map(|e| e.to_string())
.unwrap_or_default();

assert_eq!("Invalid Definition for connector \"app/test::missing_tls_config_https\": missing tls config with 'https' url. Set 'tls' to 'true' or provide a full tls config.", res);
assert_eq!("Invalid Definition for connector \"missing_tls_config_https\": missing tls config with 'https' url. Set 'tls' to 'true' or provide a full tls config.", res);

Ok(())
}
6 changes: 3 additions & 3 deletions src/connectors/tests/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -226,7 +226,7 @@ async fn transactional_retry() -> Result<()> {
assert_eq!(
&literal!({
"error": "SIMD JSON error: InternalError at character 0 ('}')",
"source": "app/test::transactional_retry",
"source": "[Source::[Node::0][default/]::transactional_retry]",
"stream_id": 8_589_934_592_u64,
"pull_id": 1u64
}),
@@ -443,7 +443,7 @@ async fn custom_no_retry() -> Result<()> {
assert_eq!(
&literal!({
"error": "SIMD JSON error: InternalError at character 0 ('}')",
"source": "app/test::custom_no_retry",
"source": "[Source::[Node::0][default/]::custom_no_retry]",
"stream_id": 8_589_934_592_u64,
"pull_id": 1u64
}),
@@ -649,7 +649,7 @@ async fn performance() -> Result<()> {
assert_eq!(
&literal!({
"error": "SIMD JSON error: InternalError at character 0 ('}')",
"source": "app/test::performance",
"source": "[Source::[Node::0][default/]::performance]",
"stream_id": 8_589_934_592_u64,
"pull_id": 1u64
}),
5 changes: 5 additions & 0 deletions src/connectors/utils/reconnect.rs
Original file line number Diff line number Diff line change
@@ -210,6 +210,11 @@ impl ConnectionLostNotifier {
self.0.send(Msg::ConnectionLost).await?;
Ok(())
}
#[cfg(test)]
pub(crate) fn dummy() -> Self {
let (tx, _) = bounded(128);
Self(tx)
}
}

impl ReconnectRuntime {
63 changes: 33 additions & 30 deletions src/preprocessor.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,9 @@ mod remove_empty;
pub(crate) mod separate;
mod textual_length_prefixed;

use crate::{config::Preprocessor as PreprocessorConfig, connectors::Alias, errors::Result};
use crate::{
config::Preprocessor as PreprocessorConfig, connectors::source::SourceContext, errors::Result,
};
use std::str;

//pub type Lines = lines::Lines;
@@ -101,11 +103,11 @@ pub fn make_preprocessors(preprocessors: &[PreprocessorConfig]) -> Result<Prepro
/// # Errors
///
/// * If a preprocessor failed
pub fn preprocess(
pub(crate) fn preprocess(
preprocessors: &mut [Box<dyn Preprocessor>],
ingest_ns: &mut u64,
data: Vec<u8>,
alias: &Alias,
ctx: &SourceContext,
) -> Result<Vec<Vec<u8>>> {
let mut data = vec![data];
let mut data1 = Vec::new();
@@ -115,7 +117,7 @@ pub fn preprocess(
match pp.process(ingest_ns, d) {
Ok(mut r) => data1.append(&mut r),
Err(e) => {
error!("[Connector::{alias}] Preprocessor [{i}] error: {e}");
error!("{ctx} Preprocessor [{i}] error: {e}");
return Err(e);
}
}
@@ -130,15 +132,15 @@ pub fn preprocess(
/// # Errors
///
/// * If a preprocessor failed
pub fn finish(preprocessors: &mut [Box<dyn Preprocessor>], alias: &Alias) -> Result<Vec<Vec<u8>>> {
pub(crate) fn finish(
preprocessors: &mut [Box<dyn Preprocessor>],
ctx: &SourceContext,
) -> Result<Vec<Vec<u8>>> {
if let Some((head, tail)) = preprocessors.split_first_mut() {
let mut data = match head.finish(None) {
Ok(d) => d,
Err(e) => {
error!(
"[Connector::{alias}] Preprocessor '{}' finish error: {e}",
head.name()
);
error!("[{ctx}] Preprocessor '{}' finish error: {e}", head.name());
return Err(e);
}
};
@@ -149,10 +151,7 @@ pub fn finish(preprocessors: &mut [Box<dyn Preprocessor>], alias: &Alias) -> Res
match pp.finish(Some(d)) {
Ok(mut r) => data1.append(&mut r),
Err(e) => {
error!(
"[Connector::{alias}] Preprocessor '{}' finish error: {e}",
pp.name()
);
error!("[{ctx}] Preprocessor '{}' finish error: {e}", pp.name());
return Err(e);
}
}
@@ -334,18 +333,18 @@ mod test {
let data = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let wire = post_p.process(0, 0, &data)?;
let (start, end) = wire[0].split_at(7);
let alias = Alias::new("test");
let ctx = SourceContext::dummy();
let mut pps: Vec<Box<dyn Preprocessor>> = vec![Box::new(pre_p)];
let recv = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &alias)?;
let recv = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &ctx)?;
assert!(recv.is_empty());
let recv = preprocess(pps.as_mut_slice(), &mut it, end.to_vec(), &alias)?;
let recv = preprocess(pps.as_mut_slice(), &mut it, end.to_vec(), &ctx)?;
assert_eq!(recv[0], data);

// incomplete data
let processed = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &alias)?;
let processed = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &ctx)?;
assert!(processed.is_empty());
// not emitted upon finish
let finished = finish(pps.as_mut_slice(), &alias)?;
let finished = finish(pps.as_mut_slice(), &ctx)?;
assert!(finished.is_empty());

Ok(())
@@ -598,9 +597,13 @@ mod test {
#[test]
fn single_pre_process_head_ok() {
let pre = Box::new(BadPreprocessor {});
let alias = crate::connectors::Alias::new("chucky".to_string());
let mut ingest_ns = 0_u64;
let r = preprocess(&mut [pre], &mut ingest_ns, b"foo".to_vec(), &alias);
let r = preprocess(
&mut [pre],
&mut ingest_ns,
b"foo".to_vec(),
&SourceContext::dummy(),
);
assert!(r.is_err());
}

@@ -609,17 +612,20 @@ mod test {
let noop = Box::new(NoOp {});
assert_eq!("nily", noop.name());
let pre = Box::new(BadPreprocessor {});
let alias = crate::connectors::Alias::new("chucky".to_string());
let mut ingest_ns = 0_u64;
let r = preprocess(&mut [noop, pre], &mut ingest_ns, b"foo".to_vec(), &alias);
let r = preprocess(
&mut [noop, pre],
&mut ingest_ns,
b"foo".to_vec(),
&SourceContext::dummy(),
);
assert!(r.is_err());
}

#[test]
fn single_pre_finish_ok() {
let pre = Box::new(BadPreprocessor {});
let alias = crate::connectors::Alias::new("chucky".to_string());
let r = finish(&mut [pre], &alias);
let r = finish(&mut [pre], &SourceContext::dummy());
assert!(r.is_ok());
}

@@ -632,28 +638,25 @@ mod test {

#[test]
fn preprocess_finish_head_fail() {
let alias = crate::connectors::Alias::new("chucky".to_string());
let pre = Box::new(BadFinisher {});
let r = finish(&mut [pre], &alias);
let r = finish(&mut [pre], &SourceContext::dummy());
assert!(r.is_err());
}

#[test]
fn preprocess_finish_tail_fail() {
let alias = crate::connectors::Alias::new("chucky".to_string());
let noop = Box::new(NoOp {});
let pre = Box::new(BadFinisher {});
let r = finish(&mut [noop, pre], &alias);
let r = finish(&mut [noop, pre], &SourceContext::dummy());
assert!(r.is_err());
}

#[test]
fn preprocess_finish_multi_ok() {
let alias = crate::connectors::Alias::new("xyz".to_string());
let noop1 = Box::new(NoOp {});
let noop2 = Box::new(NoOp {});
let noop3 = Box::new(NoOp {});
let r = finish(&mut [noop1, noop2, noop3], &alias);
let r = finish(&mut [noop1, noop2, noop3], &SourceContext::dummy());
assert!(r.is_ok());
}
}
6 changes: 3 additions & 3 deletions src/raft/test.rs
Original file line number Diff line number Diff line change
@@ -104,7 +104,7 @@ impl TestNode {
running.join().await

// only ever destroy the db when the raft node is known to have stopped
//rocksdb::DB::destroy(&rocksdb::Options::default(), &path).map_err(ClusterError::Rocks)
// rocksdb::DB::destroy(&rocksdb::Options::default(), &path).map_err(ClusterError::Rocks)
}

fn client(&self) -> &ClusterClient {
@@ -114,7 +114,7 @@ impl TestNode {

#[tokio::test(flavor = "multi_thread")]
async fn cluster_join_test() -> ClusterResult<()> {
let _ = env_logger::try_init();
let _: std::result::Result<_, _> = env_logger::try_init();
let dir0 = tempfile::tempdir()?;
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
@@ -160,7 +160,7 @@ async fn cluster_join_test() -> ClusterResult<()> {

#[tokio::test(flavor = "multi_thread")]
async fn kill_and_restart_voter() -> ClusterResult<()> {
let _ = env_logger::try_init();
let _: std::result::Result<_, _> = env_logger::try_init();
let dir0 = tempfile::tempdir()?;
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
4 changes: 2 additions & 2 deletions src/raft/test/learner.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
async fn add_learner_test() -> ClusterResult<()> {
let _ = env_logger::try_init();
let _: std::result::Result<_, _> = env_logger::try_init();
let dir0 = tempfile::tempdir()?;
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
@@ -78,7 +78,7 @@ async fn add_learner_test() -> ClusterResult<()> {

#[tokio::test(flavor = "multi_thread")]
async fn learner_runs_app() -> ClusterResult<()> {
let _ = env_logger::try_init();
let _: std::result::Result<_, _> = env_logger::try_init();
let dir0 = tempfile::tempdir()?;
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
7 changes: 2 additions & 5 deletions src/system/flow.rs
Original file line number Diff line number Diff line change
@@ -1238,14 +1238,11 @@ mod tests {
start_rx.await??;

let connector = flow.get_connector("foo".to_string()).await?;
assert_eq!(String::from("app/test::foo"), connector.alias.to_string());
assert_eq!(String::from("foo"), connector.alias.to_string());

let connectors = flow.get_connectors().await?;
assert_eq!(1, connectors.len());
assert_eq!(
String::from("app/test::foo"),
connectors[0].alias.to_string()
);
assert_eq!(String::from("foo"), connectors[0].alias.to_string());

// assert the flow has started and events are flowing
let event = connector_rx.recv().await.ok_or("empty")?;

0 comments on commit 123cf97

Please sign in to comment.