-
Notifications
You must be signed in to change notification settings - Fork 8
/
cdeSpark.py
92 lines (73 loc) · 3.58 KB
/
cdeSpark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# # WriteDF2Parquet
#
# This is the simplest PySpark example.create a dataframe and write it
#
#!pip3 install pyspark
#
# for a new environment you'll need to
# add the userid to the groups that have permission to access ML
# update the id_broker mappings so you have write access to the S3 buckets
# XX do not do this - error when creating two roles for one user: update id_broker to allow ranger access
# sync to freeipa to fix HTTP ERROR 403 forbidden
# update Ranger policy so you can write to the default Hive database, commonly policy #14
# update Ranger policy so you can use URL commonly policy #13
#
from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
# if java is not in the default location you'll need to specify
# spark.executorEnv.JAVA_HOME=/usr/java/yadayada
#
# secret sauce to get rid of java.lang.IllegalStateException:
# Authentication with IDBroker
# failed. Please ensure you have a Kerberos token by using kinit.
#
# use this: .config("spark.yarn.access.hadoopFileSystems","s3a://useyoursdefault-se/datalake/warehouse")\
#
spark = SparkSession\
.builder\
.config('job.local.dir', 'file:///home/cdsw/')\
.appName("WriteDF2Parquet")\
.config("spark.authenticate", "true") \
.config("spark.yarn.access.hadoopFileSystems","s3a://useyourswarehouse/tablespace/")\
.getOrCreate()
# Create a file named spark-defaults.conf in the project or update the existing file with property:
#spark.yarn.access.hadoopFileSystems=s3a://<STORAGE LOCATION OF ENV>
# spark.yarn.access.hadoopFileSystems='s3a://useyoursdefault-se/datalake/warehouse/tablespace/managed/hive/martydropme'
# this is where a create table landed
#(Note: This is the same S3 location as defined under Data Access)
# this error is from Ranger permissions
# AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException:
# MetaException(message:Permission denied:
# user [marty] does not have [SELECT] privilege on [default]);'
# syncfreeipa required after "marty" added as dwadmin
#
foo=spark.sql("show tables")
foo.take(50)
# ranger is cool with the next statement, SDX not so much...
# need the right mapping
#bar=spark.sql("show create table sampletxt")
#bar.take(50)
#bar2=spark.sql("select * from sampletxt")
#bar2.take(50)
# in ranger added marty with full access to default database
# !hadoop fs -rm -r s3a://useyoursdefault-se/datalake/warehouse/tablespace/external/hive/pysparktab
spark.sql("CREATE TABLE IF NOT EXISTS martydataeng (key INT, value STRING) USING hive")
spark.sql("insert into martydataeng values (22,'created in MLx')")
spark.sql("insert into martydataeng values (22,concat ( 'created in martydataeng mlx', current_timestamp() ) )")
spark.sql("select * from martydataeng").take(20)
# DataFrames can be saved as Parquet files, maintaining the schema information.
# old debugging !hadoop fs -rm -r hdfs://se-sandbox-dl-12apr-master0.se-sandb.a465-9q4k.cloudera.site:8020/tmp/age.parquet
# old debugging !rm -rf age.parquet
# try to write without usinig hive
df = spark.createDataFrame([("10", ), ("11", ), ("13", )], ["age"])
df.show()
#!hadoop fs -rm -r s3a://useyours-sandbox-default-se/datalake/martyparquet
# df.write.parquet("s3a://useyours-uat2/warehouse/martyparquetcde")
# this is where a create table landed hdfs://useyoursl-12apr-master0.se-sandb.a465-9q4k.cloudera.site:8020/tmp/age.parquet")
# file:/home/cdsw/age.parquet")
#!hadoop fs -ls s3a://t2/warehouse/martyparquet
#!hadoop fs -ls s3a://useyourscdp-sandbox/datalake/data/warehouse/martyparquet
#spark.stop()