Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore non-existing index patterns #36

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -225,7 +226,19 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
throw new RuntimeException(e);
}

tables.addAll(indexPatterns);
try (Connection connection = DriverManager.getConnection(baseUrl, username, pwd)) {
for (String indexPattern : indexPatterns) {
try {
retrieveResultSetMetaData(connection, new ObjectPath(databaseName, indexPattern));
tables.add(indexPattern);
} catch (SQLDataException e) {
LOG.warn(format("Index pattern '%s' not found.", indexPattern));
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}

return tables;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ private static List<Pattern> parseRaw(String commaSeparatedList) {
boolean isAccepted(String objectName) {
if (!includePatterns.isEmpty()) {
if (includePatterns.stream().noneMatch(pattern -> pattern.matcher(objectName).matches())) {
LOG.info("'{}' does not match any include pattern. Include patterns='{}'.", objectName, includePatterns);
LOG.debug("'{}' does not match any include pattern. Include patterns='{}'.", objectName, includePatterns);
return false;
}
}
if (!excludePatterns.isEmpty()) {
if (excludePatterns.stream().anyMatch(pattern -> pattern.matcher(objectName).matches())) {
LOG.info("'{}' matches exclude pattern; exclude patterns='{}'.", objectName, excludePatterns);
LOG.debug("'{}' matches exclude pattern; exclude patterns='{}'.", objectName, excludePatterns);
return false;
}
}
LOG.info("'{}' matches include pattern and does not match any exclude pattern.", objectName);
LOG.debug("'{}' matches include pattern and does not match any exclude pattern.", objectName);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;


public class ElasticCatalogITCase extends ElasticCatalogTestBase {

private static final String INPUT_SINGLE_RECORD_TABLE = "test_single_record_table";
private static final String INPUT_MULTIPLE_RECORDS_TABLE = "test_multiple_records_table";
private static final String INPUT_MISSING_DATE_COL_TABLE = "test_missing_date_col_table";
Expand Down Expand Up @@ -99,11 +99,7 @@ private ElasticCatalogBuilder catalogBuilder() {
}

@Test
public void testListDatabases() throws DatabaseNotExistException, TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());

public void testListDatabases() {
// when
ElasticCatalog catalog = catalogBuilder().build();

Expand All @@ -114,11 +110,7 @@ public void testListDatabases() throws DatabaseNotExistException, TableNotExistE
}

@Test
public void testListTables() throws DatabaseNotExistException, InterruptedException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());

public void testListTables() throws DatabaseNotExistException {
// when
ElasticCatalog catalog = catalogBuilder().build();

Expand All @@ -140,10 +132,6 @@ public void testListTables() throws DatabaseNotExistException, InterruptedExcept

@Test
public void testTableFiltering() throws DatabaseNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());

// when
ElasticCatalog catalog = catalogBuilder()
.indexFilterResolver(IndexFilterResolver.of("test_m.*", "test_mi.*"))
Expand All @@ -159,11 +147,7 @@ public void testTableFiltering() throws DatabaseNotExistException {
}

@Test
public void testTableExists() throws DatabaseNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());

public void testTableExists() {
// when
ElasticCatalog catalog = catalogBuilder().build();

Expand Down Expand Up @@ -193,10 +177,6 @@ public void testTableNotExists() {

@Test
public void testGetNonPartitionedTable() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());

// when
ElasticCatalog catalog = catalogBuilder().build();
CatalogBaseTable table = catalog.getTable(new ObjectPath(
Expand All @@ -214,9 +194,7 @@ public void testGetNonPartitionedTable() throws TableNotExistException {
@Test
public void testGetTablePartitionedByTimestamp() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col");
properties.put("properties.scan.test_multiple_records_table.partition.number", "10");

Expand All @@ -238,9 +216,7 @@ public void testGetTablePartitionedByTimestamp() throws TableNotExistException {
@Test
public void testGetTablePartitionedByInteger() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "integer_col");
properties.put("properties.scan.test_multiple_records_table.partition.number", "10");

Expand All @@ -262,9 +238,7 @@ public void testGetTablePartitionedByInteger() throws TableNotExistException {
@Test
public void testGetTableDefaultScanOptionsZeroRecords() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("catalog.default.scan.partition.column.name", "date_col");
properties.put("catalog.default.scan.partition.size", "100");

Expand All @@ -287,9 +261,7 @@ public void testGetTableDefaultScanOptionsZeroRecords() throws TableNotExistExce
@Test
public void testFailNoPartitionColumnProvided() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_multiple_records_table.partition.number", "10");

// when
Expand All @@ -307,9 +279,7 @@ public void testFailNoPartitionColumnProvided() throws TableNotExistException {
@Test
public void testFailNoPartitionNumberProvided() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col");

// when
Expand All @@ -327,9 +297,7 @@ public void testFailNoPartitionNumberProvided() throws TableNotExistException {
@Test
public void testFailNoPartitionColumnInTable() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_missing_date_col_table.partition.column.name", "date_col");
properties.put("properties.scan.test_missing_date_col_table.partition.number", "10");

Expand All @@ -348,9 +316,7 @@ public void testFailNoPartitionColumnInTable() throws TableNotExistException {
@Test
public void testFailPartitionColumnNotSupported() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_single_record_table.partition.column.name", "keyword_col");
properties.put("properties.scan.test_single_record_table.partition.number", "10");

Expand All @@ -369,9 +335,7 @@ public void testFailPartitionColumnNotSupported() throws TableNotExistException
@Test
public void testFailInappropriatePartitionNumber() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "date_col");
properties.put("properties.scan.test_multiple_records_table.partition.number", "0");

Expand All @@ -390,10 +354,6 @@ public void testFailInappropriatePartitionNumber() throws TableNotExistException
@Disabled
@Test
public void testUnsupportedDataTypeInTable() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());

// when
ElasticCatalog catalog = catalogBuilder().build();
try {
Expand All @@ -409,9 +369,7 @@ public void testUnsupportedDataTypeInTable() throws TableNotExistException {
@Test
public void testGetTableDefaultCatalogScanPartitionProperties() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("catalog.default.scan.partition.column.name", "date_col");
properties.put("catalog.default.scan.partition.size", "5");

Expand All @@ -432,9 +390,7 @@ public void testGetTableDefaultCatalogScanPartitionProperties() throws TableNotE
@Test
public void testGetTableOverwriteCatalogScanProperties() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_multiple_records_table.partition.column.name", "integer_col");
properties.put("properties.scan.test_multiple_records_table.partition.number", "3");
properties.put("catalog.default.scan.partition.column.name", "date_col");
Expand All @@ -458,7 +414,7 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.index.patterns", "test_*_record_table");

// when
Expand Down Expand Up @@ -510,12 +466,25 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo
assertEquals(expectedSchema, schema);
}

@Test
public void testShouldIgnoreNonexistingIndexPattern() {
// given
Map<String, String> properties = new HashMap<>();
properties.put("properties.index.patterns", "non_existing_pattern*");

// when
catalogBuilder().properties(properties).build();

// then
// no exception thrown
}

@Test
public void testGetMultipleIndexPatternPartitionedTables() throws TableNotExistException, DatabaseNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
Map<String, String> properties = new HashMap<>();
properties.put("properties.scan.test_*_record*_table.partition.column.name", "date_col");
properties.put("properties.scan.test_*_record*_table.partition.number", "10");
properties.put("properties.scan.test_partial_schema_table_*.partition.column.name", "integer_col");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ protected static void createTestIndex(String inputTable, String indexPath) throw
assertTrue(response.isSuccessful());
}
}
protected static void deleteTestIndex(String inputTable) throws Exception {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(String.format("http://%s:%d/%s/", container.getHost(),
container.getElasticPort(), inputTable))
.delete()
.addHeader("Authorization", Credentials.basic(USERNAME, PASSWORD))
.build();
try (Response response = client.newCall(request).execute()) {
assertTrue(response.isSuccessful());
}
}

protected static void addTestData(String inputTable, String inputPath) throws Exception {
OkHttpClient client = new OkHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.factories.CatalogFactory.Context;
import org.apache.flink.table.factories.FactoryUtil;
import org.junit.jupiter.api.Test;
Expand All @@ -15,6 +14,7 @@
import static org.junit.Assert.assertTrue;

public class ElasticJdbcCatalogFactoryTest extends ElasticCatalogTestBase {

@Test
public void testCreateElasticCatalogNoAdditionalOptions() {
// given
Expand Down Expand Up @@ -80,10 +80,13 @@ public void testCreateElasticCatalogDefaultPartitionOptions() {
}

@Test
public void testCreateElasticCatalogIndexPatternsOptions() throws DatabaseNotExistException {
public void testCreateElasticCatalogIndexPatternsOptions() throws Exception {
// given
createTestIndex("test_some_table", "elastic/test-index.json");

// and
Map<String, String> options = getCommonOptions();
options.put("properties.index.patterns", "example_table_*");
options.put("properties.index.patterns", "test*");

Context catalogContext = new FactoryUtil.DefaultCatalogContext("test-catalog",
options,
Expand All @@ -94,7 +97,10 @@ public void testCreateElasticCatalogIndexPatternsOptions() throws DatabaseNotExi
ElasticCatalog catalog = (ElasticCatalog) catalogFactory.createCatalog(catalogContext);

// then
assertEquals(singletonList("example_table_*"), catalog.getIndexPatterns());
assertTrue(catalog.tableExists(new ObjectPath("docker-cluster", "example_table_*")));
assertEquals(singletonList("test*"), catalog.getIndexPatterns());
assertTrue(catalog.tableExists(new ObjectPath("docker-cluster", "test*")));

// cleanup
deleteTestIndex("test_some_table");
}
}
Loading