-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SB-24793 - Assessment archival Job Implementation #452
base: release-4.4.0
Are you sure you want to change the base?
Changes from 53 commits
a022ddb
fd2ac9c
b42df8f
4ca0b4c
b6d3733
f4ca1cb
acd9a11
9d75256
3bb90fb
358b96a
e46e0a1
423f977
9cea5eb
ec710cd
e29b23e
7e4ea76
b32d1e6
22f76e9
ffa1c5f
5db16a1
df8cf7e
1f19060
a3245f8
4380e66
acff429
e96faaf
94b6269
10c2069
0d0ae86
753d763
2b467d4
ce5b0c8
00fdc7c
9d0e8b1
41b1d72
0c4b0f7
d156bea
76254c6
f7b74cd
d2a8573
020874f
0425f1c
ca2d590
acbbd19
114f7fe
74aa110
5a0d0d6
542df51
4567cc4
ba954e5
bcd9c6e
c2456ee
700d3fc
d04fd0d
d143578
d87113c
04d027e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package org.sunbird.analytics.exhaust.util | ||
|
||
|
||
import org.apache.spark.sql.{DataFrame, SparkSession} | ||
import org.ekstep.analytics.framework.FrameworkContext | ||
import org.ekstep.analytics.framework.conf.AppConf | ||
import org.ekstep.analytics.framework.util.JobLogger | ||
|
||
object ExhaustUtil { | ||
|
||
def getArchivedData(store: String, filePath: String, bucket: String, blobFields: Map[String, Any], fileFormat: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = { | ||
val filteredBlobFields = blobFields.filter(_._2 != null) | ||
val format = fileFormat.getOrElse("csv.gz") | ||
val batchId = filteredBlobFields.getOrElse("batchId", "*") | ||
val year = filteredBlobFields.getOrElse("year", "*") | ||
val weekNumb = filteredBlobFields.getOrElse("weekNum", "*") | ||
|
||
|
||
val url = store match { | ||
case "local" => | ||
filePath + s"${batchId}/${year}-${weekNumb}-*.${format}" | ||
// $COVERAGE-OFF$ for azure testing | ||
case "azure" => | ||
val key = AppConf.getConfig("azure_storage_key") | ||
val file = s"${filePath}${batchId}/${year}-${weekNumb}-*.${format}" | ||
s"wasb://$bucket@$key.blob.core.windows.net/$file" | ||
case "s3" => | ||
//TODO - Need to support the S3 As well. | ||
throw new Exception("s3 is currently not supported.") | ||
// $COVERAGE-ON$ | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this to a CommonUtil method and add support for S3 and GCP. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have added the support for S3. Can we take up GCP support as an enhacement in next release? Dependent Core PR: https://github.com/project-sunbird/sunbird-analytics-core/pull/116/files |
||
|
||
JobLogger.log(s"Fetching data from ${store} ")(new String()) | ||
fetch(url, "csv") | ||
} | ||
|
||
def fetch(url: String, format: String)(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = { | ||
spark.read.format(format).option("header", "true").load(url) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
package org.sunbird.analytics.job.report | ||
|
||
import com.datastax.spark.connector.cql.CassandraConnectorConf | ||
import com.datastax.spark.connector.{SomeColumns, toRDDFunctions} | ||
import org.apache.spark.SparkContext | ||
import org.apache.spark.sql.cassandra.CassandraSparkSessionFunctions | ||
import org.apache.spark.sql.functions._ | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.{DataFrame, SparkSession} | ||
import org.ekstep.analytics.framework.Level.INFO | ||
import org.ekstep.analytics.framework.conf.AppConf | ||
import org.ekstep.analytics.framework.util.DatasetUtil.extensions | ||
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger} | ||
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig} | ||
import org.joda.time.DateTime | ||
import org.sunbird.analytics.exhaust.util.ExhaustUtil | ||
|
||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
object AssessmentArchivalJob extends optional.Application with IJob with BaseReportsJob { | ||
val cassandraUrl = "org.apache.spark.sql.cassandra" | ||
private val assessmentAggDBSettings: Map[String, String] = Map("table" -> AppConf.getConfig("sunbird.courses.assessment.table"), "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster") | ||
implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob" | ||
private val partitionCols = List("batch_id", "year", "week_of_year") | ||
private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question") | ||
|
||
case class Period(year: Int, weekOfYear: Int) | ||
|
||
case class BatchPartition(batchId: String, period: Period) | ||
|
||
|
||
case class ArchivalMetrics(batchId: Option[String], | ||
period: Period, | ||
totalArchivedRecords: Option[Long], | ||
pendingWeeksOfYears: Option[Long], | ||
totalDeletedRecords: Option[Long], | ||
totalDistinctBatches: Long | ||
) | ||
|
||
|
||
// $COVERAGE-OFF$ Disabling scoverage for main and execute method | ||
override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { | ||
implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob" | ||
val jobName = "AssessmentArchivalJob" | ||
JobLogger.init(jobName) | ||
JobLogger.start(s"$jobName started executing", Option(Map("config" -> config, "model" -> jobName))) | ||
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config) | ||
implicit val spark: SparkSession = openSparkSession(jobConfig) | ||
implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext() | ||
val modelParams = jobConfig.modelParams.get | ||
val deleteArchivedBatch: Boolean = modelParams.getOrElse("deleteArchivedBatch", false).asInstanceOf[Boolean] | ||
init() | ||
try { | ||
|
||
/** | ||
* Below date and batchId configs are optional. By default, | ||
* The Job will remove the records for the last week. | ||
*/ | ||
val date = modelParams.getOrElse("date", null).asInstanceOf[String] | ||
val batchIds = modelParams.getOrElse("batchIds", null).asInstanceOf[List[String]] | ||
val archiveForLastWeek: Boolean = modelParams.getOrElse("archiveForLastWeek", true).asInstanceOf[Boolean] | ||
|
||
val res = if (deleteArchivedBatch) CommonUtil.time(removeRecords(date, Some(batchIds), archiveForLastWeek)) else CommonUtil.time(archiveData(date, Option(batchIds), archiveForLastWeek)) | ||
JobLogger.end(s"$jobName completed execution", "SUCCESS", Option(Map("timeTaken" -> res._1, "archived_details" -> res._2, "total_archived_files" -> res._2.length))) | ||
} catch { | ||
case ex: Exception => | ||
ex.printStackTrace() | ||
JobLogger.end(s"$jobName completed execution with the error ${ex.getMessage}", "FAILED", None) | ||
} finally { | ||
frameworkContext.closeContext() | ||
spark.close() | ||
} | ||
} | ||
|
||
def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = { | ||
spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host"))) | ||
} | ||
|
||
// $COVERAGE-ON$ | ||
// date - yyyy-mm-dd, in string format | ||
def archiveData(date: String, batchIds: Option[List[String]], archiveForLastWeek: Boolean)(implicit spark: SparkSession, config: JobConfig): Array[ArchivalMetrics] = { | ||
// Get the assessment Data | ||
val assessmentDF: DataFrame = getAssessmentData(spark, batchIds) | ||
val period = getWeekAndYearVal(date, archiveForLastWeek) | ||
//Get the Week Num & Year Value for Based on the updated_on value column | ||
val assessmentData = assessmentDF.withColumn("updated_on", to_timestamp(col("updated_on"))) | ||
.withColumn("year", year(col("updated_on"))) | ||
.withColumn("week_of_year", weekofyear(col("updated_on"))) | ||
.withColumn("question", to_json(col("question"))) | ||
|
||
/** | ||
* The below filter is required, If we want to archive the data for a specific week of year and year | ||
*/ | ||
val filteredAssessmentData = if (!isEmptyPeriod(period)) assessmentData.filter(col("year") === period.year).filter(col("week_of_year") === period.weekOfYear) else assessmentData | ||
// Creating a batchId, year and weekOfYear Map from the filteredAssessmentData and loading into memory to iterate over the BatchId's | ||
// Example - [BatchId, Year, WeekNumOfYear] -> Number of records | ||
// 1. batch-001",2021,42 -> 5 | ||
val archiveBatchList = filteredAssessmentData.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect() | ||
manjudr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val totalBatchesToArchive = new AtomicInteger(archiveBatchList.length) | ||
JobLogger.log(s"Total Batches to Archive is $totalBatchesToArchive for a period $period", None, INFO) | ||
// Loop through the batches to archive list | ||
// Example Map - {"batch-001":[{"batchId":"batch-001","period":{"year":2021,"weekOfYear":42}}],"batch-004":[{"batchId":"batch-004","period":{"year":2021,"weekOfYear":42}}]} | ||
val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], Period(f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int]))).groupBy(_.batchId) | ||
val archivalStatus: Array[ArchivalMetrics] = batchesToArchive.flatMap(batches => { | ||
val processingBatch = new AtomicInteger(batches._2.length) | ||
JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> batches._2.length)), INFO) | ||
// Loop through the week_num & year batch partition | ||
val res = for (batch <- batches._2.asInstanceOf[Array[BatchPartition]]) yield { | ||
val filteredDF = assessmentData.filter(col("batch_id") === batch.batchId && col("year") === batch.period.year && col("week_of_year") === batch.period.weekOfYear).select(columnWithOrder.head, columnWithOrder.tail: _*) | ||
upload(filteredDF, batch) // Upload the archived files into blob store | ||
val metrics = ArchivalMetrics(batchId = Some(batch.batchId), Period(year = batch.period.year, weekOfYear = batch.period.weekOfYear), | ||
pendingWeeksOfYears = Some(processingBatch.getAndDecrement()), totalArchivedRecords = Some(filteredDF.count()), totalDeletedRecords = None, totalDistinctBatches = filteredDF.select("batch_id").distinct().count()) | ||
JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(metrics), INFO) | ||
metrics | ||
} | ||
JobLogger.log(s"${batches._1} is successfully archived", Some(Map("batch_id" -> batches._1, "pending_batches" -> totalBatchesToArchive.getAndDecrement())), INFO) | ||
res | ||
}).toArray | ||
assessmentData.unpersist() | ||
archivalStatus // List of metrics | ||
} | ||
|
||
// Delete the records for the archived batch data. | ||
// Date - YYYY-MM-DD Format | ||
// Batch IDs are optional | ||
def removeRecords(date: String, batchIds: Option[List[String]], archiveForLastWeek: Boolean)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Array[ArchivalMetrics] = { | ||
val period: Period = getWeekAndYearVal(date, archiveForLastWeek) // Date is optional, By default it will provide the previous week num of current year | ||
val res = if (batchIds.nonEmpty) { | ||
for (batchId <- batchIds.getOrElse(List())) yield { | ||
remove(period, Option(batchId)) | ||
} | ||
} else { | ||
List(remove(period, None)) | ||
} | ||
res.toArray | ||
} | ||
|
||
def remove(period: Period, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): ArchivalMetrics = { | ||
val archivedDataDF = fetchArchivedBatches(period, batchId) | ||
val archivedData = archivedDataDF.select("course_id", "batch_id", "user_id", "content_id", "attempt_id") | ||
val totalArchivedRecords: Long = archivedData.count | ||
val totalDistinctBatches: Long = archivedData.select("batch_id").distinct().count() | ||
JobLogger.log(s"Deleting $totalArchivedRecords archived records only, for the year ${period.year} and week of year ${period.weekOfYear} from the DB ", None, INFO) | ||
archivedData.rdd.deleteFromCassandra(AppConf.getConfig("sunbird.courses.keyspace"), AppConf.getConfig("sunbird.courses.assessment.table"), keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id")) | ||
ArchivalMetrics(batchId = batchId, Period(year = period.year, weekOfYear = period.weekOfYear), | ||
pendingWeeksOfYears = None, totalArchivedRecords = Some(totalArchivedRecords), totalDeletedRecords = Some(totalArchivedRecords), totalDistinctBatches = totalDistinctBatches) | ||
} | ||
|
||
def fetchArchivedBatches(period: Period, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = { | ||
val modelParams = config.modelParams.get | ||
val azureFetcherConfig = modelParams.getOrElse("archivalFetcherConfig", Map()).asInstanceOf[Map[String, AnyRef]] | ||
val store = azureFetcherConfig.getOrElse("store", "azure").asInstanceOf[String] | ||
val format: String = azureFetcherConfig.getOrElse("blobExt", "csv.gz").asInstanceOf[String] | ||
val filePath = azureFetcherConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String] | ||
val container = azureFetcherConfig.getOrElse("container", "reports").asInstanceOf[String] | ||
val blobFields = if (!isEmptyPeriod(period)) Map("year" -> period.year, "weekNum" -> period.weekOfYear, "batchId" -> batchId.orNull) | ||
else Map("batchId" -> batchId.orNull) | ||
JobLogger.log(s"Fetching a archived records only from the blob store", Some(Map("reportPath" -> filePath, "container" -> container) ++ blobFields), INFO) | ||
ExhaustUtil.getArchivedData(store, filePath, container, blobFields, Some(format)) | ||
} | ||
|
||
def getAssessmentData(spark: SparkSession, batchIds: Option[List[String]]): DataFrame = { | ||
import spark.implicits._ | ||
val assessmentDF = fetchData(spark, assessmentAggDBSettings, cassandraUrl, new StructType()) | ||
val batchIdentifiers = batchIds.getOrElse(List()) | ||
if (batchIdentifiers.nonEmpty) { | ||
if (batchIdentifiers.size > 1) { | ||
val batchListDF = batchIdentifiers.toDF("batch_id") | ||
assessmentDF.join(batchListDF, Seq("batch_id"), "inner").persist() | ||
} | ||
else { | ||
assessmentDF.filter(col("batch_id") === batchIdentifiers.head).persist() | ||
} | ||
} else { | ||
assessmentDF | ||
} | ||
} | ||
|
||
def upload(archivedData: DataFrame, | ||
batch: BatchPartition)(implicit jobConfig: JobConfig): List[String] = { | ||
val modelParams = jobConfig.modelParams.get | ||
val reportPath: String = modelParams.getOrElse("reportPath", "archived-data/").asInstanceOf[String] | ||
val container = AppConf.getConfig("cloud.container.reports") | ||
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey") | ||
val fileName = s"${batch.batchId}/${batch.period.year}-${batch.period.weekOfYear}" | ||
val storageConfig = getStorageConfig( | ||
container, | ||
objectKey, | ||
jobConfig) | ||
JobLogger.log(s"Uploading reports to blob storage", None, INFO) | ||
archivedData.saveToBlobStore(storageConfig = storageConfig, format = "csv", reportId = s"$reportPath$fileName-${System.currentTimeMillis()}", options = Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), partitioningColumns = None, fileExt = Some("csv.gz")) | ||
} | ||
|
||
// Date - YYYY-MM-DD Format | ||
def getWeekAndYearVal(date: String, archiveForLastWeek: Boolean): Period = { | ||
if (archiveForLastWeek) { | ||
val today = new DateTime() | ||
val lastWeek = today.minusWeeks(1) // Get always for the previous week of the current | ||
Period(year = lastWeek.getYear, weekOfYear = lastWeek.getWeekOfWeekyear) | ||
} else { | ||
if (null != date && date.nonEmpty) { | ||
val dt = new DateTime(date) | ||
Period(year = dt.getYear, weekOfYear = dt.getWeekOfWeekyear) | ||
} else { | ||
Period(0, 0) | ||
} | ||
} | ||
} | ||
|
||
def isEmptyPeriod(period: Period): Boolean = { | ||
if (period.year == 0 && period.weekOfYear == 0) true else false | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Numb? Keep it Num or Number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done