Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SB-24793 - Assessment archival Job Implementation #452

Open
wants to merge 57 commits into
base: release-4.4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
a022ddb
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
fd2ac9c
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
b42df8f
Issue SB-24793 feat: removed the hardcoded column names in groupBy
manjudr Jul 14, 2021
4ca0b4c
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
b6d3733
Revert "Issue SB-24793 feat: Assessment data archival data product j…
manjudr Jul 14, 2021
f4ca1cb
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
acd9a11
Issue SB-24793 feat: Assessment data archival cassandra connection …
manjudr Jul 14, 2021
9d75256
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
3bb90fb
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
358b96a
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
e46e0a1
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
423f977
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
9cea5eb
Issue SB-24793 feat: removing the unwanted csv files
manjudr Jul 16, 2021
ec710cd
Issue SB-24793 feat: removing the unwanted imports
manjudr Jul 16, 2021
e29b23e
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
7e4ea76
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
b32d1e6
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
22f76e9
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
ffa1c5f
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
5db16a1
Issue SB-25481 feat: Assessment archival data product cache issue fix
manjudr Jul 19, 2021
df8cf7e
Issue SB-25481 feat: Assessment archival data product cache issue fix
manjudr Jul 20, 2021
1f19060
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
a3245f8
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
4380e66
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
acff429
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
e96faaf
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
94b6269
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
10c2069
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
0d0ae86
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
753d763
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
2b467d4
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
ce5b0c8
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
00fdc7c
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
9d0e8b1
Issue SB-24793 feat: Assessment data archival configuration update
manjudr Aug 9, 2021
41b1d72
Issue SB-24793 feat: Assessment data archival configuration update
manjudr Aug 9, 2021
0c4b0f7
Issue SB-24793 feat: Assessment data archival configuration update
manjudr Aug 11, 2021
d156bea
Issue SB-24793 feat: Assessment data archival configuration update
manjudr Aug 11, 2021
76254c6
Issue SB-24793 feat: deleting the records after archival
manjudr Aug 11, 2021
f7b74cd
Issue SB-24793 feat: deleting the records after archival
manjudr Aug 12, 2021
d2a8573
Issue SB-24793 feat: Assessment archival compress the report as csv.…
manjudr Aug 13, 2021
020874f
Issue SB-24793 feat: Assessment archival to remove the archived records
manjudr Aug 16, 2021
0425f1c
Issue SB-24793 feat: Assessment archived data removal logic
manjudr Aug 19, 2021
ca2d590
Issue SB-24793 feat: Assessment archived data removal config update
manjudr Aug 22, 2021
acbbd19
Issue SB-24793 feat:
manjudr Sep 22, 2021
114f7fe
Issue SB-24793 feat: Updated the testcase and renamed the variables
manjudr Sep 22, 2021
74aa110
Issue SB-24793 feat: Enabled the archival job to run for a specific b…
manjudr Sep 23, 2021
5a0d0d6
Issue SB-24793 feat: Enabled the archival job to run for a specific b…
manjudr Sep 23, 2021
542df51
Merge remote-tracking branch 'upstream/release-4.3.0' into assessment…
manjudr Oct 7, 2021
4567cc4
Merge branch 'release-4.4.0' of github.com:Sunbird-Ed/sunbird-data-pr…
manjudr Oct 8, 2021
ba954e5
Issue SB-24793 feat: fixed the review comments changes - added sepera…
manjudr Oct 22, 2021
bcd9c6e
Issue SB-24793 feat: Assessment archival, fixed the review comments c…
manjudr Oct 22, 2021
c2456ee
Issue SB-24793 feat: Assessment archival, fixed the review comments c…
manjudr Oct 22, 2021
700d3fc
Issue SB-24793 feat: Assessment archival fixed the review comments
manjudr Oct 22, 2021
d04fd0d
Issue SB-24793 feat: Adding the test csv files
manjudr Oct 25, 2021
d143578
Issue SB-24793 feat: Assessment archived data:: Review comments resolved
utk14 Oct 25, 2021
d87113c
Issue SB-24793 feat: Assessment archived data:: Review comments resolved
utk14 Oct 25, 2021
04d027e
Issue SB-24793 feat: Assessment archived data:: Review comments resolved
utk14 Oct 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.sunbird.analytics.exhaust.util


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.FrameworkContext
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{CommonUtil, JobLogger}

object ExhaustUtil {

def getArchivedData(store: String, filePath: String, bucket: String, blobFields: Map[String, Any], fileFormat: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = {
val filteredBlobFields = blobFields.filter(_._2 != null)
val format = fileFormat.getOrElse("csv.gz")
val batchId = filteredBlobFields.getOrElse("batchId", "*").toString()
val year = filteredBlobFields.getOrElse("year", "*")
val weekNum = filteredBlobFields.getOrElse("weekNum", "*").toString()

val file: String = s"${filePath}${batchId}/${year}-${weekNum}-*.${format}"
val url = CommonUtil.getBlobUrl(store, file, bucket)

JobLogger.log(s"Fetching data from ${store} ")(new String())
fetch(url, "csv")
}

def fetch(url: String, format: String)(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = {
spark.read.format(format).option("header", "true").load(url)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package org.sunbird.analytics.job.report

import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.{SomeColumns, toRDDFunctions}
import org.apache.spark.SparkContext
import org.apache.spark.sql.cassandra.CassandraSparkSessionFunctions
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.Level.INFO
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig}
import org.joda.time.DateTime
import org.sunbird.analytics.exhaust.util.ExhaustUtil

import java.util.concurrent.atomic.AtomicInteger

object AssessmentArchivalJob extends optional.Application with IJob with BaseReportsJob {
val cassandraUrl = "org.apache.spark.sql.cassandra"
private val assessmentAggDBSettings: Map[String, String] = Map("table" -> AppConf.getConfig("sunbird.courses.assessment.table"), "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster")
implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob"
private val partitionCols = List("batch_id", "year", "week_of_year")
private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question")

case class Period(year: Int, weekOfYear: Int)

case class BatchPartition(batchId: String, period: Period)


case class ArchivalMetrics(batchId: Option[String],
period: Period,
totalArchivedRecords: Option[Long],
pendingWeeksOfYears: Option[Long],
totalDeletedRecords: Option[Long],
totalDistinctBatches: Long
)


// $COVERAGE-OFF$ Disabling scoverage for main and execute method
override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = {
implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob"
val jobName = "AssessmentArchivalJob"
JobLogger.init(jobName)
JobLogger.start(s"$jobName started executing", Option(Map("config" -> config, "model" -> jobName)))
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config)
implicit val spark: SparkSession = openSparkSession(jobConfig)
implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext()
val modelParams = jobConfig.modelParams.get
val deleteArchivedBatch: Boolean = modelParams.getOrElse("deleteArchivedBatch", false).asInstanceOf[Boolean]
init()
try {

/**
* Below date and batchId configs are optional. By default,
* The Job will remove the records for the last week.
*/
val date = modelParams.getOrElse("date", null).asInstanceOf[String]
val batchIds = modelParams.getOrElse("batchIds", null).asInstanceOf[List[String]]
val archiveForLastWeek: Boolean = modelParams.getOrElse("archiveForLastWeek", true).asInstanceOf[Boolean]

val res = if (deleteArchivedBatch) CommonUtil.time(removeRecords(date, Some(batchIds), archiveForLastWeek)) else CommonUtil.time(archiveData(date, Option(batchIds), archiveForLastWeek))
JobLogger.end(s"$jobName completed execution", "SUCCESS", Option(Map("timeTaken" -> res._1, "archived_details" -> res._2, "total_archived_files" -> res._2.length)))
} catch {
case ex: Exception =>
ex.printStackTrace()
JobLogger.end(s"$jobName completed execution with the error ${ex.getMessage}", "FAILED", None)
} finally {
frameworkContext.closeContext()
spark.close()
}
}

def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host")))
}

// $COVERAGE-ON$
// date - yyyy-mm-dd, in string format
def archiveData(date: String, batchIds: Option[List[String]], archiveForLastWeek: Boolean)(implicit spark: SparkSession, config: JobConfig): Array[ArchivalMetrics] = {
// Get the assessment Data
val assessmentDF: DataFrame = getAssessmentData(spark, batchIds)
val period = getWeekAndYearVal(date, archiveForLastWeek)
//Get the Week Num & Year Value for Based on the updated_on value column
val assessmentData = assessmentDF.withColumn("updated_on", to_timestamp(col("updated_on")))
.withColumn("year", year(col("updated_on")))
.withColumn("week_of_year", weekofyear(col("updated_on")))
.withColumn("question", to_json(col("question")))

/**
* The below filter is required, If we want to archive the data for a specific week of year and year
*/
val filteredAssessmentData = if (!isEmptyPeriod(period)) assessmentData.filter(col("year") === period.year).filter(col("week_of_year") === period.weekOfYear) else assessmentData
// Creating a batchId, year and weekOfYear Map from the filteredAssessmentData and loading into memory to iterate over the BatchId's
// Example - [BatchId, Year, WeekNumOfYear] -> Number of records
// 1. batch-001",2021,42 -> 5
val archiveBatchList = filteredAssessmentData.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect()
manjudr marked this conversation as resolved.
Show resolved Hide resolved
val totalBatchesToArchive = new AtomicInteger(archiveBatchList.length)
JobLogger.log(s"Total Batches to Archive is $totalBatchesToArchive for a period $period", None, INFO)
// Loop through the batches to archive list
// Example Map - {"batch-001":[{"batchId":"batch-001","period":{"year":2021,"weekOfYear":42}}],"batch-004":[{"batchId":"batch-004","period":{"year":2021,"weekOfYear":42}}]}
val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], Period(f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int]))).groupBy(_.batchId)
val archivalStatus: Array[ArchivalMetrics] = batchesToArchive.flatMap(batches => {
val processingBatch = new AtomicInteger(batches._2.length)
JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> batches._2.length)), INFO)
// Loop through the week_num & year batch partition
val res = for (batch <- batches._2.asInstanceOf[Array[BatchPartition]]) yield {
val filteredDF = assessmentData.filter(col("batch_id") === batch.batchId && col("year") === batch.period.year && col("week_of_year") === batch.period.weekOfYear).select(columnWithOrder.head, columnWithOrder.tail: _*)
upload(filteredDF, batch) // Upload the archived files into blob store
val metrics = ArchivalMetrics(batchId = Some(batch.batchId), Period(year = batch.period.year, weekOfYear = batch.period.weekOfYear),
pendingWeeksOfYears = Some(processingBatch.getAndDecrement()), totalArchivedRecords = Some(filteredDF.count()), totalDeletedRecords = None, totalDistinctBatches = filteredDF.select("batch_id").distinct().count())
JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(metrics), INFO)
metrics
}
JobLogger.log(s"${batches._1} is successfully archived", Some(Map("batch_id" -> batches._1, "pending_batches" -> totalBatchesToArchive.getAndDecrement())), INFO)
res
}).toArray
assessmentData.unpersist()
archivalStatus // List of metrics
}

// Delete the records for the archived batch data.
// Date - YYYY-MM-DD Format
// Batch IDs are optional
def removeRecords(date: String, batchIds: Option[List[String]], archiveForLastWeek: Boolean)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Array[ArchivalMetrics] = {
val period: Period = getWeekAndYearVal(date, archiveForLastWeek) // Date is optional, By default it will provide the previous week num of current year
val res = if (batchIds.nonEmpty) {
for (batchId <- batchIds.getOrElse(List())) yield {
remove(period, Option(batchId))
}
} else {
List(remove(period, None))
}
res.toArray
}

def remove(period: Period, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): ArchivalMetrics = {
val archivedDataDF = fetchArchivedBatches(period, batchId)
val archivedData = archivedDataDF.select("course_id", "batch_id", "user_id", "content_id", "attempt_id")
val totalArchivedRecords: Long = archivedData.count
val totalDistinctBatches: Long = archivedData.select("batch_id").distinct().count()
JobLogger.log(s"Deleting $totalArchivedRecords archived records only, for the year ${period.year} and week of year ${period.weekOfYear} from the DB ", None, INFO)
archivedData.rdd.deleteFromCassandra(AppConf.getConfig("sunbird.courses.keyspace"), AppConf.getConfig("sunbird.courses.assessment.table"), keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id"))
ArchivalMetrics(batchId = batchId, Period(year = period.year, weekOfYear = period.weekOfYear),
pendingWeeksOfYears = None, totalArchivedRecords = Some(totalArchivedRecords), totalDeletedRecords = Some(totalArchivedRecords), totalDistinctBatches = totalDistinctBatches)
}

def fetchArchivedBatches(period: Period, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = {
val modelParams = config.modelParams.get
val azureFetcherConfig = modelParams.getOrElse("archivalFetcherConfig", Map()).asInstanceOf[Map[String, AnyRef]]
val store = azureFetcherConfig.getOrElse("store", "azure").asInstanceOf[String]
val format: String = azureFetcherConfig.getOrElse("blobExt", "csv.gz").asInstanceOf[String]
val filePath = azureFetcherConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String]
val container = azureFetcherConfig.getOrElse("container", "reports").asInstanceOf[String]
val blobFields = if (!isEmptyPeriod(period)) Map("year" -> period.year, "weekNum" -> period.weekOfYear, "batchId" -> batchId.orNull)
else Map("batchId" -> batchId.orNull)
JobLogger.log(s"Fetching a archived records only from the blob store", Some(Map("reportPath" -> filePath, "container" -> container) ++ blobFields), INFO)
ExhaustUtil.getArchivedData(store, filePath, container, blobFields, Some(format))
}

def getAssessmentData(spark: SparkSession, batchIds: Option[List[String]]): DataFrame = {
import spark.implicits._
val assessmentDF = fetchData(spark, assessmentAggDBSettings, cassandraUrl, new StructType())
val batchIdentifiers = batchIds.getOrElse(List())
if (batchIdentifiers.nonEmpty) {
if (batchIdentifiers.size > 1) {
val batchListDF = batchIdentifiers.toDF("batch_id")
assessmentDF.join(batchListDF, Seq("batch_id"), "inner").persist()
}
else {
assessmentDF.filter(col("batch_id") === batchIdentifiers.head).persist()
}
} else {
assessmentDF
}
}

def upload(archivedData: DataFrame,
batch: BatchPartition)(implicit jobConfig: JobConfig): List[String] = {
val modelParams = jobConfig.modelParams.get
val reportPath: String = modelParams.getOrElse("reportPath", "archived-data/").asInstanceOf[String]
val container = AppConf.getConfig("cloud.container.reports")
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey")
val fileName = s"${batch.batchId}/${batch.period.year}-${batch.period.weekOfYear}"
val storageConfig = getStorageConfig(
container,
objectKey,
jobConfig)
JobLogger.log(s"Uploading reports to blob storage", None, INFO)
archivedData.saveToBlobStore(storageConfig = storageConfig, format = "csv", reportId = s"$reportPath$fileName-${System.currentTimeMillis()}", options = Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), partitioningColumns = None, fileExt = Some("csv.gz"))
}

// Date - YYYY-MM-DD Format
def getWeekAndYearVal(date: String, archiveForLastWeek: Boolean): Period = {
if (archiveForLastWeek) {
val today = new DateTime()
val lastWeek = today.minusWeeks(1) // Get always for the previous week of the current
Period(year = lastWeek.getYear, weekOfYear = lastWeek.getWeekOfWeekyear)
} else {
if (null != date && date.nonEmpty) {
val dt = new DateTime(date)
Period(year = dt.getYear, weekOfYear = dt.getWeekOfWeekyear)
} else {
Period(0, 0)
}
}
}

def isEmptyPeriod(period: Period): Boolean = {
if (period.year == 0 && period.weekOfYear == 0) true else false
}

}
1 change: 1 addition & 0 deletions data-products/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ sunbird.report.cluster.host=127.0.0.1
sunbird.user.report.keyspace="sunbird_courses"
collection.exhaust.store.prefix="reports"
postgres.table.job_request="job_request"
sunbird.courses.assessment.table = "assessment_aggregator"

druid.report.default.storage="local"
druid.report.date.format="yyyy-MM-dd"
Expand Down
Binary file not shown.
Binary file not shown.
Loading