diff --git a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java index 4b3484e..cf4c1f0 100644 --- a/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java +++ b/src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java @@ -42,6 +42,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; +import java.sql.SQLDataException; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -225,7 +226,19 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep throw new RuntimeException(e); } - tables.addAll(indexPatterns); + try (Connection connection = DriverManager.getConnection(baseUrl, username, pwd)) { + for (String indexPattern : indexPatterns) { + try { + retrieveResultSetMetaData(connection, new ObjectPath(databaseName, indexPattern)); + tables.add(indexPattern); + } catch (SQLDataException e) { + LOG.warn(format("Index pattern '%s' not found.", indexPattern)); + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + return tables; } diff --git a/src/main/java/com/getindata/flink/connector/jdbc/catalog/IndexFilterResolver.java b/src/main/java/com/getindata/flink/connector/jdbc/catalog/IndexFilterResolver.java index 8999865..e1219c2 100644 --- a/src/main/java/com/getindata/flink/connector/jdbc/catalog/IndexFilterResolver.java +++ b/src/main/java/com/getindata/flink/connector/jdbc/catalog/IndexFilterResolver.java @@ -43,17 +43,17 @@ private static List parseRaw(String commaSeparatedList) { boolean isAccepted(String objectName) { if (!includePatterns.isEmpty()) { if (includePatterns.stream().noneMatch(pattern -> pattern.matcher(objectName).matches())) { - LOG.info("'{}' does not match any include pattern. Include patterns='{}'.", objectName, includePatterns); + LOG.debug("'{}' does not match any include pattern. Include patterns='{}'.", objectName, includePatterns); return false; } } if (!excludePatterns.isEmpty()) { if (excludePatterns.stream().anyMatch(pattern -> pattern.matcher(objectName).matches())) { - LOG.info("'{}' matches exclude pattern; exclude patterns='{}'.", objectName, excludePatterns); + LOG.debug("'{}' matches exclude pattern; exclude patterns='{}'.", objectName, excludePatterns); return false; } } - LOG.info("'{}' matches include pattern and does not match any exclude pattern.", objectName); + LOG.debug("'{}' matches include pattern and does not match any exclude pattern.", objectName); return true; } diff --git a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java index 7e03ad2..a10d6e7 100644 --- a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java +++ b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java @@ -28,8 +28,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; - public class ElasticCatalogITCase extends ElasticCatalogTestBase { + private static final String INPUT_SINGLE_RECORD_TABLE = "test_single_record_table"; private static final String INPUT_MULTIPLE_RECORDS_TABLE = "test_multiple_records_table"; private static final String INPUT_MISSING_DATE_COL_TABLE = "test_missing_date_col_table"; @@ -99,11 +99,7 @@ private ElasticCatalogBuilder catalogBuilder() { } @Test - public void testListDatabases() throws DatabaseNotExistException, TableNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - + public void testListDatabases() { // when ElasticCatalog catalog = catalogBuilder().build(); @@ -114,11 +110,7 @@ public void testListDatabases() throws DatabaseNotExistException, TableNotExistE } @Test - public void testListTables() throws DatabaseNotExistException, InterruptedException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - + public void testListTables() throws DatabaseNotExistException { // when ElasticCatalog catalog = catalogBuilder().build(); @@ -140,10 +132,6 @@ public void testListTables() throws DatabaseNotExistException, InterruptedExcept @Test public void testTableFiltering() throws DatabaseNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - // when ElasticCatalog catalog = catalogBuilder() .indexFilterResolver(IndexFilterResolver.of("test_m.*", "test_mi.*")) @@ -159,11 +147,7 @@ public void testTableFiltering() throws DatabaseNotExistException { } @Test - public void testTableExists() throws DatabaseNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - + public void testTableExists() { // when ElasticCatalog catalog = catalogBuilder().build(); @@ -193,10 +177,6 @@ public void testTableNotExists() { @Test public void testGetNonPartitionedTable() throws TableNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - // when ElasticCatalog catalog = catalogBuilder().build(); CatalogBaseTable table = catalog.getTable(new ObjectPath( @@ -214,9 +194,7 @@ public void testGetNonPartitionedTable() throws TableNotExistException { @Test public void testGetTablePartitionedByTimestamp() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col"); properties.put("properties.scan.test_multiple_records_table.partition.number", "10"); @@ -238,9 +216,7 @@ public void testGetTablePartitionedByTimestamp() throws TableNotExistException { @Test public void testGetTablePartitionedByInteger() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_multiple_records_table.partition.column.name", "integer_col"); properties.put("properties.scan.test_multiple_records_table.partition.number", "10"); @@ -262,9 +238,7 @@ public void testGetTablePartitionedByInteger() throws TableNotExistException { @Test public void testGetTableDefaultScanOptionsZeroRecords() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("catalog.default.scan.partition.column.name", "date_col"); properties.put("catalog.default.scan.partition.size", "100"); @@ -287,9 +261,7 @@ public void testGetTableDefaultScanOptionsZeroRecords() throws TableNotExistExce @Test public void testFailNoPartitionColumnProvided() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_multiple_records_table.partition.number", "10"); // when @@ -307,9 +279,7 @@ public void testFailNoPartitionColumnProvided() throws TableNotExistException { @Test public void testFailNoPartitionNumberProvided() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col"); // when @@ -327,9 +297,7 @@ public void testFailNoPartitionNumberProvided() throws TableNotExistException { @Test public void testFailNoPartitionColumnInTable() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_missing_date_col_table.partition.column.name", "date_col"); properties.put("properties.scan.test_missing_date_col_table.partition.number", "10"); @@ -348,9 +316,7 @@ public void testFailNoPartitionColumnInTable() throws TableNotExistException { @Test public void testFailPartitionColumnNotSupported() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_single_record_table.partition.column.name", "keyword_col"); properties.put("properties.scan.test_single_record_table.partition.number", "10"); @@ -369,9 +335,7 @@ public void testFailPartitionColumnNotSupported() throws TableNotExistException @Test public void testFailInappropriatePartitionNumber() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col"); properties.put("properties.scan.test_multiple_records_table.partition.number", "0"); @@ -390,10 +354,6 @@ public void testFailInappropriatePartitionNumber() throws TableNotExistException @Disabled @Test public void testUnsupportedDataTypeInTable() throws TableNotExistException { - // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - // when ElasticCatalog catalog = catalogBuilder().build(); try { @@ -409,9 +369,7 @@ public void testUnsupportedDataTypeInTable() throws TableNotExistException { @Test public void testGetTableDefaultCatalogScanPartitionProperties() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("catalog.default.scan.partition.column.name", "date_col"); properties.put("catalog.default.scan.partition.size", "5"); @@ -432,9 +390,7 @@ public void testGetTableDefaultCatalogScanPartitionProperties() throws TableNotE @Test public void testGetTableOverwriteCatalogScanProperties() throws TableNotExistException { // given - String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), - container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_multiple_records_table.partition.column.name", "integer_col"); properties.put("properties.scan.test_multiple_records_table.partition.number", "3"); properties.put("catalog.default.scan.partition.column.name", "date_col"); @@ -458,7 +414,7 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo // given String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.index.patterns", "test_*_record_table"); // when @@ -510,12 +466,25 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo assertEquals(expectedSchema, schema); } + @Test + public void testShouldIgnoreNonexistingIndexPattern() { + // given + Map properties = new HashMap<>(); + properties.put("properties.index.patterns", "non_existing_pattern*"); + + // when + catalogBuilder().properties(properties).build(); + + // then + // no exception thrown + } + @Test public void testGetMultipleIndexPatternPartitionedTables() throws TableNotExistException, DatabaseNotExistException { // given String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(), container.getElasticPort()); - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("properties.scan.test_*_record*_table.partition.column.name", "date_col"); properties.put("properties.scan.test_*_record*_table.partition.number", "10"); properties.put("properties.scan.test_partial_schema_table_*.partition.column.name", "integer_col"); diff --git a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogTestBase.java b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogTestBase.java index 7cc33b9..8decf92 100644 --- a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogTestBase.java +++ b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogTestBase.java @@ -107,6 +107,18 @@ protected static void createTestIndex(String inputTable, String indexPath) throw assertTrue(response.isSuccessful()); } } + protected static void deleteTestIndex(String inputTable) throws Exception { + OkHttpClient client = new OkHttpClient(); + Request request = new Request.Builder() + .url(String.format("http://%s:%d/%s/", container.getHost(), + container.getElasticPort(), inputTable)) + .delete() + .addHeader("Authorization", Credentials.basic(USERNAME, PASSWORD)) + .build(); + try (Response response = client.newCall(request).execute()) { + assertTrue(response.isSuccessful()); + } + } protected static void addTestData(String inputTable, String inputPath) throws Exception { OkHttpClient client = new OkHttpClient(); diff --git a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryTest.java b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryTest.java index a1e9bfa..13116fa 100644 --- a/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryTest.java +++ b/src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryTest.java @@ -2,7 +2,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.factories.CatalogFactory.Context; import org.apache.flink.table.factories.FactoryUtil; import org.junit.jupiter.api.Test; @@ -15,6 +14,7 @@ import static org.junit.Assert.assertTrue; public class ElasticJdbcCatalogFactoryTest extends ElasticCatalogTestBase { + @Test public void testCreateElasticCatalogNoAdditionalOptions() { // given @@ -80,10 +80,13 @@ public void testCreateElasticCatalogDefaultPartitionOptions() { } @Test - public void testCreateElasticCatalogIndexPatternsOptions() throws DatabaseNotExistException { + public void testCreateElasticCatalogIndexPatternsOptions() throws Exception { // given + createTestIndex("test_some_table", "elastic/test-index.json"); + + // and Map options = getCommonOptions(); - options.put("properties.index.patterns", "example_table_*"); + options.put("properties.index.patterns", "test*"); Context catalogContext = new FactoryUtil.DefaultCatalogContext("test-catalog", options, @@ -94,7 +97,10 @@ public void testCreateElasticCatalogIndexPatternsOptions() throws DatabaseNotExi ElasticCatalog catalog = (ElasticCatalog) catalogFactory.createCatalog(catalogContext); // then - assertEquals(singletonList("example_table_*"), catalog.getIndexPatterns()); - assertTrue(catalog.tableExists(new ObjectPath("docker-cluster", "example_table_*"))); + assertEquals(singletonList("test*"), catalog.getIndexPatterns()); + assertTrue(catalog.tableExists(new ObjectPath("docker-cluster", "test*"))); + + // cleanup + deleteTestIndex("test_some_table"); } }