diff --git a/data-products/src/main/scala/org/sunbird/analytics/archival/AssessmentArchivalJob.scala b/data-products/src/main/scala/org/sunbird/analytics/archival/AssessmentArchivalJob.scala new file mode 100644 index 000000000..8f2a003d3 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/analytics/archival/AssessmentArchivalJob.scala @@ -0,0 +1,194 @@ +package org.sunbird.analytics.archival + +import com.datastax.spark.connector.{SomeColumns, toRDDFunctions} +import org.apache.spark.sql.functions.{col, concat, lit, to_json, to_timestamp, weekofyear, year} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.{JSONUtils, JobLogger, RestUtil} +import org.ekstep.analytics.framework.{FrameworkContext, JobConfig, Level} +import org.sunbird.analytics.archival.util.ArchivalRequest + +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.sql.functions._ +import org.sunbird.analytics.exhaust.util.ExhaustUtil +import org.sunbird.analytics.util.Constants + +import scala.collection.immutable.List + +case class CollectionDetails(result: Result) +case class Result(content: List[CollectionInfo]) +case class CollectionInfo(identifier: String) + +object AssessmentArchivalJob extends optional.Application with BaseArchivalJob { + + case class Period(year: Int, weekOfYear: Int) + case class BatchPartition(collectionId: String, batchId: String, period: Period) + case class ArchivalMetrics(batch: BatchPartition, + totalArchivedRecords: Option[Long], + pendingWeeksOfYears: Option[Long], + totalDeletedRecords: Option[Long] + ) + + private val partitionCols = List("course_id", "batch_id", "year", "week_of_year") + private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question") + + override def getClassName = "org.sunbird.analytics.archival.AssessmentArchivalJob" + override def jobName = "AssessmentArchivalJob" + override def jobId: String = "assessment-archival" + override def getReportPath = "assessment-archival/" + override def getReportKey = "assessment" + override def dateColumn = "updated_on" + + override def archivalFormat(batch: Map[String,AnyRef]): String = { + val formatDetails = JSONUtils.deserialize[BatchPartition](JSONUtils.serialize(batch)) + s"${formatDetails.batchId}_${formatDetails.collectionId}/${formatDetails.period.year}-${formatDetails.period.weekOfYear}" + } + + override def dataFilter(requests: Array[ArchivalRequest], dataDF: DataFrame): DataFrame = { + var filteredDF = dataDF + for (request <- requests) { + if (request.archival_status.equals("SUCCESS")) { + val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data) + filteredDF = dataDF.filter( + col("batch_id").equalTo(request.batch_id) && + concat(col("year"), lit("-"), col("week_of_year")) =!= lit(request_data("year") + "-" + request_data("week")) + ) + } + } + filteredDF + } + + override def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest] = { + try { + val archivalKeyspace = requestConfig.keyspace.getOrElse(AppConf.getConfig("sunbird.courses.keyspace")) + val date: String = requestConfig.date.getOrElse(null) + + var data = loadData(Map("table" -> requestConfig.archivalTable, "keyspace" -> archivalKeyspace, "cluster" -> "LMSCluster"), cassandraUrl, new StructType()) + data = validateBatches(data, requestConfig) + + val dataDF = generatePeriodInData(data) + val filteredDF = dataFilter(requests, dataDF) + + val archiveBatchList = filteredDF.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect() + val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], f.get(1).asInstanceOf[String], Period(f.get(2).asInstanceOf[Int], f.get(3).asInstanceOf[Int]))).groupBy(_.batchId) + + archiveBatches(batchesToArchive, filteredDF, requestConfig) + } catch { + case ex: Exception => + ex.printStackTrace() + JobLogger.log("archiveData: Exception with error message = " + ex.getMessage, None, Level.ERROR) + List() + } + } + + def validateBatches(data: DataFrame, requestConfig: Request)(implicit spark: SparkSession, fc: FrameworkContext): DataFrame ={ + implicit val sqlContext = new SQLContext(spark.sparkContext) + import sqlContext.implicits._ + + val filteredDF = if(requestConfig.batchId.isDefined && requestConfig.collectionId.isDefined) { + data.filter(col("batch_id") === requestConfig.batchId.get && col("course_id") === requestConfig.collectionId.get).persist() + } else if (requestConfig.batchFilters.isDefined) { + val batch = requestConfig.batchFilters.get.toDF("batch_id") + data.join(batch, Seq("batch_id"), "inner") + } else { + JobLogger.log("Neither batchId nor batchFilters present", None, Level.INFO) + data + } + if (requestConfig.query.isDefined) { + val res = searchContent(requestConfig.query.get) + filteredDF.join(res, col("course_id") === col("identifier"), "inner") + } else filteredDF + + } + + def searchContent(searchFilter: Map[String, AnyRef])(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = { + // TODO: Handle limit and do a recursive search call + val apiURL = Constants.COMPOSITE_SEARCH_URL + val request = JSONUtils.serialize(searchFilter) + val response = RestUtil.post[CollectionDetails](apiURL, request).result.content + spark.createDataFrame(response).select("identifier") + } + + def archiveBatches(batchesToArchive: Map[String, Array[BatchPartition]], data: DataFrame, requestConfig: Request)(implicit config: JobConfig): List[ArchivalRequest] = { + batchesToArchive.flatMap(batches => { + val processingBatch = new AtomicInteger(batches._2.length) + JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> processingBatch))) + + // Loop through the week_num & year batch partition + batches._2.map((batch: BatchPartition) => { + val filteredDF = data.filter( + col("course_id") === batch.collectionId && + col("batch_id") === batch.batchId && + col("year") === batch.period.year && + col("week_of_year") === batch.period.weekOfYear + ).withColumn("last_attempted_on", tsToLongUdf(col("last_attempted_on"))) + .withColumn("updated_on", tsToLongUdf(col("updated_on"))) + .select(columnWithOrder.head, columnWithOrder.tail: _*) + var archivalRequest:ArchivalRequest = getRequest(jobId, batch.collectionId, batch.batchId, List(batch.period.year, batch.period.weekOfYear)) + + if (archivalRequest == null) { + val request_data = JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(requestConfig)) ++ Map[String, Int]( + "week" -> batch.period.weekOfYear, + "year"-> batch.period.year + ) + archivalRequest = ArchivalRequest("", batch.batchId, batch.collectionId, Option(getReportKey), jobId, None, None, null, null, None, Option(0), JSONUtils.serialize(request_data), None) + } + + try { + val urls = upload(filteredDF, JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(batch))) // Upload the archived files into blob store + archivalRequest.blob_url = Option(urls) + val metrics = ArchivalMetrics(batch, pendingWeeksOfYears = Some(processingBatch.getAndDecrement()), totalArchivedRecords = Some(filteredDF.count()), totalDeletedRecords = None) + JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(metrics), Level.INFO) + markArchivalRequestAsSuccess(archivalRequest, requestConfig) + } catch { + case ex: Exception => { + JobLogger.log("archiveBatch: Exception with error message = " + ex.getLocalizedMessage, Some(batch), Level.ERROR) + markArchivalRequestAsFailed(archivalRequest, ex.getLocalizedMessage) + } + } + }) + }).toList + } + + def loadArchivedData(request: ArchivalRequest)(implicit spark: SparkSession, fc: FrameworkContext, jobConfig: JobConfig): DataFrame = { + ExhaustUtil.fetch(request.blob_url.get.head, "csv") + } + + override def deleteArchivedData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest] = { + requests.filter(r => r.archival_status.equals("SUCCESS") && r.deletion_status != "SUCCESS").map((request: ArchivalRequest) => { + deleteBatch(requestConfig, request) + }).toList + } + + def deleteBatch(requestConfig: Request, request: ArchivalRequest)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): ArchivalRequest = { + try { + val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data) + val batchPartition = BatchPartition(request.collection_id, request.batch_id, Period(request_data("year").asInstanceOf[Int], request_data("week").asInstanceOf[Int])) + val archivedData = loadArchivedData(request).select("course_id", "batch_id", "user_id", "content_id", "attempt_id") + + val totalArchivedRecords: Long = archivedData.count + JobLogger.log(s"Deleting $totalArchivedRecords archived records only, for the year ${batchPartition.period.year} and week of year ${batchPartition.period.weekOfYear} from the DB ", None, Level.INFO) + + val archivalKeyspace = requestConfig.keyspace.getOrElse(AppConf.getConfig("sunbird.courses.keyspace")) + + archivedData.rdd.deleteFromCassandra(archivalKeyspace, requestConfig.archivalTable, keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id")) + val metrics = ArchivalMetrics(batchPartition, pendingWeeksOfYears = None, totalArchivedRecords = Some(totalArchivedRecords), totalDeletedRecords = Some(totalArchivedRecords)) + + JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(metrics), Level.INFO) + markDeletionRequestAsSuccess(request, requestConfig) + } catch { + case ex: Exception => { + JobLogger.log("deleteBatch: Exception with error message = " + ex.getLocalizedMessage, Some(request), Level.ERROR) + markDeletionRequestAsFailed(request, ex.getLocalizedMessage) + } + } + } + + def generatePeriodInData(data: DataFrame): DataFrame = { + data.withColumn("updated_on", to_timestamp(col(dateColumn))) + .withColumn("year", year(col("updated_on"))) + .withColumn("week_of_year", weekofyear(col("updated_on"))) + .withColumn("question", to_json(col("question"))) + } +} diff --git a/data-products/src/main/scala/org/sunbird/analytics/archival/BaseArchivalJob.scala b/data-products/src/main/scala/org/sunbird/analytics/archival/BaseArchivalJob.scala new file mode 100644 index 000000000..faf8e05e5 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/analytics/archival/BaseArchivalJob.scala @@ -0,0 +1,92 @@ +package org.sunbird.analytics.archival + +import com.datastax.spark.connector.cql.CassandraConnectorConf +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.cassandra._ +import org.apache.spark.sql.functions.udf +import org.ekstep.analytics.framework.Level.ERROR +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig, Level} +import org.sunbird.analytics.exhaust.BaseReportsJob +import org.ekstep.analytics.framework.util.DatasetUtil.extensions +import org.sunbird.analytics.archival.util.{ArchivalMetaDataStoreJob, ArchivalRequest} + +case class Request(archivalTable: String, keyspace: Option[String], query: Option[Map[String, AnyRef]] = Option(Map()), batchId: Option[String] = Option(""), collectionId: Option[String]=Option(""), batchFilters: Option[List[String]]=Option(List()), date: Option[String] = Option("")) + +trait BaseArchivalJob extends BaseReportsJob with IJob with ArchivalMetaDataStoreJob with Serializable { + + val cassandraUrl = "org.apache.spark.sql.cassandra" + def dateColumn: String + + def main(config: String)(implicit sc: Option[SparkContext] = None, fc: Option[FrameworkContext] = None): Unit = { + implicit val className: String = getClassName; + JobLogger.init(jobName) + JobLogger.start(s"$jobName started executing - ver3", Option(Map("config" -> config, "model" -> jobName))) + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config) + implicit val spark: SparkSession = openSparkSession(jobConfig) + implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext() + + try { + val res = CommonUtil.time(execute()); + JobLogger.end(s"$jobName completed execution", "SUCCESS", None) + } catch { + case ex: Exception => ex.printStackTrace() + JobLogger.log(ex.getMessage, None, ERROR); + JobLogger.end(jobName + " execution failed", "FAILED", Option(Map("model" -> jobName, "statusMsg" -> ex.getMessage))); + } + finally { + frameworkContext.closeContext(); + spark.close() + } + } + + def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = { + spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host"))) + } + + def execute()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = { + val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]()); + val requestConfig = JSONUtils.deserialize[Request](JSONUtils.serialize(modelParams.getOrElse("request", Request).asInstanceOf[Map[String,AnyRef]])) + val mode: String = modelParams.getOrElse("mode","archive").asInstanceOf[String] + + val requests = getRequests(jobId, requestConfig.batchId) + + val archivalRequests = mode.toLowerCase() match { + case "archival" => + archiveData(requestConfig, requests) + case "delete" => + deleteArchivedData(requestConfig, requests) + } + for (archivalRequest <- archivalRequests) { + upsertRequest(archivalRequest) + } + } + + def upload(archivedData: DataFrame, batch: Map[String,AnyRef])(implicit jobConfig: JobConfig): List[String] = { + val blobConfig = jobConfig.modelParams.get("blobConfig").asInstanceOf[Map[String, AnyRef]] + val reportPath: String = blobConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String] + val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey") + val fileName = archivalFormat(batch) + val storageConfig = getStorageConfig(jobConfig, objectKey) + JobLogger.log(s"Uploading reports to blob storage", None, Level.INFO) + archivedData.saveToBlobStore(storageConfig, "csv", s"$reportPath$fileName-${System.currentTimeMillis()}", Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), None, fileExt=Some("csv.gz")) + } + + // Overriding methods START: + def jobId: String; + def jobName: String; + def getReportPath: String; + def getReportKey: String; + def getClassName: String; + + def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest]; + def deleteArchivedData(archivalRequest: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest]; + def archivalFormat(batch: Map[String,AnyRef]): String; + def dataFilter(requests: Array[ArchivalRequest], dataDF: DataFrame): DataFrame; + + //Overriding methods END: + + val tsToLongUdf = udf[java.lang.Long, java.sql.Timestamp]((ts: java.sql.Timestamp) => if (ts != null) ts.getTime else null) +} diff --git a/data-products/src/main/scala/org/sunbird/analytics/archival/util/ArchivalMetaDataStoreJob.scala b/data-products/src/main/scala/org/sunbird/analytics/archival/util/ArchivalMetaDataStoreJob.scala new file mode 100644 index 000000000..d20b44a38 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/analytics/archival/util/ArchivalMetaDataStoreJob.scala @@ -0,0 +1,160 @@ +package org.sunbird.analytics.archival.util + +import java.security.MessageDigest +import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Timestamp} +import java.util.Properties +import org.apache.commons.lang.StringUtils +import org.apache.spark.sql.{Encoders, SparkSession} +import org.apache.spark.sql.functions.col +import org.ekstep.analytics.framework.FrameworkContext +import org.ekstep.analytics.framework.Level.INFO +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} +import org.sunbird.analytics.archival.Request + +case class ArchivalRequest(request_id: String, batch_id: String, collection_id: String, resource_type: Option[String], job_id: String, + var archival_date: Option[Long],var completion_date: Option[Long],var archival_status: String,var deletion_status: String, + var blob_url: Option[List[String]],var iteration: Option[Int], request_data: String, var err_message: Option[String]) + +trait ArchivalMetaDataStoreJob { + + implicit val className: String = getClassName; + val connProperties: Properties = CommonUtil.getPostgresConnectionProps() + val db: String = AppConf.getConfig("postgres.db") + val url: String = AppConf.getConfig("postgres.url") + s"$db" + val requestsTable: String = AppConf.getConfig("postgres.table.archival_request") + val dbc: Connection = DriverManager.getConnection(url, connProperties.getProperty("user"), connProperties.getProperty("password")); + dbc.setAutoCommit(true); + + def getClassName(): String; + + def cleanUp() { + dbc.close(); + } + + def getRequests(jobId: String, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): Array[ArchivalRequest] = { + val encoder = Encoders.product[ArchivalRequest] + val archivalConfigsDf = spark.read.jdbc(url, requestsTable, connProperties) + .where(col("job_id") === jobId && col("iteration") < 3) + + val filteredReportConfigDf = if (batchId.isDefined) { + val filteredArchivalConfig = archivalConfigsDf.filter(col("batch_id").equalTo(batchId.get)) + if (filteredArchivalConfig.count() > 0) filteredArchivalConfig else archivalConfigsDf + } else archivalConfigsDf + + JobLogger.log("fetched records count: " + filteredReportConfigDf.count(), None, INFO) + val requests = filteredReportConfigDf.as[ArchivalRequest](encoder).collect() + requests + } + + def getRequestID(jobId: String, collectionId: String, batchId: String, partitionCols: List[Int]): String = { + val requestComb = s"$jobId:$collectionId:$batchId:" + partitionCols.mkString(":") + MessageDigest.getInstance("MD5").digest(requestComb.getBytes).map("%02X".format(_)).mkString + } + + def getRequest(jobId: String, collectionId: String, batchId: String, partitionCols: List[Int]): ArchivalRequest = { + val requestId = getRequestID(jobId, collectionId, batchId, partitionCols) + val archivalRequest = s"""select * from $requestsTable where request_id = '$requestId' limit 1""" + val pstmt: PreparedStatement = dbc.prepareStatement(archivalRequest); + val resultSet = pstmt.executeQuery() + + if (resultSet.next()) getArchivalRequest(resultSet) else null + } + + private def getArchivalRequest(resultSet: ResultSet): ArchivalRequest = { + ArchivalRequest( + resultSet.getString("request_id"), + resultSet.getString("batch_id"), + resultSet.getString("collection_id"), + Option(resultSet.getString("resource_type")), + resultSet.getString("job_id"), + Option(resultSet.getTimestamp("archival_date").getTime), + if (resultSet.getTimestamp("completion_date") != null) Option(resultSet.getTimestamp("completion_date").getTime) else None, + resultSet.getString("archival_status"), + resultSet.getString("deletion_status"), + if (resultSet.getArray("blob_url") != null) Option(resultSet.getArray("blob_url").getArray().asInstanceOf[Array[String]].toList) else None, + Option(resultSet.getInt("iteration")), + resultSet.getString("request_data"), + Option(resultSet.getString("err_message")) + ) + } + + def markArchivalRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = { + request.archival_status = "FAILED"; + request.archival_date = Option(System.currentTimeMillis()); + request.iteration = Option(request.iteration.getOrElse(0) + 1); + request.err_message = Option(failedMsg); + request + } + + def markDeletionRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = { + request.deletion_status = "FAILED"; + request.archival_date = Option(System.currentTimeMillis()); + request.iteration = Option(request.iteration.getOrElse(0) + 1); + request.err_message = Option(failedMsg); + request + } + + def createRequest(request: ArchivalRequest) = { + val insertQry = s"INSERT INTO $requestsTable (request_id, batch_id, collection_id, resource_type, job_id, archival_date, completion_date, archival_status, " + + s"deletion_status, blob_url, iteration, request_data, err_message) VALUES (?,?,?,?,?,?,?,?,?,?,?,?::json,?)" + val pstmt: PreparedStatement = dbc.prepareStatement(insertQry); + val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data) + val requestId = getRequestID(request.job_id, request.collection_id, request.batch_id, List(request_data("year").asInstanceOf[Int], request_data("week").asInstanceOf[Int])) + pstmt.setString(1, requestId); + pstmt.setString(2, request.batch_id); + pstmt.setString(3, request.collection_id); + pstmt.setString(4, request.resource_type.getOrElse("assessment")); + pstmt.setString(5, request.job_id); + pstmt.setTimestamp(6, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null); + pstmt.setTimestamp(7, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null); + pstmt.setString(8, request.archival_status); + pstmt.setString(9, request.deletion_status); + val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]]; + pstmt.setArray(10, dbc.createArrayOf("text", blobURLs)) + pstmt.setInt(11, request.iteration.getOrElse(0)) + pstmt.setString(12, request.request_data) + pstmt.setString(13, StringUtils.abbreviate(request.err_message.getOrElse(""), 300)); + + pstmt.execute() + } + + def upsertRequest(request: ArchivalRequest): Unit = { + if (request.request_id.isEmpty) { + createRequest(request) + } else { + updateRequest(request) + } + } + + def updateRequest(request: ArchivalRequest): Unit = { + val updateQry = s"UPDATE $requestsTable SET blob_url=?, iteration = ?, archival_date=?, completion_date=?, " + + s"archival_status=?, deletion_status=?, err_message=? WHERE request_id=?"; + val pstmt: PreparedStatement = dbc.prepareStatement(updateQry) + + val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]]; + pstmt.setArray(1, dbc.createArrayOf("text", blobURLs)) + pstmt.setInt(2, request.iteration.get); + pstmt.setTimestamp(3, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null); + pstmt.setTimestamp(4, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null); + pstmt.setString(5, request.archival_status); + pstmt.setString(6, request.deletion_status); + pstmt.setString(7, request.err_message.getOrElse("")); + pstmt.setString(8, request.request_id); + + pstmt.execute() + } + + def markArchivalRequestAsSuccess(request: ArchivalRequest, requestConfig: Request): ArchivalRequest = { + request.archival_status = "SUCCESS"; + request.archival_date = Option(System.currentTimeMillis()) + request + } + + def markDeletionRequestAsSuccess(request: ArchivalRequest, requestConfig: Request): ArchivalRequest = { + request.deletion_status = "SUCCESS"; + request.completion_date = Option(System.currentTimeMillis()) + request + } + +} diff --git a/data-products/src/test/resources/application.conf b/data-products/src/test/resources/application.conf index 8f53f38fb..43a6d29a0 100644 --- a/data-products/src/test/resources/application.conf +++ b/data-products/src/test/resources/application.conf @@ -199,3 +199,6 @@ uci.conversation.postgres.pass="postgres" uci.exhaust.store.prefix="src/test/resources/exhaust-reports/" uci.encryption.secret="123443957398423479784298247982789428fldkssd" // END OF UCI Related Job Configs + +//START of Archival Config +postgres.table.archival_request="archival_metadata" \ No newline at end of file diff --git a/data-products/src/test/resources/assessment-archival/data.cql b/data-products/src/test/resources/assessment-archival/data.cql new file mode 100644 index 000000000..93503d2db --- /dev/null +++ b/data-products/src/test/resources/assessment-archival/data.cql @@ -0,0 +1,12 @@ +-- Week 48 +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, last_attempted_on, updated_on) VALUES ('do_1130928636168192001667', 'batch-011', 'user-001', 'do_1128870328040161281204', 'attempt-001', '10', 10, 10, 1638357693200, 1638357693000); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, last_attempted_on, updated_on) VALUES ('do_1130928636168192001667', 'batch-011', 'user-003', 'do_112876961957437440179', 'attempt-001', '10', 10, 10, 1638357693200, 1638357693000); + + +-- Week 49 -- +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-011', 'user-001', 'do_1128870328040161281204', 'attempt-002', '20', 20, 20, [{id: 'do_213019475454476288155', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], 1639052254823); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-011', 'user-002', 'do_1128870328040161281204', 'attempt-002', '10', 10, 10, 1639052254823); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-011', 'user-003', 'do_112876961957437440179', 'attempt-002', '10', 10, 10, 1639052254823); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_11306040245271756813015', 'batch-021', 'user-008', 'do_112876961957437440179', 'attempt-002', '10', 10, 10, 1639052254823); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_11306040245271756813015', 'batch-021', 'user-010', 'do_11307593493010022418', 'attempt-002', '15', 15, 15, 1639052254823); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_112835334818643968148', 'batch-031', 'user-014', 'do_11307593493010022418', 'attempt-002', '15', 15, 15, 1639052254823); diff --git a/data-products/src/test/scala/org/sunbird/analytics/archival/TestAsssessmentArchivalJob.scala b/data-products/src/test/scala/org/sunbird/analytics/archival/TestAsssessmentArchivalJob.scala new file mode 100644 index 000000000..80269d03e --- /dev/null +++ b/data-products/src/test/scala/org/sunbird/analytics/archival/TestAsssessmentArchivalJob.scala @@ -0,0 +1,288 @@ +package org.sunbird.analytics.archival + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.col +import org.ekstep.analytics.framework.{FrameworkContext, JobConfig} +import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils} +import org.scalamock.scalatest.MockFactory +import org.sunbird.analytics.exhaust.BaseReportsJob +import org.sunbird.analytics.util.{BaseSpec, EmbeddedCassandra, EmbeddedPostgresql} + +class TestAsssessmentArchivalJob extends BaseSpec with MockFactory with BaseReportsJob { + + val outputLocation = "src/test/resources/reports/assessment-archived-data" + implicit var spark: SparkSession = _ + + override def beforeAll(): Unit = { + spark = getSparkSession(); + super.beforeAll() + EmbeddedPostgresql.start() + EmbeddedPostgresql.createArchivalRequestTable() + } + + override def afterEach(): Unit = { + super.afterEach() + EmbeddedCassandra.close() + EmbeddedPostgresql.execute(s"TRUNCATE archival_metadata") + } + + override def beforeEach(): Unit = { + EmbeddedCassandra.loadData("src/test/resources/assessment-archival/data.cql") // Load test data in embedded cassandra server + new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, outputLocation) + } + + override def afterAll() : Unit = { + super.afterAll() + EmbeddedPostgresql.close() + new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, outputLocation) + spark.close() + } + + "AssessmentArchivalJob" should "archive the batch which is not archived in past" in { + implicit val fc = new FrameworkContext() + val batchId = "batch-011" + val courseId = "do_1130928636168192001667" + + val strConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"archival","request":{"archivalTable":"assessment_aggregator","batchId":"batch-011","collectionId": "do_1130928636168192001667","date":"2021-11-01"},"blobConfig":{"store":"azure","blobExt":"csv.gz","reportPath":"assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + implicit val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + + AssessmentArchivalJob.execute() + + val batch011Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/${batchId}_${courseId}/2021*.csv.gz") + + batch011Results.count() should be (5) + + val user1 = batch011Results.filter(col("user_id") === "user-001") + user1.count() should be (2) + + val user1attempt1 = user1.filter(col("attempt_id") === "attempt-001").first + user1attempt1.getAs[String]("course_id") should be ("do_1130928636168192001667") + user1attempt1.getAs[String]("content_id") should be ("do_1128870328040161281204") + user1attempt1.getAs[String]("last_attempted_on") should be ("1638357693200") + user1attempt1.getAs[String]("grand_total") should be ("10") + user1attempt1.getAs[String]("total_max_score") should be ("10.0") + user1attempt1.getAs[String]("total_score") should be ("10.0") + user1attempt1.getAs[String]("question") should be ("[]") + user1attempt1.getAs[String]("updated_on") should be ("1638357693000") + + val user1attempt2 = user1.filter(col("attempt_id") === "attempt-002").first + + user1attempt2.getAs[String]("course_id") should be ("do_1130928636168192001667") + user1attempt2.getAs[String]("content_id") should be ("do_1128870328040161281204") + user1attempt2.getAs[String]("last_attempted_on") should be (null) + user1attempt2.getAs[String]("grand_total") should be ("20") + user1attempt2.getAs[String]("total_max_score") should be ("20.0") + user1attempt2.getAs[String]("total_score") should be ("20.0") + val questionsList = JSONUtils.deserialize[List[Map[String, AnyRef]]](user1attempt2.getAs[String]("question")) + questionsList.size should be (4) + + user1attempt2.getAs[String]("updated_on") should be ("1639052254823") + + val user2Result = batch011Results.filter(col("user_id") === "user-002") + user2Result.count() should be (1) + + val user3Result = batch011Results.filter(col("user_id") === "user-003") + user3Result.count() should be (2) + + val archivalRequests = AssessmentArchivalJob.getRequests(AssessmentArchivalJob.jobId, None) + archivalRequests.size should be (2) + + archivalRequests.map(ar => ar.request_id).toList should contain allElementsOf List("2A04B5AF40E2E249EBB63530F19656F7", "AC0F439E287263DB49D54004DAA4644B") + archivalRequests.map(ar => ar.batch_id).toList.distinct should contain allElementsOf List("batch-011") + archivalRequests.map(ar => ar.collection_id).toList.distinct should contain allElementsOf List("do_1130928636168192001667") + archivalRequests.map(ar => ar.archival_status).toList.distinct should contain allElementsOf List("SUCCESS") + archivalRequests.map(ar => ar.blob_url.get).toList.head.head should include (s"src/test/resources/reports/assessment-archived-data/${batchId}_${courseId}/2021") + archivalRequests.map(ar => ar.iteration.get).toList.distinct should contain allElementsOf List(0) + archivalRequests.map(ar => ar.err_message.get).toList.distinct should contain allElementsOf List("") + } + + it should "archive the multiple batches which is not archived in past" in { + implicit val fc = new FrameworkContext() + val batchId = "batch-011" + val courseId = "do_1130928636168192001667" + + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"archival","request":{"archivalTable":"assessment_aggregator","batchFilters":["batch-011", "batch-021"],"date":"2021-11-01"},"blobConfig":{"store":"azure","blobExt":"csv.gz","reportPath":"assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + implicit val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + + AssessmentArchivalJob.execute() + + val archivalRequests = AssessmentArchivalJob.getRequests(AssessmentArchivalJob.jobId, None) + archivalRequests.size should be (3) + + archivalRequests.map(ar => ar.batch_id).toList.distinct should contain allElementsOf List("batch-011", "batch-021") + } + + it should "archive the batch which is failed to archive in past" in { + implicit val fc = new FrameworkContext() + val batchId = "batch-011" + val courseId = "do_1130928636168192001667" + + EmbeddedPostgresql.execute("INSERT INTO archival_metadata (request_id, batch_id, collection_id , resource_type , job_id , archival_date, completion_date, archival_status, blob_url, iteration,request_data , err_message ) VALUES ('2A04B5AF40E2E249EBB63530F19656F7', 'batch-011', 'do_1130928636168192001667', 'assessment', 'assessment-archival','2021-12-09 05:58:18.666', null,'FAILED', null, 1,'{\"batchId\": \"batch-011\", \"week\": 48, \"year\": 2021}', NULL);") + + val strConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"archival","request":{"archivalTable":"assessment_aggregator","batchId":"batch-011","collectionId": "do_1130928636168192001667","date":"2021-11-01"},"blobConfig":{"store":"azure","blobExt":"csv.gz","reportPath":"assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + implicit val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + + AssessmentArchivalJob.execute() + + val batch011Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/${batchId}_${courseId}/2021*.csv.gz") + + batch011Results.count() should be (5) + + val user1 = batch011Results.filter(col("user_id") === "user-001") + user1.count() should be (2) + + val user2Result = batch011Results.filter(col("user_id") === "user-002") + user2Result.count() should be (1) + + val user3Result = batch011Results.filter(col("user_id") === "user-003") + user3Result.count() should be (2) + + val archivalRequests = AssessmentArchivalJob.getRequests(AssessmentArchivalJob.jobId, Option(batchId)) + archivalRequests.size should be (2) + + val failedRequest = AssessmentArchivalJob.getRequest(AssessmentArchivalJob.jobId, "do_1130928636168192001667", batchId, List(2021, 48)) + + failedRequest.request_id should be ("AC0F439E287263DB49D54004DAA4644B") + failedRequest.archival_status should be ("SUCCESS") + failedRequest.blob_url.get.head should include (s"src/test/resources/reports/assessment-archived-data/${batchId}_${courseId}/2021") + } + + it should "skip archival for the batch which is archived in past" in { + implicit val fc = new FrameworkContext() + val batchId = "batch-011" + val courseId = "do_1130928636168192001667" + + EmbeddedPostgresql.execute("INSERT INTO archival_metadata (request_id, batch_id, collection_id , resource_type , job_id , archival_date, completion_date, archival_status, blob_url, iteration,request_data , err_message ) VALUES ('949887DE6364A07AE1BB5A04504368F9', 'batch-011', 'do_1130928636168192001667', 'assessment', 'assessment-archival','2021-12-09 05:58:18.666', null,'SUCCESS', '{\"reports/assessment-archival/batch-011/2021-48.csv.gz\"}', 1,'{\"batchId\": \"batch-011\", \"week\": 48, \"year\": 2021}', NULL);") + + val strConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"archival","request":{"archivalTable":"assessment_aggregator","batchId":"batch-011","collectionId": "do_1130928636168192001667","date":"2021-11-01"},"blobConfig":{"store":"azure","blobExt":"csv.gz","reportPath":"assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + implicit val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + + AssessmentArchivalJob.execute() + + val batch011Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/${batchId}_${courseId}/2021*.csv.gz") + + batch011Results.count() should be (3) + + val user1 = batch011Results.filter(col("user_id") === "user-001") + user1.count() should be (1) + val user1attempt2 = user1.filter(col("attempt_id") === "attempt-002").first + + user1attempt2.getAs[String]("course_id") should be ("do_1130928636168192001667") + user1attempt2.getAs[String]("content_id") should be ("do_1128870328040161281204") + user1attempt2.getAs[String]("last_attempted_on") should be (null) + user1attempt2.getAs[String]("grand_total") should be ("20") + user1attempt2.getAs[String]("total_max_score") should be ("20.0") + user1attempt2.getAs[String]("total_score") should be ("20.0") + + val user2Result = batch011Results.filter(col("user_id") === "user-002") + user2Result.count() should be (1) + + val user3Result = batch011Results.filter(col("user_id") === "user-003") + user3Result.count() should be (1) + + val archivalRequests = AssessmentArchivalJob.getRequests(AssessmentArchivalJob.jobId, Option(batchId)) + archivalRequests.size should be (2) + } + + it should "delete the archived records based on blob files" in { + implicit val fc = new FrameworkContext() + val batchId = "batch-011" + val courseId = "do_1130928636168192001667" + + val strConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"archival","request":{"archivalTable":"assessment_aggregator","batchId":"batch-011","collectionId": "do_1130928636168192001667","date":"2021-11-01"},"blobConfig":{"store":"azure","blobExt":"csv.gz","reportPath":"assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + + AssessmentArchivalJob.execute()(spark, fc, jobConfig) + + val batch011Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/${batchId}_${courseId}/2021*.csv.gz") + + batch011Results.count() should be (5) + + val cassData = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table" -> "assessment_aggregator", "keyspace" -> "sunbird_courses")).load() + + cassData.filter(col("batch_id") === batchId).count() should be (5) + + val archivalRequests = AssessmentArchivalJob.getRequests(AssessmentArchivalJob.jobId, Option(batchId)) + archivalRequests.size should be (2) + + archivalRequests.map(ar => ar.archival_status).toList.distinct should contain allElementsOf List("SUCCESS") + archivalRequests.map(ar => ar.deletion_status).toList.distinct should contain allElementsOf List(null) + + val delStrConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"delete","request":{"archivalTable":"assessment_aggregator","batchId":"batch-011","collectionId": "do_1130928636168192001667","date":"2021-11-01"},"blobConfig":{"store":"local","blobExt":"csv.gz","reportPath":"src/test/resources/reports/assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + + val delJobConfig = JSONUtils.deserialize[JobConfig](delStrConfig) + + AssessmentArchivalJob.execute()(spark, fc, delJobConfig) + + val delCassData = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table" -> "assessment_aggregator", "keyspace" -> "sunbird_courses")).load() + + delCassData.filter(col("batch_id") === batchId).count() should be (0) + + val deletionRequests = AssessmentArchivalJob.getRequests(AssessmentArchivalJob.jobId, Option(batchId)) + deletionRequests.map(ar => ar.archival_status).toList.distinct should contain allElementsOf List("SUCCESS") + deletionRequests.map(ar => ar.deletion_status).toList.distinct should contain allElementsOf List("SUCCESS") + } + + it should "not delete the records the if the blob file is not available" in { + implicit val fc = new FrameworkContext() + val batchId = "batch-011" + val courseId = "do_1130928636168192001667" + + // Week 48 records are processed will not be processed for archival again + EmbeddedPostgresql.execute("INSERT INTO archival_metadata (request_id, batch_id, collection_id , resource_type , job_id , archival_date, completion_date, archival_status, blob_url, iteration,request_data , err_message ) VALUES ('AC0F439E287263DB49D54004DAA4644B', 'batch-011', 'do_1130928636168192001667', 'assessment', 'assessment-archival','2021-12-09 05:58:18.666', null,'SUCCESS', '{\"reports/assessment-archival/batch-011/2021-48.csv.gz\"}', 1,'{\"batchId\": \"batch-011\", \"week\": 48, \"year\": 2021}', NULL);") + + val strConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"archival","request":{"archivalTable":"assessment_aggregator","batchId":"batch-011","collectionId": "do_1130928636168192001667","date":"2021-11-01"},"blobConfig":{"store":"azure","blobExt":"csv.gz","reportPath":"assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + + AssessmentArchivalJob.execute()(spark, fc, jobConfig) + + val batch011Results = spark.read.format("csv").option("header", "true") + .load(s"$outputLocation/${batchId}_${courseId}/2021*.csv.gz") + + batch011Results.count() should be (3) + + val delStrConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"delete","request":{"archivalTable":"assessment_aggregator","batchId":"batch-011","collectionId": "do_1130928636168192001667","date":"2021-11-01"},"blobConfig":{"store":"local","blobExt":"csv.gz","reportPath":"src/test/resources/reports/assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + + val delJobConfig = JSONUtils.deserialize[JobConfig](delStrConfig) + + AssessmentArchivalJob.execute()(spark, fc, delJobConfig) + + val delCassData = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table" -> "assessment_aggregator", "keyspace" -> "sunbird_courses")).load() + + delCassData.filter(col("batch_id") === batchId).count() should be (2) + + val skippedRequest = AssessmentArchivalJob.getRequest(AssessmentArchivalJob.jobId, "do_1130928636168192001667", batchId, List(2021, 48)) + + skippedRequest.request_id should be ("AC0F439E287263DB49D54004DAA4644B") + skippedRequest.archival_status should be ("SUCCESS") + skippedRequest.deletion_status should be ("FAILED") + skippedRequest.err_message.get should include("Path does not exist") + + val deletionRequest = AssessmentArchivalJob.getRequest(AssessmentArchivalJob.jobId, "do_1130928636168192001667", batchId, List(2021, 49)) + + deletionRequest.request_id should be ("2A04B5AF40E2E249EBB63530F19656F7") + deletionRequest.archival_status should be ("SUCCESS") + deletionRequest.deletion_status should be ("SUCCESS") + + } + + it should "Archive all batches if neither batchid nor batchfilters present" in { + implicit val fc = new FrameworkContext() + + val strConfig= """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.$job_name","modelParams":{"mode":"archival","request":{"archivalTable":"assessment_aggregator","date":"2021-11-01"},"blobConfig":{"store":"azure","blobExt":"csv.gz","reportPath":"assessment-archived-data/","container":"reports"},"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"$job_name"}""" + implicit val jobConfig = JSONUtils.deserialize[JobConfig](strConfig) + + AssessmentArchivalJob.execute() + + val archivalRequests = AssessmentArchivalJob.getRequests(AssessmentArchivalJob.jobId, None) + + archivalRequests.map(ar => ar.archival_status).toList.distinct should contain allElementsOf List("SUCCESS") + + archivalRequests.map(ar => ar.batch_id).toList.distinct should contain allElementsOf List("batch-011", "batch-021", "batch-031") + } + +} diff --git a/data-products/src/test/scala/org/sunbird/analytics/audit/TestScoreMetricMigrationJob.scala b/data-products/src/test/scala/org/sunbird/analytics/audit/TestScoreMetricMigrationJob.scala index cc92f9b09..9b139a985 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/audit/TestScoreMetricMigrationJob.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/audit/TestScoreMetricMigrationJob.scala @@ -40,16 +40,7 @@ class TestScoreMetricMigrationJob extends BaseSpec with MockFactory { result.head.get(0).asInstanceOf[Map[String, Int]]("max_score:do_112876961957437440179") should be(10) result.head.get(2).asInstanceOf[Seq[String]].size should be (2) - val aggDetail = JSONUtils.deserialize[Map[String, AnyRef]](result.head.get(2).asInstanceOf[Seq[String]].head) - - aggDetail("max_score") should be(10.0) - aggDetail("score") should be(10.0) - aggDetail("type") should be(jobConfig.modelParams.get.get("metricsType").get.toString) - aggDetail("attempt_id") should be("attempat-001") - aggDetail("content_id") should be("do_112876961957437440110") - aggDetail("attempted_on") should be(1634810023) - - result.head.get(2).asInstanceOf[Seq[String]](1) should be("""{"max_score":10.0,"score":10.0,"type":"attempt_metrics","attempt_id":"attempat-001","content_id":"do_112876961957437440179"}""") + result.head.get(2).asInstanceOf[Seq[String]] should contain allElementsOf List("""{"max_score":10.0,"score":10.0,"type":"attempt_metrics","attempt_id":"attempat-001","content_id":"do_112876961957437440110","attempted_on":1634810023}""", """{"max_score":10.0,"score":10.0,"type":"attempt_metrics","attempt_id":"attempat-001","content_id":"do_112876961957437440179"}""") val result2 = res.filter(col("context_id") === "cb:batch-001") .filter(col("activity_id") === "do_11306040245271756813015") @@ -57,8 +48,8 @@ class TestScoreMetricMigrationJob extends BaseSpec with MockFactory { .select("agg_details").collect() result2.head.get(0).asInstanceOf[Seq[String]].size should be (2) - result2.head.get(0).asInstanceOf[Seq[String]].head should be("""{"max_score":15.0,"score":15.0,"type":"attempt_metrics","attempt_id":"attempat-001","content_id":"do_11307593493010022418"}""") - result2.head.get(0).asInstanceOf[Seq[String]](1) should be("""{"max_score":15.0,"score":10.0,"type":"attempt_metrics","attempt_id":"attempat-002","content_id":"do_11307593493010022418"}""") + + result2.head.get(0).asInstanceOf[Seq[String]] should contain allElementsOf List("""{"max_score":15.0,"score":15.0,"type":"attempt_metrics","attempt_id":"attempat-001","content_id":"do_11307593493010022418"}""", """{"max_score":15.0,"score":10.0,"type":"attempt_metrics","attempt_id":"attempat-002","content_id":"do_11307593493010022418"}""") ScoreMetricMigrationJob.updatedTable(res, ScoreMetricMigrationJob.userActivityAggDBSettings) val result3 = res.filter(col("context_id") === "cb:batch-001") diff --git a/data-products/src/test/scala/org/sunbird/analytics/util/EmbeddedPostgres.scala b/data-products/src/test/scala/org/sunbird/analytics/util/EmbeddedPostgres.scala index 3cb39ca9b..516591e3a 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/util/EmbeddedPostgres.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/util/EmbeddedPostgres.scala @@ -69,6 +69,28 @@ object EmbeddedPostgresql { execute(query) } + def createArchivalRequestTable(): Unit = { + val tableName: String = "archival_metadata" + val query = + s""" + |CREATE TABLE IF NOT EXISTS $tableName ( + | request_id TEXT, + | batch_id TEXT, + | collection_id TEXT, + | resource_type TEXT, + | job_id TEXT, + | archival_date TIMESTAMP, + | completion_date TIMESTAMP, + | archival_status TEXT, + | deletion_status TEXT, + | blob_url TEXT[], + | iteration int, + | request_data json, + | err_message TEXT + |) + """.stripMargin + execute(query) + } def createConversationTable(): Unit = { val tableName: String = "bot"