From 2f1eeacc91e80531009cbaf42830eef26c0c2007 Mon Sep 17 00:00:00 2001 From: byteteacher0 <18223460208@163.com> Date: Mon, 21 Aug 2023 22:50:07 +0800 Subject: [PATCH] [Feature][connector][access] Support source/sink for access --- .../connector-access/pom.xml | 90 +++++++ .../seatunnel/access/client/AccessClient.java | 67 ++++++ .../seatunnel/access/config/AccessConfig.java | 51 ++++ .../access/config/AccessParameters.java | 64 +++++ .../exception/AccessConnectorErrorCode.java | 43 ++++ .../exception/AccessConnectorException.java | 36 +++ .../seatunnel/access/sink/AccessSink.java | 103 ++++++++ .../access/sink/AccessSinkFactory.java | 41 ++++ .../access/sink/AccessSinkWriter.java | 100 ++++++++ .../seatunnel/access/source/AccessSource.java | 114 +++++++++ .../access/source/AccessSourceFactory.java | 48 ++++ .../access/source/AccessSourceReader.java | 90 +++++++ .../access/util/TypeConvertUtil.java | 222 ++++++++++++++++++ .../java/org/apache/seatunnel/AppTest.java | 38 +++ seatunnel-connectors-v2/pom.xml | 1 + 15 files changed, 1108 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-access/pom.xml create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/client/AccessClient.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessConfig.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessParameters.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorErrorCode.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorException.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSink.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkWriter.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSource.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceFactory.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/util/TypeConvertUtil.java create mode 100644 seatunnel-connectors-v2/connector-access/src/test/java/org/apache/seatunnel/AppTest.java diff --git a/seatunnel-connectors-v2/connector-access/pom.xml b/seatunnel-connectors-v2/connector-access/pom.xml new file mode 100644 index 00000000000..ef805d18e6e --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/pom.xml @@ -0,0 +1,90 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-access + + connector-access + + + UTF-8 + 4.0.4 + db2jcc4 + 8.0.16 + 4.14.0 + 1.18-SNAPSHOT + + + + + junit + junit + 3.8.1 + test + + + net.sf.ucanaccess + ucanaccess + ${ucanaccess.version} + + + org.apache.seatunnel + seatunnel-api + ${project.version} + compile + + + org.apache.seatunnel + connector-jdbc + ${project.version} + compile + + + org.apache.seatunnel + connector-common + ${project.version} + compile + + + org.apache.seatunnel + seatunnel-flink-15-starter + ${project.version} + + + com.datastax.oss + java-driver-core + ${java-driver-core.version} + compile + + + org.apache.flink + flink-java + ${flink.version} + compile + + + diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/client/AccessClient.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/client/AccessClient.java new file mode 100644 index 00000000000..0711729a254 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/client/AccessClient.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.client; + +import java.io.Serializable; +import java.sql.*; + +public class AccessClient implements Serializable { + private String driver; + private String url; + private String username; + private String password; + private String query; + private Connection connection; + + public AccessClient(String driver, String url, String username, String password, String query) { + this.driver = driver; + this.url = url; + this.username = username; + this.password = password; + this.query = query; + } + + public Connection getAccessConnection(String url, String username, String password) { + try { + Class.forName(driver); + this.connection = DriverManager.getConnection(url, username, password); + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + return this.connection; + } + + public ResultSetMetaData selectMetaData() throws Exception { + connection = this.getAccessConnection(url, username, password); + Statement statement = connection.createStatement(); + ResultSet result = statement.executeQuery(query); + ResultSetMetaData metaData = result.getMetaData(); + statement.close(); + return metaData; + } + + public ResultSetMetaData getTableSchema(String tableName) throws Exception { + connection = this.getAccessConnection(url, username, password); + Statement statement = connection.createStatement(); + ResultSet result = + statement.executeQuery(String.format("select * from %s limit 1", tableName)); + ResultSetMetaData metaData = result.getMetaData(); + statement.close(); + return metaData; + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessConfig.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessConfig.java new file mode 100644 index 00000000000..a0ee86ce10c --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessConfig.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class AccessConfig { + public static final Integer DEFAULT_BATCH_SIZE = 5000; + + public static final Option DRIVER = + Options.key("driver").stringType().noDefaultValue().withDescription("driver"); + + public static final Option URL = + Options.key("url").stringType().noDefaultValue().withDescription("url"); + + public static final Option USERNAME = + Options.key("username").stringType().noDefaultValue().withDescription("username"); + + public static final Option PASSWORD = + Options.key("password").stringType().noDefaultValue().withDescription("password"); + + public static final Option QUERY = + Options.key("query").stringType().noDefaultValue().withDescription("query"); + + public static final Option TABLE = + Options.key("table").stringType().noDefaultValue().withDescription("table"); + + public static final Option FIELDS = + Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("fields"); + public static final Option BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(DEFAULT_BATCH_SIZE) + .withDescription(""); +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessParameters.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessParameters.java new file mode 100644 index 00000000000..1e7b20e1caf --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/config/AccessParameters.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.config; + +import lombok.Getter; +import lombok.Setter; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.Serializable; +import java.util.List; + +@Setter +@Getter +public class AccessParameters implements Serializable { + private String driver; + private String url; + private String username; + private String password; + private String query; + private List fields; + private String table; + private Integer batchSize; + + public void buildWithConfig(Config config) { + this.driver = config.getString(AccessConfig.DRIVER.key()); + this.url = config.getString(AccessConfig.URL.key()); + + if (config.hasPath(AccessConfig.USERNAME.key())) { + this.username = config.getString(AccessConfig.USERNAME.key()); + } + if (config.hasPath(AccessConfig.PASSWORD.key())) { + this.password = config.getString(AccessConfig.PASSWORD.key()); + } + if (config.hasPath(AccessConfig.QUERY.key())) { + this.query = config.getString(AccessConfig.QUERY.key()); + } + if (config.hasPath(AccessConfig.FIELDS.key())) { + this.fields = config.getStringList(AccessConfig.FIELDS.key()); + } + if (config.hasPath(AccessConfig.TABLE.key())) { + this.table = config.getString(AccessConfig.TABLE.key()); + } + if (config.hasPath(AccessConfig.BATCH_SIZE.key())) { + this.batchSize = config.getInt(AccessConfig.BATCH_SIZE.key()); + } else { + this.batchSize = AccessConfig.BATCH_SIZE.defaultValue(); + } + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorErrorCode.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorErrorCode.java new file mode 100644 index 00000000000..13b226d8bd2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorErrorCode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum AccessConnectorErrorCode implements SeaTunnelErrorCode { + FIELD_NOT_IN_TABLE("ACCESS-01", "Field is not existed in target table"), + + CLOSE_CQL_CONNECT_FAILED("ACCESS-03", "Close connect of access failed"); + private final String code; + private final String description; + + AccessConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return null; + } + + @Override + public String getDescription() { + return null; + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorException.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorException.java new file mode 100644 index 00000000000..0156e6c0bf2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/exception/AccessConnectorException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class AccessConnectorException extends SeaTunnelRuntimeException { + public AccessConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public AccessConnectorException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public AccessConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSink.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSink.java new file mode 100644 index 00000000000..e1dba7f68b7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSink.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.sink; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient; +import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters; +import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException; +import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.IOException; + +import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.*; + +@AutoService(SeaTunnelSink.class) +public class AccessSink extends AbstractSimpleSink { + private SeaTunnelRowType seaTunnelRowType; + private final AccessParameters accessParameters = new AccessParameters(); + + private SeaTunnelDataType[] seaTunnelDataTypes; + + @Override + public String getPluginName() { + return "Access"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult checkResult = + CheckConfigUtil.checkAllExists( + pluginConfig, + DRIVER.key(), + URL.key(), + USERNAME.key(), + PASSWORD.key(), + TABLE.key(), + QUERY.key()); + if (!checkResult.isSuccess()) { + throw new AccessConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, checkResult.getMsg())); + } + this.accessParameters.buildWithConfig(pluginConfig); + + AccessClient accessClient = + new AccessClient( + pluginConfig.getString(DRIVER.key()), + pluginConfig.getString(URL.key()), + pluginConfig.getString(USERNAME.key()), + pluginConfig.getString(PASSWORD.key()), + pluginConfig.getString(QUERY.key())); + + seaTunnelDataTypes = + TypeConvertUtil.getSeaTunnelDataTypes( + accessParameters, accessClient, pluginConfig, getPluginName()); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) + throws IOException { + return new AccessSinkWriter(accessParameters, seaTunnelDataTypes); + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkFactory.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkFactory.java new file mode 100644 index 00000000000..728f6e29138 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.sink; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig; + +@AutoService(Factory.class) +public class AccessSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Access"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(AccessConfig.DRIVER, AccessConfig.URL, AccessConfig.TABLE) + .bundled(AccessConfig.USERNAME, AccessConfig.PASSWORD) + .optional(AccessConfig.QUERY) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkWriter.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkWriter.java new file mode 100644 index 00000000000..45f7656d555 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/sink/AccessSinkWriter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.sink; + +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient; +import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters; +import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException; +import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Arrays; + +@Slf4j +public class AccessSinkWriter extends AbstractSinkWriter { + private final AccessParameters accessParameters; + private Connection connection; + private final PreparedStatement statement; + private final SeaTunnelDataType[] seaTunnelDataTypes; + + public AccessSinkWriter( + AccessParameters accessParameters, SeaTunnelDataType[] seaTunnelDataTypes) { + this.accessParameters = accessParameters; + this.seaTunnelDataTypes = seaTunnelDataTypes; + AccessClient accessClient = + new AccessClient( + accessParameters.getDriver(), + accessParameters.getUrl(), + accessParameters.getUsername(), + accessParameters.getPassword(), + accessParameters.getQuery()); + connection = + accessClient.getAccessConnection( + accessParameters.getUrl(), + accessParameters.getUsername(), + accessParameters.getPassword()); + try { + this.statement = connection.prepareStatement(initPrepareCQL()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + try { + for (int i = 0; i < accessParameters.getFields().size(); i++) { + String type = this.seaTunnelDataTypes[i].toString(); + TypeConvertUtil.reconvertAndInject(statement, i, type, element.getField(i)); + } + statement.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + try { + if (this.connection != null) { + this.connection.close(); + } + } catch (Exception e) { + throw new AccessConnectorException( + AccessConnectorErrorCode.CLOSE_CQL_CONNECT_FAILED, e); + } + } + + private String initPrepareCQL() { + String[] placeholder = new String[accessParameters.getFields().size()]; + Arrays.fill(placeholder, "?"); + return String.format( + "INSERT INTO %s (%s) VALUES (%s)", + accessParameters.getTable(), + String.join(",", accessParameters.getFields()), + String.join(",", placeholder)); + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSource.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSource.java new file mode 100644 index 00000000000..332df42bc8f --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSource.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.source; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SupportColumnProjection; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient; +import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters; +import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException; +import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.sql.ResultSetMetaData; + +import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.*; + +@AutoService(SeaTunnelSource.class) +public class AccessSource extends AbstractSingleSplitSource + implements SupportColumnProjection { + private SeaTunnelRowType rowTypeInfo; + private final AccessParameters accessParameters = new AccessParameters(); + + @Override + public String getPluginName() { + return "Access"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult checkResult = + CheckConfigUtil.checkAllExists( + pluginConfig, + DRIVER.key(), + URL.key(), + USERNAME.key(), + PASSWORD.key(), + QUERY.key()); + + if (!checkResult.isSuccess()) { + throw new AccessConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, checkResult.getMsg())); + } + + this.accessParameters.buildWithConfig(pluginConfig); + + AccessClient accessClient = + new AccessClient( + pluginConfig.getString(DRIVER.key()), + pluginConfig.getString(URL.key()), + pluginConfig.getString(USERNAME.key()), + pluginConfig.getString(PASSWORD.key()), + pluginConfig.getString(QUERY.key())); + try { + ResultSetMetaData metaData = accessClient.selectMetaData(); + int columnSize = metaData.getColumnCount(); + String[] fieldNames = new String[columnSize]; + SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize]; + for (int i = 1; i <= columnSize; i++) { + fieldNames[i - 1] = metaData.getColumnName(i); + seaTunnelDataTypes[i - 1] = TypeConvertUtil.convert(metaData.getColumnTypeName(i)); + } + this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowTypeInfo; + } + + @Override + public AbstractSingleSplitReader createReader( + SingleSplitReaderContext readerContext) throws Exception { + return new AccessSourceReader(accessParameters, readerContext); + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceFactory.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceFactory.java new file mode 100644 index 00000000000..93d15e770a4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.source; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig; + +@AutoService(Factory.class) +public class AccessSourceFactory implements TableSourceFactory { + + @Override + public String factoryIdentifier() { + return "Access"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(AccessConfig.DRIVER, AccessConfig.URL) + .bundled(AccessConfig.USERNAME, AccessConfig.PASSWORD) + .optional(AccessConfig.QUERY) + .build(); + } + + @Override + public Class getSourceClass() { + return AccessSource.class; + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceReader.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceReader.java new file mode 100644 index 00000000000..d2c43101c45 --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/source/AccessSourceReader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.source; + +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient; +import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters; +import org.apache.seatunnel.connectors.seatunnel.access.util.TypeConvertUtil; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; + +import java.io.IOException; +import java.sql.*; + +@Slf4j +public class AccessSourceReader extends AbstractSingleSplitReader { + + private Connection connection; + + private final SingleSplitReaderContext readerContext; + + private final AccessParameters accessParameters; + + AccessSourceReader(AccessParameters accessParameters, SingleSplitReaderContext readerContext) { + this.accessParameters = accessParameters; + this.readerContext = readerContext; + } + + @Override + public void open() throws Exception { + AccessClient accessClient = + new AccessClient( + accessParameters.getDriver(), + accessParameters.getUrl(), + accessParameters.getUsername(), + accessParameters.getPassword(), + accessParameters.getQuery()); + connection = + accessClient.getAccessConnection( + accessParameters.getUrl(), + accessParameters.getUsername(), + accessParameters.getPassword()); + } + + @Override + public void close() throws IOException { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void pollNext(Collector output) throws Exception { + Statement statement = connection.createStatement(); + ResultSet result = statement.executeQuery(accessParameters.getQuery()); + ResultSetMetaData metaData = result.getMetaData(); + + while (result.next()) { + Object[] datas = new Object[metaData.getColumnCount()]; + for (int i = 1; i <= metaData.getColumnCount(); i++) { + String columnName = metaData.getColumnName(i); + String columnType = metaData.getColumnTypeName(i); + datas[i - 1] = TypeConvertUtil.convertToObject(result, columnName, columnType); + } + output.collect(new SeaTunnelRow(datas)); + } + this.readerContext.signalNoMoreElement(); + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/util/TypeConvertUtil.java b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/util/TypeConvertUtil.java new file mode 100644 index 00000000000..ec256b0851b --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/main/java/org/apache/seatunnel/connectors/seatunnel/access/util/TypeConvertUtil.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.access.util; + +import org.apache.commons.lang.StringUtils; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.access.client.AccessClient; +import org.apache.seatunnel.connectors.seatunnel.access.config.AccessParameters; +import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.access.exception.AccessConnectorException; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.sql.*; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; +import java.util.*; + +import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.QUERY; +import static org.apache.seatunnel.connectors.seatunnel.access.config.AccessConfig.TABLE; + +public class TypeConvertUtil { + public static SeaTunnelDataType convert(String type) { + switch (type) { + case "INTEGER": + case "BIGINT": + return BasicType.INT_TYPE; + case "VARCHAR": + return BasicType.STRING_TYPE; + case "DECIMAL": + return BasicType.DOUBLE_TYPE; + case "TIMESTAMP": + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case "BOOLEAN": + return BasicType.BOOLEAN_TYPE; + default: + throw new AccessConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported this data type: " + type); + } + } + + public static Object convertToObject(ResultSet result, String columnName, String columnType) { + Object value; + if ("INTEGER".equals(columnType) || "BIGINT".equals(columnType)) { + try { + value = result.getInt(columnName); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } else if ("VARCHAR".equals(columnType)) { + try { + value = result.getString(columnName); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } else if ("DECIMAL".equals(columnType)) { + try { + value = result.getDouble(columnName); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } else if ("TIMESTAMP".equals(columnType)) { + try { + value = result.getString(columnName); + if (null == value) { + } else { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long timestamp = dateFormat.parse((String) value).getTime(); + Instant instant = Instant.ofEpochMilli(timestamp); + value = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + } + } catch (SQLException | ParseException e) { + throw new RuntimeException(e); + } + } else if ("BOOLEAN".equals(columnType)) { + try { + value = result.getBoolean(columnName); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } else { + try { + value = result.getObject(columnName); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + return value; + } + + public static void reconvertAndInject( + PreparedStatement statement, int index, String type, Object fieldValue) { + switch (type) { + case "INT": + case "INTEGER": + try { + statement.setInt( + index + 1, + null == fieldValue ? 0 : Integer.parseInt(fieldValue.toString())); + return; + } catch (SQLException e) { + throw new RuntimeException(e); + } + case "STRING": + try { + statement.setString(index + 1, null == fieldValue ? "" : fieldValue.toString()); + return; + } catch (SQLException e) { + throw new RuntimeException(e); + } + case "DOUBLE": + try { + statement.setDouble( + index + 1, + null == fieldValue ? 0D : Double.parseDouble(fieldValue.toString())); + return; + } catch (SQLException e) { + throw new RuntimeException(e); + } + case "TIMESTAMP": + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + try { + if (null == fieldValue) { + statement.setTimestamp(index + 1, Timestamp.valueOf(LocalDateTime.MIN)); + } else { + Date date = dateFormat.parse(fieldValue.toString().substring(0, 10)); + statement.setTimestamp(index + 1, new Timestamp(date.getTime())); + } + return; + } catch (ParseException | SQLException e) { + throw new RuntimeException(e); + } + case "BOOLEAN": + try { + statement.setBoolean( + index + 1, + null == fieldValue ? null : new Boolean(fieldValue.toString())); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + public static SeaTunnelDataType[] getSeaTunnelDataTypes( + AccessParameters accessParameters, + AccessClient accessClient, + Config pluginConfig, + String pluginName) { + String[] sqlColumns; + List fields = accessParameters.getFields(); + SeaTunnelDataType[] seaTunnelDataTypes; + try { + ResultSetMetaData metaData = + accessClient.getTableSchema(pluginConfig.getString(TABLE.key())); + int columnCount = metaData.getColumnCount(); + String columnString = + StringUtils.substringBetween(pluginConfig.getString(QUERY.key()), "(", ")"); + sqlColumns = columnString.split(","); + String[] tableColumns = new String[metaData.getColumnCount()]; + for (int i = 1; i <= columnCount; i++) { + tableColumns[i - 1] = metaData.getColumnName(i).toUpperCase(); + } + seaTunnelDataTypes = new SeaTunnelDataType[sqlColumns.length]; + Set tableColumnSet = new HashSet<>(Arrays.asList(tableColumns)); + int tableColumnSetSize = tableColumnSet.size(); + if (fields == null || fields.isEmpty()) { + for (int j = 0; j < sqlColumns.length; j++) { + tableColumnSet.add(sqlColumns[j].trim().toUpperCase()); + if (tableColumnSetSize == tableColumnSet.size()) { + accessParameters.setFields(Arrays.asList(sqlColumns)); + for (int k = 1; k <= columnCount; k++) { + if (metaData.getColumnName(k) + .toUpperCase() + .equals(sqlColumns[j].trim().toUpperCase())) { + seaTunnelDataTypes[j] = + TypeConvertUtil.convert(metaData.getColumnTypeName(j + 1)); + } + } + } else { + throw new AccessConnectorException( + AccessConnectorErrorCode.FIELD_NOT_IN_TABLE, + "Field " + + sqlColumns[j] + + " does not exist in table " + + pluginConfig.getString(TABLE.key())); + } + } + } + } catch (Exception e) { + throw new AccessConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + pluginName, PluginType.SINK, e)); + } + return seaTunnelDataTypes; + } +} diff --git a/seatunnel-connectors-v2/connector-access/src/test/java/org/apache/seatunnel/AppTest.java b/seatunnel-connectors-v2/connector-access/src/test/java/org/apache/seatunnel/AppTest.java new file mode 100644 index 00000000000..529f8c2b76d --- /dev/null +++ b/seatunnel-connectors-v2/connector-access/src/test/java/org/apache/seatunnel/AppTest.java @@ -0,0 +1,38 @@ +package org.apache.seatunnel; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 8af51a0d0e5..5551dc93eca 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -73,6 +73,7 @@ connector-hbase connector-rocketmq connector-paimon + connector-access