Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support stateless queries. #184

Merged
merged 6 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/main/java/net/starschema/clouddb/jdbc/BQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ public class BQConnection implements Connection {
/** Boolean to determine whether or not to use legacy sql (default: false) * */
private final boolean useLegacySql;

/**
* Enum that describes whether to create a job in projects that support stateless queries. Copied
* from BigQueryImpl
*/
public static enum JobCreationMode {
goomrw marked this conversation as resolved.
Show resolved Hide resolved
/** If unspecified JOB_CREATION_REQUIRED is the default. */
JOB_CREATION_MODE_UNSPECIFIED,
goomrw marked this conversation as resolved.
Show resolved Hide resolved
/** Default. Job creation is always required. */
JOB_CREATION_REQUIRED,

/**
* Job creation is optional. Returning immediate results is prioritized. BigQuery will
* automatically determine if a Job needs to be created. The conditions under which BigQuery can
* decide to not create a Job are subject to change. If Job creation is required,
* JOB_CREATION_REQUIRED mode should be used, which is the default.
*
* <p>Note that no job ID will be created if the results were returned immediately.
*/
JOB_CREATION_OPTIONAL;

private JobCreationMode() {}
}

/** The job creation mode - */
private JobCreationMode jobCreationMode = JobCreationMode.JOB_CREATION_MODE_UNSPECIFIED;

/** getter for useLegacySql */
public boolean getUseLegacySql() {
return useLegacySql;
Expand Down Expand Up @@ -210,6 +236,9 @@ public BQConnection(String url, Properties loginProp, HttpTransport httpTranspor
this.useQueryCache =
parseBooleanQueryParam(caseInsensitiveProps.getProperty("querycache"), true);

this.jobCreationMode =
parseJobCreationMode(caseInsensitiveProps.getProperty("jobcreationmode"));

// Create Connection to BigQuery
if (serviceAccount) {
try {
Expand Down Expand Up @@ -322,6 +351,21 @@ private static List<String> parseArrayQueryParam(@Nullable String string, Charac
: Arrays.asList(string.split(delimiter + "\\s*"));
}

/**
* Return a {@link JobCreationMode} or raise an exception if the string does not match a variant.
*/
private static JobCreationMode parseJobCreationMode(@Nullable String string)
goomrw marked this conversation as resolved.
Show resolved Hide resolved
throws BQSQLException {
if (string == null) {
return null;
}
try {
return JobCreationMode.valueOf(string);
} catch (IllegalArgumentException e) {
throw new BQSQLException("could not parse " + string + " as job creation mode", e);
}
}

/**
*
*
Expand Down Expand Up @@ -1214,4 +1258,8 @@ public Long getMaxBillingBytes() {
public Integer getTimeoutMs() {
return timeoutMs;
}

public JobCreationMode getJobCreationMode() {
return jobCreationMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class BQForwardOnlyResultSet implements java.sql.ResultSet {
private String projectId;
/** Reference for the Job */
private @Nullable Job completedJob;
/** The BigQuery query ID; set if the query completed without a Job */
private final @Nullable String queryId;
goomrw marked this conversation as resolved.
Show resolved Hide resolved
/** The total number of bytes processed while creating this ResultSet */
private final @Nullable Long totalBytesProcessed;
/** Whether the ResultSet came from BigQuery's cache */
Expand All @@ -127,12 +129,14 @@ public BQForwardOnlyResultSet(
Bigquery bigquery,
String projectId,
@Nullable Job completedJob,
String queryId,
BQStatementRoot bqStatementRoot)
throws SQLException {
this(
bigquery,
projectId,
completedJob,
queryId,
bqStatementRoot,
null,
false,
Expand Down Expand Up @@ -160,6 +164,7 @@ public BQForwardOnlyResultSet(
Bigquery bigquery,
String projectId,
@Nullable Job completedJob,
@Nullable String queryId,
BQStatementRoot bqStatementRoot,
List<TableRow> prefetchedRows,
boolean prefetchedAllRows,
Expand All @@ -172,6 +177,7 @@ public BQForwardOnlyResultSet(
logger.debug("Created forward only resultset TYPE_FORWARD_ONLY");
this.Statementreference = (Statement) bqStatementRoot;
this.completedJob = completedJob;
this.queryId = queryId;
this.projectId = projectId;
if (bigquery == null) {
throw new BQSQLException("Failed to fetch results. Connection is closed.");
Expand Down Expand Up @@ -2992,4 +2998,8 @@ public boolean wasNull() throws SQLException {
return null;
}
}

public @Nullable String getQueryId() {
return queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public ResultSet executeQuery() throws SQLException {
this);
} else {
return new BQForwardOnlyResultSet(
this.connection.getBigquery(), this.projectId, referencedJob, this);
this.connection.getBigquery(), this.projectId, referencedJob, null, this);
}
}
// Pause execution for half second before polling job status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ public class BQScrollableResultSet extends ScrollableResultset<Object>
*/
private final @Nullable List<BiEngineReason> biEngineReasons;

private final JobReference jobReference;
private final @Nullable JobReference jobReference;

/** The BigQuery query ID; set if the query completed without a Job */
private final @Nullable String queryId;

private TableSchema schema;

Expand All @@ -86,7 +89,8 @@ public BQScrollableResultSet(
bigQueryGetQueryResultResponse.getCacheHit(),
null,
null,
bigQueryGetQueryResultResponse.getJobReference());
bigQueryGetQueryResultResponse.getJobReference(),
null);

BigInteger maxrow;
try {
Expand All @@ -104,7 +108,8 @@ public BQScrollableResultSet(
@Nullable Boolean cacheHit,
@Nullable String biEngineMode,
@Nullable List<BiEngineReason> biEngineReasons,
JobReference jobReference) {
@Nullable JobReference jobReference,
@Nullable String queryId) {
logger.debug("Created Scrollable resultset TYPE_SCROLL_INSENSITIVE");
try {
maxFieldSize = bqStatementRoot.getMaxFieldSize();
Expand All @@ -126,6 +131,7 @@ public BQScrollableResultSet(
this.biEngineMode = biEngineMode;
this.biEngineReasons = biEngineReasons;
this.jobReference = jobReference;
this.queryId = queryId;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -302,4 +308,8 @@ public String getString(int columnIndex) throws SQLException {
return null;
}
}

public @Nullable String getQueryId() {
return queryId;
}
}
9 changes: 6 additions & 3 deletions src/main/java/net/starschema/clouddb/jdbc/BQStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
this.connection.getBigquery(),
projectId,
referencedJob,
qr.getQueryId(),
this,
rows,
fetchedAll,
Expand All @@ -234,7 +235,8 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
qr.getCacheHit(),
biEngineMode,
biEngineReasons,
qr.getJobReference());
qr.getJobReference(),
qr.getQueryId());
}
jobAlreadyCompleted = true;
}
Expand Down Expand Up @@ -285,7 +287,7 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
this);
} else {
return new BQForwardOnlyResultSet(
this.connection.getBigquery(), projectId, referencedJob, this);
this.connection.getBigquery(), projectId, referencedJob, null, this);
}
}
// Pause execution for half second before polling job status
Expand Down Expand Up @@ -345,7 +347,8 @@ protected QueryResponse runSyncQuery(String querySql, boolean unlimitedBillingBy
// socket timeouts
(long) getMaxRows(),
this.getAllLabels(),
this.connection.getUseQueryCache());
this.connection.getUseQueryCache(),
this.connection.getJobCreationMode());
syncResponseFromCurrentQuery.set(resp);
this.mostRecentJobReference.set(resp.getJobReference());
} catch (Exception e) {
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/net/starschema/clouddb/jdbc/BQStatementRoot.java
goomrw marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ private int executeDML(String sql) throws SQLException {
(long) querytimeout * 1000,
(long) getMaxRows(),
this.getAllLabels(),
this.connection.getUseQueryCache());
this.connection.getUseQueryCache(),
this.connection.getJobCreationMode());
this.mostRecentJobReference.set(qr.getJobReference());

if (defaultValueIfNull(qr.getJobComplete(), false)) {
Expand Down Expand Up @@ -327,7 +328,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
(long) querytimeout * 1000,
(long) getMaxRows(),
this.getAllLabels(),
this.connection.getUseQueryCache());
this.connection.getUseQueryCache(),
this.connection.getJobCreationMode());
this.mostRecentJobReference.set(qr.getJobReference());

referencedJob =
Expand Down Expand Up @@ -362,7 +364,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
qr.getCacheHit(),
biEngineMode,
biEngineReasons,
referencedJob.getJobReference());
referencedJob.getJobReference(),
qr.getQueryId());
}
jobAlreadyCompleted = true;
}
Expand All @@ -384,7 +387,7 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes)
this);
} else {
return new BQForwardOnlyResultSet(
this.connection.getBigquery(), projectId, referencedJob, this);
this.connection.getBigquery(), projectId, referencedJob, null, this);
}
}
// Pause execution for half second before polling job status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import net.starschema.clouddb.jdbc.BQConnection.JobCreationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -644,7 +645,8 @@ static QueryResponse runSyncQuery(
Long queryTimeoutMs,
Long maxResults,
Map<String, String> labels,
boolean useQueryCache)
boolean useQueryCache,
JobCreationMode jobCreationMode)
throws IOException {
QueryRequest qr =
new QueryRequest()
Expand All @@ -654,6 +656,9 @@ static QueryResponse runSyncQuery(
.setQuery(querySql)
.setUseLegacySql(useLegacySql)
.setMaximumBytesBilled(maxBillingBytes);
if (jobCreationMode != null) {
qr = qr.setJobCreationMode(jobCreationMode.name());
}
if (dataSet != null) {
qr.setDefaultDataset(new DatasetReference().setDatasetId(dataSet).setProjectId(projectId));
}
Expand Down
Loading
Loading