Skip to content

Commit

Permalink
[Feature][SaveMode] Add savemode for mysql、oracle、sqlServer、pgSql、sta…
Browse files Browse the repository at this point in the history
…rrocks、s3redshift、redshift
  • Loading branch information
EricJoy2048 committed Jul 20, 2023
2 parents bc4b6b9 + 5053f93 commit 755f51b
Show file tree
Hide file tree
Showing 38 changed files with 1,914 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
SOURCE_ALREADY_HAS_DATA("API-10", "The target data source already has data"),
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

/** The Sink Connectors which support data SaveMode should implement this interface */
public interface SupportDataSaveMode {
String SAVE_MODE_KEY = "savemode";
String SAVE_MODE_KEY = "save_mode";
/**
* Return the value of DataSaveMode configured by user in the job config file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,19 @@ void createDatabase(TablePath tablePath, boolean ignoreIfExists)
void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException;

/**
* Truncate an existing table data in this catalog.
*
* @param tablePath Path of the table
* @param ignoreIfNotExists Flag to specify behavior when a table with the given name doesn't
* exist
* @throws TableNotExistException thrown if the table doesn't exist in the catalog and
* ignoreIfNotExists is false
* @throws CatalogException in case of any runtime exception
*/
default void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {}

// todo: Support for update table metadata

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -137,6 +138,33 @@ public void close() throws CatalogException {
LOG.info("Catalog {} closing", catalogName);
}

public void executeSql(String sql) {
Connection connection = defaultConnection;
try (PreparedStatement ps = connection.prepareStatement(sql)) {
// Will there exist concurrent drop for one table?
ps.execute();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
}
}

public boolean isExistsData(String tableFullName) {
Connection connection = defaultConnection;
String sql = String.format("select count(*) from %s;", tableFullName);
try (PreparedStatement ps = connection.prepareStatement(sql)) {
ResultSet resultSet = ps.executeQuery();
if (resultSet == null) {
return false;
}
resultSet.next();
int count = 0;
count = resultSet.getInt(1);
return count > 0;
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
}
}

protected Optional<PrimaryKey> getPrimaryKey(
DatabaseMetaData metaData, String database, String table) throws SQLException {
return getPrimaryKey(metaData, database, table, table);
Expand Down Expand Up @@ -278,8 +306,18 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
}
}

public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
if (!truncateTableInternal(tablePath) && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
}

protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException;

protected abstract boolean truncateTableInternal(TablePath tablePath) throws CatalogException;

@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ public List<String> listTables(String databaseName)
}
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
Expand Down Expand Up @@ -307,7 +317,9 @@ protected boolean dropTableInternal(TablePath tablePath) throws CatalogException
Connection connection = getConnection(dbUrl);
try (PreparedStatement ps =
connection.prepareStatement(
String.format("DROP TABLE IF EXISTS %s;", tablePath.getFullName()))) {
String.format(
"DROP TABLE IF EXISTS %s;",
tablePath.getDatabaseName() + "." + tablePath.getTableName()))) {
// Will there exist concurrent drop for one table?
return ps.execute();
} catch (SQLException e) {
Expand All @@ -316,6 +328,23 @@ protected boolean dropTableInternal(TablePath tablePath) throws CatalogException
}
}

@Override
protected boolean truncateTableInternal(TablePath tablePath) throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
Connection connection = getConnection(dbUrl);
try (PreparedStatement ps =
connection.prepareStatement(
String.format(
"TRUNCATE TABLE %s;",
tablePath.getDatabaseName() + "." + tablePath.getTableName()))) {
// Will there exist concurrent truncate for one table?
return ps.execute();
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed truncating table %s", tablePath.getFullName()), e);
}
}

@Override
protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
try (PreparedStatement ps =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -170,6 +171,20 @@ protected boolean dropTableInternal(TablePath tablePath) throws CatalogException
return false;
}

@Override
protected boolean truncateTableInternal(TablePath tablePath) throws CatalogException {
Connection connection = defaultConnection;
try (PreparedStatement ps =
connection.prepareStatement(
String.format("TRUNCATE TABLE %s;", tablePath.getFullName()))) {
// Will there exist concurrent truncate for one table?
return ps.execute();
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed truncating table %s", tablePath.getFullName()), e);
}
}

@Override
protected boolean createDatabaseInternal(String databaseName) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,24 @@ protected boolean dropTableInternal(TablePath tablePath) throws CatalogException
}
}

@Override
protected boolean truncateTableInternal(TablePath tablePath) throws CatalogException {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());

String schemaName = tablePath.getSchemaName();
String tableName = tablePath.getTableName();

String sql = "TRUNCATE TABLE \"" + schemaName + "\".\"" + tableName + "\"";
Connection connection = getConnection(dbUrl);
try (PreparedStatement ps = connection.prepareStatement(sql)) {
// Will there exist concurrent drop for one table?
return ps.execute();
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed truncating table %s", tablePath.getFullName()), e);
}
}

@Override
protected boolean createDatabaseInternal(String databaseName) throws CatalogException {
String sql = "CREATE DATABASE \"" + databaseName + "\"";
Expand Down
Loading

0 comments on commit 755f51b

Please sign in to comment.