Skip to content

Commit 8aef4f7

Browse files
committed
load_as_spark and load_table_changes_as_spark accept an optional DeltaSharingProfile.
Signed-off-by: Steven Ayers <[email protected]>
1 parent a658e19 commit 8aef4f7

File tree

1 file changed

+57
-2
lines changed

1 file changed

+57
-2
lines changed

python/delta_sharing/delta_sharing.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,24 @@ def load_as_pandas(
134134
def load_as_spark(
135135
url: str,
136136
version: Optional[int] = None,
137-
timestamp: Optional[str] = None
137+
timestamp: Optional[str] = None,
138+
delta_sharing_profile: Optional[DeltaSharingProfile] = None
138139
) -> "PySparkDataFrame": # noqa: F821
139140
"""
140141
Load the shared table using the given url as a Spark DataFrame. `PySpark` must be installed,
141142
and the application must be a PySpark application with the Apache Spark Connector for Delta
142143
Sharing installed. Only one of version/timestamp is supported at one time.
143144
144145
:param url: a url under the format "<profile>#<share>.<schema>.<table>".
146+
:type url: str
145147
:param version: an optional non-negative int. Load the snapshot of table at version.
148+
:type version: Optional[int]
146149
:param timestamp: an optional string. Load the snapshot of table at version corresponding
147150
to the timestamp.
151+
:type timestamp: Optional[str]
152+
:param delta_sharing_profile: The DeltaSharingProfile to use for the connection
153+
:type delta_sharing_profile: Optional[DeltaSharingProfile]
154+
148155
:return: A Spark DataFrame representing the shared table.
149156
"""
150157
try:
@@ -158,6 +165,25 @@ def load_as_spark(
158165
"`load_as_spark` requires running in a PySpark application."
159166
)
160167
df = spark.read.format("deltaSharing")
168+
if delta_sharing_profile is not None:
169+
if delta_sharing_profile.share_credentials_version is not None:
170+
df.option("shareCredentialsVersion", delta_sharing_profile.share_credentials_version)
171+
if delta_sharing_profile.type is not None:
172+
df.option("shareCredentialsType", delta_sharing_profile.type)
173+
if delta_sharing_profile.endpoint is not None:
174+
df.option("endpoint", delta_sharing_profile.endpoint)
175+
if delta_sharing_profile.token_endpoint is not None:
176+
df.option("tokenEndpoint", delta_sharing_profile.token_endpoint)
177+
if delta_sharing_profile.client_id is not None:
178+
df.option("clientId", delta_sharing_profile.client_id)
179+
if delta_sharing_profile.client_secret is not None:
180+
df.option("clientSecret", delta_sharing_profile.client_secret)
181+
if delta_sharing_profile.scope is not None:
182+
df.option("scope", delta_sharing_profile.scope)
183+
if delta_sharing_profile.bearer_token is not None:
184+
df.option("bearerToken", delta_sharing_profile.bearer_token)
185+
if delta_sharing_profile.expiration_time is not None:
186+
df.option("expirationTime", delta_sharing_profile.expiration_time)
161187
if version is not None:
162188
df.option("versionAsOf", version)
163189
if timestamp is not None:
@@ -170,7 +196,8 @@ def load_table_changes_as_spark(
170196
starting_version: Optional[int] = None,
171197
ending_version: Optional[int] = None,
172198
starting_timestamp: Optional[str] = None,
173-
ending_timestamp: Optional[str] = None
199+
ending_timestamp: Optional[str] = None,
200+
delta_sharing_profile: Optional[DeltaSharingProfile] = None
174201
) -> "PySparkDataFrame": # noqa: F821
175202
"""
176203
Load the table changes of a shared table as a Spark DataFrame using the given url.
@@ -181,11 +208,20 @@ def load_table_changes_as_spark(
181208
latest table version for it. The parameter range is inclusive in the query.
182209
183210
:param url: a url under the format "<profile>#<share>.<schema>.<table>".
211+
:type url: str
184212
:param starting_version: The starting version of table changes.
213+
:type starting_version: Optional[int]
185214
:param ending_version: The ending version of table changes.
215+
:type ending_version: Optional[int]
186216
:param starting_timestamp: The starting timestamp of table changes.
217+
:type starting_timestamp: Optional[str]
187218
:param ending_timestamp: The ending timestamp of table changes.
219+
:type ending_timestamp: Optional[str]
220+
:param delta_sharing_profile: The DeltaSharingProfile to use for the connection
221+
:type delta_sharing_profile: Optional[DeltaSharingProfile]
188222
:return: A Spark DataFrame representing the table changes.
223+
224+
189225
"""
190226
try:
191227
from pyspark.sql import SparkSession
@@ -199,6 +235,25 @@ def load_table_changes_as_spark(
199235
"`load_table_changes_as_spark` requires running in a PySpark application."
200236
)
201237
df = spark.read.format("deltaSharing").option("readChangeFeed", "true")
238+
if delta_sharing_profile is not None:
239+
if delta_sharing_profile.share_credentials_version is not None:
240+
df.option("shareCredentialsVersion", delta_sharing_profile.share_credentials_version)
241+
if delta_sharing_profile.type is not None:
242+
df.option("shareCredentialsType", delta_sharing_profile.type)
243+
if delta_sharing_profile.endpoint is not None:
244+
df.option("endpoint", delta_sharing_profile.endpoint)
245+
if delta_sharing_profile.token_endpoint is not None:
246+
df.option("tokenEndpoint", delta_sharing_profile.token_endpoint)
247+
if delta_sharing_profile.client_id is not None:
248+
df.option("clientId", delta_sharing_profile.client_id)
249+
if delta_sharing_profile.client_secret is not None:
250+
df.option("clientSecret", delta_sharing_profile.client_secret)
251+
if delta_sharing_profile.scope is not None:
252+
df.option("scope", delta_sharing_profile.scope)
253+
if delta_sharing_profile.bearer_token is not None:
254+
df.option("bearerToken", delta_sharing_profile.bearer_token)
255+
if delta_sharing_profile.expiration_time is not None:
256+
df.option("expirationTime", delta_sharing_profile.expiration_time)
202257
if starting_version is not None:
203258
df.option("startingVersion", starting_version)
204259
if ending_version is not None:

0 commit comments

Comments
 (0)