Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove private link related connection #18975

Merged
merged 16 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions e2e_test/ddl/alter_set_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,6 @@ WHERE nspname = 'test_schema';
----
test_subscription test_schema

statement ok
CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock');

statement ok
ALTER CONNECTION test_conn SET SCHEMA test_schema;

query TT
SELECT name AS connname, nspname AS schemaname
FROM rw_connections
JOIN pg_namespace ON pg_namespace.oid = rw_connections.schema_id
WHERE nspname = 'test_schema';
----
test_conn test_schema

statement ok
DROP CONNECTION test_schema.test_conn;

statement ok
DROP SINK test_schema.test_sink;

Expand Down
23 changes: 0 additions & 23 deletions e2e_test/ddl/connection.slt

This file was deleted.

42 changes: 0 additions & 42 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -31,48 +31,6 @@ create sink sink_non_exist_broker from t_kafka with (
type = 'append-only',
);

# Test create sink with connection
# Create a mock connection
statement ok
create connection mock with (
type = 'privatelink',
provider = 'mock',
);

# Refer to a non-existant connection
statement error
create sink si_kafka_append_only_conn from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
force_append_only = 'true',
connection.name = 'nonexist',
);

# Create sink with connection
statement ok
create sink si_kafka_append_only_conn from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
force_append_only = 'true',
connection.name = 'mock',
);

# Try to drop connection mock, which is in use
statement error
drop connection mock;

# Drop sink
statement ok
drop sink si_kafka_append_only_conn;

# Drop connection
statement ok
drop connection mock;

# Connection test clean-up finished

statement error sink cannot be append-only
Expand Down
52 changes: 0 additions & 52 deletions e2e_test/source_legacy/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -198,35 +198,6 @@ s
statement ok
drop table s

# Test create source with connection
statement ok
CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock');

# Reference to non-existant connection
statement error
create source s (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE mytable (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock'
) FORMAT PLAIN ENCODE JSON;

statement ok
DROP TABLE mytable;


# `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns.
statement error
create source s (
Expand All @@ -236,7 +207,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

# `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key.
Expand All @@ -248,7 +218,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

# `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type.
Expand All @@ -260,25 +229,4 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

statement ok
create source s (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock',
) FORMAT PLAIN ENCODE JSON;

# Drop a connection in use
statement error
drop connection mock;

statement ok
drop source s;

statement ok
drop connection mock;
40 changes: 0 additions & 40 deletions e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -152,35 +152,6 @@ s
statement ok
drop table s

# Test create source with connection
statement ok
CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock');

# Reference to non-existant connection
statement error
create source s (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT JSON;

statement ok
CREATE TABLE mytable (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock'
) ROW FORMAT JSON;

statement ok
DROP TABLE mytable;


# `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns.
statement error
create source s (
Expand All @@ -190,7 +161,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT DEBEZIUM_MONGO_JSON;

# `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key.
Expand All @@ -202,7 +172,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT DEBEZIUM_MONGO_JSON;

# `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type.
Expand All @@ -214,7 +183,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT DEBEZIUM_MONGO_JSON;

statement ok
Expand All @@ -224,15 +192,7 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock',
) ROW FORMAT JSON;

# Drop a connection in use
statement error
drop connection mock;

statement ok
drop source s;

statement ok
drop connection mock;
2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ message Connection {
uint32 database_id = 3;
string name = 4;
oneof info {
PrivateLinkService private_link_service = 5;
PrivateLinkService private_link_service = 5 [deprecated = true];
}
uint32 owner = 6;
}
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,10 @@ impl SinkCommitCoordinator for DummySinkCommitCoordinator {

impl SinkImpl {
pub fn new(mut param: SinkParam) -> Result<Self> {
const CONNECTION_NAME_KEY: &str = "connection.name";
const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";

// remove privatelink related properties if any
param.properties.remove(PRIVATE_LINK_TARGET_KEY);
param.properties.remove(CONNECTION_NAME_KEY);

let sink_type = param
.properties
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::error::ConnectorResult;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};

pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
pub const CONNECTION_NAME_KEY: &str = "connection.name";

#[derive(Debug)]
pub(super) enum PrivateLinkContextRole {
Expand Down
29 changes: 0 additions & 29 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::anyhow;
use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::catalog::connection::Info;
use risingwave_pb::catalog::{connection, PbConnection};

use crate::catalog::{ConnectionId, OwnedByUserCatalog};
use crate::error::{Result, RwError};
use crate::user::UserId;

#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -64,24 +56,3 @@ impl OwnedByUserCatalog for ConnectionCatalog {
self.owner
}
}

pub(crate) fn resolve_private_link_connection(
connection: &Arc<ConnectionCatalog>,
properties: &mut BTreeMap<String, String>,
) -> Result<()> {
#[allow(irrefutable_let_patterns)]
if let connection::Info::PrivateLinkService(svc) = &connection.info {
if !properties.is_kafka_connector() {
return Err(RwError::from(anyhow!(
"Private link is only supported for Kafka connector"
)));
}
// skip all checks for mock connection
if svc.get_provider()? == PrivateLinkProvider::Mock {
return Ok(());
}
insert_privatelink_broker_rewrite_map(properties, Some(svc), None)
.map_err(RwError::from)?;
}
Ok(())
}
Loading
Loading