Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SB-24793 - Assessment data archival data product job #425

Open
wants to merge 34 commits into
base: release-4.2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a022ddb
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
fd2ac9c
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
b42df8f
Issue SB-24793 feat: removed the hardcoded column names in groupBy
manjudr Jul 14, 2021
4ca0b4c
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
b6d3733
Revert "Issue SB-24793 feat: Assessment data archival data product j…
manjudr Jul 14, 2021
f4ca1cb
Issue SB-24793 feat: Assessment data archival data product job imple…
manjudr Jul 14, 2021
acd9a11
Issue SB-24793 feat: Assessment data archival cassandra connection …
manjudr Jul 14, 2021
9d75256
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
3bb90fb
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
358b96a
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
e46e0a1
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
423f977
Issue SB-24793 feat: Assessment data archival question data seriali…
manjudr Jul 16, 2021
9cea5eb
Issue SB-24793 feat: removing the unwanted csv files
manjudr Jul 16, 2021
ec710cd
Issue SB-24793 feat: removing the unwanted imports
manjudr Jul 16, 2021
e29b23e
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
7e4ea76
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
b32d1e6
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
22f76e9
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
ffa1c5f
Issue SB-25481 feat: Assessment archival data product archive the da…
manjudr Jul 19, 2021
5db16a1
Issue SB-25481 feat: Assessment archival data product cache issue fix
manjudr Jul 19, 2021
df8cf7e
Issue SB-25481 feat: Assessment archival data product cache issue fix
manjudr Jul 20, 2021
1f19060
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
a3245f8
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
4380e66
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
acff429
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
e96faaf
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
94b6269
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
10c2069
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
0d0ae86
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
753d763
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
2b467d4
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
ce5b0c8
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
00fdc7c
Issue SB-25481 feat: Assessment archival data product persist issue
manjudr Jul 20, 2021
9d0e8b1
Issue SB-24793 feat: Assessment data archival configuration update
manjudr Aug 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package org.sunbird.analytics.job.report

import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.cassandra.CassandraSparkSessionFunctions
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.Level.INFO
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig}

import java.util.concurrent.atomic.AtomicInteger

object AssessmentArchivalJob extends optional.Application with IJob with BaseReportsJob {
val cassandraUrl = "org.apache.spark.sql.cassandra"
private val assessmentAggDBSettings: Map[String, String] = Map("table" -> AppConf.getConfig("sunbird.courses.assessment.table"), "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster")
implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob"
private val partitionCols = List("batch_id", "year", "week_of_year")

case class BatchPartition(batch_id: String, year: Int, week_of_year: Int)

// $COVERAGE-OFF$ Disabling scoverage for main and execute method
override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = {

implicit val className: String = "org.sunbird.analytics.job.report.AssessmentArchivalJob"
val jobName = "AssessmentArchivalJob"
JobLogger.init(jobName)
JobLogger.start(s"$jobName started executing", Option(Map("config" -> config, "model" -> jobName)))
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config)
implicit val spark: SparkSession = openSparkSession(jobConfig)

implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext()
val modelParams = jobConfig.modelParams.get
val truncateData: Boolean = modelParams.getOrElse("truncateData", "false").asInstanceOf[Boolean]
init()
try {
val res = CommonUtil.time(archiveData(spark, fetchData, jobConfig))
val total_archived_files = res._2.length
if (truncateData) deleteRecords(spark, assessmentAggDBSettings.getOrElse("keyspace", "sunbird_courses"), assessmentAggDBSettings.getOrElse("table", "assessment_aggregator")) else JobLogger.log(s"Skipping the ${assessmentAggDBSettings.getOrElse("table", "assessment_aggregator")} truncate process", None, INFO)
JobLogger.end(s"$jobName completed execution", "SUCCESS", Option(Map("timeTaken" -> res._1, "total_archived_files" -> total_archived_files)))
} catch {
case ex: Exception => {
ex.printStackTrace()
JobLogger.end(s"$jobName completed execution with the error ${ex.getMessage}", "FAILED", None)
}
} finally {
frameworkContext.closeContext()
spark.close()
}
}

def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host")))
}

// $COVERAGE-ON$
def archiveData(sparkSession: SparkSession, fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame, jobConfig: JobConfig): Array[Map[String, Any]] = {
val batches: List[String] = AppConf.getConfig("assessment.batches").split(",").toList
val assessmentDF: DataFrame = getAssessmentData(sparkSession, fetchData, batches)
val assessmentData = assessmentDF.withColumn("updated_on", to_timestamp(col("updated_on")))
.withColumn("year", year(col("updated_on")))
.withColumn("week_of_year", weekofyear(col("updated_on")))
.withColumn("question", to_json(col("question")))
val archivedBatchList = assessmentData.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect()
val archivedBatchCount = new AtomicInteger(archivedBatchList.length)
JobLogger.log(s"Total Batches to Archive By Year & Week $archivedBatchCount", None, INFO)
val batchesToArchive: Array[BatchPartition] = archivedBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int]))
for (batch <- batchesToArchive) yield {
val filteredDF = assessmentData
.filter(col("batch_id") === batch.batch_id && col("year") === batch.year && col("week_of_year") === batch.week_of_year)
upload(filteredDF.drop("year", "week_of_year"), batch, jobConfig)
val metrics = Map("batch_id" -> batch.batch_id, "year" -> batch.year, "week_of_year" -> batch.week_of_year, "pending_batches" -> archivedBatchCount.getAndDecrement(), "total_records" -> filteredDF.count())
JobLogger.log(s"Data is archived and Remaining batches to archive is ", Some(metrics), INFO)
assessmentData.unpersist()
metrics
}
}

def getAssessmentData(spark: SparkSession, fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame, batchIds: List[String]): DataFrame = {
import spark.implicits._
val assessmentDF = fetchData(spark, assessmentAggDBSettings, cassandraUrl, new StructType())
if (batchIds.nonEmpty) {
val batchListDF = batchIds.asInstanceOf[List[String]].toDF("batch_id")
assessmentDF.join(batchListDF, Seq("batch_id"), "inner").persist()
} else {
assessmentDF
}
}

def deleteRecords(sparkSession: SparkSession, keyspace: String, table: String): Unit = {
// sparkSession.sql(s"TRUNCATE TABLE $keyspace.$table")
JobLogger.log(s"The Job Cleared The Table Data SuccessFully, Please Execute The Compaction", None, INFO)
}

def upload(archivedData: DataFrame,
batch: BatchPartition,
jobConfig: JobConfig): List[String] = {
val modelParams = jobConfig.modelParams.get
val reportPath: String = modelParams.getOrElse("reportPath", "archival-data/").asInstanceOf[String]
val container = AppConf.getConfig("cloud.container.reports")
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey")
val fileName = s"${batch.batch_id}-${batch.year}-${batch.week_of_year}"
val storageConfig = getStorageConfig(
container,
objectKey,
jobConfig)
JobLogger.log(s"Uploading reports to blob storage", None, INFO)
archivedData.saveToBlobStore(storageConfig, "csv", s"$reportPath$fileName-${System.currentTimeMillis()}", Option(Map("header" -> "true")), None)
}

}
1 change: 1 addition & 0 deletions data-products/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ sunbird.report.cluster.host=127.0.0.1
sunbird.user.report.keyspace="sunbird_courses"
collection.exhaust.store.prefix="reports"
postgres.table.job_request="job_request"
sunbird.courses.assessment.table = "assessment_aggregator"

druid.report.default.storage="local"
druid.report.date.format="yyyy-MM-dd"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
content_id,attempt_id,user_id,course_id,batch_id,created_on,last_attempted_on,total_max_score,total_score,updated_on,grand_total,question
do_112835335135993856149,A3,user030,do_1125559882615357441175,1010,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"10/2","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
do_112835335135993856149,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,20,4,2019-09-06 09:59:51.000+0000,"2/2","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
do_112835336280596480151,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:58:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
do_112835336280596480151,A4,user021,do_2123101488779837441168,1001,1971-09-22 02:10:53.444+0000,2019-09-06 09:57:51.000+0000,30,8,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
do_112832394979106816112,A1,user015,do_112695422838472704115,1005,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
do_112832394979106816112,A2,user030,do_1125559882615357441175,1010,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,10,5,2019-09-06 09:59:51.000+0000,"4/4","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
do_112832394979106816112,A6,user026,do_1126458775024025601296,1006,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"5/5","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
do_112832394979106816112,A6,user021,do_1126458775024025601296,1006,1971-09-22 02:10:53.444+0000,2019-09-06 09:59:51.000+0000,30,10,2019-09-06 09:59:51.000+0000,"6/6","[{id: 'q3', max_score: 2, score: 1, type: 'mmc', title: 'choose one', resvalues: [{'2': '{\"text\":\"Quantity\\n\"}'}], params: [{'1': '{\"text\":\"Space and Shape\\n\"}'}, {'2': '{\"text\":\"Quantity\\n\"}'}, {'3': '{\"text\":\"Reading Skills\\n\"}'}, {'4': '{\"text\":\"Uncertainity and Data\\n\"}'}, {'answer': '{\"correct\":[\"3\"]}'}], description: 'Description of the question', duration: 10}]"
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.sunbird.analytics.job.report


import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}
import org.scalamock.scalatest.MockFactory

import scala.collection.mutable


class TestAssessmentArchivalJob extends BaseReportSpec with MockFactory {

var spark: SparkSession = _

var assessmentAggDF: DataFrame = _
var reporterMock: BaseReportsJob = mock[BaseReportsJob]
val sunbirdCoursesKeyspace = "sunbird_courses"

override def beforeAll(): Unit = {
super.beforeAll()
spark = getSparkSession();
assessmentAggDF = spark
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("src/test/resources/assessment-archival/assessment_aggregator.csv")
.cache()
}

override def afterAll(): Unit = {
super.afterAll()
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey")
new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, objectKey + "assessment-archival")
}

val convertMethod = udf((value: mutable.WrappedArray[String]) => {
if (null != value && value.nonEmpty)
value.toList.map(str => JSONUtils.deserialize(str)(manifest[Map[String, String]])).toArray
else null
}, new ArrayType(MapType(StringType, StringType), true))

it should "Should able to archive the batch data" in {
initializeDefaultMockData()
implicit val mockFc: FrameworkContext = mock[FrameworkContext]
val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.AssessmentArchivalJob","modelParams":{"truncateData":false,"store":"local","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Assessment Archival Job"}""".stripMargin
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](strConfig)
val reportData = AssessmentArchivalJob.archiveData(spark, reporterMock.fetchData, jobConfig)

val batch_1 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1010")
batch_1.foreach(res => res("year") === "2019")
batch_1.foreach(res => res("total_records") === "2")
batch_1.foreach(res => res("week_of_year") === "36")


val batch_2 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1001")
batch_2.foreach(res => res("year") === "2019")
batch_2.foreach(res => res("total_records") === "3")
batch_2.foreach(res => res("week_of_year") === "36")


val batch_3 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1005")
batch_3.foreach(res => res("year") === "2019")
batch_3.foreach(res => res("total_records") === "1")
batch_3.foreach(res => res("week_of_year") === "36")


val batch_4 = reportData.filter(x => x.getOrElse("batch_id", "").asInstanceOf[String] === "1006")
batch_4.foreach(res => res("year") === "2019")
batch_4.foreach(res => res("total_records") === "2")
batch_4.foreach(res => res("week_of_year") === "36")

}

def initializeDefaultMockData() {
(reporterMock.fetchData _)
.expects(spark, Map("table" -> "assessment_aggregator", "keyspace" -> sunbirdCoursesKeyspace, "cluster" -> "LMSCluster"), "org.apache.spark.sql.cassandra", new StructType())
.returning(assessmentAggDF)
}
}