Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove private link related connection #18975

Merged
merged 16 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::error::ConnectorResult;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};

pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
pub const CONNECTION_NAME_KEY: &str = "connection.name";

#[derive(Debug)]
pub(super) enum PrivateLinkContextRole {
Expand Down
29 changes: 0 additions & 29 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::anyhow;
use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::catalog::connection::Info;
use risingwave_pb::catalog::{connection, PbConnection};

use crate::catalog::{ConnectionId, OwnedByUserCatalog};
use crate::error::{Result, RwError};
use crate::user::UserId;

#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -64,24 +56,3 @@ impl OwnedByUserCatalog for ConnectionCatalog {
self.owner
}
}

pub(crate) fn resolve_private_link_connection(
connection: &Arc<ConnectionCatalog>,
properties: &mut BTreeMap<String, String>,
) -> Result<()> {
#[allow(irrefutable_let_patterns)]
if let connection::Info::PrivateLinkService(svc) = &connection.info {
if !properties.is_kafka_connector() {
return Err(RwError::from(anyhow!(
"Private link is only supported for Kafka connector"
)));
}
// skip all checks for mock connection
if svc.get_provider()? == PrivateLinkProvider::Mock {
return Ok(());
}
insert_privatelink_broker_rewrite_map(properties, Some(svc), None)
.map_err(RwError::from)?;
}
Ok(())
}
64 changes: 9 additions & 55 deletions src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,16 @@ use std::collections::BTreeMap;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_sqlparser::ast::CreateConnectionStatement;

use super::RwPgResponse;
use crate::binder::Binder;
use crate::error::ErrorCode::ProtocolError;
use crate::error::{Result, RwError};
use crate::error::{ErrorCode, Result, RwError};
use crate::handler::HandlerArgs;

pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
pub(crate) const CONNECTION_PROVIDER_PROP: &str = "provider";
pub(crate) const CONNECTION_SERVICE_NAME_PROP: &str = "service.name";
pub(crate) const CONNECTION_TAGS_PROP: &str = "tags";

pub(crate) const CLOUD_PROVIDER_MOCK: &str = "mock"; // fake privatelink provider for testing
pub(crate) const CLOUD_PROVIDER_AWS: &str = "aws";

#[inline(always)]
fn get_connection_property_required(
Expand All @@ -48,58 +41,19 @@ fn get_connection_property_required(
)))
})
}

fn resolve_private_link_properties(
with_properties: &BTreeMap<String, String>,
) -> Result<create_connection_request::PrivateLink> {
let provider =
match get_connection_property_required(with_properties, CONNECTION_PROVIDER_PROP)?.as_str()
{
CLOUD_PROVIDER_MOCK => PrivateLinkProvider::Mock,
CLOUD_PROVIDER_AWS => PrivateLinkProvider::Aws,
provider => {
return Err(RwError::from(ProtocolError(format!(
"Unsupported privatelink provider {}",
provider
))));
}
};
match provider {
PrivateLinkProvider::Mock => Ok(create_connection_request::PrivateLink {
provider: provider.into(),
service_name: String::new(),
tags: None,
}),
PrivateLinkProvider::Aws => {
let service_name =
get_connection_property_required(with_properties, CONNECTION_SERVICE_NAME_PROP)?;
Ok(create_connection_request::PrivateLink {
provider: provider.into(),
service_name,
tags: with_properties.get(CONNECTION_TAGS_PROP).cloned(),
})
}
PrivateLinkProvider::Unspecified => Err(RwError::from(ProtocolError(
"Privatelink provider unspecified".to_string(),
))),
}
}

fn resolve_create_connection_payload(
with_properties: &BTreeMap<String, String>,
) -> Result<create_connection_request::Payload> {
let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?;
let create_connection_payload = match connection_type.as_str() {
PRIVATELINK_CONNECTION => create_connection_request::Payload::PrivateLink(
resolve_private_link_properties(with_properties)?,
),
_ => {
return Err(RwError::from(ProtocolError(format!(
"Connection type \"{connection_type}\" is not supported"
))));
}
return match connection_type.as_str() {
PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated(
"CREATE CONNECTION to Private Link".to_string(),
"RisingWave Cloud Portal".to_string(),
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
))),
_ => Err(RwError::from(ProtocolError(format!(
"Connection type \"{connection_type}\" is not supported"
)))),
};
Ok(create_connection_payload)
}

pub async fn handle_create_connection(
Expand Down
13 changes: 3 additions & 10 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::array::arrow::arrow_schema_iceberg::DataType as ArrowDataType;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, TableId, UserId};
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
Expand Down Expand Up @@ -92,12 +90,7 @@ pub async fn gen_sink_plan(

let mut with_options = handler_args.with_options.clone();

let connection_id = {
let conn_id =
resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?;
conn_id.map(ConnectionId)
};

resolve_privatelink_in_with_option(&mut with_options)?;
let mut resolved_with_options = resolve_secret_ref_in_with_options(with_options, session)?;

let partition_info = get_partition_compute_info(&resolved_with_options).await?;
Expand Down Expand Up @@ -266,7 +259,7 @@ pub async fn gen_sink_plan(
SchemaId::new(sink_schema_id),
DatabaseId::new(sink_database_id),
UserId::new(session.user_id()),
connection_id,
None, // deprecated: private link connection id
dependent_relations.into_iter().collect_vec(),
);

Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1555,8 +1555,7 @@ pub async fn bind_create_source_or_table_with_connector(

// resolve privatelink connection for Kafka
let mut with_properties = with_properties;
let connection_id =
resolve_privatelink_in_with_option(&mut with_properties, &schema_name, session)?;
resolve_privatelink_in_with_option(&mut with_properties)?;

let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?;

Expand Down Expand Up @@ -1632,7 +1631,7 @@ pub async fn bind_create_source_or_table_with_connector(
watermark_descs,
associated_table_id,
definition,
connection_id,
connection_id: None, // deprecated: private link connection id
created_at_epoch: None,
initialized_at_epoch: None,
version: INITIAL_SOURCE_VERSION_ID,
Expand Down
27 changes: 2 additions & 25 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::BTreeMap;
use std::num::NonZeroU32;

use risingwave_connector::source::kafka::private_link::{
insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY,
insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY,
};
pub use risingwave_connector::WithOptionsSecResolved;
use risingwave_connector::WithPropertiesExt;
Expand All @@ -28,7 +28,6 @@ use risingwave_sqlparser::ast::{
};

use super::OverwriteOptions;
use crate::catalog::connection_catalog::resolve_private_link_connection;
use crate::catalog::ConnectionId;
use crate::error::{ErrorCode, Result as RwResult, RwError};
use crate::session::SessionImpl;
Expand Down Expand Up @@ -186,8 +185,6 @@ pub(crate) fn resolve_secret_ref_in_with_options(

pub(crate) fn resolve_privatelink_in_with_option(
with_options: &mut WithOptions,
schema_name: &Option<String>,
session: &SessionImpl,
) -> RwResult<Option<ConnectionId>> {
let is_kafka = with_options.is_kafka_connector();
let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
Expand All @@ -201,28 +198,8 @@ pub(crate) fn resolve_privatelink_in_with_option(
}
insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
.map_err(RwError::from)?;
return Ok(None);
}

let connection_name = with_options
.remove(CONNECTION_NAME_KEY)
.map(|s| s.to_lowercase());
let connection_id = match connection_name {
Some(connection_name) => {
let connection = session
.get_connection_by_name(schema_name.clone(), &connection_name)
.map_err(|_| ErrorCode::ItemNotFound(connection_name))?;
if !is_kafka {
return Err(RwError::from(ErrorCode::ProtocolError(
"Connection is only supported in kafka connector".to_string(),
)));
}
resolve_private_link_connection(&connection, with_options.inner_mut())?;
Some(connection.id)
}
None => None,
};
Ok(connection_id)
Ok(None)
}

impl TryFrom<&[SqlOption]> for WithOptions {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub connection_id: ConnectionId,
pub name: String,

// todo: Private link service has been deprecated, consider using a new field for the connection info
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should deprecate the unnecessary fields in message PrivateLinkService, only keeping the necessary fields for create source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In long term, the create connection statement should be kept, but the semantic of it is simply storing the user-specific endpoint and targets, instead of calling AWS API to create a private link. This aligns with the style of the connection RFC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if it is safe to remove the field. The exisiting connection still need this.
After the cloud portal supports building subnets for private link (long ago), we no longer encourage using create connection for this case.

but the semantic of it is simply storing the user-specific endpoint and targets, instead of calling AWS API to create a private link

IIRC, we store the two fields together with other connector props, rather than in the connection catalog.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, we store the two fields together with other connector props, rather than in the connection catalog.

I see. Then we may deprecate the entire message PrivateLinkService. See also https://stackoverflow.com/a/52787662/5739882.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the change involve a new migration, cc @yezizp2012 ?

pub info: PrivateLinkService,
}

Expand Down
14 changes: 2 additions & 12 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ use crate::controller::SqlMetaStore;
use crate::hummock::HummockManager;
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
use crate::rpc::metrics::{
start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS,
Expand Down Expand Up @@ -531,17 +530,8 @@ pub async fn start_service_as_election_leader(
compactor_manager.clone(),
));

let mut aws_cli = None;
if let Some(my_vpc_id) = &env.opts.vpc_id
&& let Some(security_group_id) = &env.opts.security_group_id
{
let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await;
aws_cli = Some(cli);
}

let ddl_srv = DdlServiceImpl::new(
env.clone(),
aws_cli.clone(),
metadata_manager.clone(),
stream_manager.clone(),
source_manager.clone(),
Expand Down Expand Up @@ -586,7 +576,7 @@ pub async fn start_service_as_election_leader(
let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
let serving_srv =
ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli);
let cloud_srv = CloudServiceImpl::new();
let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());

Expand Down Expand Up @@ -705,14 +695,14 @@ pub async fn start_service_as_election_leader(
.add_service(MetaMemberServiceServer::new(meta_member_srv))
.add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
.add_service(UserServiceServer::new(user_srv))
.add_service(CloudServiceServer::new(cloud_srv))
.add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
.add_service(HealthServer::new(health_srv))
.add_service(BackupServiceServer::new(backup_srv))
.add_service(SystemParamsServiceServer::new(system_params_srv))
.add_service(SessionParamServiceServer::new(session_params_srv))
.add_service(TelemetryInfoServiceServer::new(telemetry_srv))
.add_service(ServingServiceServer::new(serving_srv))
.add_service(CloudServiceServer::new(cloud_srv))
.add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
.add_service(EventLogServiceServer::new(event_log_srv))
.add_service(ClusterLimitServiceServer::new(cluster_limit_srv));
Expand Down
Loading
Loading