Skip to content

Commit

Permalink
Create lambda function for RDS migration using Flyway.
Browse files Browse the repository at this point in the history
  • Loading branch information
crossroad0201 committed Nov 13, 2016
0 parents commit a3505f7
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea
target
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
Lambda function for AWS RDS Migration using Flyway.
====

**Now developing...**

# Setup

## Create S3 bucket

Put your Flyway resources to S3 bucket.

### Bucket structure

```
s3://my-flyway
- /my-application
- flyway.conf <- Flyway configuration file.
- V1__create_foo.sql <- SQL file(s)
- V2__create_bar.sql
```

## Deploy Lambda function

### Code

* Build lambda function module.
```
sbt assembly
```

* Upload `target/scala-x.x.x/flywayAwsLambda-assembly-x.x.x.jar`.

### Configuration

||value|
|----|----|
|Runtime|`Java 8`|
|Handler|`crossroad0201.aws.flywaylambda.S3EventMigrationHandler::handleRequest`|
|Role|See `Role` section.|
|Timeout|`5 min.`|
|VPC|Same VPC as target RDS.|

#### Role

Require policies.

* AmazonRDSFullAccess
* AmazonS3FullAccess
* AmazonLambdaVPCAccessExecutionRole

### Triggers

Add trigger `S3 to Lambda`.

||value|Example|
|----|----|----|
|Bucket|Your Flyway migration bucket.|`my-flyway`|
|Event type|`Object created`|-|
|Prefix|Your Flyway migration files location.|`my-application`|
|Suffix|`sql`|-|


# Run

Put Flyway SQL file to S3 bucket.

Invoke flyway-lambda automatically by S3 event.
25 changes: 25 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

lazy val flywayAwsLambda = (project in file(".")).settings {
organization := "crossroad0201.aws"
name := "flyway-awslambda"
version := "0.1.0-SNAPSHOT"
scalaVersion := "2.12.0"

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
test in assembly := {}

libraryDependencies ++= Seq(
// Flyway
"org.flywaydb" % "flyway-core" % "4.0.3",
"mysql" % "mysql-connector-java" % "6.0.5", // Flyway supports only Ver.6 higher.

// AWS
"com.amazonaws" % "aws-lambda-java-core" % "1.1.0",
"com.amazonaws" % "aws-lambda-java-events" % "1.3.0",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.53",

// Test
"org.scalatest" %% "scalatest" % "3.0.0" % Test
)

}
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version = 0.13.13
3 changes: 3 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package crossroad0201.aws.flywaylambda

import java.nio.file.{Files, Path, Paths}
import java.util.{Properties => JProperties}

import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.services.lambda.runtime.events.S3Event
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.event.S3EventNotification.{S3Entity, S3EventNotificationRecord}
import com.amazonaws.services.s3.model.S3ObjectSummary
import org.flywaydb.core.Flyway

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}

class S3EventMigrationHandler extends RequestHandler[S3Event, String] {
val FlywayConfFileName = "flyway.conf"

override def handleRequest(event: S3Event, context: Context): String = {
val logger = context.getLogger

val successCounts = event.getRecords.asScala.map(handleS3Event(_)(context))

val message = s"Flyway migration(s) finished. ${successCounts.mkString(", ")}"
logger.log(message)

message
}

private def handleS3Event(event: S3EventNotificationRecord)(implicit context: Context) = {
val logger = context.getLogger

val s3 = event.getS3
val bucket = s3.getBucket

logger.log(s"Flyway migration start. by ${event.getEventName} s3://${bucket.getName}/${s3.getObject.getKey}")

def deploy(s3: S3Entity) = Try {
val s3Client: AmazonS3Client = new AmazonS3Client().withRegion(Region.getRegion(Regions.fromName(event.getAwsRegion)))
val tmpDir = Files.createDirectories(Paths.get("/tmp", context.getAwsRequestId))

@tailrec
def deployInternal(objects: List[S3ObjectSummary], acc: (Option[JProperties], ListBuffer[Path])): (Option[JProperties], Seq[Path]) = {
def loadConf(key: String) = {
val o = s3Client.getObject(bucket.getName, key)
val props = new JProperties
props.load(o.getObjectContent)
logger.log(s"Flyway configuration loaded. s3://${bucket.getName}/$key")
(Some(props), acc._2)
}
def createDir(key: String) = {
val dir = Files.createDirectories(Paths.get(tmpDir.toString, key))
logger.log(s"Dir created. $dir")
acc
}
def createSqlFile(key: String) = {
val o = s3Client.getObject(bucket.getName, key)
val file = Paths.get(tmpDir.toString, key)
val fileSize = Files.copy(o.getObjectContent, file)
logger.log(s"SQL file created. $file($fileSize Byte)")
acc._2 += file
acc
}

objects match {
case Nil => (acc._1, acc._2)
case x :: xs =>
val _acc = x.getKey match {
case key if key.endsWith(FlywayConfFileName) => loadConf(key)
case key if key.endsWith("/") => createDir(key)
case key => createSqlFile(key)
}
deployInternal(xs, _acc)
}
}

val migrationPrefix = {
val objectKey = s3.getObject.getKey
objectKey.substring(0, objectKey.lastIndexOf("/"))
}
val objects = s3Client.listObjects(bucket.getName, migrationPrefix)

deployInternal(objects.getObjectSummaries.asScala.toList, (None, ListBuffer())) match {
case (Some(conf), sqlFiles) =>
FlywayDeployment(
conf,
s"filesystem:${Paths.get(tmpDir.toString, migrationPrefix).toString}",
sqlFiles)
case _ => throw new IllegalStateException(s"$FlywayConfFileName does not exists.")
}
}

def migrate(deployment: FlywayDeployment) = Try {
val flyway = new Flyway
flyway.setDataSource(
deployment.url,
deployment.user,
deployment.password
)
flyway.setLocations(deployment.location)

val successCount = flyway.migrate

flyway.info.all.foreach { i =>
logger.log(s"${i.getVersion} ${i.getDescription} ${i.getType} ${i.getState} ${i.getExecutionTime}")
}

successCount
}

for {
d <- deploy(s3)
_ = {
logger.log(
s"""--- Flyway configuration ------------------------------------
|flyway.url = ${d.url}
|flyway.user = ${d.user}
|flyway.password = ${d.password}
|
|SQL locations = ${d.location}
|SQL files = ${d.sqlFiles.mkString(", ")}
|-------------------------------------------------------------
""".stripMargin)
}
c <- migrate(d)
_ = logger.log(s"$c migration applied successfully.")
} yield c
}
}

case class FlywayDeployment(url: String, user: String, password: String, location: String, sqlFiles: Seq[Path])
object FlywayDeployment {
def apply(conf: JProperties, location: String, sqlFiles: Seq[Path]): FlywayDeployment = {
FlywayDeployment(
conf.getProperty("flyway.url"),
conf.getProperty("flyway.user"),
conf.getProperty("flyway.password"),
location,
sqlFiles
)
}
}

0 comments on commit a3505f7

Please sign in to comment.