Skip to content

Commit

Permalink
Allow connectors access to raft api
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Apr 5, 2023
1 parent caaec36 commit cc00936
Show file tree
Hide file tree
Showing 42 changed files with 522 additions and 326 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:

env:
TREMOR_PATH: "${{ github.workspace }}/tremor-script/lib:${{ github.workspace }}/tremor-cli/tests/lib"
RUSTFLAGS: -D warnings -C target-feature=${{ matrix.target_feature }} --cfg tokio_unstable
RUSTFLAGS: -D warnings -C target-feature=${{ matrix.target_feature }}

runs-on: ${{ matrix.os }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN apt-get update \

# Switch back to dialog for any ad-hoc use of apt-get
ENV DEBIAN_FRONTEND=dialog
ENV RUSTFLAGS="-C target-feature=+avx,+avx2,+sse4.2 --cfg tokio_unstable"
ENV RUSTFLAGS="-C target-feature=+avx,+avx2,+sse4.2"

WORKDIR /app

Expand Down
3 changes: 3 additions & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ pub(crate) type Sender<T> = tokio::sync::mpsc::Sender<T>;
pub(crate) type Receiver<T> = tokio::sync::mpsc::Receiver<T>;
pub(crate) type UnboundedSender<T> = tokio::sync::mpsc::UnboundedSender<T>;
pub(crate) type UnboundedReceiver<T> = tokio::sync::mpsc::UnboundedReceiver<T>;
pub(crate) type OneShotSender<T> = tokio::sync::oneshot::Sender<T>;
// pub(crate) type OneShotReceiver<T> = tokio::sync::oneshot::Receiver<T>;
pub(crate) use tokio::sync::mpsc::error::{SendError, TryRecvError};
pub(crate) use tokio::sync::mpsc::{channel as bounded, unbounded_channel as unbounded};
pub(crate) use tokio::sync::oneshot::channel as oneshot;
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ mod tests {
use serde::Deserialize;

use super::*;
use crate::{errors::Result, ids::FlowInstanceId};
use crate::{errors::Result, ids::AppFlowInstanceId};

#[test]
fn test_reconnect_serde() -> Result<()> {
Expand Down Expand Up @@ -351,7 +351,7 @@ mod tests {
#[test]
fn test_config_builtin_preproc_with_config() -> Result<()> {
let c = Connector::from_config(
&Alias::new(FlowInstanceId::new("app", "flow"), "my_otel_client"),
&Alias::new(AppFlowInstanceId::new("app", "flow"), "my_otel_client"),
ConnectorType::from("otel_client".to_string()),
&literal!({
"preprocessors": [ {"name": "snot", "config": { "separator": "\n" }}],
Expand Down Expand Up @@ -382,7 +382,7 @@ mod tests {
"reconnect": {},
"metrics_interval_s": "wrong_type"
});
let id = Alias::new(FlowInstanceId::new("app", "flow"), "my_id");
let id = Alias::new(AppFlowInstanceId::new("app", "flow"), "my_id");
let res = Connector::from_config(&id, "fancy_schmancy".into(), &config);
assert!(res.is_err());
assert_eq!(String::from("Invalid Definition for connector \"app/flow::my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default());
Expand Down
52 changes: 46 additions & 6 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ pub(crate) use crate::config::Connector as ConnectorConfig;
use crate::{
channel::{bounded, Sender},
errors::{connector_send_err, Error, Kind as ErrorKind, Result},
ids::FlowInstanceId,
ids::{AliasType, AppFlowInstanceId, AppId, GenericAlias, InstanceId},
instance::State,
log_error, pipeline, qsize,
raft::api::APIStoreReq,
system::{KillSwitch, Runtime},
};
use beef::Cow;
Expand Down Expand Up @@ -262,6 +263,9 @@ pub(crate) trait Context: Display + Clone {
/// get the connector type
fn connector_type(&self) -> &ConnectorType;

/// gets the API sender
fn raft_api_sender(&self) -> Option<&Sender<APIStoreReq>>;

/// only log an error and swallow the result
#[inline]
fn swallow_err<T, E, M>(&self, expr: std::result::Result<T, E>, msg: &M)
Expand Down Expand Up @@ -326,6 +330,8 @@ pub(crate) struct ConnectorContext {
quiescence_beacon: QuiescenceBeacon,
/// Notifier
notifier: reconnect::ConnectionLostNotifier,
/// sender for raft requests
raft_api_tx: Option<Sender<APIStoreReq>>,
}

impl Display for ConnectorContext {
Expand All @@ -350,6 +356,10 @@ impl Context for ConnectorContext {
fn notifier(&self) -> &reconnect::ConnectionLostNotifier {
&self.notifier
}

fn raft_api_sender(&self) -> Option<&Sender<APIStoreReq>> {
self.raft_api_tx.as_ref()
}
}

/// Connector instance status report
Expand Down Expand Up @@ -431,6 +441,7 @@ pub(crate) async fn spawn(
builder: &dyn ConnectorBuilder,
config: ConnectorConfig,
kill_switch: &KillSwitch,
raft_api_tx: Option<Sender<APIStoreReq>>,
) -> Result<Addr> {
// instantiate connector
let connector = builder.build(alias, &config, kill_switch).await?;
Expand All @@ -440,6 +451,7 @@ pub(crate) async fn spawn(
connector,
config,
connector_id_gen.next_id(),
raft_api_tx,
)
.await?;

Expand All @@ -454,6 +466,7 @@ async fn connector_task(
mut connector: Box<dyn Connector>,
config: ConnectorConfig,
uid: ConnectorUId,
raft_api_tx: Option<Sender<APIStoreReq>>,
) -> Result<Addr> {
let qsize = qsize();
// channel for connector-level control plane communication
Expand Down Expand Up @@ -489,6 +502,7 @@ async fn connector_task(
connector_type: config.connector_type.clone(),
quiescence_beacon: quiescence_beacon.clone(),
notifier: notifier.clone(),
raft_api_tx: raft_api_tx.clone(),
};

let sink_metrics_reporter = SinkReporter::new(
Expand All @@ -504,6 +518,7 @@ async fn connector_task(
config.connector_type.clone(),
quiescence_beacon.clone(),
notifier.clone(),
raft_api_tx.clone(),
);
// create source instance
let source_addr = connector.create_source(source_ctx, source_builder).await?;
Expand All @@ -527,6 +542,7 @@ async fn connector_task(
connector_type: config.connector_type.clone(),
quiescence_beacon: quiescence_beacon.clone(),
notifier,
raft_api_tx,
};

let send_addr = connector_addr.clone();
Expand Down Expand Up @@ -1142,13 +1158,34 @@ where
/// unique instance alias/id of a connector within a deployment
#[derive(Debug, PartialEq, PartialOrd, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct Alias {
flow_alias: FlowInstanceId,
flow_alias: AppFlowInstanceId,
connector_alias: String,
}

impl GenericAlias for Alias {
fn app_id(&self) -> &AppId {
self.flow_alias.app_id()
}

fn app_instance(&self) -> &InstanceId {
self.flow_alias.instance_id()
}

fn alias_type(&self) -> AliasType {
AliasType::Connector
}

fn alias(&self) -> &str {
&self.connector_alias
}
}

impl Alias {
/// construct a new `ConnectorId` from the id of the containing flow and the connector instance id
pub fn new(flow_alias: impl Into<FlowInstanceId>, connector_alias: impl Into<String>) -> Self {
pub fn new(
flow_alias: impl Into<AppFlowInstanceId>,
connector_alias: impl Into<String>,
) -> Self {
Self {
flow_alias: flow_alias.into(),
connector_alias: connector_alias.into(),
Expand All @@ -1157,7 +1194,7 @@ impl Alias {

/// get a reference to the flow alias
#[must_use]
pub fn flow_alias(&self) -> &FlowInstanceId {
pub fn flow_alias(&self) -> &AppFlowInstanceId {
&self.flow_alias
}

Expand Down Expand Up @@ -1310,7 +1347,7 @@ where

#[cfg(test)]
pub(crate) mod unit_tests {
use crate::ids::FlowInstanceId;
use crate::ids::AppFlowInstanceId;

use super::*;

Expand All @@ -1326,7 +1363,7 @@ pub(crate) mod unit_tests {
pub(crate) fn new(tx: Sender<Msg>) -> Self {
Self {
t: ConnectorType::from("snot"),
alias: Alias::new(FlowInstanceId::new("app", "fake"), "fake"),
alias: Alias::new(AppFlowInstanceId::new("app", "fake"), "fake"),
notifier: reconnect::ConnectionLostNotifier::new(tx),
beacon: QuiescenceBeacon::default(),
}
Expand Down Expand Up @@ -1355,6 +1392,9 @@ pub(crate) mod unit_tests {
fn connector_type(&self) -> &ConnectorType {
&self.t
}
fn raft_api_sender(&self) -> Option<&Sender<APIStoreReq>> {
None
}
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
6 changes: 3 additions & 3 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ mod tests {
use elasticsearch::http::request::Body;

use super::*;
use crate::{config::Connector as ConnectorConfig, ids::FlowInstanceId};
use crate::{config::Connector as ConnectorConfig, ids::AppFlowInstanceId};

#[tokio::test(flavor = "multi_thread")]
async fn connector_builder_empty_nodes() -> Result<()> {
Expand All @@ -968,7 +968,7 @@ mod tests {
"nodes": []
}
});
let alias = Alias::new(FlowInstanceId::new("app", "flow"), "my_elastic");
let alias = Alias::new(AppFlowInstanceId::new("app", "flow"), "my_elastic");
let builder = super::Builder::default();
let connector_config =
ConnectorConfig::from_config(&alias, builder.connector_type(), &config)?;
Expand Down Expand Up @@ -997,7 +997,7 @@ mod tests {
]
}
});
let alias = Alias::new(FlowInstanceId::new("app", "snot"), "my_elastic");
let alias = Alias::new(AppFlowInstanceId::new("app", "snot"), "my_elastic");
let builder = super::Builder::default();
let connector_config =
ConnectorConfig::from_config(&alias, builder.connector_type(), &config)?;
Expand Down
9 changes: 5 additions & 4 deletions src/connectors/impls/gbq/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ mod tests {
use crate::connectors::sink::builder;
use crate::connectors::utils::quiescence::QuiescenceBeacon;
use crate::connectors::{reconnect::ConnectionLostNotifier, utils::metrics::SinkReporter};
use crate::ids::FlowInstanceId;
use crate::ids::AppFlowInstanceId;

#[tokio::test(flavor = "multi_thread")]
pub async fn can_spawn_sink() -> Result<()> {
Expand All @@ -108,17 +108,18 @@ mod tests {
SinkContext::new(
openraft::NodeId::default(),
SinkUId::default(),
Alias::new(FlowInstanceId::new("app", "a"), "b"),
Alias::new(AppFlowInstanceId::new("app", "a"), "b"),
ConnectorType::default(),
QuiescenceBeacon::default(),
ConnectionLostNotifier::new(crate::channel::bounded(128).0),
None,
),
builder(
&ConnectorConfig::default(),
CodecReq::Structured,
&Alias::new(FlowInstanceId::new("app", "a"), "b"),
&Alias::new(AppFlowInstanceId::new("app", "a"), "b"),
SinkReporter::new(
Alias::new(FlowInstanceId::new("app", "a"), "b"),
Alias::new(AppFlowInstanceId::new("app", "a"), "b"),
broadcast::channel(1).0,
None,
),
Expand Down
Loading

0 comments on commit cc00936

Please sign in to comment.