Skip to content

kernel164/spark-cassandra

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

spark-cassandra

Integrating Spark & Cassandra

Warning: Consider this as pre alpha -> Work in progress - API might change very frequently.

import spark.SparkContext
import spark.SparkContext._
import spark.cassandra.Implicits._
import spark.cassandra.CassEnum._

val sc = new SparkContext("local", "appName")
val vars = sc.broadcast(Map(("host", "cassandra-host")))
val cql3 = CassConfig.cql3
  .host("cassandra-host")
  .randomPartitioner()
  .table("keyspace", "columnfamily", true)
  .columns("col2, col3, col4")
  .limit(10000)

val dataList = sc.fetchFromCassandra(cql3).asList(Map((3, LONG))).filter(l => l(2) == "")
dataList.cache
dataList.storeToCassandra(vars.value("host"), "INSERT keyspace.columnfamily1 (col1, col2, col3, col4) VALUES (?, ?, ?, ?);");

val reducedKVMap = dataList.map(l => (l(2).toInt / 100, l(3).toInt)).reduceByKey(_ + _)
reducedKVMap.cache
reducedKVMap.storeToCassandra(vars.value("host"), "UPDATE keyspace.columnfamily2 SET col2 = ? WHERE col1 = ?;");

val sortedListResult = reducedKVMap.map(x => (x._2, x._1)).sortByKey(false)
sortedListResult.foreach(x => println(x))

About

Integrating Spark & Cassandra

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published