Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SB-27408 | Assessment Archival JOB - Deletion function added #504

Open
wants to merge 36 commits into
base: release-4.6.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d320064
Issue feat SB-27408: Initial commit of Base Archival Job Implementation
manjudr Nov 7, 2021
9a4cbe2
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 Nov 15, 2021
d9c6e57
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 Nov 15, 2021
a0cdd69
Issue SB-24793 feat: Assessment archived data implemetation
utk14 Dec 7, 2021
029125b
Issue SB-24793 feat: Assessment archived data implemetation
utk14 Dec 7, 2021
9139b7c
Merge pull request #497 from manjudr/assessment-archival-refactor-cha…
manjudr Dec 7, 2021
78797d1
Issue #SB-27408 | Assessment archival to update existing requests
kumarks1122 Dec 8, 2021
862e306
Issue #SB-27408 | Assessment archival to create and update requests
kumarks1122 Dec 8, 2021
9f1567e
Issue #SB-27408 | Assessment archival test case and fixes added
kumarks1122 Dec 9, 2021
0de04e7
Issue #SB-27408 | Assessment archival Base and sub class changes
kumarks1122 Dec 10, 2021
4d5cab2
Issue #SB-27408 | Assessment archival changes added
kumarks1122 Dec 10, 2021
a60875f
Issue #SB-27408 | Assessment archival changes added
kumarks1122 Dec 10, 2021
78ee433
Issue SB-24793 feat: Review comments resolved
utk14 Dec 13, 2021
23c1d7b
merge conflicts resolved
utk14 Dec 13, 2021
8c39db1
Issue #SB-27408 | Test case fixes added
kumarks1122 Dec 13, 2021
e9f71dd
Issue #SB-27408 | PR Review changes added
kumarks1122 Dec 13, 2021
970b857
Issue SB-24793 feat: Review comments resolved
utk14 Dec 13, 2021
ff71892
merge conflicts resolved
utk14 Dec 13, 2021
0401987
Issue #SB-27408 | Archival Metrics changes added
kumarks1122 Dec 13, 2021
6b3c627
Issue #SB-27408 | Fixes added
kumarks1122 Dec 13, 2021
6ae0f3f
Issue #SB-27408 | Testcase Fixes added
kumarks1122 Dec 14, 2021
22c88a5
Issue #SB-27408 | Testcase Fixes added
kumarks1122 Dec 14, 2021
c69817e
Issue SB-24793 feat: Added batchfilters and search query support
utk14 Dec 15, 2021
4d3fac1
Merge conflicts resolved
utk14 Dec 15, 2021
0af8df3
Issue SB-24793 feat: Added batchfilters and search query support
utk14 Dec 15, 2021
5c2aa2f
Issue #SB-27408 | Deletion function added
kumarks1122 Dec 21, 2021
8b5a2b5
Issue #SB-27408 | Deleting file from blob changes added
kumarks1122 Dec 22, 2021
97f5cbe
Issue #SB-27408 | Testcases for Deleting files added
kumarks1122 Dec 28, 2021
466584e
Issue #SB-27408 | Testcases fixes added
kumarks1122 Dec 29, 2021
2002694
Issue #SB-27408 | Reqest ID changes added
kumarks1122 Jan 3, 2022
c456bf3
Issue #SB-27408 | validation changes added
kumarks1122 Jan 3, 2022
a0f555c
Issue #SB-27408 | validation changes added
kumarks1122 Jan 3, 2022
bc75436
Issue #SB-27408 | validation testcases added
kumarks1122 Jan 3, 2022
8ff5c20
Issue #SB-27408 | All batches archival function changes added
kumarks1122 Jan 4, 2022
f41099a
Issue #SB-27408 | Testcases fixes added
kumarks1122 Jan 4, 2022
b298133
Issue #SB-27408 | PR Review changes added
kumarks1122 Jan 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package org.sunbird.analytics.archival

import com.datastax.spark.connector.{SomeColumns, toRDDFunctions}
import org.apache.spark.sql.functions.{col, concat, lit, to_json, to_timestamp, weekofyear, year}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{JSONUtils, JobLogger, RestUtil}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig, Level}
import org.sunbird.analytics.archival.util.ArchivalRequest

import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.sql.functions._
import org.sunbird.analytics.exhaust.util.ExhaustUtil
import org.sunbird.analytics.util.Constants

import scala.collection.immutable.List

case class CollectionDetails(result: Result)
case class Result(content: List[CollectionInfo])
case class CollectionInfo(identifier: String)

object AssessmentArchivalJob extends optional.Application with BaseArchivalJob {

case class Period(year: Int, weekOfYear: Int)
case class BatchPartition(collectionId: String, batchId: String, period: Period)
case class ArchivalMetrics(batch: BatchPartition,
totalArchivedRecords: Option[Long],
pendingWeeksOfYears: Option[Long],
totalDeletedRecords: Option[Long]
)

private val partitionCols = List("course_id", "batch_id", "year", "week_of_year")
private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question")

override def getClassName = "org.sunbird.analytics.archival.AssessmentArchivalJob"
override def jobName = "AssessmentArchivalJob"
override def jobId: String = "assessment-archival"
override def getReportPath = "assessment-archival/"
override def getReportKey = "assessment"
override def dateColumn = "updated_on"

override def archivalFormat(batch: Map[String,AnyRef]): String = {
val formatDetails = JSONUtils.deserialize[BatchPartition](JSONUtils.serialize(batch))
s"${formatDetails.batchId}_${formatDetails.collectionId}/${formatDetails.period.year}-${formatDetails.period.weekOfYear}"
}

override def dataFilter(requests: Array[ArchivalRequest], dataDF: DataFrame): DataFrame = {
var filteredDF = dataDF
for (request <- requests) {
if (request.archival_status.equals("SUCCESS")) {
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data)
filteredDF = dataDF.filter(
col("batch_id").equalTo(request.batch_id) &&
concat(col("year"), lit("-"), col("week_of_year")) =!= lit(request_data("year") + "-" + request_data("week"))
)
}
}
filteredDF
}

override def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest] = {
try {
val archivalKeyspace = requestConfig.keyspace.getOrElse(AppConf.getConfig("sunbird.courses.keyspace"))
val date: String = requestConfig.date.getOrElse(null)

var data = loadData(Map("table" -> requestConfig.archivalTable, "keyspace" -> archivalKeyspace, "cluster" -> "LMSCluster"), cassandraUrl, new StructType())
data = validateBatches(data, requestConfig)

val dataDF = generatePeriodInData(data)
val filteredDF = dataFilter(requests, dataDF)

val archiveBatchList = filteredDF.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect()
val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], f.get(1).asInstanceOf[String], Period(f.get(2).asInstanceOf[Int], f.get(3).asInstanceOf[Int]))).groupBy(_.batchId)

archiveBatches(batchesToArchive, filteredDF, requestConfig)
} catch {
case ex: Exception =>
ex.printStackTrace()
JobLogger.log("archiveData: Exception with error message = " + ex.getMessage, None, Level.ERROR)
List()
}
}

def validateBatches(data: DataFrame, requestConfig: Request)(implicit spark: SparkSession, fc: FrameworkContext): DataFrame ={
implicit val sqlContext = new SQLContext(spark.sparkContext)
import sqlContext.implicits._

val filteredDF = if(requestConfig.batchId.isDefined && requestConfig.collectionId.isDefined) {
data.filter(col("batch_id") === requestConfig.batchId.get && col("course_id") === requestConfig.collectionId.get).persist()
} else if (requestConfig.batchFilters.isDefined) {
val batch = requestConfig.batchFilters.get.toDF("batch_id")
data.join(batch, Seq("batch_id"), "inner")
} else {
JobLogger.log("Neither batchId nor batchFilters present", None, Level.INFO)
data
}
if (requestConfig.query.isDefined) {
val res = searchContent(requestConfig.query.get)
filteredDF.join(res, col("course_id") === col("identifier"), "inner")
} else filteredDF

}

def searchContent(searchFilter: Map[String, AnyRef])(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = {
// TODO: Handle limit and do a recursive search call
val apiURL = Constants.COMPOSITE_SEARCH_URL
val request = JSONUtils.serialize(searchFilter)
val response = RestUtil.post[CollectionDetails](apiURL, request).result.content
spark.createDataFrame(response).select("identifier")
}

def archiveBatches(batchesToArchive: Map[String, Array[BatchPartition]], data: DataFrame, requestConfig: Request)(implicit config: JobConfig): List[ArchivalRequest] = {
batchesToArchive.flatMap(batches => {
val processingBatch = new AtomicInteger(batches._2.length)
JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> processingBatch)))

// Loop through the week_num & year batch partition
batches._2.map((batch: BatchPartition) => {
val filteredDF = data.filter(
col("course_id") === batch.collectionId &&
col("batch_id") === batch.batchId &&
col("year") === batch.period.year &&
col("week_of_year") === batch.period.weekOfYear
).withColumn("last_attempted_on", tsToLongUdf(col("last_attempted_on")))
.withColumn("updated_on", tsToLongUdf(col("updated_on")))
.select(columnWithOrder.head, columnWithOrder.tail: _*)
var archivalRequest:ArchivalRequest = getRequest(jobId, batch.collectionId, batch.batchId, List(batch.period.year, batch.period.weekOfYear))

if (archivalRequest == null) {
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(requestConfig)) ++ Map[String, Int](
"week" -> batch.period.weekOfYear,
"year"-> batch.period.year
)
archivalRequest = ArchivalRequest("", batch.batchId, batch.collectionId, Option(getReportKey), jobId, None, None, null, null, None, Option(0), JSONUtils.serialize(request_data), None)
}

try {
val urls = upload(filteredDF, JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(batch))) // Upload the archived files into blob store
archivalRequest.blob_url = Option(urls)
val metrics = ArchivalMetrics(batch, pendingWeeksOfYears = Some(processingBatch.getAndDecrement()), totalArchivedRecords = Some(filteredDF.count()), totalDeletedRecords = None)
JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(metrics), Level.INFO)
markArchivalRequestAsSuccess(archivalRequest, requestConfig)
} catch {
case ex: Exception => {
JobLogger.log("archiveBatch: Exception with error message = " + ex.getLocalizedMessage, Some(batch), Level.ERROR)
markArchivalRequestAsFailed(archivalRequest, ex.getLocalizedMessage)
}
}
})
}).toList
}

def loadArchivedData(request: ArchivalRequest)(implicit spark: SparkSession, fc: FrameworkContext, jobConfig: JobConfig): DataFrame = {
ExhaustUtil.fetch(request.blob_url.get.head, "csv")
}

override def deleteArchivedData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest] = {
requests.filter(r => r.archival_status.equals("SUCCESS") && r.deletion_status != "SUCCESS").map((request: ArchivalRequest) => {
deleteBatch(requestConfig, request)
}).toList
}

def deleteBatch(requestConfig: Request, request: ArchivalRequest)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): ArchivalRequest = {
try {
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data)
val batchPartition = BatchPartition(request.collection_id, request.batch_id, Period(request_data("year").asInstanceOf[Int], request_data("week").asInstanceOf[Int]))
val archivedData = loadArchivedData(request).select("course_id", "batch_id", "user_id", "content_id", "attempt_id")

val totalArchivedRecords: Long = archivedData.count
JobLogger.log(s"Deleting $totalArchivedRecords archived records only, for the year ${batchPartition.period.year} and week of year ${batchPartition.period.weekOfYear} from the DB ", None, Level.INFO)

val archivalKeyspace = requestConfig.keyspace.getOrElse(AppConf.getConfig("sunbird.courses.keyspace"))

archivedData.rdd.deleteFromCassandra(archivalKeyspace, requestConfig.archivalTable, keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id"))
val metrics = ArchivalMetrics(batchPartition, pendingWeeksOfYears = None, totalArchivedRecords = Some(totalArchivedRecords), totalDeletedRecords = Some(totalArchivedRecords))

JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(metrics), Level.INFO)
markDeletionRequestAsSuccess(request, requestConfig)
} catch {
case ex: Exception => {
JobLogger.log("deleteBatch: Exception with error message = " + ex.getLocalizedMessage, Some(request), Level.ERROR)
markDeletionRequestAsFailed(request, ex.getLocalizedMessage)
}
}
}

def generatePeriodInData(data: DataFrame): DataFrame = {
data.withColumn("updated_on", to_timestamp(col(dateColumn)))
.withColumn("year", year(col("updated_on")))
.withColumn("week_of_year", weekofyear(col("updated_on")))
.withColumn("question", to_json(col("question")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.sunbird.analytics.archival

import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.functions.udf
import org.ekstep.analytics.framework.Level.ERROR
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig, Level}
import org.sunbird.analytics.exhaust.BaseReportsJob
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.sunbird.analytics.archival.util.{ArchivalMetaDataStoreJob, ArchivalRequest}

case class Request(archivalTable: String, keyspace: Option[String], query: Option[Map[String, AnyRef]] = Option(Map()), batchId: Option[String] = Option(""), collectionId: Option[String]=Option(""), batchFilters: Option[List[String]]=Option(List()), date: Option[String] = Option(""))

trait BaseArchivalJob extends BaseReportsJob with IJob with ArchivalMetaDataStoreJob with Serializable {

val cassandraUrl = "org.apache.spark.sql.cassandra"
def dateColumn: String

def main(config: String)(implicit sc: Option[SparkContext] = None, fc: Option[FrameworkContext] = None): Unit = {
implicit val className: String = getClassName;
JobLogger.init(jobName)
JobLogger.start(s"$jobName started executing - ver3", Option(Map("config" -> config, "model" -> jobName)))
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config)
implicit val spark: SparkSession = openSparkSession(jobConfig)
implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext()

try {
val res = CommonUtil.time(execute());
JobLogger.end(s"$jobName completed execution", "SUCCESS", None)
} catch {
case ex: Exception => ex.printStackTrace()
JobLogger.log(ex.getMessage, None, ERROR);
JobLogger.end(jobName + " execution failed", "FAILED", Option(Map("model" -> jobName, "statusMsg" -> ex.getMessage)));
}
finally {
frameworkContext.closeContext();
spark.close()
}
}

def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host")))
}

def execute()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]());
val requestConfig = JSONUtils.deserialize[Request](JSONUtils.serialize(modelParams.getOrElse("request", Request).asInstanceOf[Map[String,AnyRef]]))
val mode: String = modelParams.getOrElse("mode","archive").asInstanceOf[String]

val requests = getRequests(jobId, requestConfig.batchId)

val archivalRequests = mode.toLowerCase() match {
case "archival" =>
archiveData(requestConfig, requests)
case "delete" =>
deleteArchivedData(requestConfig, requests)
}
for (archivalRequest <- archivalRequests) {
upsertRequest(archivalRequest)
}
}

def upload(archivedData: DataFrame, batch: Map[String,AnyRef])(implicit jobConfig: JobConfig): List[String] = {
val blobConfig = jobConfig.modelParams.get("blobConfig").asInstanceOf[Map[String, AnyRef]]
val reportPath: String = blobConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String]
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey")
val fileName = archivalFormat(batch)
val storageConfig = getStorageConfig(jobConfig, objectKey)
JobLogger.log(s"Uploading reports to blob storage", None, Level.INFO)
archivedData.saveToBlobStore(storageConfig, "csv", s"$reportPath$fileName-${System.currentTimeMillis()}", Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), None, fileExt=Some("csv.gz"))
}

// Overriding methods START:
def jobId: String;
def jobName: String;
def getReportPath: String;
def getReportKey: String;
def getClassName: String;

def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest];
def deleteArchivedData(archivalRequest: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest];
def archivalFormat(batch: Map[String,AnyRef]): String;
def dataFilter(requests: Array[ArchivalRequest], dataDF: DataFrame): DataFrame;

//Overriding methods END:

val tsToLongUdf = udf[java.lang.Long, java.sql.Timestamp]((ts: java.sql.Timestamp) => if (ts != null) ts.getTime else null)
}
Loading