-
Notifications
You must be signed in to change notification settings - Fork 295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added new connector for OpenSearch data source #1335
base: master
Are you sure you want to change the base?
Conversation
7a3a4ad
to
7053f9e
Compare
@@ -849,6 +849,12 @@ else if (value instanceof Integer) { | |||
if (value == null) { | |||
float4Writer.writeNull(); | |||
} | |||
else if (value instanceof java.lang.Integer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these changes have implications for any of the other connectors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They should not. I used similar structure to other data types in BlockUtils::writeSimpleValue
, checking if value is of specific instance helps with writing value to block, doesn't change core behavior.
@@ -42,6 +42,7 @@ public class FieldBuilder | |||
private final boolean nullable; | |||
//Using LinkedHashMap because Apache Arrow makes field order important so honoring that contract here | |||
private final Map<String, Field> children = new LinkedHashMap<>(); | |||
private final Map<String, FieldBuilder> nestedChildren = new LinkedHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you mind explaining this addition? isn't nesting already available because we can have children which are Fields, and those children can themselves have children, etc? or does the existing children behavior not actually work like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after reading through the rest of the PR, let me know if my understanding is correct.
if i have a field like:
"topObj": {
"innerObj": {
"intField": 1,
"strField": "asdf"
},
"int": 5
}
then opensearch will send back each of these fields as their own columns, and our result set will have this:
topObj.int
topObj.innerObj.intField
topObj.innerObj.strField
so the reason for this new nested children field and it's corresponding logic with OpensearchUtils
class is so you can say createNestedStruct
andd pass in topObj.innerObj.intField
(for example), have it find the struct topObj
, and continue to build to the struct definition for topObj?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's correct. It makes it more intuitive to make complex nested fields in my opinion.
@@ -89,6 +89,18 @@ public SchemaBuilder addStructField(String fieldName) | |||
return this; | |||
} | |||
|
|||
/** | |||
* Adds a new Nested STRUCT Field to the Schema as a top-level Field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is for nested structs, did we not support nested structs in any connectors before? Does this impact functionality in any existing connectors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't believe that existing connectors should be impacted, its independent. We do support nested structs in other connectors. The added functions were kind of tailored to OpenSearch use case and I thought it made sense to expand the SchemaBuilder functionality.
readRecordsRequest.getSplit().getProperties()); | ||
try (Connection connection = this.jdbcConnectionFactory.getConnection(getCredentialProvider())) { | ||
connection.setAutoCommit(false); // For consistency. This is needed to be false to enable streaming for some database types. | ||
if (this.getAutoCommit().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I understand if this modification is adding any value. Are you expecting getAutoCommit()
to not always return false in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see, in the opensearch record handler you have it return Optional.empty(). But is there a reason to even have that as an optional wrapping around a boolean, instead of it just being a boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so the reason it's an Optional is because, the presence of a value does not signify true
, we want to maintain the original behavior.
I can go back and do additional testing, however, we did not want to setAutoCommit for the OpenSearch driver.
@@ -168,8 +185,9 @@ public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest rea | |||
rowsReturnedFromDatabase++; | |||
} | |||
LOGGER.info("{} rows returned by database.", rowsReturnedFromDatabase); | |||
|
|||
connection.commit(); | |||
if (this.getAutoCommit().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also not sure what this adds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only need to commit connection if we setAutoCommit, and we don't need to do that for the OpenSearch driver.
// update schema from null because Opensearch doesn't have schema but change later | ||
// resultSet.updateString("TABLE_SCHEM", databaseName); | ||
|
||
// Temporary in schemaName | ||
list.add(getSchemaTableName(resultSet, databaseName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having a hard time understanding the comments here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left over comments. Will get rid of.
Pattern pattern = Pattern.compile(DATA_STREAM_REGEX); | ||
Matcher matcher = pattern.matcher(tableName); | ||
if (matcher.find()) { | ||
tableName = matcher.group(2); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain this? I thought you mentioned data streams are not yet supported. Also, let's avoid magic numbers or add an example in the comment for what this looks like to explain why we match on group 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes will provide embedded example for reference.
But to give a brief explanation here is that a data stream lets you store time series data across multiple indices while giving you a single named resource for requests. The multiple indices that correlate with one data stream have the following naming convention: .ds-<data-stream>-<generation>
where generation
is a six-digit, zero-padded integer.
Therefore, instead of populating the many indices correlating to a single data stream, I only want to display the one data stream and I achieve that through pattern matching.
...search/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchRecordHandler.java
Outdated
Show resolved
Hide resolved
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, null, tableName.getTableName(), schema, constraints, split); | ||
|
||
// Disable fetching all rows. | ||
preparedStatement.setFetchSize(10000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's put this in a constant.
what happens if the input query has over 10k results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting fetch size Gives the JDBC driver a hint as to the number of rows that should be fetched from the database when more rows are needed for ResultSet objects generated by this Statement
.
So what this really controls is the number of network calls, as well as enforces pagination so that SQL Plugin uses v2 instead of legacy engine. Otherwise legacy version only returns 200 rows per table.
athena-opensearch/src/main/java/com/amazonaws/athena/connectors/opensearch/OpensearchUtils.java
Outdated
Show resolved
Hide resolved
0086dcc
to
f66cc58
Compare
f66cc58
to
42ac46f
Compare
42ac46f
to
334e9d7
Compare
What’s the sql/jdbc feature that’s needed from opensearch to bring this closer to merging? Also, are there any performance comparisons done between sql/jdbc vs using a Java or another client? |
Hi, @macohen! We were waiting for the implementation of shard partitioning, which would allow to parallelize the fetching of data with the JDBC driver. Currently the connector implements a single partition solution, which resulted in performance regression compared to current connector version. |
@akuzin1 can you link to the feature request here, please? |
DO NOT MERGE !!! - depends on pending features in OpenSearch sql and sql-jdbc
Description of changes:
This pull request introduces a new OpenSearch connector that implemented the Athena JDBC interface, leveraging the the OpenSearch driver, to enable metadata and data retrieval for the Athena query engine.
Added slight changes to Federation SDK, which should be reviewed and determined whether it is fitting.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.