Skip to content

Commit

Permalink
cleanup Unions and UnionAlls. UnionAll is a bag op, Union is a set op…
Browse files Browse the repository at this point in the history
…. "+" is the same as UnionAll.
  • Loading branch information
jingjingwang committed Jul 10, 2016
1 parent 0231c09 commit 72ce4ae
Show file tree
Hide file tree
Showing 18 changed files with 148 additions and 58 deletions.
13 changes: 4 additions & 9 deletions examples/naivebayes/generate_parse.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
else:
parse_template = "input_sp{i} = select id, INT(input.x{i}/{bwidth}) as value, {i} as index from input;"


union_template = "input_sp{l}{r} = UNIONALL(input_sp{lm}{rm}, input_sp{i});"

if y==1:
print "input = SCAN(trainingdata);"
else:
Expand All @@ -21,9 +18,7 @@
for i in range(nfeat):
print parse_template.format(i=i, bwidth=bwidth)

print union_template.format(i=1, l=0, r=1, lm=0, rm='')

for i in range(2, nfeat-1):
print union_template.format(i=i, l=0, r=i, lm=0, rm=i-1)

print union_template.format(i=nfeat-1, l='', r='', lm=0, rm=nfeat-2)
inputs = []
for i in range(nfeat):
inputs.append("input_sp%d" % i)
print "input_sp = UNIONALL(%s);" % ', '.join(inputs)
5 changes: 1 addition & 4 deletions examples/naivebayes/nb_classify.myl
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ input_sp1 = select input.id as id, input.x1 as value, 1 as index from input;
input_sp2 = select input.id as id, input.x2 as value, 2 as index from input;
input_sp3 = select input.id as id, input.x3 as value, 3 as index from input;
input_sp4 = select input.id as id, input.x4 as value, 4 as index from input;
input_sp01 = UNIONALL(input_sp0, input_sp1);
input_sp02 = UNIONALL(input_sp01, input_sp2);
input_sp03 = UNIONALL(input_sp02, input_sp3);
input_sp = UNIONALL(input_sp03, input_sp4);
input_sp = UNIONALL(input_sp0, input_sp1, input_sp2, input_sp3, input_sp4);

-- calculate probability of outcomes
Poe = select input_sp.id as inputId,
Expand Down
6 changes: 1 addition & 5 deletions examples/samplescan.myl
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,5 @@ T4 = samplescan(public:adhoc:employee, .5%, WoR);
T5 = samplescan(public:adhoc:employee, 1);
T6 = samplescan(public:adhoc:employee, 1%);

T = unionall(T1, T2);
T = unionall(T, T3);
T = unionall(T, T4);
T = unionall(T, T5);
T = unionall(T, T6);
T = unionall(T1, T2, T3, T4, T5, T6);
Store(T, samplescanquery);
24 changes: 19 additions & 5 deletions raco/algebra.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,23 +421,37 @@ def shortStr(self):
return self.opname()


class UnionAll(IdenticalSchemeBinaryOperator):
class UnionAll(NaryOperator):

"""Bag union."""

def __init__(self, left=None, right=None):
BinaryOperator.__init__(self, left, right)
def __init__(self, children=None):
NaryOperator.__init__(self, children)

def partitioning(self):
return RepresentationProperties()

def num_tuples(self):
return self.left.num_tuples() + self.right.num_tuples()
sum = 0
for op in self.args:
sum = sum + op.num_tuples()
return sum

def copy(self, other):
"""deep copy"""
BinaryOperator.copy(self, other)
NaryOperator.copy(self, other)

def shortStr(self):
return self.opname()

def scheme(self):
for child in self.args:
assert all(
la[1] == ra[1] for la, ra in zip(child.scheme(), self.args[0].scheme())), \
"Must be same scheme types: {left} != {right}".format(
left=child.scheme(), right=self.args[0].scheme())
return self.args[0].scheme()


class Intersection(IdenticalSchemeBinaryOperator):

Expand Down
6 changes: 5 additions & 1 deletion raco/backends/cpp/cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ def indentby(code, level):

# iteration over table + insertion into hash table with filter

class CUnion(cppcommon.CBaseUnion, CCOperator):
pass


class CUnionAll(cppcommon.CBaseUnionAll, CCOperator):
pass

Expand Down Expand Up @@ -548,7 +552,7 @@ def clangify(emit_print):
rules.OneToOne(algebra.Project, CProject),
rules.OneToOne(algebra.UnionAll, CUnionAll),
# TODO: obviously breaks semantics
rules.OneToOne(algebra.Union, CUnionAll),
rules.OneToOne(algebra.Union, CUnion),
cppcommon.StoreToBaseCStore(emit_print, CStore),
rules.OneToOne(algebra.Sink, CSink),

Expand Down
25 changes: 24 additions & 1 deletion raco/backends/cpp/cppcommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def createTupleTypeConversion(lang, state, input_tuple, result_tuple):
)


class CBaseUnionAll(Pipelined, algebra.Union):
class CBaseUnion(Pipelined, algebra.Union):

def produce(self, state):
self.unifiedTupleType = self.new_tuple_ref(gensym(), self.scheme())
Expand All @@ -477,6 +477,29 @@ def consume(self, t, src, state):
return assignment_code + inner_plan_compiled


class CBaseUnionAll(Pipelined, algebra.UnionAll):

def produce(self, state):
self.unifiedTupleType = self.new_tuple_ref(gensym(), self.scheme())
state.addDeclarations([self.unifiedTupleType.generateDefinition()])

for arg in self.args:
arg.produce(state)

def consume(self, t, src, state):
unified_tuple = self.unifiedTupleType

assignment_code = \
createTupleTypeConversion(self.language(),
state,
t,
unified_tuple)

inner_plan_compiled = \
self.parent().consume(self.unifiedTupleType, self, state)
return assignment_code + inner_plan_compiled


class CBaseApply(Pipelined, algebra.Apply):

def produce(self, state):
Expand Down
33 changes: 29 additions & 4 deletions raco/backends/myria/myria.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ def compileme(self, inputid):

class MyriaUnionAll(algebra.UnionAll, MyriaOperator):

def compileme(self, leftid, rightid):
def compileme(self, *args):
return {
"opType": "UnionAll",
"argChildren": [leftid, rightid]
"argChildren": args
}


Expand Down Expand Up @@ -1468,8 +1468,12 @@ def fire(self, op):
if not isinstance(child, MyriaUnionAll):
return op

left = child.left
right = child.right
# TODO: handle multiple children.
if len(child.args) != 2:
return op

left = child.args[0]
right = child.args[1]
rel_name = op.name

is_scan = lambda op: isinstance(
Expand Down Expand Up @@ -1695,6 +1699,26 @@ class MyriaAlgebra(Algebra):
)


class FlattenUnionAll(rules.Rule):

@staticmethod
def collect_children(op):
if isinstance(op, algebra.UnionAll):
children = []
for child in op.args:
children += FlattenUnionAll.collect_children(child)
return children
return [op]

def fire(self, op):
if not isinstance(op, algebra.UnionAll):
return op
children = FlattenUnionAll.collect_children(op)
if len(children) == 1:
return children[0]
return algebra.UnionAll(children)


class MyriaLeftDeepTreeAlgebra(MyriaAlgebra):

"""Myria physical algebra using left deep tree pipeline and 1-D shuffle"""
Expand All @@ -1716,6 +1740,7 @@ def opt_rules(self, **kwargs):
distributed_group_by(MyriaGroupBy),
[rules.PushApply()],
[LogicalSampleToDistributedSample()],
[FlattenUnionAll()],
]

if kwargs.get('push_sql', False):
Expand Down
8 changes: 6 additions & 2 deletions raco/backends/radish/radish.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,10 @@ class GrappaUnionAll(cppcommon.CBaseUnionAll, GrappaOperator):
pass


class GrappaUnion(cppcommon.CBaseUnion, GrappaOperator):
pass


# Basic materialized copy based project like serial C++
class GrappaProject(cppcommon.CBaseProject, GrappaOperator):
pass
Expand Down Expand Up @@ -2417,7 +2421,7 @@ def iteratorfy(emit_print, scan_array_repr, groupby_class):
# rules.OneToOne(algebra.Project, GrappaProject),
# rules.OneToOne(algebra.UnionAll, GrappaUnionAll),
# TODO: obviously breaks semantics
# rules.OneToOne(algebra.Union, GrappaUnionAll),
# rules.OneToOne(algebra.Union, GrappaUnion),
cppcommon.StoreToBaseCStore(emit_print, IGrappaStore),
CrossProductWithSmall(IGrappaBroadcastCrossProduct),
DistinctToGroupby(groupby_class)
Expand Down Expand Up @@ -2448,7 +2452,7 @@ def grappify(join_type, emit_print,
rules.OneToOne(algebra.Shuffle, GrappaShuffle),
rules.OneToOne(algebra.UnionAll, GrappaUnionAll),
# TODO: obviously breaks semantics
rules.OneToOne(algebra.Union, GrappaUnionAll),
rules.OneToOne(algebra.Union, GrappaUnion),
cppcommon.StoreToBaseCStore(emit_print, GrappaStore),
CrossProductWithSmall(),
rules.OneToOne(algebra.Sink, GrappaSink),
Expand Down
2 changes: 2 additions & 0 deletions raco/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def scheme_write_to_file(cls, path, new_rel_key, new_rel_schema,
current_dict = literal_eval(s)
current_dict[new_rel_key] = columns
json.dump(current_dict, schema_write)
schema_write.write("\n")
schema_write.close()
else:
raise IOError("file {0} exists".format(path))
Expand All @@ -186,6 +187,7 @@ def scheme_write_to_file(cls, path, new_rel_key, new_rel_schema,
d = {}
d[new_rel_key] = columns
json.dump(d, fh)
fh.write("\n")
fh.close()

def get_num_servers(self):
Expand Down
2 changes: 1 addition & 1 deletion raco/dbconn.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ def delete_table(self, rel_key, ignore_failure=False):
def get_sql_output(self, sql):
"""Retrieve the result of a query as a bag (Counter)."""
s = text(sql)
return collections.Counter(tuple(t) for t in self.engine.execute(s))
return collections.Counter(tuple(t) for t in self.engine.execute(s))
11 changes: 5 additions & 6 deletions raco/fakedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,12 @@ def singletonrelation(op):
def emptyrelation(op):
return iter([])

def unionall(self, op):
left_it = self.evaluate(op.left)
right_it = self.evaluate(op.right)
return itertools.chain(left_it, right_it)

def union(self, op):
return set(x for x in self.unionall(op))
return set(self.evaluate(op.left)).union(set(self.evaluate(op.right)))

def unionall(self, op):
return itertools.chain.from_iterable(
self.evaluate(arg) for arg in op.args)

def difference(self, op):
its = [self.evaluate(op.left), self.evaluate(op.right)]
Expand Down
9 changes: 6 additions & 3 deletions raco/myrial/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,14 @@ def distinct(self, expr):
op = self.evaluate(expr)
return raco.algebra.Distinct(input=op)

def unionall(self, e1, e2):
def union(self, e1, e2):
left = self.evaluate(e1)
right = self.evaluate(e2)
check_binop_compatability("unionall", left, right)
return raco.algebra.UnionAll(left, right)
check_binop_compatability("union", left, right)
return raco.algebra.Union(left, right)

def unionall(self, e1):
return raco.algebra.UnionAll([self.evaluate(e) for e in e1])

def countall(self, expr):
op = self.evaluate(expr)
Expand Down
18 changes: 16 additions & 2 deletions raco/myrial/optimizer_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

from raco.backends.myria import (
MyriaShuffleConsumer, MyriaShuffleProducer, MyriaHyperShuffleProducer,
MyriaBroadcastConsumer, MyriaQueryScan, MyriaSplitConsumer, MyriaDupElim,
MyriaGroupBy)
MyriaBroadcastConsumer, MyriaQueryScan, MyriaSplitConsumer, MyriaUnionAll,
MyriaDupElim, MyriaGroupBy)
from raco.backends.myria import (MyriaLeftDeepTreeAlgebra,
MyriaHyperCubeAlgebra)
from raco.compile import optimize
Expand Down Expand Up @@ -1174,3 +1174,17 @@ def test_aggregate_COUNTALL_pushed(self):
# translate to COUNT(something)
self._check_aggregate_functions_pushed(
'count(*)', r'count[(][a-zA-Z.]+[)]', True)

def test_flatten_unionall(self):
"""Test flattening a chain of UnionAlls"""
query = """
X = scan({x});
a = (select $0 from X) + [from X emit $0] + [from X emit $1];
store(a, a);
""".format(x=self.x_key)
lp = self.get_logical_plan(query)
# should be UNIONAll([UNIONAll([expr_1, expr_2]), expr_3])
self.assertEquals(self.get_count(lp, UnionAll), 2)
pp = self.logical_to_physical(lp)
# should be UNIONALL([expr_1, expr_2, expr_3])
self.assertEquals(self.get_count(pp, MyriaUnionAll), 1)
20 changes: 17 additions & 3 deletions raco/myrial/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,17 +817,31 @@ def p_expression_binary_set_operation(p):
'expression : setop LPAREN expression COMMA expression RPAREN'
p[0] = (p[1], p[3], p[5])

@staticmethod
def p_expression_unionall(p):
'expression : UNIONALL LPAREN expression_list RPAREN'
p[0] = ('UNIONALL', p[3])

@staticmethod
def p_expression_list(p):
"""expression_list : expression COMMA expression_list
| expression"""
if len(p) == 4:
p[0] = [p[1]] + p[3]
else:
p[0] = [p[1]]

@staticmethod
def p_setop(p):
"""setop : INTERSECT
| DIFF
| UNIONALL"""
| UNION"""
p[0] = p[1]

@staticmethod
def p_expression_unionall_inline(p):
def p_expression_unionall_plus_inline(p):
"""expression : expression PLUS expression"""
p[0] = ('UNIONALL', p[1], p[3])
p[0] = ('UNIONALL', [p[1], p[3]])

@staticmethod
def p_expression_cross(p):
Expand Down
4 changes: 2 additions & 2 deletions raco/myrial/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
word_operators = ['AND', 'OR', 'NOT']

builtins = ['EMPTY', 'WORKER_ID', 'SCAN', 'COUNTALL', 'COUNT', 'STORE',
'DIFF', 'CROSS', 'JOIN', 'UNIONALL', 'INTERSECT', 'DISTINCT',
'LIMIT', 'SINK', 'SAMPLESCAN', 'LIKE']
'DIFF', 'CROSS', 'JOIN', 'UNION', 'UNIONALL', 'INTERSECT',
'DISTINCT', 'LIMIT', 'SINK', 'SAMPLESCAN', 'LIKE']


# identifiers with special meaning; case-insensitive
Expand Down
6 changes: 3 additions & 3 deletions raco/myrial/setop_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ def setUp(self):

def test_unionall(self):
query = """
out = UNIONALL(SCAN(%s), SCAN(%s));
out = SCAN(%s) + SCAN(%s);
STORE(out, OUTPUT);
""" % (self.emp_key1, self.emp_key2)

expected = self.emp_table1 + self.emp_table2
self.check_result(query, expected)

def test_unionall_schema_mismatch(self):
def test_union_schema_mismatch(self):
query = """
T1 = [FROM SCAN(%s) AS X EMIT id, dept_id, name, salary, 7 as seven];
out = UNIONALL(T1, SCAN(%s));
out = UNION(T1, SCAN(%s));
STORE(out, OUTPUT);
""" % (self.emp_key1, self.emp_key2)

Expand Down
Loading

0 comments on commit 72ce4ae

Please sign in to comment.