Skip to content

Commit

Permalink
[ZEPPELIN-6080] Add support for bq single region dataset query (apach…
Browse files Browse the repository at this point in the history
…e#4815)

* Add support for bq single region dataset query

* Remove duplicate code

* Resolve review comments

* Updated documentation

* Improved region check for bq interpreter
  • Loading branch information
meharanjan318 authored Sep 9, 2024
1 parent 1641ce1 commit 205014f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 3 deletions.
4 changes: 4 additions & 0 deletions bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.Function;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -90,6 +91,7 @@ public class BigQueryInterpreter extends Interpreter {
static final String WAIT_TIME = "zeppelin.bigquery.wait_time";
static final String MAX_ROWS = "zeppelin.bigquery.max_no_of_rows";
static final String SQL_DIALECT = "zeppelin.bigquery.sql_dialect";
static final String REGION = "zeppelin.bigquery.region";

private static String jobId = null;
private static String projectId = null;
Expand Down Expand Up @@ -227,6 +229,7 @@ private InterpreterResult executeSql(String sql) {
long wTime = Long.parseLong(getProperty(WAIT_TIME));
long maxRows = Long.parseLong(getProperty(MAX_ROWS));
String sqlDialect = getProperty(SQL_DIALECT, "").toLowerCase();
String region = getProperty(REGION, null);
Boolean useLegacySql;
switch (sqlDialect) {
case "standardsql":
Expand All @@ -241,7 +244,7 @@ private InterpreterResult executeSql(String sql) {
}
Iterator<GetQueryResultsResponse> pages;
try {
pages = run(sql, projId, wTime, maxRows, useLegacySql);
pages = run(sql, projId, wTime, maxRows, useLegacySql, region);
} catch (IOException ex) {
LOGGER.error(ex.getMessage());
return new InterpreterResult(Code.ERROR, ex.getMessage());
Expand All @@ -258,8 +261,9 @@ private InterpreterResult executeSql(String sql) {

//Function to run the SQL on bigQuery service
public static Iterator<GetQueryResultsResponse> run(final String queryString,
final String projId, final long wTime, final long maxRows, Boolean useLegacySql)
throws IOException {
final String projId, final long wTime, final long maxRows,
Boolean useLegacySql, final String region)
throws IOException {
try {
LOGGER.info("Use legacy sql: {}", useLegacySql);
QueryResponse query;
Expand All @@ -275,6 +279,9 @@ public static Iterator<GetQueryResultsResponse> run(final String queryString,
GetQueryResults getRequest = service.jobs().getQueryResults(
projectId,
jobId);
if (StringUtils.isNotBlank(region)) {
getRequest = getRequest.setLocation(region);
}
return getPages(getRequest);
} catch (IOException ex) {
throw ex;
Expand Down
7 changes: 7 additions & 0 deletions bigquery/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
"defaultValue": "",
"description": "BigQuery SQL dialect (standardSQL or legacySQL). If empty, query prefix like '#standardSQL' can be used.",
"type": "string"
},
"zeppelin.bigquery.region": {
"envName": null,
"propertyName": "zeppelin.bigquery.region",
"defaultValue": "",
"description": "Location of BigQuery dataset. Needed if it is a single-region dataset.",
"type": "string"
}
},
"editor": {
Expand Down
5 changes: 5 additions & 0 deletions docs/interpreter/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ limitations under the License.
<td></td>
<td>BigQuery SQL dialect (standardSQL or legacySQL). If empty, [query prefix](https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql#sql-prefix) like '#standardSQL' can be used.</td>
</tr>
<tr>
<td>zeppelin.bigquery.region</td>
<td></td>
<td>BigQuery dataset region (Needed for single region dataset)</td>
</tr>
</table>


Expand Down

0 comments on commit 205014f

Please sign in to comment.