From e1ba52ff1fe276d1f90b97c143855ab4b2770e1c Mon Sep 17 00:00:00 2001 From: Derrick Oswald Date: Tue, 12 May 2020 22:37:14 +0200 Subject: [PATCH] some codebase improvements #23 --- .../main/scala/ch/ninecode/cim/CIMAbout.scala | 6 +-- .../main/scala/ch/ninecode/cim/CIMDeDup.scala | 21 ++++---- .../main/scala/ch/ninecode/cim/CIMEdges.scala | 2 +- .../ch/ninecode/cim/CIMIntegrityChecker.scala | 52 ++++++------------- .../main/scala/ch/ninecode/cim/CIMJoin.scala | 5 +- .../cim/CIMNetworkTopologyProcessor.scala | 48 +++++++++++++---- .../scala/ch/ninecode/cim/CIMNormalize.scala | 6 +-- .../main/scala/ch/ninecode/cim/CIMRDD.scala | 18 +++---- .../scala/ch/ninecode/cim/CIMRelation.scala | 11 ++-- .../scala/ch/ninecode/cim/CIMSubsetter.scala | 2 +- .../main/scala/ch/ninecode/cim/Context.scala | 13 +++-- .../scala/ch/ninecode/cim/DefaultSource.scala | 11 ++-- .../scala/ch/ninecode/cim/tool/JavaDoc.scala | 4 +- 13 files changed, 110 insertions(+), 89 deletions(-) diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMAbout.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMAbout.scala index 84f9b970c..07ff986a0 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMAbout.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMAbout.scala @@ -24,6 +24,7 @@ with Serializable { implicit val session: SparkSession = spark + implicit val storage_level: StorageLevel = storage // for put() implicit val log: Logger = LoggerFactory.getLogger (getClass) /** @@ -102,10 +103,7 @@ with val new_elements = element_groups.map (merge) // swap the old Elements RDD for the new one - elements.name = "about_Elements" - new_elements.name = "Elements" - new_elements.persist (storage) - if (spark.sparkContext.getCheckpointDir.isDefined) new_elements.checkpoint () + put (new_elements, "Elements") new_elements } diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMDeDup.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMDeDup.scala index 47daa244b..773cf87f2 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMDeDup.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMDeDup.scala @@ -39,6 +39,7 @@ import ch.ninecode.model._ class CIMDeDup (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER) extends CIMRDD with Serializable { implicit val session: SparkSession = spark + implicit val storage_level: StorageLevel = storage // for put() implicit val log: Logger = LoggerFactory.getLogger (getClass) /** @@ -72,11 +73,16 @@ class CIMDeDup (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY */ def deduplicate (elements: Iterable[Element]): Element = { - val ret = elements.head - if (1 != elements.size) - // check for equality - check (ret, elements.tail) - ret + elements.toList match + { + case head :: Nil => + head + case head :: tail => + // check for equality + check (head, tail) + head + case _ => BasicElement () + } } /** @@ -101,10 +107,7 @@ class CIMDeDup (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY val new_elements = elements.keyBy (_.id).groupByKey ().values.map (deduplicate) // swap the old Elements RDD for the new one - elements.name = "duplicate_Elements" - new_elements.name = "Elements" - new_elements.persist (storage) - if (spark.sparkContext.getCheckpointDir.isDefined) new_elements.checkpoint () + put (new_elements, "Elements") new_elements } diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMEdges.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMEdges.scala index 79a77a8f1..1cddaab1d 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMEdges.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMEdges.scala @@ -411,7 +411,7 @@ with Serializable val asseted_edges = located_edges.keyBy (_._1.id_equ).leftOuterJoin (assets.flatMap (psr)).map (x => (x._2._1._1, x._2._1._2, x._2._2)) // join with topological nodes if requested - if (topological_nodes) + val _ = if (topological_nodes) { val topologicals = getOrElse[TopologicalNode].keyBy (_.id) val topo1 = asseted_edges.keyBy (_._1.cn_1).leftOuterJoin (topologicals).values.map (x => (x._1._1, x._1._2, x._1._3, x._2)) diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala index b476c4ffb..7c10ca08e 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala @@ -52,20 +52,8 @@ class Worker[C <: Product, P <: Product] (relation: Relationship, child: String, def message (child: String, field: String, parent: String) (problem: (String, Product)): String = { - val (key, obj) = problem - new StringBuilder () - .append (child) - .append (" ") - .append (primary_key (obj)) // equipment.id - .append (" field ") - .append (field) - .append (" ") - .append (" references ") - .append (parent) - .append (" ") - .append (key) - .append (" that is not present") - .toString + val (key: String, obj: Product) = problem + s"$child ${primary_key (obj)} field $field references $parent $key that is not present" } def run (): String = @@ -104,33 +92,27 @@ class CIMIntegrityChecker (spark: SparkSession) extends CIMRDD with Serializable // val equipment: RDD[Equipment] = spark.sparkContext.getPersistentRDDs.filter (_._2.name == "Equipment").head._2.asInstanceOf[RDD[Equipment]] type childrdd = info.subsetter.rddtype - val companion: ClassInfo = classes.filter (_.name == relation.clazz).head + val companion: ClassInfo = classes.find (_.name == relation.clazz).getOrElse (ch.ninecode.model.Unknown.register) type parentrdd = companion.subsetter.rddtype if (log.isDebugEnabled) log.debug (s"${info.name}.${relation.field} => ${relation.clazz}") - val cc: collection.Map[Int, RDD[_]] = spark.sparkContext.getPersistentRDDs.filter (_._2.name == info.name) - if (cc.nonEmpty) - { - val ccc: childrdd = cc.head._2.asInstanceOf[childrdd] - - //val container: RDD[EquipmentContainer] = spark.sparkContext.getPersistentRDDs.filter (_._2.name == "EquipmentContainer").head._2.asInstanceOf[RDD[EquipmentContainer]] - val pc: collection.Map[Int, RDD[_]] = spark.sparkContext.getPersistentRDDs.filter (_._2.name == companion.name) - if (pc.nonEmpty) - { - val pcc: parentrdd = pc.head._2.asInstanceOf[parentrdd] - val worker = new Worker (relation, info.name, ccc, companion.name, pcc) - worker.run () - } - else + spark.sparkContext.getPersistentRDDs.find (_._2.name == info.name).map (x => x._2) + .fold ("") { - // every instance is an error - val worker = new Worker (relation, info.name, ccc, companion.name, null) - worker.run () + case rdd: childrdd => + //val container: RDD[EquipmentContainer] = spark.sparkContext.getPersistentRDDs.filter (_._2.name == "EquipmentContainer").head._2.asInstanceOf[RDD[EquipmentContainer]] + spark.sparkContext.getPersistentRDDs.find (_._2.name == companion.name).map (x => x._2) + .fold ( + // every instance is an error + new Worker (relation, info.name, rdd, companion.name, null).run () + ) + { + case pcc: parentrdd => + new Worker (relation, info.name, rdd, companion.name, pcc).run () + } } - } - else - "" + } def checkClass (classes: List[ClassInfo]) (info: ClassInfo): Option[String] = diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMJoin.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMJoin.scala index d8cf91267..1eaa6ee3d 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMJoin.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMJoin.scala @@ -299,10 +299,7 @@ class CIMJoin (spark: SparkSession, storage: StorageLevel) extends CIMRDD with S ) // swap the old Elements RDD for the new one - old_elements.name = "unjoined_Elements" - new_elements.name = "Elements" - new_elements.persist (storage) - if (spark.sparkContext.getCheckpointDir.isDefined) new_elements.checkpoint () + put (new_elements, "Elements") new_elements } diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala index c5664de4f..4a20bdf08 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala @@ -356,14 +356,30 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession) extends CIMRDD { // check for uniqueness of VertexId val duplicates = vertices.groupByKey.filter (_._2.size > 1) - if (!duplicates.isEmpty ()) - duplicates.collect.map (x => { log.error (s"VertexId clash (${x._1}) for ${x._2.head.node_label} and ${x._2.tail.head.node_label}"); 1 }) + duplicates.collect.foreach ( + x => + { + val (vertex, data) = x + data.toList match + { + case head :: tail :: Nil => + log.error (s"VertexId clash ($vertex) for ${head.node_label} and ${tail.node_label}") + case _ => + } + } + ) // check for missing vertices val cn = edges.flatMap (x => List ((x.attr.id_cn_1, x.attr.id_equ), (x.attr.id_cn_2, x.attr.id_equ))) - val missing = cn.leftOuterJoin (vertices.keyBy (_._2.node_label)).filter (_._2._2 match { case None => true case _ => false } ).map (x => (x._2._1, x._1)) - if (!missing.isEmpty) - missing.collect.map (x => { log.error (s"${x._1} missing ConnectivityNode ${x._2}"); 1 }) + val missing = cn.leftOuterJoin (vertices.keyBy (_._2.node_label)) + .filter (_._2._2 match { case None => true case _ => false } ).map (x => (x._2._1, x._1)) + missing.collect.foreach ( + x => + { + val (id, node) = x + log.error (s"$id missing ConnectivityNode $node") + } + ) } // construct the initial graph from the edges @@ -512,7 +528,7 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession) extends CIMRDD island.bitfields = Array (0) ( - nodes.head._1._1.island, + nodes.headOption.fold (0L)(x => x._1._1.island), island ) } @@ -712,8 +728,8 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession) extends CIMRDD val ret: Graph[CIMVertexData, CIMEdgeData] = Graph.apply (v, graph.edges, CIMVertexData (), options.storage, options.storage) // persist the RDD to avoid recomputation - ret.vertices.persist (options.storage) - ret.edges.persist (options.storage) + { val _ = ret.vertices.persist (options.storage) } + { val _ = ret.edges.persist (options.storage) } ret } @@ -775,14 +791,26 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession) extends CIMRDD put (new_ti) val nodes_with_islands = graph.vertices.values.keyBy (_.island).join (islands).values - val nodes = nodes_with_islands.groupBy (_._1.node).map (x => (x._1, x._2.head._1, Some (x._2.head._2))).map (to_nodes) + val nodes = nodes_with_islands.groupBy (_._1.node) + // keep the head of the Iterable + // .map (x => (x._1, x._2.head)) + .flatMapValues (_.toList match { case head :: _ => Some (head) case _ => None }) + .map (x => (x._1, x._2._1, Some (x._2._2))).map (to_nodes) if (options.debug && log.isDebugEnabled) log.debug (s"${nodes.count} nodes") + + // unpersist the graph nodes and edges + { val _ = graph.vertices.unpersist (false) } + { val _ = graph.edges.unpersist (false) } (nodes, new_ti) } else { - val nodes = graph.vertices.values.groupBy (_.node).map (x => (x._1, x._2.head, None)).map (to_nodes) + val nodes = graph.vertices.values.groupBy (_.node) + // keep the head of the Iterable + // .map (x => (x._1, x._2.head)) + .flatMapValues (_.toList match { case head :: _ => Some (head) case _ => None }) + .map (x => (x._1, x._2, None)).map (to_nodes) if (options.debug && log.isDebugEnabled) log.debug (s"${nodes.count} nodes") (nodes, spark.sparkContext.emptyRDD[TopologicalIsland]) diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMNormalize.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMNormalize.scala index 3b2629b20..ea1e3bdb4 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMNormalize.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMNormalize.scala @@ -24,6 +24,7 @@ with Serializable { implicit val session: SparkSession = spark + implicit val level: StorageLevel = storage // for put() implicit val log: Logger = LoggerFactory.getLogger (getClass) /** @@ -196,10 +197,7 @@ with val new_elements: RDD[Element] = cleaned.subtractByKey (fixed2).union (fixed2).values // swap the old Elements RDD for the new one - old_elements.name = "denormalized_Elements" - new_elements.name = "Elements" - new_elements.persist (storage) - if (spark.sparkContext.getCheckpointDir.isDefined) new_elements.checkpoint () + put (new_elements, "Elements") new_elements } diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMRDD.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMRDD.scala index d0669b12c..e0f058227 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMRDD.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMRDD.scala @@ -154,15 +154,16 @@ trait CIMRDD * @return The named, viewed and possibly checkpointed original RDD. * @tparam T The type of RDD. */ - def put[T <: Product : ClassTag : TypeTag](rdd: RDD[T], name: String)(implicit spark: SparkSession, storage: StorageLevel): RDD[T] = + def put[T <: Product : ClassTag : TypeTag](rdd: RDD[T], name: String)(implicit spark: SparkSession, storage: StorageLevel): Unit = { - spark.sparkContext.getPersistentRDDs.find (_._2.name == name) match - { - case Some ((_: Int, old: RDD[_])) => + spark.sparkContext.getPersistentRDDs.find (_._2.name == name).foreach ( + x => + { + val (_, old) = x old.setName (null).unpersist (true) - case Some (_) | None => - } - rdd.setName (name).persist (storage) + } + ) + val _ = rdd.setName (name).persist (storage) if (spark.sparkContext.getCheckpointDir.isDefined) rdd.checkpoint () val tag: universe.TypeTag[T] = typeTag[T] val runtime_class: Class[_] = classTag[T].runtimeClass @@ -174,7 +175,6 @@ trait CIMRDD } else spark.createDataFrame (rdd).createOrReplaceTempView (name) - rdd } /** @@ -236,7 +236,7 @@ trait CIMRDD * @param storage The storage level for persistence. * @tparam T The type of RDD. */ - def put[T <: Product : ClassTag : TypeTag](rdd: RDD[T])(implicit spark: SparkSession, storage: StorageLevel): RDD[T] = put (rdd, nameOf[T]) + def put[T <: Product : ClassTag : TypeTag](rdd: RDD[T])(implicit spark: SparkSession, storage: StorageLevel): Unit = put (rdd, nameOf[T]) /** * Get a typed DataSet for the given class. diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMRelation.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMRelation.scala index d80422e82..72b4806d7 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMRelation.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMRelation.scala @@ -171,9 +171,10 @@ class CIMRelation ( var ret: RDD[Row] = null // remove any existing RDD created by this relation - spark.sparkContext.getPersistentRDDs.find (_._2.name == "Elements") match - { - case Some ((_: Int, old: RDD[_])) => + spark.sparkContext.getPersistentRDDs.find (_._2.name == "Elements").foreach ( + x => + { + val (_, old) = x // aggregate the set of subclass names val names = old.asInstanceOf[RDD[Element]] .aggregate (Set[String]()) ( @@ -190,8 +191,8 @@ class CIMRelation ( } // remove the Element rdd old.setName (null).unpersist (true) - case Some (_) | None => - } + } + ) if (_Cache != "") { diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/CIMSubsetter.scala b/CIMReader/src/main/scala/ch/ninecode/cim/CIMSubsetter.scala index 351c439a6..e90fad9ba 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/CIMSubsetter.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/CIMSubsetter.scala @@ -66,7 +66,7 @@ class CIMSubsetter[A <: Product : ClassTag : TypeTag] () extends Serializable def save (context: SQLContext, rdd: rddtype, storage: StorageLevel): Unit = { rdd.name = cls - rdd.persist (storage) + val _ = rdd.persist (storage) if (context.sparkSession.sparkContext.getCheckpointDir.isDefined) rdd.checkpoint () val df = context.sparkSession.createDataFrame (rdd)(tag) val altered_schema = modify_schema (runtime_class, df.schema) diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/Context.scala b/CIMReader/src/main/scala/ch/ninecode/cim/Context.scala index 13e6c20e3..39e98332a 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/Context.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/Context.scala @@ -55,7 +55,9 @@ class Context (var xml: String, val start: Long, var end: Long, var first_byte: { val matcher = lines.matcher (string) while (matcher.find ()) - n += (matcher.start () + offset) + { + val _ = n += (matcher.start () + offset) + } n } @@ -66,6 +68,7 @@ class Context (var xml: String, val start: Long, var end: Long, var first_byte: * @param offset the character position in the stream * @return the line number (1 + how many newlines precede the offset) */ + @SuppressWarnings (Array ("org.wartremover.warts.Return")) def line_number (offset: Long = end): Int = { var min = 0 @@ -106,7 +109,9 @@ class Context (var xml: String, val start: Long, var end: Long, var first_byte: { ret &&= subxml.charAt (index).isWhitespace if (!ret && errors.size < MAXERRORS) - errors += """Unknown content "%s" at line %d""".format (subxml.substring (index, pair._1).trim (), line_number ()) + { + val _ = errors += """Unknown content "%s" at line %d""".format (subxml.substring (index, pair._1).trim (), line_number ()) + } index += 1 } index = pair._2 @@ -115,7 +120,9 @@ class Context (var xml: String, val start: Long, var end: Long, var first_byte: { ret &&= subxml.charAt (index).isWhitespace if (!ret && errors.size < MAXERRORS) - errors += """Unknown content "%s" at line %d""".format (subxml.substring (index, subxml.length ()).trim (), line_number ()) + { + val _ = errors += """Unknown content "%s" at line %d""".format (subxml.substring (index, subxml.length ()).trim (), line_number ()) + } index += 1 } diff --git a/CIMReader/src/main/scala/ch/ninecode/cim/DefaultSource.scala b/CIMReader/src/main/scala/ch/ninecode/cim/DefaultSource.scala index 9b21386ec..d5f68233a 100644 --- a/CIMReader/src/main/scala/ch/ninecode/cim/DefaultSource.scala +++ b/CIMReader/src/main/scala/ch/ninecode/cim/DefaultSource.scala @@ -33,14 +33,19 @@ extends val globPath = SparkHadoopUtil.get.globPathIfNecessary (qualified) if (globPath.isEmpty) throw new java.io.FileNotFoundException (s"Path does not exist: $qualified") - if (!fs.exists(globPath.head)) - throw new java.io.FileNotFoundException (s"Path does not exist: ${globPath.head}") + globPath.foreach ( + p => + { + if (!fs.exists (p)) + throw new java.io.FileNotFoundException (s"Path does not exist: $p") + } + ) globPath } val fileCatalog = new InMemoryFileIndex (session, globbedPaths, parameters, None) val partitionSchema = fileCatalog.partitionSpec().partitionColumns val format = new CIMFileFormat () - val dataSchema = format.inferSchema (session, parameters, fileCatalog.allFiles ()).get + val dataSchema = format.inferSchema (session, parameters, fileCatalog.allFiles ()).orNull new CIMRelation (fileCatalog, partitionSchema, dataSchema, format, parameters) (session) } } diff --git a/CIMTool/src/main/scala/ch/ninecode/cim/tool/JavaDoc.scala b/CIMTool/src/main/scala/ch/ninecode/cim/tool/JavaDoc.scala index ba099fa96..f43c4fcee 100644 --- a/CIMTool/src/main/scala/ch/ninecode/cim/tool/JavaDoc.scala +++ b/CIMTool/src/main/scala/ch/ninecode/cim/tool/JavaDoc.scala @@ -68,6 +68,8 @@ case class JavaDoc ( strings.flatten } + def asterisk (s: String): String = if ("" == s) " *" else s" * $s" + def contents: String = { val text: List[String] = if ((null != note) && (note != "")) @@ -83,7 +85,7 @@ case class JavaDoc ( } else List () - (text :: groupStuff :: Nil).flatten .map (l => if ("" == l) " *" else s" * $l").mkString ("\n") + (text :: groupStuff :: Nil).flatten.map (asterisk).mkString ("\n") } def asText: String =