Skip to content
This repository has been archived by the owner on Feb 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #181 from actorapp/server/files
Browse files Browse the repository at this point in the history
Server/files
  • Loading branch information
John Doe committed Dec 16, 2015
2 parents 63800d6 + c1c8837 commit 13d2172
Show file tree
Hide file tree
Showing 35 changed files with 282 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import im.actor.bots.BotMessages._
import im.actor.concurrent.FutureResultCats
import im.actor.server.bot.{ ApiToBotConversions, BotServiceBase }
import im.actor.server.file.{ S3StorageExtension, S3StorageAdapter, ImageUtils }
import im.actor.server.file.{ FileStorageAdapter, FileStorageExtension, ImageUtils }
import im.actor.server.stickers.{ StickerErrors, StickersExtension }

private[bot] object StickersBotErrors {
Expand Down Expand Up @@ -36,7 +36,7 @@ private[bot] final class StickersBotService(_system: ActorSystem) extends BotSer
import system.dispatcher

private val stickerExt = StickersExtension(system)
private implicit val fsAdapter: S3StorageAdapter = S3StorageExtension(system).s3StorageAdapter
private implicit val fsAdapter: FileStorageAdapter = FileStorageExtension(system).fsAdapter

override def handlers: Handlers = {
case CreateStickerPack(userId) createStickerPack(userId).toWeak
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import im.actor.bots.BotMessages.BotError
import im.actor.concurrent.FutureResultCats
import im.actor.server.bot.{ ApiToBotConversions, BotServiceBase }
import im.actor.server.db.DbExtension
import im.actor.server.file.{ S3StorageExtension, FileStorageAdapter }
import im.actor.server.file.{ FileStorageExtension, FileStorageAdapter }
import im.actor.server.user.{ UserErrors, UserUtils }

private[bot] final class UsersBotService(system: ActorSystem) extends BotServiceBase(system) with FutureResultCats[BotError] with ApiToBotConversions {
Expand All @@ -16,7 +16,7 @@ private[bot] final class UsersBotService(system: ActorSystem) extends BotService
import im.actor.concurrent.FutureExt._

private val db = DbExtension(system).db
private implicit val fsAdapter: FileStorageAdapter = S3StorageExtension(system).s3StorageAdapter
private implicit val fsAdapter: FileStorageAdapter = FileStorageExtension(system).fsAdapter
private implicit val _system = system

override val handlers: Handlers = {
Expand Down
7 changes: 7 additions & 0 deletions actor-server/actor-core/src/main/protobuf/file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto2";

package im.actor.server;

import "scalapb/scalapb.proto";

message FileLocation {
required int64 file_id = 1;
required int64 access_hash = 2;
Expand All @@ -18,4 +20,9 @@ message Avatar {
optional AvatarImage smallImage = 1;
optional AvatarImage largeImage = 2;
optional AvatarImage fullImage = 3;
}

message S3UploadKey {
option (scalapb.message).extends = "im.actor.server.file.UploadKey";
required string key = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ package im.actor.server.file

import java.io.File

import im.actor.api.rpc.files.ApiFileLocation
import im.actor.server.model
import slick.driver.PostgresDriver.api._

import scala.concurrent._

trait FileStorageAdapter extends UploadActions with DownloadActions
trait FileStorageAdapter extends UploadActions with DownloadActions with UploadKeyParsing

private[file] trait UploadActions {

def getFileUploadPartUrl(fileId: Long, partNumber: Int): Future[(UploadKey, String)]

def getFileUploadUrl(fileId: Long): Future[(UploadKey, String)]

def completeFileUpload(fileId: Long, fieSize: Long, fileName: String, partNames: Seq[String]): Future[Unit]

def uploadFile(name: String, file: File): DBIO[FileLocation]

def uploadFileF(name: String, file: File): Future[FileLocation]
}

private[file] trait UploadKeyParsing {
def parseKey(bytes: Array[Byte]): UploadKey
}

private[file] trait DownloadActions {
def getFileUrl(file: model.File, accessHash: Long): Future[Option[String]]
def getFileDownloadUrl(file: model.File, accessHash: Long): Future[Option[String]]

def downloadFile(id: Long): DBIO[Option[File]]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package im.actor.server.file

import akka.actor._
import im.actor.config.ActorConfig
import im.actor.serialization.ActorSerializer

import scala.util.{ Failure, Success, Try }

trait FileStorageExtension extends Extension {
val fsAdapter: FileStorageAdapter
}

class FileStorageExtensionImpl(system: ActorSystem) extends FileStorageExtension {

ActorSerializer.register(
80001 classOf[FileLocation],
80002 classOf[AvatarImage],
80003 classOf[Avatar],
80004 classOf[S3UploadKey]
)

override val fsAdapter = init()

def init(): FileStorageAdapter =
(for {
fqcn Try(ActorConfig.load().getString("modules.files.adapter"))
_ = system.log.debug("File adapter is: {}", fqcn)
clazz Try(Class.forName(fqcn).asSubclass(classOf[FileStorageAdapter]))
} yield clazz.getDeclaredConstructor(classOf[ActorSystem]).newInstance(system)) match {
case Success(adapter) adapter
case Failure(e) throw new RuntimeException("Failed to initialize FileStorageAdapter", e)
}
}

object FileStorageExtension extends ExtensionId[FileStorageExtensionImpl] with ExtensionIdProvider {
override def lookup = FileStorageExtension

override def createExtension(system: ExtendedActorSystem) = new FileStorageExtensionImpl(system)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package im.actor.server.file

import java.io.File
import java.io.{ FileOutputStream, File }
import java.nio.file.{ Files, Path }

import akka.actor.ActorSystem
Expand Down Expand Up @@ -53,11 +53,22 @@ object FileUtils {
} yield (file, size)
}

def s3Key(id: Long, name: String): String = {
if (name.isEmpty) {
s"file_${id}"
} else {
s"file_${id}/${name}"
def concatFiles(dir: File, fileNames: Seq[String])(implicit ec: ExecutionContext): Future[File] = {
Future {
blocking {
val dirPath = dir.toPath
val concatFile = dirPath.resolve("concatenated").toFile

val outStream = new FileOutputStream(concatFile)

fileNames foreach { fileName
Files.copy(dirPath.resolve(fileName), outStream)
}

outStream.close()

concatFile
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,7 @@ object ImageUtils {
def scaleAvatar(
fullFileId: Long,
rng: ThreadLocalRandom
)(
implicit
fsAdapter: FileStorageAdapter,
ec: ExecutionContext,
system: ActorSystem
): DBIO[Either[Throwable, Avatar]] =
)(implicit system: ActorSystem): DBIO[Either[Throwable, Avatar]] =
scaleAvatar(
fullFileId,
rng,
Expand All @@ -103,12 +98,9 @@ object ImageUtils {
rng: ThreadLocalRandom,
smallDesc: ThumbDescriptor,
largeDesc: ThumbDescriptor
)(
implicit
fsAdapter: FileStorageAdapter,
ec: ExecutionContext,
system: ActorSystem
): DBIO[Either[Throwable, Avatar]] = {
)(implicit system: ActorSystem): DBIO[Either[Throwable, Avatar]] = {
implicit val ec: ExecutionContext = system.dispatcher
val fsAdapter = FileStorageExtension(system).fsAdapter
persist.FileRepo.find(fullFileId) flatMap {
case Some(fullFileModel)
fsAdapter.downloadFile(fullFileId) flatMap {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package im.actor.server.file

trait UploadKey {
val key: String
def toByteArray: Array[Byte]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package im.actor.server.file
package im.actor.server.file.s3

import java.io.File

Expand All @@ -9,59 +9,24 @@ import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest
import com.amazonaws.services.s3.transfer.TransferManager
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.github.dwhjames.awswrap.s3.{ AmazonS3ScalaClient, FutureTransfer }
import com.github.kxbmap.configs._
import com.typesafe.config.{ Config, ConfigFactory }
import im.actor.serialization.ActorSerializer
import im.actor.server.acl.ACLUtils
import im.actor.server.db.DbExtension
import im.actor.server.file.FileUtils._
import im.actor.server.file._
import im.actor.server.{ model, persist }
import slick.driver.PostgresDriver.api._

import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

class S3StorageExtensionImpl(val s3StorageAdapter: S3StorageAdapter) extends Extension {
// TODO: move to a proper place

ActorSerializer.register(80001, classOf[FileLocation])
ActorSerializer.register(80002, classOf[AvatarImage])
ActorSerializer.register(80003, classOf[Avatar])
}

object S3StorageExtension extends ExtensionId[S3StorageExtensionImpl] with ExtensionIdProvider {
override def lookup = S3StorageExtension

override def createExtension(system: ExtendedActorSystem) = {
val config = S3StorageAdapterConfig.load(system.settings.config.getConfig("services.aws.s3")).get
new S3StorageExtensionImpl(new S3StorageAdapter(config, system))
}
}

case class S3StorageAdapterConfig(bucketName: String, key: String, secret: String)

object S3StorageAdapterConfig {
def load(config: Config): Try[S3StorageAdapterConfig] = {
for {
bucketName config.get[Try[String]]("default-bucket")
key config.get[Try[String]]("access-key")
secret config.get[Try[String]]("secret-key")
} yield S3StorageAdapterConfig(bucketName, key, secret)
}

def load: Try[S3StorageAdapterConfig] = {
val config = ConfigFactory.load().getConfig("services.aws.s3")
load(config)
}
}

class S3StorageAdapter(config: S3StorageAdapterConfig, _system: ActorSystem) extends FileStorageAdapter {
val bucketName = config.bucketName
class S3StorageAdapter(_system: ActorSystem) extends FileStorageAdapter {

private implicit val system: ActorSystem = _system
private implicit val ec: ExecutionContext = system.dispatcher

private val config = S3StorageAdapterConfig.load(system.settings.config.getConfig("services.aws.s3")).get
private val bucketName = config.bucketName
private val awsCredentials = new BasicAWSCredentials(config.key, config.secret)
private val db = DbExtension(system).db

Expand All @@ -85,11 +50,11 @@ class S3StorageAdapter(config: S3StorageAdapterConfig, _system: ActorSystem) ext
override def downloadFileF(id: Long): Future[Option[File]] =
db.run(downloadFile(id))

override def getFileUrl(file: model.File, accessHash: Long): Future[Option[String]] = {
override def getFileDownloadUrl(file: model.File, accessHash: Long): Future[Option[String]] = {
val timeout = 1.day

if (ACLUtils.fileAccessHash(file.id, file.accessSalt) == accessHash) {
val presignedRequest = new GeneratePresignedUrlRequest(bucketName, FileUtils.s3Key(file.id, file.name))
val presignedRequest = new GeneratePresignedUrlRequest(bucketName, s3Key(file.id, file.name))

val expiration = new java.util.Date
expiration.setTime(expiration.getTime + timeout.toMillis)
Expand All @@ -104,7 +69,7 @@ class S3StorageAdapter(config: S3StorageAdapterConfig, _system: ActorSystem) ext
for {
dirFile DBIO.from(FileUtils.createTempDir())
file = dirFile.toPath.resolve("file").toFile
_ DBIO.from(FutureTransfer.listenFor(transferManager.download(bucketName, FileUtils.s3Key(id, name), file)) map (_.waitForCompletion()))
_ DBIO.from(FutureTransfer.listenFor(transferManager.download(bucketName, s3Key(id, name), file)) map (_.waitForCompletion()))
} yield file
}

Expand All @@ -115,13 +80,64 @@ class S3StorageAdapter(config: S3StorageAdapterConfig, _system: ActorSystem) ext
val sizeF = FileUtils.getFileLength(file)

for {
_ persist.FileRepo.create(id, accessSalt, FileUtils.s3Key(id, name))
size DBIO.from(sizeF)
_ persist.FileRepo.create(id, size, accessSalt, s3Key(id, name))
_ DBIO.from(s3Upload(bucketName, id, name, file))
_ DBIO.from(sizeF) flatMap (s persist.FileRepo.setUploaded(id, s, name))
_ persist.FileRepo.setUploaded(id, name)
} yield FileLocation(id, ACLUtils.fileAccessHash(id, accessSalt))
}

private def s3Upload(bucketName: String, id: Long, name: String, file: File): Future[UploadResult] = {
FutureTransfer.listenFor(transferManager.upload(bucketName, FileUtils.s3Key(id, name), file)) map (_.waitForUploadResult())
FutureTransfer.listenFor(transferManager.upload(bucketName, s3Key(id, name), file)) map (_.waitForUploadResult())
}

override def getFileUploadPartUrl(fileId: Long, partNumber: Int): Future[(UploadKey, String)] = {
val fileKey = uploadKey(fileId)
val partKey = S3UploadKey(s"upload_part_${fileKey.key}_${partNumber}")
val request = new GeneratePresignedUrlRequest(bucketName, partKey.key)
val expiration = new java.util.Date
expiration.setTime(expiration.getTime + 1.day.toMillis)
request.setMethod(HttpMethod.PUT)
request.setExpiration(expiration)
request.setContentType("application/octet-stream")

for (url s3Client.generatePresignedUrlRequest(request)) yield partKey url.toString
}

override def getFileUploadUrl(fileId: Long): Future[(UploadKey, String)] = {
val fileKey = uploadKey(fileId)
val presignedRequest = new GeneratePresignedUrlRequest(bucketName, fileKey.key)
val expiration = new java.util.Date
expiration.setTime(expiration.getTime + 1.day.toMillis)
presignedRequest.setExpiration(expiration)
presignedRequest.setMethod(HttpMethod.PUT)

for (url s3Client.generatePresignedUrlRequest(presignedRequest)) yield fileKey url.toString
}

override def completeFileUpload(fileId: Long, fileSize: Long, fileName: String, partNames: Seq[String]): Future[Unit] = {
for {
tempDir createTempDir()
fk = uploadKey(fileId).key
_ FutureTransfer.listenFor {
transferManager.downloadDirectory(bucketName, s"upload_part_${fk}", tempDir)
} map (_.waitForCompletion())
concatFile concatFiles(tempDir, partNames)
_ FutureTransfer.listenFor {
transferManager.upload(bucketName, s3Key(fileId, fileName), concatFile)
} map (_.waitForCompletion())
_ deleteDir(tempDir)
} yield ()
}

private def uploadKey(fileId: Long): S3UploadKey = S3UploadKey(s"upload_${fileId}")

private def s3Key(id: Long, name: String): String =
if (name.isEmpty) {
s"file_${id}"
} else {
s"file_${id}/${name}"
}

override def parseKey(bytes: Array[Byte]): UploadKey = S3UploadKey.parseFrom(bytes)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package im.actor.server.file.s3

import com.github.kxbmap.configs._
import com.typesafe.config.{ ConfigFactory, Config }

import scala.util.Try

case class S3StorageAdapterConfig(bucketName: String, key: String, secret: String)

object S3StorageAdapterConfig {
def load(config: Config): Try[S3StorageAdapterConfig] = {
for {
bucketName config.get[Try[String]]("default-bucket")
key config.get[Try[String]]("access-key")
secret config.get[Try[String]]("secret-key")
} yield S3StorageAdapterConfig(bucketName, key, secret)
}

def load: Try[S3StorageAdapterConfig] = {
val config = ConfigFactory.load().getConfig("services.aws.s3")
load(config)
}
}
Loading

0 comments on commit 13d2172

Please sign in to comment.