Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue #1332 - Upgrade to Scala 2.13 #1333

Merged
merged 7 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package za.co.absa.spline.admin

import org.slf4s.Logging

import com.typesafe.scalalogging.LazyLogging

import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.time.temporal.ChronoField
import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

object DateTimeUtils extends Logging {
object DateTimeUtils extends LazyLogging {

private val ZonedDateTimeRegexp = (s"" +
"^" +
Expand Down Expand Up @@ -60,14 +61,16 @@ object DateTimeUtils extends Logging {

val validOffsets = tz.getRules.getValidOffsets(ldt).asScala
if (validOffsets.isEmpty) {
log.warn(s"" +
logger.warn("" +
s"DST gap was detected for the input '$s' in the time zone '$tz'. " +
s"Continue with the adjusted datetime '$zdt''")
s"Continue with the adjusted datetime '$zdt'"
)
}
if (validOffsets.length > 1) {
log.warn(s"" +
logger.warn("" +
s"DST overlap (${validOffsets.mkString(", ")}) was detected for the input '$s' in the time zone '$tz'. " +
s"Continue with the assumed datetime '$zdt'")
s"Continue with the assumed datetime '$zdt'"
)
}

maybeZoneGeoId.foldLeft(zdt)(_ withZoneSameInstant _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.immutable.ListSet

trait UserInteractor {
def credentializeConnectionUrl(url: ArangoConnectionURL): ArangoConnectionURL

def confirmDatabaseBackupReady(): Boolean
}

Expand Down Expand Up @@ -82,7 +83,7 @@ class ConsoleUserInteractor(console: InputConsole) extends UserInteractor {
|Have you created a database backup? [${validAnswers.mkString("/")}]:\u00A0
""".stripMargin.trim

def userAnswers: Stream[String] = console.readLine(msg).trim.toLowerCase #:: userAnswers
def userAnswers: LazyList[String] = console.readLine(msg).trim.toLowerCase #:: userAnswers

val userAnswer = userAnswers.filter(validAnswers).head

Expand Down
133 changes: 69 additions & 64 deletions admin/src/main/scala/za/co/absa/spline/arango/ArangoManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.entity.{EdgeDefinition, IndexType}
import com.arangodb.model.Implicits.IndexOptionsOps
import com.arangodb.model._
import org.slf4s.Logging
import com.typesafe.scalalogging.StrictLogging
import za.co.absa.commons.reflect.EnumerationMacros.sealedInstancesOf
import za.co.absa.commons.version.impl.SemVer20Impl.SemanticVersion
import za.co.absa.spline.arango.OnDBExistsAction.{Drop, Skip}
Expand All @@ -30,21 +30,25 @@ import za.co.absa.spline.persistence.model.{CollectionDef, GraphDef, SearchAnaly
import za.co.absa.spline.persistence.{DatabaseVersionManager, DryRunnable}

import java.time.{Clock, ZonedDateTime}
import scala.collection.JavaConverters._
import scala.collection.immutable._
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._

trait ArangoManager {

/**
* @return `true` if actual initialization was performed.
*/
def createDatabase(onExistsAction: OnDBExistsAction, options: DatabaseCreateOptions): Future[Boolean]

def upgrade(): Future[Unit]

def execute(actions: AuxiliaryDBAction*): Future[Unit]

def prune(retentionPeriod: Duration): Future[Unit]

def prune(thresholdDate: ZonedDateTime): Future[Unit]

}
Expand All @@ -57,19 +61,20 @@ class ArangoManagerImpl(
foxxManager: FoxxManager,
clock: Clock,
appDBVersion: SemanticVersion,
val dryRun: Boolean)
val dryRun: Boolean
)
(implicit val ex: ExecutionContext)
extends ArangoManager
with DryRunnable
with Logging {
with StrictLogging {

import ArangoManagerImpl._

def createDatabase(onExistsAction: OnDBExistsAction, options: DatabaseCreateOptions): Future[Boolean] = {
log.debug("Initialize database")
db.exists.toScala.flatMap { exists =>
logger.debug("Initialize database")
db.exists.asScala.flatMap { exists =>
if (exists && onExistsAction == Skip) {
log.debug("Database already exists - skipping initialization")
logger.debug("Database already exists - skipping initialization")
Future.successful(false)
} else for {
_ <- deleteDbIfRequested(onExistsAction == Drop)
Expand All @@ -86,20 +91,20 @@ class ArangoManagerImpl(
}

override def upgrade(): Future[Unit] = {
log.debug("Upgrade database")
logger.debug("Upgrade database")
dbVersionManager.currentVersion
.flatMap(currentVersion => {
log.info(s"Current database version: ${currentVersion.asString}")
log.info(s"Target database version: ${appDBVersion.asString}")
logger.info(s"Current database version: ${currentVersion.asString}")
logger.info(s"Target database version: ${appDBVersion.asString}")
if (currentVersion == appDBVersion) Future.successful {
log.info(s"The database is up-to-date")
logger.info(s"The database is up-to-date")
} else if (currentVersion > appDBVersion) Future.failed {
new RuntimeException("Database downgrade is not supported")
} else for {
_ <- deleteFoxxServices()
_ <- migrator.migrate(currentVersion, appDBVersion)
_ <- createFoxxServices()
} yield {}
} yield ()
})
}

Expand All @@ -115,51 +120,51 @@ class ArangoManagerImpl(
case AuxiliaryDBAction.SearchViewsCreate => createSearchViews()
case AuxiliaryDBAction.SearchAnalyzerDelete => deleteSearchAnalyzers()
case AuxiliaryDBAction.SearchAnalyzerCreate => createSearchAnalyzers()
}).map(_ => {}))
}).map(_ => ()))
}
}

private def checkDBAccess() = {
db.exists.toScala
db.exists.asScala
}

private def reinstallFoxxServices() = {
for {
_ <- deleteFoxxServices()
_ <- createFoxxServices()
} yield {}
} yield ()
}

override def prune(retentionPeriod: Duration): Future[Unit] = {
log.debug(s"Prune data older than $retentionPeriod")
logger.debug(s"Prune data older than $retentionPeriod")
dataRetentionManager.pruneBefore(clock.millis - retentionPeriod.toMillis)
}

override def prune(dateTime: ZonedDateTime): Future[Unit] = {
log.debug(s"Prune data before $dateTime")
logger.debug(s"Prune data before $dateTime")
dataRetentionManager.pruneBefore(dateTime.toInstant.toEpochMilli)
}

private def deleteDbIfRequested(dropIfExists: Boolean) = {
for {
exists <- db.exists.toScala
exists <- db.exists.asScala
_ <- if (exists && !dropIfExists)
throw new IllegalArgumentException(s"Arango Database ${db.name} already exists")
else if (exists && dropIfExists) {
log.info(s"Drop database: ${db.name}")
unlessDryRunAsync(db.drop().toScala)
logger.info(s"Drop database: ${db.name}")
unlessDryRunAsync(db.drop().asScala)
}
else Future.successful({})
} yield {}
else Future.successful(())
} yield ()
}

private def createDb() = {
log.info(s"Create database: ${db.name}")
unlessDryRunAsync(db.create().toScala)
logger.info(s"Create database: ${db.name}")
unlessDryRunAsync(db.create().asScala)
}

private def createCollections(options: DatabaseCreateOptions) = {
log.debug(s"Create collections")
logger.debug(s"Create collections")
Future.sequence(
for (colDef <- sealedInstancesOf[CollectionDef])
yield {
Expand All @@ -173,48 +178,48 @@ class ArangoManagerImpl(
.replicationFactor(replFactor)
.waitForSync(options.waitForSync)
for {
_ <- unlessDryRunAsync(db.createCollection(colDef.name, collectionOptions).toScala)
_ <- unlessDryRunAsync(db.collection(colDef.name).insertDocuments(colDef.initData.asJava).toScala)
_ <- unlessDryRunAsync(db.createCollection(colDef.name, collectionOptions).asScala)
_ <- unlessDryRunAsync(db.collection(colDef.name).insertDocuments(colDef.initData.asJava).asScala)
} yield ()
})
}

private def createGraphs() = {
log.debug(s"Create graphs")
logger.debug(s"Create graphs")
Future.sequence(
for (graphDef <- sealedInstancesOf[GraphDef]) yield {
val edgeDefs = graphDef.edgeDefs.map(e =>
(new EdgeDefinition)
.collection(e.name)
.from(e.froms.map(_.name): _*)
.to(e.tos.map(_.name): _*))
unlessDryRunAsync(db.createGraph(graphDef.name, edgeDefs.asJava).toScala)
unlessDryRunAsync(db.createGraph(graphDef.name, edgeDefs.asJava).asScala)
})
}

private def deleteIndices() = {
log.info(s"Drop indices")
logger.info(s"Drop indices")
for {
colEntities <- db.getCollections.toScala.map(_.asScala.filter(!_.getIsSystem))
eventualIndices = colEntities.map(ce => db.collection(ce.getName).getIndexes.toScala.map(_.asScala.map(ce.getName -> _)))
colEntities <- db.getCollections.asScala.map(_.asScala.filter(!_.getIsSystem))
eventualIndices = colEntities.map(ce => db.collection(ce.getName).getIndexes.asScala.map(_.asScala.map(ce.getName -> _)))
allIndices <- Future.reduceLeft(Iterable(eventualIndices.toSeq: _*))(_ ++ _)
userIndices = allIndices.filter { case (_, idx) => idx.getType != IndexType.primary && idx.getType != IndexType.edge }
_ <- Future.traverse(userIndices) { case (colName, idx) =>
log.debug(s"Drop ${idx.getType} index: $colName.${idx.getName}")
unlessDryRunAsync(db.deleteIndex(idx.getId).toScala)
logger.debug(s"Drop ${idx.getType} index: $colName.${idx.getName}")
unlessDryRunAsync(db.deleteIndex(idx.getId).asScala)
}
} yield {}
} yield ()
}

private def createIndices() = {
log.info(s"Create indices")
logger.info(s"Create indices")
Future.sequence(
for {
colDef <- sealedInstancesOf[CollectionDef]
idxDef <- colDef.indexDefs ++ colDef.commonIndexDefs
} yield {
val idxOpts = idxDef.options
log.debug(s"Ensure ${idxOpts.indexType} index: ${colDef.name} [${idxDef.fields.mkString(",")}]")
logger.debug(s"Ensure ${idxOpts.indexType} index: ${colDef.name} [${idxDef.fields.mkString(",")}]")
val dbCol = db.collection(colDef.name)
val fields = idxDef.fields.asJava
unlessDryRunAsync {
Expand All @@ -224,75 +229,75 @@ class ArangoManagerImpl(
case opts: PersistentIndexOptions => dbCol.ensurePersistentIndex(fields, opts)
case opts: TtlIndexOptions => dbCol.ensureTtlIndex(fields, opts)
case opts: ZKDIndexOptions => dbCol.ensureZKDIndex(fields, opts)
}).toScala
}).asScala
}
})
}

private def createFoxxServices(): Future[_] = {
log.debug(s"Lookup Foxx services to install")
private def createFoxxServices(): Future[Unit] = {
logger.debug(s"Lookup Foxx services to install")
val serviceDefs = FoxxSourceResolver.lookupSources(FoxxSourcesLocation)
log.debug(s"Found Foxx services: ${serviceDefs.map(_._1) mkString ", "}")
logger.debug(s"Found Foxx services: ${serviceDefs.map(_._1) mkString ", "}")
Future.traverse(serviceDefs.toSeq) {
case (name, content) =>
val srvMount = s"/$name"
log.info(s"Install Foxx service: $srvMount")
logger.info(s"Install Foxx service: $srvMount")
foxxManager.install(srvMount, content)
}
}.map(_ => ())
}

private def deleteFoxxServices(): Future[_] = {
log.debug(s"Delete Foxx services")
private def deleteFoxxServices(): Future[Unit] = {
logger.debug(s"Delete Foxx services")
foxxManager.list().flatMap(srvDefs =>
Future.sequence(for {
srvDef <- srvDefs
srvMount = srvDef("mount").toString
if !srvMount.startsWith("/_")
} yield {
log.info(s"Uninstall Foxx service: $srvMount")
logger.info(s"Uninstall Foxx service: $srvMount")
foxxManager.uninstall(srvMount)
}
).map(_ => {})
).map(_ => ())
)
}

private def deleteSearchViews() = {
log.debug(s"Delete search views")
logger.debug(s"Delete search views")
for {
viewEntities <- db.getViews.toScala.map(_.asScala)
viewEntities <- db.getViews.asScala.map(_.asScala)
views = viewEntities.map(ve => db.view(ve.getName))
_ <- Future.traverse(views) { view =>
log.info(s"Delete search view: ${view.name}")
unlessDryRunAsync(view.drop().toScala)
logger.info(s"Delete search view: ${view.name}")
unlessDryRunAsync(view.drop().asScala)
}
} yield {}
} yield ()
}

private def createSearchViews() = {
log.debug(s"Create search views")
logger.debug(s"Create search views")
Future.traverse(sealedInstancesOf[SearchViewDef]) { viewDef =>
log.info(s"Create search view: ${viewDef.name}")
unlessDryRunAsync(db.createArangoSearch(viewDef.name, viewDef.properties).toScala)
logger.info(s"Create search view: ${viewDef.name}")
unlessDryRunAsync(db.createArangoSearch(viewDef.name, viewDef.properties).asScala)
}
}

private def deleteSearchAnalyzers() = {
log.debug(s"Delete search analyzers")
logger.debug(s"Delete search analyzers")
for {
analyzers <- db.getSearchAnalyzers.toScala.map(_.asScala)
analyzers <- db.getSearchAnalyzers.asScala.map(_.asScala)
userAnalyzers = analyzers.filter(_.getName.startsWith(s"${db.name}::"))
_ <- Future.traverse(userAnalyzers)(ua => {
log.info(s"Delete search analyzer: ${ua.getName}")
unlessDryRunAsync(db.deleteSearchAnalyzer(ua.getName).toScala)
logger.info(s"Delete search analyzer: ${ua.getName}")
unlessDryRunAsync(db.deleteSearchAnalyzer(ua.getName).asScala)
})
} yield {}
} yield ()
}

private def createSearchAnalyzers() = {
log.debug(s"Create search analyzers")
logger.debug(s"Create search analyzers")
Future.traverse(sealedInstancesOf[SearchAnalyzerDef]) { ad =>
log.info(s"Create search analyzer: ${ad.name}")
unlessDryRunAsync(db.createSearchAnalyzer(ad.analyzer).toScala)
logger.info(s"Create search analyzer: ${ad.name}")
unlessDryRunAsync(db.createSearchAnalyzer(ad.analyzer).asScala)
}
}
}
Expand Down
Loading
Loading