Skip to content

Commit

Permalink
get BigQuery pandas working
Browse files Browse the repository at this point in the history
  • Loading branch information
lakshmanok committed Mar 24, 2017
1 parent 1ddc66e commit 40f1f3a
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 40 deletions.
96 changes: 58 additions & 38 deletions courses/unstructured/BigQuery-test-solution.ipynb

Large diffs are not rendered by default.

74 changes: 74 additions & 0 deletions courses/unstructured/bigquery_connect.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/bin/env python

# This sample shows you how you could do this. While you would not execute map-reduce processing on BigQuery in Spark (why? just write a SQL statement), it can be useful if you want to use Spark libraries such as Mlib on data that is in BigQuery.

import json
import pprint
import subprocess
import pyspark

sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_table'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)

# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

(word_counts
.map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
.saveAsTextFile(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'{dataset}.{table} {files}'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)

12 changes: 12 additions & 0 deletions courses/unstructured/create_mycluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

BUCKET=cloud-training-demos-ml

gcloud dataproc clusters create my-cluster --zone us-central1-a \
--master-machine-type n1-standard-1 --master-boot-disk-size 50 \
--num-workers 2 --worker-machine-type n1-standard-1 \
--worker-boot-disk-size 50 --network=default \
--initialization-actions=gs://$BUCKET/unstructured/init-script.sh,gs://$BUCKET/unstructured/datalab.sh


# --initialization-actions=gs://$BUCKET/unstructured/init-script.sh,gs://dataproc-initialization-actions/datalab/datalab.sh
4 changes: 2 additions & 2 deletions courses/unstructured/replace_and_upload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ echo "replacing bucket references to $BUCKET and copying to gs://$BUCKET/unstruc
TEMP=tmp
rm -rf $TEMP
mkdir $TEMP
for FILE in $(ls -1 *.py *.ipynb); do
for FILE in $(ls -1 *.py *.ipynb init*.sh); do
echo $FILE
cat $FILE | sed "s/BUCKET_NAME/$BUCKET/g" > $TEMP/$FILE
cat $FILE | sed "s/BUCKET_NAME/$BUCKET/g" | sed "s/USER_NAME/$USER/g" > $TEMP/$FILE
done

# first the originals, then the modified
Expand Down

0 comments on commit 40f1f3a

Please sign in to comment.