Skip to content

Commit a72b6f1

Browse files
committed
Ignore non-existing index patterns
If index pattern does not exist, an error is logged instead of failing the entire job.
1 parent 73972f0 commit a72b6f1

File tree

4 files changed

+53
-68
lines changed

4 files changed

+53
-68
lines changed

src/main/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalog.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.sql.PreparedStatement;
4343
import java.sql.ResultSet;
4444
import java.sql.ResultSetMetaData;
45+
import java.sql.SQLDataException;
4546
import java.sql.SQLException;
4647
import java.sql.Statement;
4748
import java.util.ArrayList;
@@ -225,7 +226,19 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
225226
throw new RuntimeException(e);
226227
}
227228

228-
tables.addAll(indexPatterns);
229+
try (Connection connection = DriverManager.getConnection(baseUrl, username, pwd)) {
230+
for (String indexPattern : indexPatterns) {
231+
try {
232+
retrieveResultSetMetaData(connection, new ObjectPath(databaseName, indexPattern));
233+
tables.add(indexPattern);
234+
} catch (SQLDataException e) {
235+
LOG.warn(format("Index pattern '%s' not found.", indexPattern));
236+
}
237+
}
238+
} catch (SQLException e) {
239+
throw new RuntimeException(e);
240+
}
241+
229242
return tables;
230243
}
231244

src/main/java/com/getindata/flink/connector/jdbc/catalog/IndexFilterResolver.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,17 @@ private static List<Pattern> parseRaw(String commaSeparatedList) {
4343
boolean isAccepted(String objectName) {
4444
if (!includePatterns.isEmpty()) {
4545
if (includePatterns.stream().noneMatch(pattern -> pattern.matcher(objectName).matches())) {
46-
LOG.info("'{}' does not match any include pattern. Include patterns='{}'.", objectName, includePatterns);
46+
LOG.debug("'{}' does not match any include pattern. Include patterns='{}'.", objectName, includePatterns);
4747
return false;
4848
}
4949
}
5050
if (!excludePatterns.isEmpty()) {
5151
if (excludePatterns.stream().anyMatch(pattern -> pattern.matcher(objectName).matches())) {
52-
LOG.info("'{}' matches exclude pattern; exclude patterns='{}'.", objectName, excludePatterns);
52+
LOG.debug("'{}' matches exclude pattern; exclude patterns='{}'.", objectName, excludePatterns);
5353
return false;
5454
}
5555
}
56-
LOG.info("'{}' matches include pattern and does not match any exclude pattern.", objectName);
56+
LOG.debug("'{}' matches include pattern and does not match any exclude pattern.", objectName);
5757
return true;
5858
}
5959

src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticCatalogITCase.java

+28-59
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,7 @@ private ElasticCatalogBuilder catalogBuilder() {
9999
}
100100

101101
@Test
102-
public void testListDatabases() throws DatabaseNotExistException, TableNotExistException {
103-
// given
104-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
105-
container.getElasticPort());
106-
102+
public void testListDatabases() {
107103
// when
108104
ElasticCatalog catalog = catalogBuilder().build();
109105

@@ -114,11 +110,7 @@ public void testListDatabases() throws DatabaseNotExistException, TableNotExistE
114110
}
115111

116112
@Test
117-
public void testListTables() throws DatabaseNotExistException, InterruptedException {
118-
// given
119-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
120-
container.getElasticPort());
121-
113+
public void testListTables() throws DatabaseNotExistException {
122114
// when
123115
ElasticCatalog catalog = catalogBuilder().build();
124116

@@ -140,10 +132,6 @@ public void testListTables() throws DatabaseNotExistException, InterruptedExcept
140132

141133
@Test
142134
public void testTableFiltering() throws DatabaseNotExistException {
143-
// given
144-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
145-
container.getElasticPort());
146-
147135
// when
148136
ElasticCatalog catalog = catalogBuilder()
149137
.indexFilterResolver(IndexFilterResolver.of("test_m.*", "test_mi.*"))
@@ -159,11 +147,7 @@ public void testTableFiltering() throws DatabaseNotExistException {
159147
}
160148

161149
@Test
162-
public void testTableExists() throws DatabaseNotExistException {
163-
// given
164-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
165-
container.getElasticPort());
166-
150+
public void testTableExists() {
167151
// when
168152
ElasticCatalog catalog = catalogBuilder().build();
169153

@@ -193,10 +177,6 @@ public void testTableNotExists() {
193177

194178
@Test
195179
public void testGetNonPartitionedTable() throws TableNotExistException {
196-
// given
197-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
198-
container.getElasticPort());
199-
200180
// when
201181
ElasticCatalog catalog = catalogBuilder().build();
202182
CatalogBaseTable table = catalog.getTable(new ObjectPath(
@@ -214,9 +194,7 @@ public void testGetNonPartitionedTable() throws TableNotExistException {
214194
@Test
215195
public void testGetTablePartitionedByTimestamp() throws TableNotExistException {
216196
// given
217-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
218-
container.getElasticPort());
219-
Map<String, String> properties = new HashMap<String, String>();
197+
Map<String, String> properties = new HashMap<>();
220198
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col");
221199
properties.put("properties.scan.test_multiple_records_table.partition.number", "10");
222200

@@ -238,9 +216,7 @@ public void testGetTablePartitionedByTimestamp() throws TableNotExistException {
238216
@Test
239217
public void testGetTablePartitionedByInteger() throws TableNotExistException {
240218
// given
241-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
242-
container.getElasticPort());
243-
Map<String, String> properties = new HashMap<String, String>();
219+
Map<String, String> properties = new HashMap<>();
244220
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "integer_col");
245221
properties.put("properties.scan.test_multiple_records_table.partition.number", "10");
246222

@@ -262,9 +238,7 @@ public void testGetTablePartitionedByInteger() throws TableNotExistException {
262238
@Test
263239
public void testGetTableDefaultScanOptionsZeroRecords() throws TableNotExistException {
264240
// given
265-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
266-
container.getElasticPort());
267-
Map<String, String> properties = new HashMap<String, String>();
241+
Map<String, String> properties = new HashMap<>();
268242
properties.put("catalog.default.scan.partition.column.name", "date_col");
269243
properties.put("catalog.default.scan.partition.size", "100");
270244

@@ -287,9 +261,7 @@ public void testGetTableDefaultScanOptionsZeroRecords() throws TableNotExistExce
287261
@Test
288262
public void testFailNoPartitionColumnProvided() throws TableNotExistException {
289263
// given
290-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
291-
container.getElasticPort());
292-
Map<String, String> properties = new HashMap<String, String>();
264+
Map<String, String> properties = new HashMap<>();
293265
properties.put("properties.scan.test_multiple_records_table.partition.number", "10");
294266

295267
// when
@@ -307,9 +279,7 @@ public void testFailNoPartitionColumnProvided() throws TableNotExistException {
307279
@Test
308280
public void testFailNoPartitionNumberProvided() throws TableNotExistException {
309281
// given
310-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
311-
container.getElasticPort());
312-
Map<String, String> properties = new HashMap<String, String>();
282+
Map<String, String> properties = new HashMap<>();
313283
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col");
314284

315285
// when
@@ -327,9 +297,7 @@ public void testFailNoPartitionNumberProvided() throws TableNotExistException {
327297
@Test
328298
public void testFailNoPartitionColumnInTable() throws TableNotExistException {
329299
// given
330-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
331-
container.getElasticPort());
332-
Map<String, String> properties = new HashMap<String, String>();
300+
Map<String, String> properties = new HashMap<>();
333301
properties.put("properties.scan.test_missing_date_col_table.partition.column.name", "date_col");
334302
properties.put("properties.scan.test_missing_date_col_table.partition.number", "10");
335303

@@ -348,9 +316,7 @@ public void testFailNoPartitionColumnInTable() throws TableNotExistException {
348316
@Test
349317
public void testFailPartitionColumnNotSupported() throws TableNotExistException {
350318
// given
351-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
352-
container.getElasticPort());
353-
Map<String, String> properties = new HashMap<String, String>();
319+
Map<String, String> properties = new HashMap<>();
354320
properties.put("properties.scan.test_single_record_table.partition.column.name", "keyword_col");
355321
properties.put("properties.scan.test_single_record_table.partition.number", "10");
356322

@@ -369,9 +335,7 @@ public void testFailPartitionColumnNotSupported() throws TableNotExistException
369335
@Test
370336
public void testFailInappropriatePartitionNumber() throws TableNotExistException {
371337
// given
372-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
373-
container.getElasticPort());
374-
Map<String, String> properties = new HashMap<String, String>();
338+
Map<String, String> properties = new HashMap<>();
375339
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col");
376340
properties.put("properties.scan.test_multiple_records_table.partition.number", "0");
377341

@@ -390,10 +354,6 @@ public void testFailInappropriatePartitionNumber() throws TableNotExistException
390354
@Disabled
391355
@Test
392356
public void testUnsupportedDataTypeInTable() throws TableNotExistException {
393-
// given
394-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
395-
container.getElasticPort());
396-
397357
// when
398358
ElasticCatalog catalog = catalogBuilder().build();
399359
try {
@@ -409,9 +369,7 @@ public void testUnsupportedDataTypeInTable() throws TableNotExistException {
409369
@Test
410370
public void testGetTableDefaultCatalogScanPartitionProperties() throws TableNotExistException {
411371
// given
412-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
413-
container.getElasticPort());
414-
Map<String, String> properties = new HashMap<String, String>();
372+
Map<String, String> properties = new HashMap<>();
415373
properties.put("catalog.default.scan.partition.column.name", "date_col");
416374
properties.put("catalog.default.scan.partition.size", "5");
417375

@@ -432,9 +390,7 @@ public void testGetTableDefaultCatalogScanPartitionProperties() throws TableNotE
432390
@Test
433391
public void testGetTableOverwriteCatalogScanProperties() throws TableNotExistException {
434392
// given
435-
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
436-
container.getElasticPort());
437-
Map<String, String> properties = new HashMap<String, String>();
393+
Map<String, String> properties = new HashMap<>();
438394
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "integer_col");
439395
properties.put("properties.scan.test_multiple_records_table.partition.number", "3");
440396
properties.put("catalog.default.scan.partition.column.name", "date_col");
@@ -458,7 +414,7 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo
458414
// given
459415
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
460416
container.getElasticPort());
461-
Map<String, String> properties = new HashMap<String, String>();
417+
Map<String, String> properties = new HashMap<>();
462418
properties.put("properties.index.patterns", "test_*_record_table");
463419

464420
// when
@@ -510,12 +466,25 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo
510466
assertEquals(expectedSchema, schema);
511467
}
512468

469+
@Test
470+
public void testShouldIgnoreNonexistingIndexPattern() {
471+
// given
472+
Map<String, String> properties = new HashMap<>();
473+
properties.put("properties.index.patterns", "non_existing_pattern*");
474+
475+
// when
476+
catalogBuilder().properties(properties).build();
477+
478+
// then
479+
// no exception thrown
480+
}
481+
513482
@Test
514483
public void testGetMultipleIndexPatternPartitionedTables() throws TableNotExistException, DatabaseNotExistException {
515484
// given
516485
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
517486
container.getElasticPort());
518-
Map<String, String> properties = new HashMap<String, String>();
487+
Map<String, String> properties = new HashMap<>();
519488
properties.put("properties.scan.test_*_record*_table.partition.column.name", "date_col");
520489
properties.put("properties.scan.test_*_record*_table.partition.number", "10");
521490
properties.put("properties.scan.test_partial_schema_table_*.partition.column.name", "integer_col");

src/test/java/com/getindata/flink/connector/jdbc/catalog/ElasticJdbcCatalogFactoryTest.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import org.apache.flink.configuration.Configuration;
44
import org.apache.flink.table.catalog.ObjectPath;
5-
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
65
import org.apache.flink.table.factories.CatalogFactory.Context;
76
import org.apache.flink.table.factories.FactoryUtil;
87
import org.junit.jupiter.api.Test;
@@ -15,6 +14,7 @@
1514
import static org.junit.Assert.assertTrue;
1615

1716
public class ElasticJdbcCatalogFactoryTest extends ElasticCatalogTestBase {
17+
1818
@Test
1919
public void testCreateElasticCatalogNoAdditionalOptions() {
2020
// given
@@ -80,10 +80,13 @@ public void testCreateElasticCatalogDefaultPartitionOptions() {
8080
}
8181

8282
@Test
83-
public void testCreateElasticCatalogIndexPatternsOptions() throws DatabaseNotExistException {
83+
public void testCreateElasticCatalogIndexPatternsOptions() throws Exception {
8484
// given
85+
createTestIndex("test_some_table", "elastic/test-index.json");
86+
87+
// and
8588
Map<String, String> options = getCommonOptions();
86-
options.put("properties.index.patterns", "example_table_*");
89+
options.put("properties.index.patterns", "test*");
8790

8891
Context catalogContext = new FactoryUtil.DefaultCatalogContext("test-catalog",
8992
options,
@@ -94,7 +97,7 @@ public void testCreateElasticCatalogIndexPatternsOptions() throws DatabaseNotExi
9497
ElasticCatalog catalog = (ElasticCatalog) catalogFactory.createCatalog(catalogContext);
9598

9699
// then
97-
assertEquals(singletonList("example_table_*"), catalog.getIndexPatterns());
98-
assertTrue(catalog.tableExists(new ObjectPath("docker-cluster", "example_table_*")));
100+
assertEquals(singletonList("test*"), catalog.getIndexPatterns());
101+
assertTrue(catalog.tableExists(new ObjectPath("docker-cluster", "test*")));
99102
}
100103
}

0 commit comments

Comments
 (0)