4
4
5
5
import datetime
6
6
import json
7
+ from urllib .parse import urlparse
8
+ from enum import Enum
9
+
7
10
import streamsx .spl .op
8
11
import streamsx .spl .types
9
-
10
12
from streamsx .topology .schema import CommonSchema , StreamSchema
11
- from urllib .parse import urlparse
12
13
from streamsx .toolkits import download_toolkit
13
- from enum import Enum
14
14
15
15
16
16
_TOOLKIT_NAME = 'com.ibm.streamsx.hdfs'
33
33
``'tuple<rstring fileName>'``
34
34
"""
35
35
36
-
37
-
38
- def _add_toolkit_dependency (topo , version ):
36
+ def _add_toolkit_dependency (topo ):
39
37
# IMPORTANT: Dependency of this python wrapper to a specific toolkit version
40
38
# This is important when toolkit is not set with streamsx.spl.toolkit.add_toolkit (selecting toolkit from remote build service)
41
- streamsx .spl .toolkit .add_toolkit_dependency (topo , _TOOLKIT_NAME , version )
42
-
39
+ streamsx .spl .toolkit .add_toolkit_dependency (topo , _TOOLKIT_NAME , '[5.0.0,6.0.0)' )
43
40
44
- def _read_ae_service_credentials (credentials ):
41
+ def _read_service_credentials (credentials ):
45
42
hdfs_uri = ""
46
43
user = ""
47
44
password = ""
@@ -64,6 +61,23 @@ def _read_ae_service_credentials(credentials):
64
61
uri_parsed = urlparse (hdfs_uri )
65
62
hdfs_uri = 'webhdfs://' + uri_parsed .netloc
66
63
return hdfs_uri , user , password
64
+
65
+ def _check_vresion_credentials (credentials , _op , topology ):
66
+ # check streamsx.hdfs version
67
+ _add_toolkit_dependency (topology )
68
+
69
+ if isinstance (credentials , dict ):
70
+ hdfs_uri , user , password = _read_service_credentials (credentials )
71
+ _op .params ['hdfsUri' ] = hdfs_uri
72
+ _op .params ['hdfsUser' ] = user
73
+ _op .params ['hdfsPassword' ] = password
74
+ #check if the credentials is a valid JSON string
75
+ elif _is_a_valid_json (credentials ):
76
+ _op .params ['credentials' ] = credentials
77
+ else :
78
+ # expect core-site.xml file in credentials param
79
+ topology .add_file_dependency (credentials , 'etc' )
80
+ _op .params ['configPath' ] = 'etc'
67
81
68
82
def _check_time_param (time_value , parameter_name ):
69
83
if isinstance (time_value , datetime .timedelta ):
@@ -76,6 +90,15 @@ def _check_time_param(time_value, parameter_name):
76
90
raise ValueError ("Invalid " + parameter_name + " value. Value must be at least one second." )
77
91
return result
78
92
93
+ def _is_a_valid_json (credentials ):
94
+ # checking if the input string is a valid JSON string
95
+ try :
96
+ json .loads (credentials )
97
+ return 1
98
+ except :
99
+ pass
100
+ return 0
101
+
79
102
class CopyDirection (Enum ):
80
103
"""Defines File Copy directions for HDFS2FileCopy.
81
104
@@ -184,6 +207,7 @@ def download_toolkit(url=None, target_dir=None):
184
207
_toolkit_location = streamsx .toolkits .download_toolkit (toolkit_name = _TOOLKIT_NAME , url = url , target_dir = target_dir )
185
208
return _toolkit_location
186
209
210
+
187
211
188
212
def scan (topology , credentials , directory , pattern = None , init_delay = None , name = None ):
189
213
"""Scans a Hadoop Distributed File System directory for new or modified files.
@@ -205,19 +229,7 @@ def scan(topology, credentials, directory, pattern=None, init_delay=None, name=N
205
229
206
230
_op = _HDFS2DirectoryScan (topology , directory = directory , pattern = pattern , schema = DirectoryScanSchema , name = name )
207
231
208
- if isinstance (credentials , dict ):
209
- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
210
- _op .params ['hdfsUri' ] = hdfs_uri
211
- _op .params ['hdfsUser' ] = user
212
- _op .params ['hdfsPassword' ] = password
213
- else :
214
- # JSON string
215
- if credentials .startswith ('{' ):
216
- _op .params ['credentials' ] = credentials
217
- else :
218
- # expect core-site.xml file in credentials param
219
- topology .add_file_dependency (credentials , 'etc' )
220
- _op .params ['configPath' ] = 'etc'
232
+ _check_vresion_credentials (credentials , _op , topology )
221
233
222
234
if init_delay is not None :
223
235
_op .params ['initDelay' ] = streamsx .spl .types .float64 (_check_time_param (init_delay , 'init_delay' ))
@@ -241,20 +253,8 @@ def read(stream, credentials, schema=CommonSchema.String, name=None):
241
253
"""
242
254
243
255
_op = _HDFS2FileSource (stream , schema = schema , name = name )
244
-
245
- if isinstance (credentials , dict ):
246
- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
247
- _op .params ['hdfsUri' ] = hdfs_uri
248
- _op .params ['hdfsUser' ] = user
249
- _op .params ['hdfsPassword' ] = password
250
- else :
251
- # JSON string
252
- if credentials .startswith ('{' ):
253
- _op .params ['credentials' ] = credentials
254
- else :
255
- # expect core-site.xml file in credentials param
256
- stream .topology .add_file_dependency (credentials , 'etc' )
257
- _op .params ['configPath' ] = 'etc'
256
+
257
+ _check_vresion_credentials (credentials , _op , stream .topology )
258
258
259
259
return _op .outputs [0 ]
260
260
@@ -291,25 +291,14 @@ def write(stream, credentials, file=None, fileAttributeName=None, schema=None, t
291
291
Returns:
292
292
Output Stream with schema :py:const:`~streamsx.hdfs.FileInfoSchema`.
293
293
"""
294
-
295
294
# check bytes_per_file, time_per_file and tuples_per_file parameters
296
295
if (time_per_file is not None and tuples_per_file is not None ) or (tuples_per_file is not None and bytes_per_file is not None ) or (time_per_file is not None and bytes_per_file is not None ):
297
296
raise ValueError ("The parameters are mutually exclusive: bytes_per_file, time_per_file, tuples_per_file" )
298
297
299
298
_op = _HDFS2FileSink (stream , file = file , fileAttributeName = fileAttributeName , schema = FileInfoSchema , name = name )
299
+
300
+ _check_vresion_credentials (credentials , _op , stream .topology )
300
301
301
- if isinstance (credentials , dict ):
302
- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
303
- _op .params ['hdfsUri' ] = hdfs_uri
304
- _op .params ['hdfsUser' ] = user
305
- _op .params ['hdfsPassword' ] = password
306
- else :
307
- if credentials .startswith ('{' ):
308
- _op .params ['credentials' ] = credentials
309
- else :
310
- # expect core-site.xml file in credentials param
311
- stream .topology .add_file_dependency (credentials , 'etc' )
312
- _op .params ['configPath' ] = 'etc'
313
302
314
303
if time_per_file is None and tuples_per_file is None and bytes_per_file is None :
315
304
_op .params ['closeOnPunct' ] = _op .expression ('true' )
@@ -343,20 +332,9 @@ def copy(stream, credentials, direction, hdfsFile=None, hdfsFileAttrName=None, l
343
332
Direction = _convert_copy_direction_string_to_enum (direction )
344
333
345
334
_op = _HDFS2FileCopy (stream , direction = Direction , hdfsFileAttrName = hdfsFileAttrName , localFile = localFile , schema = FileCopySchema , name = name )
346
-
347
- if isinstance (credentials , dict ):
348
- hdfs_uri , user , password = _read_ae_service_credentials (credentials )
349
- _op .params ['hdfsUri' ] = hdfs_uri
350
- _op .params ['hdfsUser' ] = user
351
- _op .params ['hdfsPassword' ] = password
352
- else :
353
- if credentials .startswith ('{' ):
354
- _op .params ['credentials' ] = credentials
355
- else :
356
- # expect core-site.xml file in credentials param
357
- stream .topology .add_file_dependency (credentials , 'etc' )
358
- _op .params ['configPath' ] = 'etc'
359
335
336
+ _check_vresion_credentials (credentials , _op , stream .topology )
337
+
360
338
return _op .outputs [0 ]
361
339
362
340
0 commit comments