Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove private link related connection #18975

Merged
merged 16 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 9 additions & 55 deletions src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,16 @@ use std::collections::BTreeMap;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_sqlparser::ast::CreateConnectionStatement;

use super::RwPgResponse;
use crate::binder::Binder;
use crate::error::ErrorCode::ProtocolError;
use crate::error::{Result, RwError};
use crate::error::{ErrorCode, Result, RwError};
use crate::handler::HandlerArgs;

pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
pub(crate) const CONNECTION_PROVIDER_PROP: &str = "provider";
pub(crate) const CONNECTION_SERVICE_NAME_PROP: &str = "service.name";
pub(crate) const CONNECTION_TAGS_PROP: &str = "tags";

pub(crate) const CLOUD_PROVIDER_MOCK: &str = "mock"; // fake privatelink provider for testing
pub(crate) const CLOUD_PROVIDER_AWS: &str = "aws";

#[inline(always)]
fn get_connection_property_required(
Expand All @@ -48,58 +41,19 @@ fn get_connection_property_required(
)))
})
}

fn resolve_private_link_properties(
with_properties: &BTreeMap<String, String>,
) -> Result<create_connection_request::PrivateLink> {
let provider =
match get_connection_property_required(with_properties, CONNECTION_PROVIDER_PROP)?.as_str()
{
CLOUD_PROVIDER_MOCK => PrivateLinkProvider::Mock,
CLOUD_PROVIDER_AWS => PrivateLinkProvider::Aws,
provider => {
return Err(RwError::from(ProtocolError(format!(
"Unsupported privatelink provider {}",
provider
))));
}
};
match provider {
PrivateLinkProvider::Mock => Ok(create_connection_request::PrivateLink {
provider: provider.into(),
service_name: String::new(),
tags: None,
}),
PrivateLinkProvider::Aws => {
let service_name =
get_connection_property_required(with_properties, CONNECTION_SERVICE_NAME_PROP)?;
Ok(create_connection_request::PrivateLink {
provider: provider.into(),
service_name,
tags: with_properties.get(CONNECTION_TAGS_PROP).cloned(),
})
}
PrivateLinkProvider::Unspecified => Err(RwError::from(ProtocolError(
"Privatelink provider unspecified".to_string(),
))),
}
}

fn resolve_create_connection_payload(
with_properties: &BTreeMap<String, String>,
) -> Result<create_connection_request::Payload> {
let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?;
let create_connection_payload = match connection_type.as_str() {
PRIVATELINK_CONNECTION => create_connection_request::Payload::PrivateLink(
resolve_private_link_properties(with_properties)?,
),
_ => {
return Err(RwError::from(ProtocolError(format!(
"Connection type \"{connection_type}\" is not supported"
))));
}
return match connection_type.as_str() {
PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated(
"CREATE CONNECTION to Private Link".to_string(),
"RisingWave Cloud Portal".to_string(),
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
))),
_ => Err(RwError::from(ProtocolError(format!(
"Connection type \"{connection_type}\" is not supported"
)))),
};
Ok(create_connection_payload)
}

pub async fn handle_create_connection(
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub connection_id: ConnectionId,
pub name: String,

// todo: Private link service has been deprecated, consider using a new field for the connection info
pub info: PrivateLinkService,
}

Expand Down
14 changes: 0 additions & 14 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::stream::ScaleController;
use risingwave_meta::MetaStoreBackend;
use risingwave_meta_service::backup_service::BackupServiceImpl;
use risingwave_meta_service::cloud_service::CloudServiceImpl;
use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
use risingwave_meta_service::cluster_service::ClusterServiceImpl;
use risingwave_meta_service::ddl_service::DdlServiceImpl;
Expand All @@ -55,7 +54,6 @@ use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
use risingwave_meta_service::user_service::UserServiceImpl;
use risingwave_meta_service::AddressInfo;
use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
use risingwave_pb::health::health_server::HealthServer;
Expand Down Expand Up @@ -86,7 +84,6 @@ use crate::controller::SqlMetaStore;
use crate::hummock::HummockManager;
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
use crate::rpc::metrics::{
start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS,
Expand Down Expand Up @@ -531,17 +528,8 @@ pub async fn start_service_as_election_leader(
compactor_manager.clone(),
));

let mut aws_cli = None;
if let Some(my_vpc_id) = &env.opts.vpc_id
&& let Some(security_group_id) = &env.opts.security_group_id
{
let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await;
aws_cli = Some(cli);
}

let ddl_srv = DdlServiceImpl::new(
env.clone(),
aws_cli.clone(),
metadata_manager.clone(),
stream_manager.clone(),
source_manager.clone(),
Expand Down Expand Up @@ -586,7 +574,6 @@ pub async fn start_service_as_election_leader(
let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
let serving_srv =
ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli);
let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());

Expand Down Expand Up @@ -712,7 +699,6 @@ pub async fn start_service_as_election_leader(
.add_service(SessionParamServiceServer::new(session_params_srv))
.add_service(TelemetryInfoServiceServer::new(telemetry_srv))
.add_service(ServingServiceServer::new(serving_srv))
.add_service(CloudServiceServer::new(cloud_srv))
.add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
.add_service(EventLogServiceServer::new(event_log_srv))
.add_service(ClusterLimitServiceServer::new(cluster_limit_srv));
Expand Down
197 changes: 0 additions & 197 deletions src/meta/service/src/cloud_service.rs

This file was deleted.

Loading
Loading