Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize deduplication while processing held tasks #2203

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e62f85b
Stored groups to avoid recomputation. Brings down held task computati…
rahul-privado Jan 18, 2023
31f968e
Code betterment
rahul-privado Jan 18, 2023
ecc40e8
Removed unused function
rahul-privado Jan 18, 2023
9e4042c
Merge branch 'master' into held-tasks-opti
rahul-privado Jan 19, 2023
e6b18a8
More compact code for old
rahul-privado Jan 20, 2023
a7b9aea
Revert "More compact code for old"
rahul-privado Jan 20, 2023
0269925
Cache table entry list
rahul-privado Jan 20, 2023
5378828
Merge branch 'master' into held-tasks-opti
rahul-privado Jan 20, 2023
e9b24da
Formatted the code
rahul-privado Jan 20, 2023
1457a60
Merge branch 'joernio:master' into held-tasks-opti
rahul-privado Jan 24, 2023
42daee6
1. Properly concatenated the lists to avoid missing TableEntries
rahul-privado Jan 25, 2023
214edfc
Removed sorting O(n^2) worst case and made it a linear search O(n) to…
rahul-privado Jan 25, 2023
a252feb
Unique hash for each TableEntry
rahul-privado Jan 27, 2023
22f0cb1
Lazy computation of Sha1 hash
rahul-privado Jan 30, 2023
bbd65f0
Merge branch 'master' into held-tasks-opti
rahul-privado Jan 30, 2023
9dfadd7
Reintroduced caching of table entry hashes
rahul-privado Jan 30, 2023
16323fd
Variable renaming
rahul-privado Jan 30, 2023
34372f4
Formatted the code
rahul-privado Jan 30, 2023
04cc73d
Moved hash computation to a different function
rahul-privado Jan 30, 2023
f1ef699
Merging code improvement
rahul-privado Jan 30, 2023
e5afde1
Filter on max length
rahul-privado Jan 30, 2023
ee78f6b
Filtering while appendin new list
rahul-privado Jan 30, 2023
9ebba8f
Accounted for new max entries
rahul-privado Jan 30, 2023
83dc73f
Removed SHA/MD5 usage since it is expensive
rahul-privado Jan 30, 2023
d4eba26
Used minBy() instead of sorting
rahul-privado Jan 30, 2023
b56b25f
Updated group list map that was missed earlier
rahul-privado Jan 31, 2023
a866227
1. Converted merge list to hash map
rahul-privado Jan 31, 2023
125e30b
Better sync of the critical section
rahul-privado Jan 31, 2023
901c50e
Merge branch 'master' into held-tasks-opti
rahul-privado Jan 31, 2023
f614366
Removed merge list map since it is no longer needed
rahul-privado Feb 1, 2023
328c287
Introduced RW lock for proper protection
rahul-privado Feb 1, 2023
8375599
Parallization in the outer loop
rahul-privado Feb 1, 2023
c4ce927
Removed outer loop parallelism due to race issues
rahul-privado Feb 1, 2023
febe497
Parallel group computation
rahul-privado Feb 1, 2023
39db644
Line formatting
rahul-privado Feb 1, 2023
5ed522d
Made priority computation more accurate
rahul-privado Feb 2, 2023
dba8d1d
Merge branch 'master' into held-tasks-opti
rahul-privado Feb 6, 2023
70a53b8
Merge branch 'joernio:master' into held-tasks-opti
rahul-privado Feb 7, 2023
3a01ade
Merge branch 'joernio:master' into held-tasks-opti
rahul-privado Feb 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package io.joern.dataflowengineoss.queryengine

import io.shiftleft.codepropertygraph.generated.nodes.{Call, CfgNode}

import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable
import scala.collection.parallel.CollectionConverters._
import scala.language.postfixOps

/** Complete held tasks using the result table. The result table is modified in the process.
*
Expand Down Expand Up @@ -34,7 +38,6 @@ class HeldTaskCompletion(
* created, `changed` is set to true for the result's table entry and `resultsProductByTask` is updated.
*/
def completeHeldTasks(): Unit = {

deduplicateResultTable()
val toProcess =
heldTasks.distinct.sortBy(x =>
Expand All @@ -46,6 +49,11 @@ class HeldTaskCompletion(
def noneChanged = toProcess.map { t => t.fingerprint -> false }.toMap

var changed: Map[TaskFingerprint, Boolean] = allChanged
val groupMap
: mutable.Map[TaskFingerprint, Map[((CfgNode, List[Call], Boolean), (CfgNode, List[Call], Boolean)), List[
TableEntry
]]] = mutable.Map()
val rwlock = new ReentrantReadWriteLock()

while (changed.values.toList.contains(true)) {
val taskResultsPairs = toProcess
Expand All @@ -61,7 +69,7 @@ class HeldTaskCompletion(

changed = noneChanged
taskResultsPairs.foreach { case (t, resultsForTask, newResults) =>
addCompletedTasksToMainTable(newResults.toList)
addCompletedTasksToMainTable(newResults.toList, groupMap, rwlock)
newResults.foreach { case (fingerprint, _) =>
changed += fingerprint -> true
}
Expand Down Expand Up @@ -117,11 +125,73 @@ class HeldTaskCompletion(
pathSeq.distinct.size == pathSeq.size
}

private def addCompletedTasksToMainTable(results: List[(TaskFingerprint, TableEntry)]): Unit = {
results.groupBy(_._1).foreach { case (fingerprint, resultList) =>
val entries = resultList.map(_._2)
val old = resultTable.getOrElse(fingerprint, Vector()).toList
resultTable.put(fingerprint, deduplicateTableEntries(old ++ entries))
private def addCompletedTasksToMainTable(
results: List[(TaskFingerprint, TableEntry)],
groupMap: mutable.Map[TaskFingerprint, Map[((CfgNode, List[Call], Boolean), (CfgNode, List[Call], Boolean)), List[
TableEntry
]]],
rwlock: ReentrantReadWriteLock
): Unit = {
results.groupBy(_._1).par.foreach { case (fingerprint, resultList) =>
val entries = resultList.par.map(_._2)
val newGroups = entries
.groupBy { result =>
val head = result.path.headOption.map(x => (x.node, x.callSiteStack, x.isOutputArg)).get
val last = result.path.lastOption.map(x => (x.node, x.callSiteStack, x.isOutputArg)).get
(head, last)
}

rwlock.readLock().lock()
val old = resultTable.getOrElse(fingerprint, Vector()).toList
val oldGroups = groupMap.getOrElse(
fingerprint,
old
.groupBy { result =>
val head = result.path.headOption.map(x => (x.node, x.callSiteStack, x.isOutputArg)).get
val last = result.path.lastOption.map(x => (x.node, x.callSiteStack, x.isOutputArg)).get
(head, last)
}
)
rwlock.readLock().unlock()

val mergedGroups = oldGroups ++ newGroups.map { case (k, v) =>
k -> {
val old = oldGroups.getOrElse(k, List())
val maxLen = if (old.length > 0) {
old.head.path.length
} else { 0 }

val gtOrEqualMax = v.filter(x => x.path.length >= maxLen)
val gtMax = gtOrEqualMax.filter(x => x.path.length > maxLen)

if (gtMax.length > 0) {
// new list contains elements with paths exceeding the max. retain new list elements only
// that have max length
var newMaxLen = maxLen
gtMax.foreach(x => {
if (x.path.length > newMaxLen) {
newMaxLen = x.path.length
}
})
val element = gtMax.filter(x => x.path.length == newMaxLen).par.minBy(computePriority)
List(element)
} else if (gtOrEqualMax == 0) {
// new list contains all elements with paths less than the max. retain old list elements only
old
} else {
// new list contains all elements with paths less than or equal to the max but not exceeding it.
// append new list elements that are equal to max
val element = (old ++ gtOrEqualMax.par.filter(x => x.path.length == maxLen)).par.minBy(computePriority)
List(element)
}
}
}
val mergedList = mergedGroups.map { case (_, list) => list.head}.toList

rwlock.writeLock().lock()
resultTable.put(fingerprint, mergedList)
groupMap.update(fingerprint, mergedGroups)
rwlock.writeLock().unlock()
}
}

Expand Down Expand Up @@ -168,4 +238,17 @@ class HeldTaskCompletion(
.toList
}

private def computePriority(entry: TableEntry): BigInt = {
var priority: BigInt = entry.path.length
val multiplier: BigInt = 131072 // 2^17

entry.path.foreach(element => {
priority = priority + element.callSiteStack.length * multiplier
priority = priority + element.node.id() * multiplier * 64
priority = priority + element.isOutputArg.hashCode() * multiplier * multiplier *64
priority = priority + element.visible.hashCode() * multiplier * 64
})
priority
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.joern.dataflowengineoss.semanticsloader.Semantics
import io.shiftleft.codepropertygraph.generated.nodes._
import io.shiftleft.semanticcpg.language.{toCfgNodeMethods, toExpressionMethods}

import java.security.MessageDigest
import java.util.concurrent.Callable
import scala.collection.mutable

Expand Down Expand Up @@ -49,6 +50,7 @@ class TaskSolver(task: ReachableByTask, context: EngineContext, sources: Set[Cfg
val parentTask = r.taskStack(i)
val pathToSink = r.path.slice(0, r.path.map(_.node).indexOf(parentTask.sink))
val newPath = pathToSink :+ PathElement(parentTask.sink, parentTask.callSiteStack)

(parentTask, TableEntry(path = newPath))
}.toList
}
Expand Down