diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 755de8bb9a7..31dcdd75a31 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -170,6 +170,7 @@ there are some reference value for params above. | Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | | OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | +| sqlserver | net.sourceforge.jtds.jdbc.Driver | jdbc:jtds:sqlserver://localhost:1433 | / | https://mvnrepository.com/artifact/net.sourceforge.jtds/jtds/1.3.1/jtds-1.3.1.jar | ## Example diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index d82df87a02e..d8874c954f5 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -126,6 +126,7 @@ there are some reference value for params above. | Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | | OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | +| sqlserver | net.sourceforge.jtds.jdbc.Driver | jdbc:jtds:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/net.sourceforge.jtds/jtds/1.3.1/jtds-1.3.1.jar | ## Example diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index e76237e7e07..75d1807c005 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -34,6 +34,7 @@ 42.4.3 8.1.2.141 9.2.1.jre8 + 1.3.1 5.2.5-HBase-2.x 12.2.0.1 3.39.3.0 @@ -86,6 +87,12 @@ ${sqlserver.version} provided + + net.sourceforge.jtds + jtds + ${jtds.version} + provided + com.oracle.database.jdbc ojdbc8 @@ -180,6 +187,12 @@ com.microsoft.sqlserver mssql-jdbc + + + net.sourceforge.jtds + jtds + + com.oracle.database.jdbc ojdbc8 diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java index 815d51a3f08..13196099a55 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java @@ -59,8 +59,12 @@ public Connection getConnection() { @Override public boolean isConnectionValid() throws SQLException { - return connection != null - && connection.isValid(jdbcConfig.getConnectionCheckTimeoutSeconds()); + if (connection != null && connection.toString().startsWith("net.sourceforge.jtds")) { + return connection != null && !connection.isClosed(); + } else { + return connection != null + && connection.isValid(jdbcConfig.getConnectionCheckTimeoutSeconds()); + } } private static Driver loadDriver(String driverName) throws ClassNotFoundException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java index d8fce3c43c1..51884fd9086 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java @@ -27,7 +27,7 @@ public class SqlServerDialectFactory implements JdbcDialectFactory { @Override public boolean acceptsURL(String url) { - return url.startsWith("jdbc:sqlserver:"); + return (url.startsWith("jdbc:jtds:sqlserver:") || url.startsWith("jdbc:sqlserver:")); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java index 4a8978cb247..8999e9a648d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java @@ -57,6 +57,7 @@ public class SqlserverTypeMapper implements JdbcDialectTypeMapper { private static final String SQLSERVER_NTEXT = "NTEXT"; private static final String SQLSERVER_NCHAR = "NCHAR"; private static final String SQLSERVER_NVARCHAR = "NVARCHAR"; + private static final String SQLSERVER_SYSNAME = "SYSNAME"; private static final String SQLSERVER_TEXT = "TEXT"; // ------------------------------time------------------------- @@ -106,6 +107,7 @@ public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) case SQLSERVER_NTEXT: case SQLSERVER_NVARCHAR: case SQLSERVER_TEXT: + case SQLSERVER_SYSNAME: return BasicType.STRING_TYPE; case SQLSERVER_DATE: return LocalTimeType.LOCAL_DATE_TYPE; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml index 8628e2b80b6..249cfa4dcbe 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml @@ -91,6 +91,11 @@ vertica-jdbc test + + net.sourceforge.jtds + jtds + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcJtdsSqlServerIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcJtdsSqlServerIT.java new file mode 100644 index 00000000000..1bb7292e6d4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcJtdsSqlServerIT.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.e2e.common.TestSuiteBase; + +import org.apache.commons.lang3.tuple.Pair; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JdbcJtdsSqlServerIT extends AbstractJdbcIT { + private static final String SQLSERVER_IMAGE = "mcr.microsoft.com/mssql/server:2022-latest"; + private static final String SQLSERVER_CONTAINER_HOST = "sqlserver"; + private static final String SQLSERVER_SOURCE = "source"; + private static final String SQLSERVER_SINK = "sink"; + private static final int SQLSERVER_CONTAINER_PORT = 1433; + private static final String SQLSERVER_URL = + "jdbc:jtds:sqlserver://" + AbstractJdbcIT.HOST + ":%s"; + private static final String DRIVER_CLASS = "net.sourceforge.jtds.jdbc.Driver"; + private static final List CONFIG_FILE = + Lists.newArrayList("/jdbc_jtds_sqlserver_source_to_sink.conf"); + private static final String CREATE_SQL = + "CREATE TABLE %s (\n" + + " [age] bigint NOT NULL,\n" + + " [name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL\n" + + ")"; + + private String username; + + private String password; + + @Override + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + String jdbcUrl = String.format(SQLSERVER_URL, SQLSERVER_CONTAINER_PORT); + Pair> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable("", SQLSERVER_SOURCE, fieldNames); + + return JdbcCase.builder() + .dockerImage(SQLSERVER_IMAGE) + .networkAliases(SQLSERVER_CONTAINER_HOST) + .containerEnv(containerEnv) + .driverClass(DRIVER_CLASS) + .host(AbstractJdbcIT.HOST) + .port(SQLSERVER_CONTAINER_PORT) + .localPort(SQLSERVER_CONTAINER_PORT) + .jdbcTemplate(SQLSERVER_URL) + .jdbcUrl(jdbcUrl) + .userName(username) + .password(password) + .sourceTable(SQLSERVER_SOURCE) + .sinkTable(SQLSERVER_SINK) + .createSql(CREATE_SQL) + .configFile(CONFIG_FILE) + .insertSql(insertSql) + .testData(testDataSet) + .build(); + } + + @Override + void compareResult() throws SQLException, IOException {} + + @Override + String driverUrl() { + return "https://repo1.maven.org/maven2/net/sourceforge/jtds/jtds/1.3.1/jtds-1.3.1.jar"; + } + + @Override + Pair> initTestData() { + String[] fieldNames = + new String[] { + "age", "name", + }; + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + i, "f_" + i, + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } + + @Override + GenericContainer initContainer() { + DockerImageName imageName = DockerImageName.parse(SQLSERVER_IMAGE); + + MSSQLServerContainer container = + new MSSQLServerContainer<>(imageName) + .withNetwork(TestSuiteBase.NETWORK) + .withNetworkAliases(SQLSERVER_CONTAINER_HOST) + .acceptLicense() + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(SQLSERVER_IMAGE))); + + container.setPortBindings( + Lists.newArrayList( + String.format( + "%s:%s", SQLSERVER_CONTAINER_PORT, SQLSERVER_CONTAINER_PORT))); + + try { + Class.forName(container.getDriverClassName()); + } catch (ClassNotFoundException e) { + throw new SeaTunnelRuntimeException( + JdbcITErrorCode.DRIVER_NOT_FOUND, "Not found suitable driver for mssql", e); + } + + username = container.getUsername(); + password = container.getPassword(); + + return container; + } + + @Override + public String quoteIdentifier(String field) { + return "[" + field + "]"; + } + + @Override + public void clearTable(String schema, String table) { + // do nothing. + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_jtds_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_jtds_sqlserver_source_to_sink.conf new file mode 100644 index 00000000000..f3175f43dff --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_jtds_sqlserver_source_to_sink.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + # default port is 1433 + driver = net.sourceforge.jtds.jdbc.Driver + url = "jdbc:jtds:sqlserver://sqlserver:port/database" + user = SA + password = "A_Str0ng_Required_Password" + query = "select age, name from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform-v2/sql +} + +sink { + Jdbc { + # default port is 1433 + driver = net.sourceforge.jtds.jdbc.Driver + url = "jdbc:jtds:sqlserver://sqlserver:port/database" + user = SA + password = "A_Str0ng_Required_Password" + query = "insert into sink(age, name) values(?,?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +}