-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathhadoop_utils.py
115 lines (86 loc) · 3.25 KB
/
hadoop_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env python
import hdfs.ext.avro
import hdfs
"""
note that since these helper functions use http, they are designed for simple
(slow) use cases like pulling results of MR jobs on the cluster to another
location, so don't expect to get hdfs/Infiniband performance.
see http://hdfscli.readthedocs.org/en/latest/api.html#module-hdfs.client
"""
HDFS_DEFAULT_PORT = 50070
SAMPA_HDFS_URL = 'http://hadoop.sampa:{}'.format(HDFS_DEFAULT_PORT)
LOCALHOST_HDFS_URL = 'http://localhost:{}'.format(HDFS_DEFAULT_PORT)
def hdfs_client_connection(url=SAMPA_HDFS_URL, root='/'):
client = hdfs.client.InsecureClient(url, root='/')
return client
def hdfs_file_contents(filepath, url=SAMPA_HDFS_URL, buffer_char='\n'):
"""
generates each part of each file in filepath, yielding whenever it sees
a <buffer_char>
"""
client = hdfs_client_connection(url=url)
for line in client.read(filepath, buffer_char=buffer_char):
yield line
raise StopIteration()
def hdfs_dir_contents(directory, url=SAMPA_HDFS_URL, buffer_char='\n'):
"""
performs hdfs_file_contents for each part-XYZ file in directory
"""
client = hdfs_client_connection(url=url)
part_files = client.parts(directory)
for part_file in part_files:
for line in hdfs_file_contents(part_file):
yield line
raise StopIteration()
def hdfs_append_to_file(filepath, text_to_append, url=SAMPA_HDFS_URL):
"""
appends text_to_append to filepath at url
"""
hdfs_touch_file(filepath, url=url)
client = hdfs_client_connection(url=url)
client.append(filepath, text_to_append)
def hdfs_write_to_file(filepath, content, url=SAMPA_HDFS_URL):
"""
this overwrites the contents currently in filepath with content
"""
# may need to set overwrite option here
client = hdfs_client_connection(url=url)
client.write(filepath, content)
def hdfs_touch_file(filepath, url=SAMPA_HDFS_URL):
"""
ensures that filepath exists, creates it if neccessary
"""
try:
hdfs_append_to_file(filepath, '', url)
except hdfs.util.HdfsError as e:
if str(e) == 'File /{} not found.'.format(filepath):
# write creates neccessary dirs / files
hdfs_write_to_file(filepath, '', url)
def hdfs_avro_records(filepath, url=SAMPA_HDFS_URL):
client = hdfs_client_connection(url=url)
avro_reader = hdfs.ext.avro.AvroReader(client, filepath)
for record in avro_reader.records:
yield record
raise StopIteration()
def main():
# example usage:
# first, we'll read the contents of a file
# hdfs_filepath = 'patents/claims_splits/claims.tsv.aa'
# lines = hdfs_file_contents(hdfs_filepath)
# first_line = lines.next()
# print first_line
# now let's try to write
# hdfs_append_to_file('user/zbsimon/temp.txt', 'can I append here?')
# now, try to create new file
# hdfs_touch_file('user/zbsimon/newfile.txt')
# now, read content of entire directory
# hdfs_filepath = 'patents/output/json/ngrams'
# lines = hdfs_dir_contents(hdfs_filepath)
# first_line = lines.next()
# print first_line
# now, do an avro file
# a = hdfs_avro_records_file_content('patents/output/tfidf')
# print a.next()
pass
if __name__ == '__main__':
main()