Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sbt-assembly is using lots of native memory #517

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 104 additions & 106 deletions src/main/scala/sbtassembly/Assembly.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,136 +287,134 @@ object Assembly {
),
log
)
val (jarFiles, jarFileEntries) = timed(Level.Debug, "Collect and shade dependency entries") {
val jarFileEntries = timed(Level.Debug, "Collect and shade dependency entries") {
filteredJars.par.map { jar =>
val module = jar.metadata
.get(moduleID.key)
.map(m => ModuleCoordinate(m.organization, m.name, m.revision))
.getOrElse(ModuleCoordinate("", jar.data.name.replaceAll(".jar", ""), ""))
val jarFile = new JarFile(jar.data)
jarFile -> jarFile
val jarFileEntriesRes = jarFile
.entries()
.asScala
.filterNot(_.isDirectory)
.toVector
.par
.flatMap { entry =>
jarShader(module)(entry.getName, () => jarFile.getInputStream(entry))
jarShader(module)(entry.getName, () => new ByteArrayInputStream(jarFile.getInputStream(entry).readAllBytes()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is a good idea. Maybe it would be better to create a setting to tweak the parallelism of .par to reduce the concurrent calls on very big JARs?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating parallelism of .par should not change the native memory usage as we will still keep all those jar handler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're right. The PR as it stands requires heap equivalent to all JAR file size right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it will store the inflated data for each entries of each jar. From heapdump on our build job it represent around 5GB of byte[] while using the "stream" version it use around 3.5GB of byte[]. Stream version also used lots of byte[] because even if it is a stream the stream as already load/initiate a chunk of bytes per entry which seems to represent a big percent of an entry (probably because each of them are mostly only little text file).
Another approach would be to not load the stream in https://github.com/sbt/sbt-assembly/blob/develop/src/main/scala/sbtassembly/Assembly.scala#L304. Just get needed information for each entry and close the jar file handler immediately
And load the stream only in createJar.
This would reduce the heap used (even with current release) and reduce off heap size
Do you think it is possible or there are some other usage of the stream that would be uneasy to extract?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jar Jar Abrams needs to read the streams to process various rules against the stream of bytecode I think. Similar to what it's doing () => jarFile.getInputStream(entry) a lazy function that creates a stream on demand, I guess we can further make it lazier to provide:

name => {
  val jarFile = lookupJarFile(jar.data)
  val entry = jarFile.getEntry(name)
  jarFile.getInputStream(entry)
}

where we can keep a LRU cache of JAR files in lookupJarFile(...)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shader that rely on this jarjarabrams Shader seems to already load the whole stream into a ByteArrayInputStream
From jarjar-abrams bytecodeShader signature: https://github.com/eed3si9n/jarjar-abrams/blob/develop/core/src/main/scala/com/eed3si9n/jarjarabrams/Shader.scala#L71

I'm not sure to see the gain in keeping the stream as we will load all the bytes in the shader. Also wondering if we rely on such mechanism with LRU cache if we could not reach some bugs with LRU cache closing the file handler while still having an input stream open from it

.map { case (shadedName, stream) =>
Library(module, entry.getName, shadedName, stream)
}
}
}.unzip
jarFile.close()
jarFileEntriesRes
}
}

val (mappingsToRename, others) = timed(Level.Debug, "Collect renames") {
(classMappings ++ jarFileEntries.flatten)
.partition(mapping => ao.mergeStrategy(mapping.target).name == MergeStrategy.rename.name)
}
try {
val (mappingsToRename, others) = timed(Level.Debug, "Collect renames") {
(classMappings ++ jarFileEntries.flatten)
.partition(mapping => ao.mergeStrategy(mapping.target).name == MergeStrategy.rename.name)
val renamedEntries = timed(Level.Debug, "Process renames") {
merge(mappingsToRename, path => Option(ao.mergeStrategy(path)), log)
}
// convert renames back to `Dependency`s for second-pass merge and cache-invalidation
val renamedDependencies = convertToDependency(renamedEntries)
val (jarManifest, timestamp) = createManifest(po, log)
// exclude renames from the second pass
val secondPassMergeStrategy = (path: String) => {
val mergeStrategy = ao.mergeStrategy(path)
if (mergeStrategy.name == MergeStrategy.rename.name) Option.empty
else Option(mergeStrategy)
}
val buildAssembly = () => {
val mergedEntries = timed(Level.Debug, "Merge all conflicting jar entries (including those renamed)") {
merge(renamedDependencies ++ others, secondPassMergeStrategy, log)
}
val renamedEntries = timed(Level.Debug, "Process renames") {
merge(mappingsToRename, path => Option(ao.mergeStrategy(path)), log)
timed(Level.Debug, "Report merge results") {
reportMergeResults(renamedEntries, log)
reportMergeResults(mergedEntries, log)
}
// convert renames back to `Dependency`s for second-pass merge and cache-invalidation
val renamedDependencies = convertToDependency(renamedEntries)
val (jarManifest, timestamp) = createManifest(po, log)
// exclude renames from the second pass
val secondPassMergeStrategy = (path: String) => {
val mergeStrategy = ao.mergeStrategy(path)
if (mergeStrategy.name == MergeStrategy.rename.name) Option.empty
else Option(mergeStrategy)
timed(Level.Debug, "Finding remaining conflicts that were not merged") {
reportConflictsMissedByTheMerge(mergedEntries, log)
}
val buildAssembly = () => {
val mergedEntries = timed(Level.Debug, "Merge all conflicting jar entries (including those renamed)") {
merge(renamedDependencies ++ others, secondPassMergeStrategy, log)
}
timed(Level.Debug, "Report merge results") {
reportMergeResults(renamedEntries, log)
reportMergeResults(mergedEntries, log)
}
timed(Level.Debug, "Finding remaining conflicts that were not merged") {
reportConflictsMissedByTheMerge(mergedEntries, log)
}
val jarEntriesToWrite = timed(Level.Debug, "Sort/Parallelize merged entries") {
if (ao.repeatableBuild) // we need the jars in a specific order to have a consistent hash
mergedEntries.flatMap(_.entries).seq.sortBy(_.target)
else // we actually gain performance when creating the jar in parallel, but we won't have a consistent hash
mergedEntries.flatMap(_.entries).par
}
val localTime = timestamp
.map(t => t - java.util.TimeZone.getDefault.getOffset(t))
.getOrElse(System.currentTimeMillis())
val jarEntriesToWrite = timed(Level.Debug, "Sort/Parallelize merged entries") {
if (ao.repeatableBuild) // we need the jars in a specific order to have a consistent hash
mergedEntries.flatMap(_.entries).seq.sortBy(_.target)
else // we actually gain performance when creating the jar in parallel, but we won't have a consistent hash
mergedEntries.flatMap(_.entries).par
}
val localTime = timestamp
.map(t => t - java.util.TimeZone.getDefault.getOffset(t))
.getOrElse(System.currentTimeMillis())

timed(Level.Debug, "Create jar") {
IO.delete(output)
createJar(output, jarEntriesToWrite, jarManifest, localTime)
}
val fullSha1 = timed(Level.Debug, "Hash newly-built Jar") {
hash(output)
}
val builtAssemblyJar =
if (ao.appendContentHash) {
val sha1 = ao.maxHashLength.fold(fullSha1)(length => fullSha1.take(length))
val newName = output.getName.replaceAll("\\.[^.]*$", "") + "-" + sha1 + ".jar"
val outputWithHash = new File(output.getParentFile, newName)
IO.delete(outputWithHash)
Files.move(output.toPath, outputWithHash.toPath, StandardCopyOption.REPLACE_EXISTING)
outputWithHash
} else output
ao.prependShellScript
.foreach { shellScript =>
timed(Level.Info, "Prepend shell script") {
val tmp = cacheDir / "assemblyExec.tmp"
if (tmp.exists) IO.delete(tmp)
Files.move(builtAssemblyJar.toPath, tmp.toPath)
IO.write(builtAssemblyJar, shellScript.map(_ + "\n").mkString, append = false)
Using.fileOutputStream(true)(builtAssemblyJar)(out => IO.transfer(tmp, out))
IO.delete(tmp)
if (!scala.util.Properties.isWin) {
val posixPermissions = Files.getPosixFilePermissions(builtAssemblyJar.toPath)
posixPermissions.add(PosixFilePermission.OWNER_EXECUTE)
posixPermissions.add(PosixFilePermission.GROUP_EXECUTE)
posixPermissions.add(PosixFilePermission.OTHERS_EXECUTE)
Files.setPosixFilePermissions(builtAssemblyJar.toPath, posixPermissions)
}
}
}
log.info("Built: " + builtAssemblyJar.toPath)
log.info("Jar hash: " + fullSha1)
builtAssemblyJar
timed(Level.Debug, "Create jar") {
IO.delete(output)
createJar(output, jarEntriesToWrite, jarManifest, localTime)
}
val mergeStrategiesByPathList = timed(Level.Debug, "Collect all merge strategies for cache check") {
// collect all
(renamedDependencies ++ others)
.groupBy(_.target)
.map { case (target, _) =>
val mergeStrategy = secondPassMergeStrategy(target).getOrElse(MergeStrategy.deduplicate)
target -> (mergeStrategy.isBuiltIn -> mergeStrategy.name)
}
val fullSha1 = timed(Level.Debug, "Hash newly-built Jar") {
hash(output)
}
if (
ao.cacheOutput &&
!mergeStrategiesByPathList.values
.exists { case (isBuiltIn, name) =>
// if there is at least one custom merge strategy, we cannot predict what it does so we cannot use caching
if (!isBuiltIn) log.warn(s"Caching disabled because of a custom merge strategy: '$name'")
!isBuiltIn
val builtAssemblyJar =
if (ao.appendContentHash) {
val sha1 = ao.maxHashLength.fold(fullSha1)(length => fullSha1.take(length))
val newName = output.getName.replaceAll("\\.[^.]*$", "") + "-" + sha1 + ".jar"
val outputWithHash = new File(output.getParentFile, newName)
IO.delete(outputWithHash)
Files.move(output.toPath, outputWithHash.toPath, StandardCopyOption.REPLACE_EXISTING)
outputWithHash
} else output
ao.prependShellScript
.foreach { shellScript =>
timed(Level.Info, "Prepend shell script") {
val tmp = cacheDir / "assemblyExec.tmp"
if (tmp.exists) IO.delete(tmp)
Files.move(builtAssemblyJar.toPath, tmp.toPath)
IO.write(builtAssemblyJar, shellScript.map(_ + "\n").mkString, append = false)
Using.fileOutputStream(true)(builtAssemblyJar)(out => IO.transfer(tmp, out))
IO.delete(tmp)
if (!scala.util.Properties.isWin) {
val posixPermissions = Files.getPosixFilePermissions(builtAssemblyJar.toPath)
posixPermissions.add(PosixFilePermission.OWNER_EXECUTE)
posixPermissions.add(PosixFilePermission.GROUP_EXECUTE)
posixPermissions.add(PosixFilePermission.OTHERS_EXECUTE)
Files.setPosixFilePermissions(builtAssemblyJar.toPath, posixPermissions)
}
}
) {
val (_, classes) = classByParentDir.unzip
val cacheKey = lastModified(classes.toSet ++ filteredJars.map(_.data).toSet) :+:
mergeStrategiesByPathList :+:
jarManifest :+:
ao.repeatableBuild :+:
ao.prependShellScript :+:
ao.maxHashLength :+:
ao.appendContentHash :+:
HNil
cachedAssembly(cacheKey, cacheDir, ao.scalaVersion, log)(buildAssembly)
} else buildAssembly()
} finally
timed(Level.Debug, "Close library jar references") {
jarFiles.foreach(_.close())
}
}
log.info("Built: " + builtAssemblyJar.toPath)
log.info("Jar hash: " + fullSha1)
builtAssemblyJar
}
val mergeStrategiesByPathList = timed(Level.Debug, "Collect all merge strategies for cache check") {
// collect all
(renamedDependencies ++ others)
.groupBy(_.target)
.map { case (target, _) =>
val mergeStrategy = secondPassMergeStrategy(target).getOrElse(MergeStrategy.deduplicate)
target -> (mergeStrategy.isBuiltIn -> mergeStrategy.name)
}
}
if (
ao.cacheOutput &&
!mergeStrategiesByPathList.values
.exists { case (isBuiltIn, name) =>
// if there is at least one custom merge strategy, we cannot predict what it does so we cannot use caching
if (!isBuiltIn) log.warn(s"Caching disabled because of a custom merge strategy: '$name'")
!isBuiltIn
}
) {
val (_, classes) = classByParentDir.unzip
val cacheKey = lastModified(classes.toSet ++ filteredJars.map(_.data).toSet) :+:
mergeStrategiesByPathList :+:
jarManifest :+:
ao.repeatableBuild :+:
ao.prependShellScript :+:
ao.maxHashLength :+:
ao.appendContentHash :+:
HNil
cachedAssembly(cacheKey, cacheDir, ao.scalaVersion, log)(buildAssembly)
} else buildAssembly()
}

/**
Expand Down
Loading