forked from scalacenter/bloop
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathParallelOps.scala
285 lines (259 loc) · 10.1 KB
/
ParallelOps.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package bloop.io
import java.io.IOException
import java.nio.file.FileVisitResult
import java.nio.file.FileVisitor
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Promise
import scala.util.control.NonFatal
import bloop.logging.Logger
import bloop.task.Task
import monix.eval.{Task => MonixTask}
import monix.execution.Cancelable
import monix.execution.Scheduler
import monix.execution.atomic.AtomicBoolean
import monix.execution.cancelables.AssignableCancelable
import monix.execution.cancelables.CompositeCancelable
import monix.reactive.Consumer
import monix.reactive.MulticastStrategy
import monix.reactive.Observable
object ParallelOps {
sealed trait CopyMode
object CopyMode {
final case object NoReplace extends CopyMode
final case object ReplaceExisting extends CopyMode
final case object ReplaceIfMetadataMismatch extends CopyMode
}
/**
* A configuration for a copy process.
*
* @param parallelUnits Threads to use for parallel IO copy.
* @param replaceExisting Whether the copy should replace existing paths in the target.
* @param denylist A list of both origin and target paths that if matched skip the copy.
*/
case class CopyConfiguration private (
parallelUnits: Int,
mode: CopyMode,
denylist: Set[Path]
)
case class FileWalk(visited: List[Path], target: List[Path])
private[this] val takenByOtherCopyProcess = new ConcurrentHashMap[Path, Promise[Unit]]()
/**
* Copies files from [[origin]] to [[target]] with the provided copy
* configuration in parallel on the scheduler [[scheduler]].
*
* @param enableCancellation A flag to control whether the task should be
* cancelled or not. For semantics-preserving copy tasks, we might want to
* disable cancellation. Otherwise, it's possible that, for example,
* `BspServer` calls `cancel` on the post-compilation task even though the
* compilation was cancelled, because the order in which cancel finalizers
* are done is arbitrary. This value is usually `false` because most of the
* copies are key for compilation semantics.
*
* @return The list of paths that have been copied.
*/
def copyDirectories(configuration: CopyConfiguration)(
origin: Path,
target: Path,
scheduler: Scheduler,
enableCancellation: Boolean,
logger: Logger
): Task[FileWalk] = Task.defer {
val isCancelled = AtomicBoolean(false)
import scala.collection.mutable
val visitedPaths = new mutable.ListBuffer[Path]()
val targetPaths = new mutable.ListBuffer[Path]()
val (observer, observable) = Observable.multicast[((Path, BasicFileAttributes), Path)](
MulticastStrategy.publish
)(scheduler)
val discovery = new FileVisitor[Path] {
var firstVisit: Boolean = true
var currentTargetDirectory: Path = target
def visitFile(file: Path, attributes: BasicFileAttributes): FileVisitResult = {
if (isCancelled.get) FileVisitResult.TERMINATE
else {
if (attributes.isDirectory || configuration.denylist.contains(file)) ()
else {
val rebasedFile = currentTargetDirectory.resolve(file.getFileName)
if (configuration.denylist.contains(rebasedFile)) ()
else {
visitedPaths.+=(file)
targetPaths.+=(rebasedFile)
observer.onNext((file -> attributes, rebasedFile))
}
}
FileVisitResult.CONTINUE
}
}
def visitFileFailed(
t: Path,
e: IOException
): FileVisitResult = FileVisitResult.CONTINUE
def preVisitDirectory(
directory: Path,
attributes: BasicFileAttributes
): FileVisitResult = {
if (isCancelled.get) FileVisitResult.TERMINATE
else {
if (firstVisit) {
firstVisit = false
} else {
currentTargetDirectory = currentTargetDirectory.resolve(directory.getFileName)
}
Files.createDirectories(currentTargetDirectory)
FileVisitResult.CONTINUE
}
}
def postVisitDirectory(
directory: Path,
exception: IOException
): FileVisitResult = {
currentTargetDirectory = currentTargetDirectory.getParent()
FileVisitResult.CONTINUE
}
}
val discoverFileTree = Task {
if (!Files.exists(origin)) {
FileWalk(Nil, Nil)
} else {
Files.walkFileTree(origin, discovery)
FileWalk(visitedPaths.toList, targetPaths.toList)
}
}.doOnFinish {
case Some(t) => Task(observer.onError(t))
case None => Task(observer.onComplete())
}
val subscribed = Promise[Unit]()
// We set the value of this cancelable when we start consuming task
var completeSubscribers: Cancelable = Cancelable.empty
val cancelables = new mutable.ListBuffer[Cancelable]()
val cancelable = AssignableCancelable.multi { () =>
val tasksToCancel = cancelables.synchronized { cancelables.toList }
Cancelable.cancelAll(completeSubscribers :: tasksToCancel)
}
val copyFileSequentially = Consumer.foreachTask[((Path, BasicFileAttributes), Path)] {
case ((originFile, originAttrs), targetFile) =>
def copy(replaceExisting: Boolean): Unit = try {
if (replaceExisting) {
Files.copy(
originFile,
targetFile,
StandardCopyOption.COPY_ATTRIBUTES,
StandardCopyOption.REPLACE_EXISTING
)
} else {
Files.copy(
originFile,
targetFile,
StandardCopyOption.COPY_ATTRIBUTES
)
}
()
} catch {
case NonFatal(t) =>
logger.error(
s"Unexpected error when copying $originFile to $targetFile, you might need to restart the build server.",
t
)
}
// It's important that this task is not forked for performance reasons
def triggerCopy(p: Promise[Unit]) = MonixTask.eval {
try {
// Skip work if cancellation is on and complete promise in finalizer
if (isCancelled.get) ()
else {
configuration.mode match {
case CopyMode.ReplaceExisting => copy(replaceExisting = true)
case CopyMode.ReplaceIfMetadataMismatch =>
import scala.util.{Try, Success, Failure}
Try(Files.readAttributes(targetFile, classOf[BasicFileAttributes])) match {
case Success(targetAttrs) =>
val changedMetadata = {
originAttrs.lastModifiedTime
.compareTo(targetAttrs.lastModifiedTime) != 0 ||
originAttrs.size() != targetAttrs.size()
}
if (!changedMetadata) ()
else copy(replaceExisting = true)
// Can happen when the file does not exist, replace in that case
case Failure(_: IOException) => copy(replaceExisting = true)
case Failure(t) => throw t
}
case CopyMode.NoReplace =>
if (Files.exists(targetFile)) ()
else copy(replaceExisting = false)
}
}
} finally {
takenByOtherCopyProcess.remove(originFile)
// Complete successfully to unblock other tasks
p.success(())
}
()
}
def acquireFile: MonixTask[Unit] = {
val currentPromise = Promise[Unit]()
val promiseInMap = takenByOtherCopyProcess.putIfAbsent(originFile, currentPromise)
if (promiseInMap == null) {
triggerCopy(currentPromise)
} else {
MonixTask.fromFuture(promiseInMap.future).flatMap(_ => acquireFile)
}
}
acquireFile.coeval(scheduler).value match {
// The happy path is that we evaluate the task and return
case Right(()) => MonixTask.now(())
case Left(cancelable) =>
// Blocked on another process to finish the copy of a file, when it's done we restart
cancelables.synchronized { cancelables.+=(cancelable) }
MonixTask
.fromFuture(cancelable)
.doOnFinish(_ =>
MonixTask { cancelables.synchronized { cancelables.-=(cancelable) }; () }
)
}
}
/**
* Make manual subscription to consumer so that we can control the
* cancellation for both the source and the consumer. Otherwise, there is
* no way to call the cancelable produced by the consumer.
*/
val copyFilesInParallel = Task.create[List[Unit]] { (scheduler, cb) =>
if (isCancelled.get) {
cb.onSuccess(Nil)
subscribed.success(())
Cancelable.empty
} else {
val parallelConsumer =
Consumer.loadBalance(configuration.parallelUnits, copyFileSequentially)
val (out, consumerSubscription) = parallelConsumer.createSubscriber(cb, scheduler)
val cancelOut = Cancelable(() => out.onComplete())
completeSubscribers = CompositeCancelable(cancelOut)
val sourceSubscription = observable.subscribe(out)
subscribed.success(())
val cancelable = CompositeCancelable(sourceSubscription, consumerSubscription)
if (!enableCancellation) Cancelable.empty
else {
if (isCancelled.get) {
cancelable.cancel()
}
cancelable
}
}
}
val orderlyDiscovery = Task.fromFuture(subscribed.future).flatMap(_ => discoverFileTree)
val aggregatedCopyTask = Task {
Task.mapBoth(orderlyDiscovery, copyFilesInParallel) { case (fileWalk, _) => fileWalk }
}.flatten.executeOn(scheduler)
aggregatedCopyTask.doOnCancel(Task {
if (enableCancellation) {
isCancelled.compareAndSet(false, true)
observer.onComplete()
cancelable.cancel()
}
})
}
}