-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschema.py
95 lines (76 loc) · 3.36 KB
/
schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from transformer import Column, Aggregate, ParsingException
class Schema(object):
"""
Schemas tell me what input data looks like and how I should process it. For
example if you have this data:
age,liabilities,assets,fname ,lname ,date
22 ,0 ,200 ,"George","Lucas",2012-05-02
55 ,2000 ,40 ,"Oscar" ,"Wilde".2012-02-12
And you just want everyone and their age, you could use this schema:
class Person(Schema):
LastName = Column("lname")
Age = Column("age")
_ordering = [LastName, Age]
All schemas contain elements and an ordering list. Each element can be
[[Column]] like above, or an [[Aggregate]]. Aggregates are combinations of
Columns, like so:
FullName = Aggregate(
sources=[LastName, Column("fname")]
)
This should return "Lucas George" for the `FullName` column. You can
specify your own mergers that meld n inputs into one output. """
_ordering = []
"""
The `_ordering` variable tells which fields to include in the output and in
what order. You don't have to include all the fields you declared.
"""
@classmethod
def transform(cls, document, dictionary=False, ignore_empty=True):
"""
The `transform()` function applies a schema to some data. You can
specify `dictionary=True` to return each line as a `dict:colname->value`
rather than a list. You can also specify `ignore_empty` to not process
empty columns.
"""
#Headers are not data and are skipped.
if hasattr(document, "header"):
document.reader.next()
"""
If the columns are not given titles, we have to name them
automatically. To do this, we can traverse upwards from the class MRO,
looking at each superclass dict for Columns and Aggregates.
"""
for search_class in reversed(cls.__mro__):
for k,v in search_class.__dict__.iteritems():
if isinstance(v, Column) or isinstance(v, Aggregate):
if not v.title:
v.title = k
for line_num, line in enumerate(document.reader):
new_line = []
if document.encoding:
line = [x.decode(document.encoding) for x in line]
"""
Then we process each column and if there is an error, we annotate
it nicely with line numbers.
"""
for column in cls._ordering:
raw_data = column.fetch_data(document, line)
if raw_data == "" and ignore_empty:
new_line += [None]
continue
try:
if isinstance(column, Column):
data = column.transform_column(raw_data)
elif isinstance(column, Aggregate):
data = column.merge_aggregate(raw_data)
except ParsingException as pe:
pe.document = document
pe.line = line
pe.line_number = line_num
print pe
raise pe
new_line += [data]
if dictionary:
yield dict(zip([col.title for col in cls._ordering], new_line))
else:
yield new_line