Skip to content

Commit

Permalink
version changes to run on aws
Browse files Browse the repository at this point in the history
  • Loading branch information
Kaushal1011 committed Oct 31, 2023
1 parent 92d3fde commit 20a2a56
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
# CS441ManInMiddleAttackSimulator


run instructions:

```bash
spark-submit --class Main ./target/scala-2.12/CS441HW2MitM-assembly-0.1.0-SNAPSHOT.jar ./input/NodesOut.txt ./input/NodesPerturbedOut.txt ./input/EdgesPerturbedOut.txt ./output/sparkS
```
9 changes: 4 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.15"
ThisBuild / scalaVersion := "2.12.17"

lazy val root = (project in file("."))
.settings(
name := "CS441HW2MitM",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.2.4" excludeAll(ExclusionRule(organization = "org.apache.hadoop")),
"org.apache.spark" %% "spark-sql" % "3.2.4" excludeAll(ExclusionRule(organization = "org.apache.hadoop")),
"org.apache.hadoop" % "hadoop-client" % "3.2.1",
"org.apache.spark" %% "spark-core" % "3.4.1" ,
"org.apache.spark" %% "spark-sql" % "3.4.1" ,
"org.scalatest" %% "scalatest" % "3.0.8" % "test",
"org.apache.spark" %% "spark-graphx" % "3.2.4" excludeAll(ExclusionRule(organization = "org.apache.hadoop")),
"org.apache.spark" %% "spark-graphx" % "3.4.1",
"com.typesafe" % "config" % "1.4.1",
)
)
Expand Down
29 changes: 16 additions & 13 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.graphx._
import helpers.{ComparableEdge, ComparableNode, NodeDataParser}

import scala.util.Random
import org.apache.log4j.Logger
import RandomWalk.RandomWalk.{vertexProgram, sendMessage, mergeMessage}
import RandomWalk.RandomWalk.{mergeMessage, sendMessage, vertexProgram}
import com.typesafe.config.ConfigFactory
import Utilz.ConfigReader
import org.apache.spark.broadcast.Broadcast
object Main {
val logger: Logger = Logger.getLogger("CS441HW2MitM")

Expand All @@ -18,7 +20,7 @@ object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("GraphSim")
.master("local[4]") // Set master to local with 4 cores. Adjust as needed.
// .master("local[4]") // Set master to local with 4 cores. Adjust as needed. // comment this if using spark-submit
.getOrCreate()

val sc = spark.sparkContext
Expand Down Expand Up @@ -54,7 +56,7 @@ object Main {
val nodesOGRDD: RDD[ComparableNode] = sc.textFile(nodeFileOG)
.map(line => NodeDataParser.parseNodeData(line))

val originalGraph: Array[ComparableNode] = nodesOGRDD.collect()
val originalGraph: Broadcast[Array[ComparableNode]] = sc.broadcast(nodesOGRDD.collect())

val graph = Graph(nodesRDD, edgeRDD)

Expand All @@ -72,7 +74,8 @@ object Main {


// Convert it to a map for easy lookup
val neighborsMap: Map[VertexId, Array[ComparableNode]] = neighborsWithAttrs.collect().toMap
val neighborsMap = sc.broadcast(neighborsWithAttrs.collect().toMap)


// Define accumulators for successful and failed attacks
val successfulAttacks: LongAccumulator = sc.longAccumulator("Successful Attacks")
Expand All @@ -84,9 +87,9 @@ object Main {

logger.info(s"Starting iteration, ${i}")

val initialNodes = nodesRDD.takeSample(withReplacement = false, num = initialNodeCount).map(_._1)
val initialNodes = sc.broadcast(nodesRDD.takeSample(withReplacement = false, num = initialNodeCount).map(_._1))

logger.info(s"Initial nodes, ${initialNodes.mkString(",")}")
logger.info(s"Initial nodes, ${initialNodes.value.mkString(",")}")

// Pregel simulation
val pregelGraph = runPregelSimulation(graph, neighborsMap, originalGraph, initialNodes)
Expand All @@ -111,7 +114,7 @@ object Main {

val stats = Array(
s"Total Successful Attacks: ${successfulAttacks.value}",
s"Total Failed Attacks: ${successfulAttacks.value}",
s"Total Failed Attacks: ${failedAttacks.value}",
s"Total Missidentified Attacks: ${missidentifiedAttacks.value}",
s"Total Uneventful Attacks: ${uneventfulAttacks.value}"
)
Expand All @@ -123,17 +126,17 @@ object Main {
}

private def runPregelSimulation(
graph: Graph[ComparableNode, ComparableEdge],
neighborsMap: Map[VertexId, Array[ComparableNode]],
originalGraph: Array[ComparableNode],
initialNodes: Array[VertexId]
graph: Graph[ComparableNode, ComparableEdge],
neighborsMap: Broadcast[Map[VertexId, Array[ComparableNode]]],
originalGraph: Broadcast[Array[ComparableNode]],
initialNodes: Broadcast[Array[VertexId]]
): Graph[(Long, ComparableNode, Long, Long, Long, Long, Long), ComparableEdge] = {

val initialGraph: Graph[(Long, ComparableNode, Long, Long, Long, Long, Long), ComparableEdge] = graph.mapVertices((id, e) => {


if (initialNodes.contains(id)) {
val nbrs = neighborsMap.getOrElse(id, Array.empty[ComparableNode])
if (initialNodes.value.contains(id)) {
val nbrs = neighborsMap.value.getOrElse(id, Array.empty[ComparableNode])
if (nbrs.nonEmpty) {
(nbrs(Random.nextInt(nbrs.length)).id, e, 0L, 0L,0L,0L,0L )
} else {
Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/RandomWalk/RandomWalk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import scala.util.Random
import helpers.ComparableNode
import MitMSimulator.MitMSimulator.attackingOriginalGraph
import org.apache.log4j.Logger
import org.apache.spark.broadcast.Broadcast

object RandomWalk {
val logger: Logger = Logger.getLogger("CS441HW2MitM")

def vertexProgram(originalGraph: Array[ComparableNode]): (VertexId, (Long, ComparableNode, Long, Long, Long, Long, Long), (Long, ComparableNode, Long, Long, Long, Long, Long)) => (Long, ComparableNode, Long, Long, Long, Long, Long) = {
def vertexProgram(originalGraph: Broadcast[Array[ComparableNode]]): (VertexId, (Long, ComparableNode, Long, Long, Long, Long, Long), (Long, ComparableNode, Long, Long, Long, Long, Long)) => (Long, ComparableNode, Long, Long, Long, Long, Long) = {
(id, oldValue, newValue) => {
// attr always stays the same
// number of sucessful attacks and failed attacks are updated
Expand All @@ -24,16 +25,16 @@ object RandomWalk {
// _6 is uneventful attacks

if (newValue._1 != Long.MaxValue) {
val (newSuccessful, newFailed, newMissidentified , newUneventful) = attackingOriginalGraph(newValue._2, originalGraph, oldValue._3, oldValue._4, oldValue._5, oldValue._6)
val (newSuccessful, newFailed, newMissidentified , newUneventful) = attackingOriginalGraph(newValue._2, originalGraph.value, oldValue._3, oldValue._4, oldValue._5, oldValue._6)
(newValue._1, oldValue._2, newSuccessful, newFailed, newMissidentified, newUneventful, newValue._7)

} else oldValue
}
}

def sendMessage(triplet: EdgeTriplet[(Long, ComparableNode, Long, Long, Long, Long, Long), _], neighborsMap: Map[VertexId, Array[ComparableNode]]): Iterator[(VertexId, (Long, ComparableNode, Long, Long, Long, Long, Long))] = {
def sendMessage(triplet: EdgeTriplet[(Long, ComparableNode, Long, Long, Long, Long, Long), _], neighborsMap: Broadcast[Map[VertexId, Array[ComparableNode]]]): Iterator[(VertexId, (Long, ComparableNode, Long, Long, Long, Long, Long))] = {
if (triplet.srcAttr._1 != Long.MaxValue && triplet.srcAttr._1 == triplet.dstId) {
val neighbours = neighborsMap.getOrElse(triplet.dstId, Array.empty[ComparableNode])
val neighbours = neighborsMap.value.getOrElse(triplet.dstId, Array.empty[ComparableNode])
if (neighbours.nonEmpty) {
val randomNeighbour = neighbours(Random.nextInt(neighbours.length)).id
logger.info(s"Message Passed,${triplet.srcId},${triplet.dstId},${triplet.srcAttr._7}")
Expand Down

0 comments on commit 20a2a56

Please sign in to comment.