Skip to content

Commit

Permalink
feature: update flink-cdc form
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqi committed Jul 13, 2024
1 parent 6af8a84 commit 03219ca
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.sliew.scaleph.workspace.flink.cdc.service.dto.WsArtifactFlinkCDCDTO;
import cn.sliew.scaleph.workspace.flink.cdc.service.param.*;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.validation.Valid;
Expand Down Expand Up @@ -119,8 +120,8 @@ public ResponseEntity<ResponseVO> deleteArtifact(@PathVariable("artifactId") Lon
@Logging
@PostMapping("preview")
@Operation(summary = "预览 flink cdc 配置", description = "预览 flink cdc 配置")
public ResponseEntity<ResponseVO<String>> previewJob(@RequestBody WsArtifactFlinkCDCDTO dto) throws Exception {
String conf = wsArtifactFlinkCDCService.buildConfig(dto);
public ResponseEntity<ResponseVO<JsonNode>> previewJob(@RequestBody WsArtifactFlinkCDCDTO dto) throws Exception {
JsonNode conf = wsArtifactFlinkCDCService.buildConfig(dto);
return new ResponseEntity<>(ResponseVO.success(conf), HttpStatus.OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package cn.sliew.scaleph.plugin.flink.cdc.connectors.doris.sink;

import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping;
import cn.sliew.scaleph.ds.modal.AbstractDataSource;
import cn.sliew.scaleph.ds.modal.olap.DorisDataSource;
import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin;
import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static cn.sliew.scaleph.plugin.flink.cdc.connectors.doris.sink.DorisSinkProperties.*;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.PASSWORD;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.USERNAME;

@AutoService(FlinkCDCPipilineConnectorPlugin.class)
public class DorisSinkPlugin extends FlinkCDCPipilineConnectorPlugin {
Expand All @@ -41,22 +49,33 @@ public DorisSinkPlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
// props.add(FENODES);
// props.add(BENODES);
// props.add(JDBC_URL);
// props.add(USERNAME);
// props.add(PASSWORD);
// props.add(AUTO_REDIRECT);
// props.add(SINK_ENABLE_BATCH_MODE);
// props.add(SINK_FLUSH_QUEUE_SIZE);
// props.add(SINK_BUFFER_FLUSH_MAX_ROWS);
// props.add(SINK_BUFFER_FLUSH_MAX_BYTES);
// props.add(SINK_BUFFER_FLUSH_INTERVAL);
// props.add(SINK_PROPERTIES);
// props.add(TABLE_CREATE_PROPERTIES);
props.add(AUTO_REDIRECT);
props.add(SINK_ENABLE_BATCH_MODE);
props.add(SINK_FLUSH_QUEUE_SIZE);
props.add(SINK_BUFFER_FLUSH_MAX_ROWS);
props.add(SINK_BUFFER_FLUSH_MAX_BYTES);
props.add(SINK_BUFFER_FLUSH_INTERVAL);
props.add(SINK_PROPERTIES);
props.add(TABLE_CREATE_PROPERTIES);
this.supportedProperties = Collections.unmodifiableList(props);
}

@Override
public List<ResourceProperty> getRequiredResources() {
return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE);
}

@Override
public ObjectNode createConf() {
ObjectNode conf = super.createConf();
JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE);
DorisDataSource dataSource = (DorisDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
conf.putPOJO(FENODES.getName(), dataSource.getNodeUrls());
conf.putPOJO(USERNAME.getName(), dataSource.getUsername());
conf.putPOJO(PASSWORD.getName(), dataSource.getPassword());
return conf;
}

@Override
protected FlinkCDCPluginMapping getPluginMapping() {
return FlinkCDCPluginMapping.SINK_DORIS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@

package cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source;

import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping;
import cn.sliew.scaleph.ds.modal.AbstractDataSource;
import cn.sliew.scaleph.ds.modal.jdbc.MySQLDataSource;
import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin;
import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -41,11 +50,7 @@ public MySQLSourcePlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
// props.add(HOSTNAME);
// props.add(PORT);
// props.add(USERNAME);
// props.add(PASSWORD);
// props.add(TABLES);
props.add(TABLES);
props.add(SCHEMA_CHANGE_ENABLED);
props.add(SERVER_ID);
props.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
Expand All @@ -66,6 +71,29 @@ public MySQLSourcePlugin() {
this.supportedProperties = Collections.unmodifiableList(props);
}

@Override
public List<ResourceProperty> getRequiredResources() {
return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE);
}

@Override
public ObjectNode createConf() {
try {
ObjectNode conf = super.createConf();
JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE);
MySQLDataSource dataSource = (MySQLDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
URI url = new URI(dataSource.getUrl().replace("jdbc:", ""));
conf.putPOJO(HOSTNAME.getName(), url.getHost());
conf.putPOJO(PORT.getName(), url.getPort());
conf.putPOJO(USERNAME.getName(), dataSource.getUser());
conf.putPOJO(PASSWORD.getName(), dataSource.getPassword());
return conf;
} catch (URISyntaxException e) {
Rethrower.throwAs(e);
return null;
}
}

@Override
protected FlinkCDCPluginMapping getPluginMapping() {
return FlinkCDCPluginMapping.SOURCE_MYSQL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@
package cn.sliew.scaleph.plugin.flink.cdc.connectors.starrocks.sink;

import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginMapping;
import cn.sliew.scaleph.ds.modal.AbstractDataSource;
import cn.sliew.scaleph.ds.modal.olap.StarRocksDataSource;
import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin;
import cn.sliew.scaleph.plugin.flink.cdc.connectors.CommonProperties;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperties;
import cn.sliew.scaleph.plugin.framework.resource.ResourceProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.PASSWORD;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.mysql.source.MySQLSourceProperties.USERNAME;
import static cn.sliew.scaleph.plugin.flink.cdc.connectors.starrocks.sink.StarRocksSinkProperties.*;

@AutoService(FlinkCDCPipilineConnectorPlugin.class)
Expand All @@ -41,10 +49,6 @@ public StarRocksSinkPlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
props.add(JDBC_URL);
props.add(LOAD_URL);
props.add(USERNAME);
props.add(PASSWORD);
props.add(SINK_LABEL_PREFIX);
props.add(SINK_CONNECT_TIMEOUT_MS);
props.add(SINK_WAIT_FOR_CONTINUE_TIMEOUT_MS);
Expand All @@ -60,6 +64,23 @@ public StarRocksSinkPlugin() {
this.supportedProperties = Collections.unmodifiableList(props);
}

@Override
public List<ResourceProperty> getRequiredResources() {
return Collections.singletonList(ResourceProperties.DATASOURCE_RESOURCE);
}

@Override
public ObjectNode createConf() {
ObjectNode conf = super.createConf();
JsonNode jsonNode = properties.get(ResourceProperties.DATASOURCE);
StarRocksDataSource dataSource = (StarRocksDataSource) AbstractDataSource.fromDsInfo((ObjectNode) jsonNode);
conf.putPOJO(JDBC_URL.getName(), dataSource.getNodeUrls());
conf.putPOJO(LOAD_URL.getName(), dataSource.getNodeUrls());
conf.putPOJO(USERNAME.getName(), dataSource.getUsername());
conf.putPOJO(PASSWORD.getName(), dataSource.getPassword());
return conf;
}

@Override
protected FlinkCDCPluginMapping getPluginMapping() {
return FlinkCDCPluginMapping.SINK_STARROCKS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class MybatisPlusGenerator {

private final static String AUTHOR = "wangqi";
private final static String URL = "jdbc:mysql://127.0.0.1:3306/scaleph";
private final static String URL = "jdbc:mysql://127.0.0.1:3306/carp";
private final static String USERNAME = "root";
private final static String PASSWORD = "123456"; //NOSONAR
private static final String BASE_PACKAGE = "cn.sliew";
Expand All @@ -54,7 +54,7 @@ public class MybatisPlusGenerator {
/**
* just add table names here and run the {@link #main(String[])} method.
*/
private static final String[] TABLES = {"dag_instance", "dag_link", "dag_step"};
private static final String[] TABLES = {"sec_application"};

public static void main(String[] args) {
//自动生成配置
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import SinkDorisConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkDorisConnector";
import SinkStarRocksConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkStarRocksConnector";
import SourceKafkaConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Source/SourceKafkaConnector";
import SinkKafkaConnectorForm
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/Sink/SinkKafkaConnector";

type ConnectorProps = {
prefix: string;
type: string;
dsId: number;
};

const FlinkCDCConnectorForm: React.FC<ConnectorProps> = ({type, dsId}) => {
const FlinkCDCConnectorForm: React.FC<ConnectorProps> = ({prefix, type, dsId}) => {

const [content, setConstent] = useState(<></>)

Expand All @@ -27,13 +28,13 @@ const FlinkCDCConnectorForm: React.FC<ConnectorProps> = ({type, dsId}) => {
DsInfoService.selectOne(dsId).then((response) => {
if (response.data) {
if (type === 'source' && response.data.dsType.type.value == 'MySQL') {
setConstent(<SourceMySQLConnectorForm/>)
setConstent(<SourceMySQLConnectorForm prefix={prefix} />)
} else if (type === 'sink' && response.data.dsType.type.value === 'Doris') {
setConstent(<SinkDorisConnectorForm/>)
setConstent(<SinkDorisConnectorForm prefix={prefix} />)
} else if (type === 'sink' && response.data.dsType.type.value === 'StarRocks') {
setConstent(<SinkStarRocksConnectorForm/>)
setConstent(<SinkStarRocksConnectorForm prefix={prefix} />)
} else if (type === 'sink' && response.data.dsType.type.value === 'Kafka') {
setConstent(<SinkStarRocksConnectorForm/>)
setConstent(<SinkKafkaConnectorForm prefix={prefix} />)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ import {useIntl} from "@umijs/max";
import {ProFormDependency, ProFormDigit, ProFormGroup, ProFormSwitch, ProFormText} from "@ant-design/pro-components";
import {DorisParams} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant";

const SinkDorisConnectorForm: React.FC = () => {
const SinkDorisConnectorForm: React.FC<{prefix: string}> = ({prefix}) => {
const intl = useIntl();

return (
<ProFormGroup>
<ProFormSwitch
name={DorisParams.autoRedirect}
name={[prefix, DorisParams.autoRedirect]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.autoRedirect'})}
/>
<ProFormSwitch
name={DorisParams.sinkEnableBatchModeParam}
name={[prefix, DorisParams.sinkEnableBatchModeParam]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkEnableBatchMode'})}
initialValue={true}
/>
Expand All @@ -22,15 +22,15 @@ const SinkDorisConnectorForm: React.FC = () => {
if (sinkEnableBatchModeParam) {
return <ProFormGroup>
<ProFormDigit
name={DorisParams.sinkFlushQueueSize}
name={[prefix, DorisParams.sinkFlushQueueSize]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkFlushQueueSize'})}
initialValue={2}
fieldProps={{
min: 1
}}
/>
<ProFormDigit
name={DorisParams.sinkBufferFlushMaxRows}
name={[prefix, DorisParams.sinkBufferFlushMaxRows]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkBufferFlushMaxRows'})}
colProps={{span: 8}}
initialValue={50000}
Expand All @@ -39,7 +39,7 @@ const SinkDorisConnectorForm: React.FC = () => {
}}
/>
<ProFormDigit
name={DorisParams.sinkBufferFlushMaxBytes}
name={[prefix, DorisParams.sinkBufferFlushMaxBytes]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkBufferFlushMaxBytes'})}
colProps={{span: 8}}
initialValue={1024 * 1024 * 10}
Expand All @@ -48,7 +48,7 @@ const SinkDorisConnectorForm: React.FC = () => {
}}
/>
<ProFormText
name={DorisParams.sinkBufferFlushInterval}
name={[prefix, DorisParams.sinkBufferFlushInterval]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.doris.sinkBufferFlushInterval'})}
colProps={{span: 8}}
initialValue={'10s'}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,39 @@
import React from 'react';
import {useIntl} from "@umijs/max";
import {ProFormDigit, ProFormGroup, ProFormSelect, ProFormSwitch, ProFormText} from "@ant-design/pro-components";
import {
KafkaParams,
StarRocksParams
} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant";
import {ProFormGroup, ProFormSelect, ProFormSwitch, ProFormText} from "@ant-design/pro-components";
import {KafkaParams} from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/Connector/constant";

const SourceKafkaConnectorForm: React.FC = () => {
const SinkKafkaConnectorForm: React.FC<{prefix: string}> = ({prefix}) => {
const intl = useIntl();

return (
<ProFormGroup>
<ProFormText
name={KafkaParams.propertiesBootstrapServers}
name={[prefix, KafkaParams.propertiesBootstrapServers]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.propertiesBootstrapServers'})}
rules={[{required: true}]}
/>
<ProFormText
name={KafkaParams.topic}
name={[prefix, KafkaParams.topic]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.topic'})}
/>
<ProFormSelect
name={KafkaParams.valueFormat}
name={[prefix, KafkaParams.valueFormat]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.valueFormat'})}
allowClear={false}
initialValue={'debezium-json'}
options={['debezium-json', 'canal-json']}
/>
<ProFormSwitch
name={KafkaParams.sinkAddTableIdToHeaderEnabled}
name={[prefix, KafkaParams.sinkAddTableIdToHeaderEnabled]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.sinkAddTableIdToHeaderEnabled'})}
/>
<ProFormText
name={KafkaParams.sinkCustomHeader}
name={[prefix, KafkaParams.sinkCustomHeader]}
label={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.connector.kafka.sinkCustomHeader'})}
/>
</ProFormGroup>
);
};

export default SourceKafkaConnectorForm;
export default SinkKafkaConnectorForm;
Loading

0 comments on commit 03219ca

Please sign in to comment.