-
Notifications
You must be signed in to change notification settings - Fork 706
SQL to Scalding
- Motivation
- Create datasets
- Simple Count
- Count distinct
- Count, Count distinct, Sum in one query
- Where
- Order by X, Y limit N
- Union
- Group and Aggregate
- Join
- Histogram, Ntile
- TODO
SQL is a popular language for data analytics. Scalding is a relative newcomer that is more powerful and complex. The goal of this document is to translate commonly used SQL idioms to Scalding type-safe API which is preferred over the fields-based API. We are using Vertica SQL variant that is based on PSQL and has support for analytic functions. We have purposely picked trivial example datasets so that it is easy to experiment using the REPL and view intermediate results to get a better understanding of what each method does. More information on how to use the REPL is in Scalding REPL and Learning Scalding with Alice.
Prerequisites:
- Elementary knowledge of Scala
- Basic ability to decipher types in Scalding methods
You should not expect Scalding to be as intuitive as SQL but at the same time it is not as hard as it may seem when you see the plethora of classes and methods in the Scalding docs.
To get a deeper understanding of monoids like QTree, please see Learning Algebird Monoids with REPL
SQL
CREATE TABLE test.allsales(
state VARCHAR(20),
name VARCHAR(20),
sales INT
);
INSERT INTO test.allsales VALUES('CA', 'A', 60);
INSERT INTO test.allsales VALUES('CA', 'A', 20);
INSERT INTO test.allsales VALUES('VA', 'B', 15);
COMMIT;
pwagle=> select * from test.allsales;
state | name | sales
-------+------+-------
CA | A | 60
VA | B | 15
CA | A | 20
(3 rows)
Scalding
scala> case class Sale(state: String, name: String, sale: Int)
defined class Sale
scala> val salesList = List(Sale("CA", "A", 60), Sale("CA", "A", 20), Sale("VA", "B", 15))
salesList: List[Sale] = List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15))
scala> val salesPipe = TypedPipe.from(salesList)
salesPipe: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,15), Sale(VA,B,20)))
SQL
pwagle=> select count(1) from test.allsales;
count
-------
3
Scalding
scala> salesPipe.groupAll.size.values.dump
3
SQL
pwagle=> select count(distinct state) from test.allsales;
count
-------
2
Scalding
scala> salesPipe.map{x => x.state}.distinct.groupAll.size.values.dump
2
SQL
pwagle=> select count(1), count(distinct state), sum(sales) from test.allsales;
count | count | sum
-------+-------+-----
3 | 2 | 95
Scalding
scala> salesPipe.map{x => (1, Set(x.state), x.sale) }.groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }.dump
(3,2,95)
The above query will have performance issues if count(distinct state) is large. This can be solved in two ways:
- Group by state first (TODO)
- Using an approximate data structure like HyperLogLog (TODO)
Also see Aggregation using Algebird Aggregators.
SQL
select state, name, sales
from test.allsales
where
state = 'CA';
Scalding
salesPipe.filter(sale => sale.state == "CA").values.dump
scala> salesPipe.filter(sale => (sale.state == "CA")).dump
Sale(CA,A,60)
Sale(CA,A,20)
SQL
select state, name, sale
from test.allsales
order by state, name
limit 1;
Scalding
scala> object SaleOrderingWithState extends Ordering[Sale] {
| def compare(a: Sale, b: Sale) = a.state compare b.state
| }
defined module SaleOrderingWithState
scala> implicit val saleOrderingWithState = SaleOrderingWithState
saleOrderingWithState: SaleOrderingWithState.type = SaleOrderingWithState$@3c91b77c
scala> salesPipe.groupAll.sorted.values.dump
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)
salesPipe.groupAll.sorted.take(1).values.dump
salesPipe.groupAll.sortedTake(1).values.dump
SQL
select state, name, sales from test.allsales
UNION ALL
select state, name, sales from test.allsales2
Scalding
scala> val salesPipe1 = TypedPipe.from(salesList)
salesPipe1: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))
scala> val salesPipe2 = TypedPipe.from(salesList)
salesPipe2: com.twitter.scalding.typed.TypedPipe[Sale] = IterablePipe(List(Sale(CA,A,60), Sale(CA,A,20), Sale(VA,B,15)))
scala> (salesPipe1 ++ salesPipe2).dump
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)
Sale(CA,A,60)
Sale(CA,A,20)
Sale(VA,B,15)
SQL
pwagle=> select state, count(1), count(distinct name), sum(sales)
pwagle-> from test.allsales
pwagle-> group by state;
state | count | count | sum
-------+-------+-------+-----
CA | 2 | 1 | 80
VA | 1 | 1 | 15
Scalding
scala> salesPipe.map{x => (x.state, (1, Set(x.name), x.sale))}.sumByKey.dump
(CA,(2,Set(A),80))
(VA,(1,Set(B),15))
scala> salesPipe.map{x => (x.state, (1, Set(x.name), x.sale))}.sumByKey.mapValues{ case (count, set, sum) => (count, set.size, sum)}.dump
(CA,(2,1,80))
(VA,(1,1,15))
Scalding
scala> case class Table1Row(field1: String, val1: Int)
defined class Table1Row
scala> case class Table2Row(field2: String, val2: Int)
defined class Table2Row
scala> val table1List = List(Table1Row("a", 1), Table1Row("b", 2))
table1List: List[Table1Row] = List(Table1Row(a,1), Table1Row(b,2))
scala> val table1Pipe = TypedPipe.from(List(Table1Row("a", 1), Table1Row("b", 2)))
table1Pipe: com.twitter.scalding.typed.TypedPipe[Table1Row] = IterablePipe(List(Table1Row(a,1), Table1Row(b,2)))
scala> val table2Pipe = TypedPipe.from(List(Table2Row("b", 3), Table2Row("c", 4)))
table2Pipe: com.twitter.scalding.typed.TypedPipe[Table2Row] = IterablePipe(List(Table2Row(b,3), Table2Row(c,4)))
scala> val table1PipeGroup = table1Pipe.groupBy { table1Row => table1Row.field1 }
table1PipeGroup: com.twitter.scalding.typed.Grouped[String,Table1Row] = IdentityReduce(scala.math.Ordering$String$@464dfa23,com.twitter.scalding.typed.TypedPipeFactory@1f0ed511,None)
scala> val table2PipeGroup = table2Pipe.groupBy { table2Row => table2Row.field2 }
table2PipeGroup: com.twitter.scalding.typed.Grouped[String,Table2Row] = IdentityReduce(scala.math.Ordering$String$@464dfa23,com.twitter.scalding.typed.TypedPipeFactory@6d73d27c,None)
scala> val join = table1PipeGroup.join(table2PipeGroup)
join: com.twitter.scalding.typed.CoGrouped[String,(Table1Row, Table2Row)] = com.twitter.scalding.typed.CoGroupable$$anon$3@1c75af7a
scala> join.dump
(b,(Table1Row(b,2),Table2Row(b,3)))
scala> val leftJoin = table1PipeGroup.leftJoin(table2PipeGroup)
leftJoin: com.twitter.scalding.typed.CoGrouped[String,(Table1Row, Option[Table2Row])] = com.twitter.scalding.typed.CoGroupable$$anon$3@7c067391
scala> leftJoin.dump
(a,(Table1Row(a,1),None))
(b,(Table1Row(b,2),Some(Table2Row(b,3))))
scala> val outerJoin = table1PipeGroup.outerJoin(table2PipeGroup)
outerJoin: com.twitter.scalding.typed.CoGrouped[String,(Option[Table1Row], Option[Table2Row])] = com.twitter.scalding.typed.CoGroupable$$anon$3@3daae803
scala> outerJoin.dump
(a,(Some(Table1Row(a,1)),None))
(b,(Some(Table1Row(b,2)),Some(Table2Row(b,3))))
(c,(None,Some(Table2Row(c,4))))
SQL
TODO
Scalding Histogram Fields-based Only
val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
val p = inputTp.toPipe(('value))
val p1 = p.groupAll { group => group.histogram('value -> 'histogram) }
.map('histogram -> ('min, 'q1, 'median, 'q3, 'max, 'mean)) {
x: Histogram => (x.min, x.q1, x.median, x.q3, x.max, x.mean)
}
val outputTp = p1.toTypedPipe[(Double, Double, Double, Double, Double, Double)](('min, 'q1, 'median, 'q3, 'max, 'mean))
outputTp.dump
(1.0,3.0,4.0,5.0,30.0,7.1)
Scalding QTree
val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
implicit val qtSemigroup = new QTreeSemigroup[Long](6)
val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
scala> val inputTp: TypedPipe[Int] = TypedPipe.from(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
inputTp: com.twitter.scalding.package.TypedPipe[Int] = IterablePipe(List(5, 2, 3, 3, 4, 4, 4, 1, 15, 30))
scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
<console>:41: error: Cannot find Semigroup type class for com.twitter.algebird.QTree[Long]
val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
^
scala> implicit val qtSemigroup = new QTreeSemigroup[Long](6)
qtSemigroup: com.twitter.algebird.QTreeSemigroup[Long] = com.twitter.algebird.QTreeSemigroup@4e92c2ed
scala> val v = inputTp.map {x => QTree(x)}.groupAll.sum.values
v: com.twitter.scalding.typed.TypedPipe[com.twitter.algebird.QTree[Long]] = com.twitter.scalding.typed.TypedPipeFactory@7925e180
scala> v.map { q => (q.count, q.upperBound, q.lowerBound, q.quantileBounds(.5), q.quantileBounds(.95)) }.dump
(10,32.0,0.0,(4.0,5.0),(15.0,16.0))
Running Total, Moving Average, Sessionization
- Scaladocs
- Getting Started
- Type-safe API Reference
- SQL to Scalding
- Building Bigger Platforms With Scalding
- Scalding Sources
- Scalding-Commons
- Rosetta Code
- Fields-based API Reference (deprecated)
- Scalding: Powerful & Concise MapReduce Programming
- Scalding lecture for UC Berkeley's Analyzing Big Data with Twitter class
- Scalding REPL with Eclipse Scala Worksheets
- Scalding with CDH3U2 in a Maven project
- Running your Scalding jobs in Eclipse
- Running your Scalding jobs in IDEA intellij
- Running Scalding jobs on EMR
- Running Scalding with HBase support: Scalding HBase wiki
- Using the distributed cache
- Unit Testing Scalding Jobs
- TDD for Scalding
- Using counters
- Scalding for the impatient
- Movie Recommendations and more in MapReduce and Scalding
- Generating Recommendations with MapReduce and Scalding
- Poker collusion detection with Mahout and Scalding
- Portfolio Management in Scalding
- Find the Fastest Growing County in US, 1969-2011, using Scalding
- Mod-4 matrix arithmetic with Scalding and Algebird
- Dean Wampler's Scalding Workshop
- Typesafe's Activator for Scalding