Skip to content
This repository has been archived by the owner on Apr 5, 2021. It is now read-only.

tabmo/aerospark

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Aerospike Spark Connector

The Aerospike spark connector provides features to represent data stored in Aerospike as a DataFrame in Spark.

Aerospike Spark Connector includes:

  • Reading from Aerospike to a DataFrame
  • Saving a DataFrame to Aerospike
  • Spark SQL multiple filters pushed down to the Aerospike cluster

How to build

The source code for this solution is available on GitHub at https://github.com/aerospike/aerospark. SBT is the build tool and it will create a Uber (fat) jar as the final output of the build process. The jar will contain all the class files and dependencies.

This Library requires Java JDK 7+ Scala 2.10, SBT 0.13, Maven and the aerospike-helper-java

Before you build the Aerospike spark connector you need to build the aerospike-helper-java JAR and install it in your local maven repository. The aerospike-helper-java JAR is used by the connector to perform efficent, multi-filter queries on Aerospike.

Clone the Aerospike Helper repository using this command:

$ git clone https://github.com/aerospike/aerospike-helper

Navigate to the subdirectory java and run the following command to build and install the helper class jar:

$ mvn clean install -DskipTests

When maven is complete the aerospike-helper-java JAR will be installed in your local maven repository

To build the Spark connector: Clone the Aerospike Spark repository using this command:

$ git clone https://github.com/citrusleaf/aerospark/.git

After cloning the repository, build the uber jar using:

$ sbt assembly

Note that during the build, a number of unit tests are run, these tests will assume an Aerospike cluster is running at "127.0.0.1" on port 3000. If you want to ignore the unit tests, use:

$ sbt 'set test in assembly := {}' clean assembly

On conclusion of the build, the uber JAR aerospike-spark-assembly-<version>.jar will be located in the subdirectory target/scala-2.10.

Usage

The assembled JAR can be used in any Spark application providing it's on the class path.

spark shell

To use connector with the spark-shell, use the --jars command line option and include the path to the assembled JAR. Example:

$ spark-shell --master local[*] --jars target/scala-2.10/aerospike-spark-assembly-1.1.4.jar

Import the com.aerospike.spark.sql._ package

scala> import com.aerospike.spark.sql._
import com.aerospike.spark.sql._

and any Aerospike packages and classes. For example:

scala> import com.aerospike.client.AerospikeClient
import com.aerospike.client.AerospikeClient

scala> import com.aerospike.client.Bin
import com.aerospike.client.Bin

scala> import com.aerospike.client.Key
import com.aerospike.client.Key

scala> import com.aerospike.client.Value
import com.aerospike.client.Value

Load some data into Aerospike with:

    val TEST_COUNT = 100
    val namespace = "test"
    var client=AerospikeConnection.getClient(AerospikeConfig.newConfig("localhost",3000,1000))
    Value.UseDoubleType = true
    for (i <- 1 to TEST_COUNT) {
      val key = new Key(namespace, "rdd-test", "rdd-test-"+i)
      client.put(null, key,
         new Bin("one", i),
         new Bin("two", "two:"+i),
         new Bin("three", i.toDouble)
      )
    }

Try a test with the loaded data:

	val thingsDF = sqlContext.read.
			format("com.aerospike.spark.sql").
			option("aerospike.seedhost", "127.0.0.1").
			option("aerospike.port", "3000").
			option("aerospike.namespace", namespace).
			option("aerospike.set", "rdd-test").
			load 
	thingsDF.registerTempTable("things")
	val filteredThings = sqlContext.sql("select * from things where one = 55")
	val thing = filteredThings.first()

Loading and Saving DataFrames

The Aerospike Spark connector provides functions to load data from Aerospike into a DataFrame and save a DataFrame into Aerospike

Loading data

	val thingsDF = sqlContext.read.
		format("com.aerospike.spark.sql").
		option("aerospike.seedhost", "127.0.0.1").
		option("aerospike.port", "3000").
		option("aerospike.namespace", "test").
		option("aerospike.set", "rdd-test").
		load 

You can see that the read function is configured by a number of options, these are:

  • format("com.aerospike.spark.sql") specifies the function library to load the DataFrame.
  • option("aerospike.seedhost", "127.0.0.1") specifies a seed host in the Aerospike cluster.
  • option("aerospike.port", "3000") specifies the port to be used
  • option("aerospike.namespace", "test") specifies the Namespace name to be used e.g. "test"
  • option("aerospike.set", "rdd-test") specifies the Set to be used e.g. "rdd-test" Spark SQL can be used to efficently filter (where lastName = 'Smith') Bin values represented as columns. The filter is passed down to the Aerospike cluster and filtering is done in the server. Here is an example using filtering:
	val thingsDF = sqlContext.read.
		format("com.aerospike.spark.sql").
		option("aerospike.seedhost", "127.0.0.1").
		option("aerospike.port", "3000").
		option("aerospike.namespace", namespace).
		option("aerospike.set", "rdd-test").
		load 
	thingsDF.registerTempTable("things")
	val filteredThings = sqlContext.sql("select * from things where one = 55")

Additional meta-data columns are automatically included when reading from Aerospike, the default names are:

  • __key the values of the primary key if it is stored in Aerospike
  • __digest the digest as Array[byte]
  • __generation the gereration value of the record read
  • __expitation the expiration epoch
  • __ttl the time to live value calcualed from the expiration - now

These meta-data column name defaults can be be changed by using additional options during read or write, for example:

	val thingsDF = sqlContext.read.
		format("com.aerospike.spark.sql").
		option("aerospike.seedhost", "127.0.0.1").
		option("aerospike.port", "3000").
		option("aerospike.namespace", "test").
		option("aerospike.set", "rdd-test").
		option("aerospike.expiryColumn", "_my_expiry_column").
		load 

Saving data

A DataFrame can be saved in Aerospike by specifying a column in the DataFrame as the Primary Key or the Digest.

Saving by Digest

In this example, the value of the digest is specified by the "__digest" column in the DataFrame.

	val thingsDF = sqlContext.read.
		format("com.aerospike.spark.sql").
		option("aerospike.seedhost", "127.0.0.1").
		option("aerospike.port", "3000").
		option("aerospike.namespace", namespace).
		option("aerospike.set", "rdd-test").
		load 
		
    thingsDF.write.
        mode(SaveMode.Overwrite).
        format("com.aerospike.spark.sql").
        option("aerospike.seedhost", "127.0.0.1").
		option("aerospike.port", "3000").
		option("aerospike.namespace", namespace).
		option("aerospike.set", "rdd-test").
		option("aerospike.updateByDigest", "__digest").
        save()                
Saving by Key

In this example, the value of the primary key is specified by the "key" column in the DataFrame.

      import org.apache.spark.sql.types.StructType
      import org.apache.spark.sql.types.StructField
      import org.apache.spark.sql.types.LongType
      import org.apache.spark.sql.types.StringType
      import org.apache.spark.sql.DataFrame
      import org.apache.spark.sql.Row
      import org.apache.spark.sql.SaveMode


      val namespace = "test"
      val setName = "new-rdd-data"
      
      val schema = new StructType(Array(
          StructField("key",StringType,nullable = false),
          StructField("last",StringType,nullable = true),
          StructField("first",StringType,nullable = true),
          StructField("when",LongType,nullable = true)
          )) 
      val rows = Seq(
          Row("Fraser_Malcolm","Fraser", "Malcolm", 1975L),
          Row("Hawke_Bob","Hawke", "Bob", 1983L),
          Row("Keating_Paul","Keating", "Paul", 1991L), 
          Row("Howard_John","Howard", "John", 1996L), 
          Row("Rudd_Kevin","Rudd", "Kevin", 2007L), 
          Row("Gillard_Julia","Gillard", "Julia", 2010L), 
          Row("Abbott_Tony","Abbott", "Tony", 2013L), 
          Row("Tunrbull_Malcom","Tunrbull", "Malcom", 2015L)
          )
          
      val inputRDD = sc.parallelize(rows)
      
      val newDF = sqlContext.createDataFrame(inputRDD, schema)
  
      newDF.write.
        mode(SaveMode.Ignore).
        format("com.aerospike.spark.sql").
        option("aerospike.seedhost", "127.0.0.1").
						option("aerospike.port", "3000").
						option("aerospike.namespace", namespace).
						option("aerospike.set", setName).
						option("aerospike.updateByKey", "key").
        save()       
Using TTL while saving

Time to live (TTL) can be set individually on each record. The TTL should be stored in a column in the DataSet before it is saved.

To enable updates to TTL, and additional option is specified:

	option("aerospike.ttlColumn", "expiry")

Schema

Aerospike is Schema-less and Spark DataFrames use a Schema. To facilitate the need for schema, the Aerospike spark connector samples 100 records, via a scan, and reads the Bin names and infers the Bin type.

The number of records scanned can be changed by using the option:

	option("aerospike.schema.scan", 20)

Note: the schema is derived each time load is called. If you call load before the Aerospike namespace/set has any data, only the meta-data columns will be available.

Save mode reference

Save mode Record Exists Policy
ErrorIfExists CREATE_ONLY
Ignore CREATE_ONLY
Overwrite REPLACE
Append UPDATE_ONLY

Options reference

Option Description Default value
aerospike.seedhost A host name or address of the cluster "127.0.0.1"
aerospike.port Port of Aerospike 3000
aerospike.timeout Timeout for all operations in milliseconds 1000
aerospike.sendKey If true, store the value of the primary key false
aerospike.commitLevel Consistency guarantee when committing a transaction on the server CommitLevel.COMMIT_ALL
aerospike.generationPolicy ow to handle record writes based on record generation GenerationPolicy.NONE
aerospike.namespace Aerospike Namespace "test"
aerospike.set Aerospike Set no default
aerospike.updateByKey This option specifies that updates are done by key with the value in the column specified option("aerospike.updateByKey", "key")
aerospike.updateByDigest This option specifies that updates are done by digest with the value in the column specified option("aerospike.updateByDigest", "Digest")
aerospike.schema.scan The number of records to scan to infer schema 100
aerospike.keyColumn The name of the key column in the Data Frame "__key"
aerospike.digestColumn The name of the digest column in the Data Frame "__digest"
aerospike.expiryColumn The name of the expiry column in the Data Frame "__expiry"
aerospike.generationColumn The name of the generation column in the Data Frame "__generation"
aerospike.ttlColumn The name of the TTL column in the Data Frame "__ttl"

About

Aerospike Spark Connector

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala 93.9%
  • Shell 4.6%
  • Ruby 1.5%