diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/util/ExhaustUtil.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/util/ExhaustUtil.scala new file mode 100644 index 000000000..09b8a7d5f --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/util/ExhaustUtil.scala @@ -0,0 +1,29 @@ +package org.sunbird.analytics.exhaust.util + + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.ekstep.analytics.framework.FrameworkContext +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.{CommonUtil, JobLogger} + +object ExhaustUtil { + + def getArchivedData(store: String, filePath: String, bucket: String, blobFields: Map[String, Any], fileFormat: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = { + val filteredBlobFields = blobFields.filter(_._2 != null) + val format = fileFormat.getOrElse("csv.gz") + val batchId = filteredBlobFields.getOrElse("batchId", "*").toString() + val year = filteredBlobFields.getOrElse("year", "*") + val weekNum = filteredBlobFields.getOrElse("weekNum", "*").toString() + + val file: String = s"${filePath}${batchId}/${year}-${weekNum}-*.${format}" + val url = CommonUtil.getBlobUrl(store, file, bucket) + + JobLogger.log(s"Fetching data from ${store} ")(new String()) + fetch(url, "csv") + } + + def fetch(url: String, format: String)(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = { + spark.read.format(format).option("header", "true").load(url) + } + +} \ No newline at end of file diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/AssessmentArchivalJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/AssessmentArchivalJob.scala new file mode 100644 index 000000000..4ad8e8b13 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/AssessmentArchivalJob.scala @@ -0,0 +1,214 @@ +package org.sunbird.analytics.job.report + +import com.datastax.spark.connector.cql.CassandraConnectorConf +import com.datastax.spark.connector.{SomeColumns, toRDDFunctions} +import org.apache.spark.SparkContext +import org.apache.spark.sql.cassandra.CassandraSparkSessionFunctions +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.ekstep.analytics.framework.Level.INFO +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.DatasetUtil.extensions +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig} +import org.joda.time.DateTime +import org.sunbird.analytics.exhaust.util.ExhaustUtil + +import java.util.concurrent.atomic.AtomicInteger + +object AssessmentArchivalJob extends optional.Application with IJob with BaseReportsJob { + val cassandraUrl = "org.apache.spark.sql.cassandra" + private val assessmentAggDBSettings: Map[String, String] = Map("table" -> AppConf.getConfig("sunbird.courses.assessment.table"), "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster") + implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob" + private val partitionCols = List("batch_id", "year", "week_of_year") + private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question") + + case class Period(year: Int, weekOfYear: Int) + + case class BatchPartition(batchId: String, period: Period) + + + case class ArchivalMetrics(batchId: Option[String], + period: Period, + totalArchivedRecords: Option[Long], + pendingWeeksOfYears: Option[Long], + totalDeletedRecords: Option[Long], + totalDistinctBatches: Long + ) + + + // $COVERAGE-OFF$ Disabling scoverage for main and execute method + override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { + implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob" + val jobName = "AssessmentArchivalJob" + JobLogger.init(jobName) + JobLogger.start(s"$jobName started executing", Option(Map("config" -> config, "model" -> jobName))) + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config) + implicit val spark: SparkSession = openSparkSession(jobConfig) + implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext() + val modelParams = jobConfig.modelParams.get + val deleteArchivedBatch: Boolean = modelParams.getOrElse("deleteArchivedBatch", false).asInstanceOf[Boolean] + init() + try { + + /** + * Below date and batchId configs are optional. By default, + * The Job will remove the records for the last week. + */ + val date = modelParams.getOrElse("date", null).asInstanceOf[String] + val batchIds = modelParams.getOrElse("batchIds", null).asInstanceOf[List[String]] + val archiveForLastWeek: Boolean = modelParams.getOrElse("archiveForLastWeek", true).asInstanceOf[Boolean] + + val res = if (deleteArchivedBatch) CommonUtil.time(removeRecords(date, Some(batchIds), archiveForLastWeek)) else CommonUtil.time(archiveData(date, Option(batchIds), archiveForLastWeek)) + JobLogger.end(s"$jobName completed execution", "SUCCESS", Option(Map("timeTaken" -> res._1, "archived_details" -> res._2, "total_archived_files" -> res._2.length))) + } catch { + case ex: Exception => + ex.printStackTrace() + JobLogger.end(s"$jobName completed execution with the error ${ex.getMessage}", "FAILED", None) + } finally { + frameworkContext.closeContext() + spark.close() + } + } + + def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = { + spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host"))) + } + + // $COVERAGE-ON$ + // date - yyyy-mm-dd, in string format + def archiveData(date: String, batchIds: Option[List[String]], archiveForLastWeek: Boolean)(implicit spark: SparkSession, config: JobConfig): Array[ArchivalMetrics] = { + // Get the assessment Data + val assessmentDF: DataFrame = getAssessmentData(spark, batchIds) + val period = getWeekAndYearVal(date, archiveForLastWeek) + //Get the Week Num & Year Value for Based on the updated_on value column + val assessmentData = assessmentDF.withColumn("updated_on", to_timestamp(col("updated_on"))) + .withColumn("year", year(col("updated_on"))) + .withColumn("week_of_year", weekofyear(col("updated_on"))) + .withColumn("question", to_json(col("question"))) + + /** + * The below filter is required, If we want to archive the data for a specific week of year and year + */ + val filteredAssessmentData = if (!isEmptyPeriod(period)) assessmentData.filter(col("year") === period.year).filter(col("week_of_year") === period.weekOfYear) else assessmentData + // Creating a batchId, year and weekOfYear Map from the filteredAssessmentData and loading into memory to iterate over the BatchId's + // Example - [BatchId, Year, WeekNumOfYear] -> Number of records + // 1. batch-001",2021,42 -> 5 + val archiveBatchList = filteredAssessmentData.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect() + val totalBatchesToArchive = new AtomicInteger(archiveBatchList.length) + JobLogger.log(s"Total Batches to Archive is $totalBatchesToArchive for a period $period", None, INFO) + // Loop through the batches to archive list + // Example Map - {"batch-001":[{"batchId":"batch-001","period":{"year":2021,"weekOfYear":42}}],"batch-004":[{"batchId":"batch-004","period":{"year":2021,"weekOfYear":42}}]} + val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], Period(f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int]))).groupBy(_.batchId) + val archivalStatus: Array[ArchivalMetrics] = batchesToArchive.flatMap(batches => { + val processingBatch = new AtomicInteger(batches._2.length) + JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> batches._2.length)), INFO) + // Loop through the week_num & year batch partition + val res = for (batch <- batches._2.asInstanceOf[Array[BatchPartition]]) yield { + val filteredDF = assessmentData.filter(col("batch_id") === batch.batchId && col("year") === batch.period.year && col("week_of_year") === batch.period.weekOfYear).select(columnWithOrder.head, columnWithOrder.tail: _*) + upload(filteredDF, batch) // Upload the archived files into blob store + val metrics = ArchivalMetrics(batchId = Some(batch.batchId), Period(year = batch.period.year, weekOfYear = batch.period.weekOfYear), + pendingWeeksOfYears = Some(processingBatch.getAndDecrement()), totalArchivedRecords = Some(filteredDF.count()), totalDeletedRecords = None, totalDistinctBatches = filteredDF.select("batch_id").distinct().count()) + JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(metrics), INFO) + metrics + } + JobLogger.log(s"${batches._1} is successfully archived", Some(Map("batch_id" -> batches._1, "pending_batches" -> totalBatchesToArchive.getAndDecrement())), INFO) + res + }).toArray + assessmentData.unpersist() + archivalStatus // List of metrics + } + + // Delete the records for the archived batch data. + // Date - YYYY-MM-DD Format + // Batch IDs are optional + def removeRecords(date: String, batchIds: Option[List[String]], archiveForLastWeek: Boolean)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Array[ArchivalMetrics] = { + val period: Period = getWeekAndYearVal(date, archiveForLastWeek) // Date is optional, By default it will provide the previous week num of current year + val res = if (batchIds.nonEmpty) { + for (batchId <- batchIds.getOrElse(List())) yield { + remove(period, Option(batchId)) + } + } else { + List(remove(period, None)) + } + res.toArray + } + + def remove(period: Period, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): ArchivalMetrics = { + val archivedDataDF = fetchArchivedBatches(period, batchId) + val archivedData = archivedDataDF.select("course_id", "batch_id", "user_id", "content_id", "attempt_id") + val totalArchivedRecords: Long = archivedData.count + val totalDistinctBatches: Long = archivedData.select("batch_id").distinct().count() + JobLogger.log(s"Deleting $totalArchivedRecords archived records only, for the year ${period.year} and week of year ${period.weekOfYear} from the DB ", None, INFO) + archivedData.rdd.deleteFromCassandra(AppConf.getConfig("sunbird.courses.keyspace"), AppConf.getConfig("sunbird.courses.assessment.table"), keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id")) + ArchivalMetrics(batchId = batchId, Period(year = period.year, weekOfYear = period.weekOfYear), + pendingWeeksOfYears = None, totalArchivedRecords = Some(totalArchivedRecords), totalDeletedRecords = Some(totalArchivedRecords), totalDistinctBatches = totalDistinctBatches) + } + + def fetchArchivedBatches(period: Period, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = { + val modelParams = config.modelParams.get + val azureFetcherConfig = modelParams.getOrElse("archivalFetcherConfig", Map()).asInstanceOf[Map[String, AnyRef]] + val store = azureFetcherConfig.getOrElse("store", "azure").asInstanceOf[String] + val format: String = azureFetcherConfig.getOrElse("blobExt", "csv.gz").asInstanceOf[String] + val filePath = azureFetcherConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String] + val container = azureFetcherConfig.getOrElse("container", "reports").asInstanceOf[String] + val blobFields = if (!isEmptyPeriod(period)) Map("year" -> period.year, "weekNum" -> period.weekOfYear, "batchId" -> batchId.orNull) + else Map("batchId" -> batchId.orNull) + JobLogger.log(s"Fetching a archived records only from the blob store", Some(Map("reportPath" -> filePath, "container" -> container) ++ blobFields), INFO) + ExhaustUtil.getArchivedData(store, filePath, container, blobFields, Some(format)) + } + + def getAssessmentData(spark: SparkSession, batchIds: Option[List[String]]): DataFrame = { + import spark.implicits._ + val assessmentDF = fetchData(spark, assessmentAggDBSettings, cassandraUrl, new StructType()) + val batchIdentifiers = batchIds.getOrElse(List()) + if (batchIdentifiers.nonEmpty) { + if (batchIdentifiers.size > 1) { + val batchListDF = batchIdentifiers.toDF("batch_id") + assessmentDF.join(batchListDF, Seq("batch_id"), "inner").persist() + } + else { + assessmentDF.filter(col("batch_id") === batchIdentifiers.head).persist() + } + } else { + assessmentDF + } + } + + def upload(archivedData: DataFrame, + batch: BatchPartition)(implicit jobConfig: JobConfig): List[String] = { + val modelParams = jobConfig.modelParams.get + val reportPath: String = modelParams.getOrElse("reportPath", "archived-data/").asInstanceOf[String] + val container = AppConf.getConfig("cloud.container.reports") + val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey") + val fileName = s"${batch.batchId}/${batch.period.year}-${batch.period.weekOfYear}" + val storageConfig = getStorageConfig( + container, + objectKey, + jobConfig) + JobLogger.log(s"Uploading reports to blob storage", None, INFO) + archivedData.saveToBlobStore(storageConfig = storageConfig, format = "csv", reportId = s"$reportPath$fileName-${System.currentTimeMillis()}", options = Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), partitioningColumns = None, fileExt = Some("csv.gz")) + } + + // Date - YYYY-MM-DD Format + def getWeekAndYearVal(date: String, archiveForLastWeek: Boolean): Period = { + if (archiveForLastWeek) { + val today = new DateTime() + val lastWeek = today.minusWeeks(1) // Get always for the previous week of the current + Period(year = lastWeek.getYear, weekOfYear = lastWeek.getWeekOfWeekyear) + } else { + if (null != date && date.nonEmpty) { + val dt = new DateTime(date) + Period(year = dt.getYear, weekOfYear = dt.getWeekOfWeekyear) + } else { + Period(0, 0) + } + } + } + + def isEmptyPeriod(period: Period): Boolean = { + if (period.year == 0 && period.weekOfYear == 0) true else false + } + +} diff --git a/data-products/src/test/resources/application.conf b/data-products/src/test/resources/application.conf index 8f53f38fb..9c544c143 100644 --- a/data-products/src/test/resources/application.conf +++ b/data-products/src/test/resources/application.conf @@ -164,6 +164,7 @@ sunbird.report.cluster.host=127.0.0.1 sunbird.user.report.keyspace="sunbird_courses" collection.exhaust.store.prefix="reports" postgres.table.job_request="job_request" +sunbird.courses.assessment.table = "assessment_aggregator" druid.report.default.storage="local" druid.report.date.format="yyyy-MM-dd" diff --git a/data-products/src/test/resources/assessment-archival/archival-data/batch-001/2021-33-1629367970070.csv.gz b/data-products/src/test/resources/assessment-archival/archival-data/batch-001/2021-33-1629367970070.csv.gz new file mode 100644 index 000000000..28898c629 Binary files /dev/null and b/data-products/src/test/resources/assessment-archival/archival-data/batch-001/2021-33-1629367970070.csv.gz differ diff --git a/data-products/src/test/resources/assessment-archival/archival-data/batch-004/2021-33-1629367971867.csv.gz b/data-products/src/test/resources/assessment-archival/archival-data/batch-004/2021-33-1629367971867.csv.gz new file mode 100644 index 000000000..b5cc9276d Binary files /dev/null and b/data-products/src/test/resources/assessment-archival/archival-data/batch-004/2021-33-1629367971867.csv.gz differ diff --git a/data-products/src/test/resources/assessment-archival/assessment_agg.cql b/data-products/src/test/resources/assessment-archival/assessment_agg.cql new file mode 100644 index 000000000..88d520260 --- /dev/null +++ b/data-products/src/test/resources/assessment-archival/assessment_agg.cql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS sunbird_courses.assessment_aggregator ( + user_id text, + course_id text, + batch_id text, + content_id text, + attempt_id text, + created_on timestamp, + grand_total text, + last_attempted_on timestamp, + question list>, + total_max_score double, + total_score double, + updated_on timestamp, + PRIMARY KEY ((user_id, course_id), batch_id, content_id, attempt_id) +); + +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-001', 'do_1128870328040161281204', 'attempat-001', '20', 20, 20, [{id: 'do_213019475454476288155', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-001', 'do_1128870328040161281204', 'attempat-001', '20', 20, 20, [{id: 'do_213019475454476288155', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2020-06-18T18:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-002', 'do_1128870328040161281204', 'attempat-001', '10', 10, 10, toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-003', 'do_112876961957437440179', 'attempat-001', '10', 10, 10, toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_1130928636168192001667', 'batch-001', 'user-003', 'do_112876961957437440179', 'attempat-001', '10', 10, 10, toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_11306040245271756813015', 'batch-001', 'user-008', 'do_112876961957437440179', 'attempat-001', '10', 10, 10, toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_11306040245271756813015', 'batch-001', 'user-010', 'do_11307593493010022418', 'attempat-001', '15', 15, 15, toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, updated_on) VALUES ('do_112835334818643968148', 'batch-004', 'user-014', 'do_11307593493010022418', 'attempat-001', '15', 15, 15, toTimeStamp(toDate(now()))); \ No newline at end of file diff --git a/data-products/src/test/resources/assessment-archival/assessment_aggregator.csv b/data-products/src/test/resources/assessment-archival/assessment_aggregator.csv new file mode 100644 index 000000000..088edc97e --- /dev/null +++ b/data-products/src/test/resources/assessment-archival/assessment_aggregator.csv @@ -0,0 +1,9 @@ +content_id,attempt_id,user_id,course_id,batch_id,created_on,last_attempted_on,total_max_score,total_score,updated_on,grand_total,question +do_112835335135993856149,A3,user030,do_1125559882615357441175,1010,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"10/2","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112835335135993856149,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,20,4,2019-09-06 09:59:51.000+0000,"2/2","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112835336280596480151,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:58:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112835336280596480151,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:57:51.000+0000,30,8,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A1,user015,do_112695422838472704115,1005,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A2,user030,do_1125559882615357441175,1010,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A6,user026,do_1126458775024025601296,1006,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"5/5","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" +do_112832394979106816112,A6,user021,do_1126458775024025601296,1006,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"6/6","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]" \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/analytics/job/report/TestAssessmentArchivalJob.scala b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestAssessmentArchivalJob.scala new file mode 100644 index 000000000..00fbe2fdb --- /dev/null +++ b/data-products/src/test/scala/org/sunbird/analytics/job/report/TestAssessmentArchivalJob.scala @@ -0,0 +1,63 @@ +package org.sunbird.analytics.job.report + + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils} +import org.ekstep.analytics.framework.{FrameworkContext, JobConfig} +import org.scalamock.scalatest.MockFactory +import org.sunbird.analytics.job.report.AssessmentArchivalJob.{Period, getWeekAndYearVal} +import org.sunbird.analytics.util.EmbeddedCassandra + + +class TestAssessmentArchivalJob extends BaseReportSpec with MockFactory { + + implicit var spark: SparkSession = _ + + var assessmentAggDF: DataFrame = _ + var reporterMock: BaseReportsJob = mock[BaseReportsJob] + val sunbirdCoursesKeyspace = "sunbird_courses" + + override def beforeAll(): Unit = { + super.beforeAll() + spark = getSparkSession(); + EmbeddedCassandra.loadData("src/test/resources/assessment-archival/assessment_agg.cql") // Load test data in embedded cassandra server + } + + override def afterAll(): Unit = { + super.afterAll() + val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey") + new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, objectKey + "assessment-archival") + } + + + it should "Should able to archive the batch data for a specific date" in { + implicit val mockFc: FrameworkContext = mock[FrameworkContext] + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.AssessmentArchivalJob","modelParams":{"deleteArchivedBatch":false,"store":"local","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Assessment Archival Job"}""".stripMargin + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](strConfig) + val todayDate: String = java.time.LocalDate.now.toString + val reportData = AssessmentArchivalJob.archiveData(date = todayDate, batchIds = None, archiveForLastWeek = false) + val period: Period = getWeekAndYearVal(todayDate, archiveForLastWeek = false) + + val batch_1 = reportData.filter(x => x.batchId.getOrElse("") === "batch-001") + batch_1.foreach(res => res.period.year should be(period.year)) + batch_1.foreach(res => res.totalArchivedRecords.get should be(5)) + batch_1.foreach(res => res.period.weekOfYear should be(period.weekOfYear)) + + val batch_2 = reportData.filter(x => x.batchId.getOrElse("") === "batch-004") + batch_2.foreach(res => res.period.year should be(period.year)) + batch_2.foreach(res => res.totalArchivedRecords.get should be (1)) + batch_2.foreach(res => res.period.weekOfYear should be(period.weekOfYear)) + } + + it should "Should able to fetch the archived records from the azure and delete the records" in { + implicit val mockFc: FrameworkContext = mock[FrameworkContext] + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.AssessmentArchivalJob","modelParams":{"archivalFetcherConfig":{"store":"local","format":"csv.gz","reportPath":"src/test/resources/assessment-archival/archival-data/","container":""},"deleteArchivedBatch":true,"sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Assessment Archival Job"}""".stripMargin + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](strConfig) + val reportData = AssessmentArchivalJob.removeRecords(date = "2021-08-18", None, archiveForLastWeek = false) + reportData.head.totalDeletedRecords.get should be(6) + } + + + +} \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/analytics/util/BaseSpec.scala b/data-products/src/test/scala/org/sunbird/analytics/util/BaseSpec.scala index 65ea98b09..a6bcc2c91 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/util/BaseSpec.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/util/BaseSpec.scala @@ -26,6 +26,7 @@ class BaseSpec extends FlatSpec with Matchers with BeforeAndAfterAll { conf.set("spark.redis.host", "localhost") conf.set("spark.redis.port", "6341") conf.set("spark.redis.db", "0") + conf.set("spark.cassandra.output.concurrent.writes", "12") conf; }