Skip to content

Commit

Permalink
responding to Philippe’s comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei Winitzki committed Dec 26, 2016
1 parent 01a8881 commit 91af296
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 57 deletions.
8 changes: 4 additions & 4 deletions docs/chymyst02.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ def makeFirstResult[T](f: B[Unit, T], g: B[Unit, T]): B[Unit, T] = {

```

The main advantage of this code is to encapsulate all the auxiliary molecule emitters within the local scope of the method `makeFirstResult`.
The main advantage of this code is to encapsulate all the auxiliary molecule emitters within the local scope of the method `makeFirstResult()`.
Only the `firstResult` emitter is returned:
This is the only emitter that the user of this library needs.
This is the only emitter that the user of `makeFirstResult()` will need.
The other emitters (`c`, `d`, and `done`) are invisible to the user since they are local variables in the scope of `makeFirstResult`.
The user of the library cannot break the chemistry by inadvertently emitting some further copies of `c`, `d`, or `done`.
The user of `makeFirstResult()` cannot break the chemistry by inadvertently emitting some further copies of `c`, `d`, or `done`.


#### Refactoring as a molecule
Expand Down Expand Up @@ -426,7 +426,7 @@ site(
c() + d() + result(0)
```

As an exercise, the reader should now try to encapsulate the `parallelOr` operation into a library function.
As an exercise, the reader should now try to encapsulate the `parallelOr` operation into a function.
(This is done in the tests in `ParallelOrSpec.scala`.)

As another exercise: Revise these reactions to incorporate more than two blocking emitters (say, `f`, `g`, `h`, `i`, `j`).
Expand Down
2 changes: 1 addition & 1 deletion docs/chymyst06.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Here are previous implementations of Join Calculus that I was able to find.
- _Join Java_: [von Itzstein et al., 2001-2005](http://www.vonitzstein.com/Project_JoinJava.html). This was a modified Java language compiler, with support for certain Join Calculus constructions. The project is not maintained.
- The `JoCaml` language: [Official site](http://jocaml.inria.fr) and a publication about JoCaml: [Fournet et al. 2003](http://research.microsoft.com/en-us/um/people/fournet/papers/jocaml-afp4-summer-school-02.pdf). This is a dialect of OCaml implemented as a patch to the OCaml compiler. The project is still supported.
- “Join in Scala” compiler patch: [V. Cremet 2003](http://lampwww.epfl.ch/~cremet/misc/join_in_scala/index.html). The project is discontinued.
- Joins library for .NET: [P. Crusso 2006](http://research.microsoft.com/en-us/um/people/crusso/joins/). The project is available as a binary .NET download from Microsoft Research.
- Joins library for .NET: [P. Crusso 2006](http://research.microsoft.com/en-us/um/people/crusso/joins/). The project is available as a .NET binary download from Microsoft Research.
- `ScalaJoins`, a prototype implementation in Scala: [P. Haller 2008](http://lampwww.epfl.ch/~phaller/joins/index.html). The project is not maintained.
- `ScalaJoin`: an improvement over `ScalaJoins`, [J. He 2011](https://github.com/Jiansen/ScalaJoin). The project is not maintained.
- Joinads, a not-quite-Join-Calculus implementation in F# and Haskell: [Petricek and Syme 2011](https://www.microsoft.com/en-us/research/publication/joinads-a-retargetable-control-flow-construct-for-reactive-parallel-and-concurrent-programming/). The project is not maintained.
Expand Down
148 changes: 96 additions & 52 deletions docs/chymyst07.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,22 @@ all_done() // This will block until all reactions are finished.
### Refactoring into a function

Consider what is required in order to refactor this functionality into a reusable library function.
We would like to have a library function call such as `make_all_done()` that creates the `all_done()` molecule for us.
We would like to have a function call such as `make_all_done()` that creates the `all_done()` molecule for us.

The library function `make_all_done()` will need to declare a new reaction site.
The function `make_all_done()` will need to declare a new reaction site.
The `done()` molecule is an input molecule at the new reaction site.
Therefore, this molecule cannot be already defined before we perform the library call.
Therefore, this molecule cannot be already defined before we perform the call to `make_all_done()`.

We see that the result of the library call `make_all_done()` must be the creation of _two_ new molecules: a new `done()` molecule and a new `all_done()` molecule.
We see that the result of the call `make_all_done()` must be the creation of _two_ new molecules: a new `done()` molecule and a new `all_done()` molecule.

The user of the library will then need to make sure that every job emits the `done()` molecule at the end of the job.
The user can then arrange for all the jobs to start (in whatever way necessary).
The user will then need to make sure that every job emits the `done()` molecule at the end of the job,
and arrange for all the jobs to start (in whatever way necessary).
When it becomes necessary to wait until the completion of all jobs, the user code will simply emit the `all_done()` blocking molecule and wait until it returns.

Refactoring an inline piece of chemistry into a reusable library function is generally done in two steps:
Refactoring an inline piece of chemistry into a reusablefunction is generally done in two steps:

- declare the same molecules and reactions as in the previous code, but now everything is within the scope of a function
- the function should then return just the new molecule emitters that the library user will need to use, but no other molecule emitters
- the function should then return just the new molecule emitters that the user will need to call, but no other molecule emitters

Here is the result of this procedure for our previous code:

Expand Down Expand Up @@ -263,7 +263,7 @@ all_done() // This will block until all reactions are finished.

A variation on the same theme is to detect when `n` jobs are finished, given that each job will emit a _blocking_ molecule `done()` at the end.

In the previous section, we implemented waiting for `n` non-blocking molecules as a library call `make_all_done()`.
In the previous section, we encapsulated the functionality of waiting for `n` non-blocking molecules into a function `make_all_done()`.
Let us take the code we just saw for `make_all_done()` and try to modify it for the case when `done()` is a blocking molecule.
The key reaction

Expand Down Expand Up @@ -582,7 +582,8 @@ val beginCritical2 = newCriticalSectionMarker()

The result should be that we can create any number of critical sections that work independently of each other.

In order to package the implementation of the critical section into a library function `newCriticalSectionMarker`, we simply declare the chemistry in the local scope of that function and return the molecule emitter:
In order to package the implementation of the critical section into a function `newCriticalSectionMarker()`,
we simply declare the chemistry in the local scope of that function and return the molecule emitter:

```scala
def newCriticalSectionMarker(): B[Unit, M[Unit]] = {
Expand Down Expand Up @@ -802,30 +803,30 @@ The count value (showing the number of `barrier()` molecules already consumed) m
Therefore, we need a reaction like this:

```scala
go { case barrier(_, reply) + counter(k) => ???; if (k+1<n) counter(k+1); ???; reply() }
go { case barrier(_, reply) + counter(k) => ???; if (k + 1 < n) counter(k + 1); ???; reply() }

```

This reaction should reply to the `barrier()` molecule (as should any reaction that consumes a blocking molecule).
It is clear that `replyB()` should come last: this is the reply to the `barrier()` molecule, which should be performed only after we counted up to `n`.
It is clear that `reply()` should come last: this is the reply to the `barrier()` molecule, which should be performed only after we counted up to `n`.
Until then, this reaction must be blocked.

The only way to block in the middle of a reaction is to emit a blocking molecule there.
Therefore, some blocking molecule must be emitted in the reaction before replying to `barrier()`.
Let us make `counter()` a blocking molecule:

```scala
go { case barrier(_, replyB) + counter(k, replyC) => ???; if (k+1<n) counter(k+1); ???; replyB() }
go { case barrier(_, reply) + counter(k, replyCounter) => ???; if (k + 1 < n) counter(k + 1); ???; reply() }

```

What is still missing is the reply to the `counter(k)` molecule.
Now we are faced with a question: at what point should we perform that reply, before emitting `counter(k+1)` or after?
Now we are faced with a question: at what point should we perform that reply, before emitting `counter(k + 1)` or after?
We could write one of the two reactions:

```scala
val reaction1 = go { case barrier(_, replyB) + counter(k, replyC) => replyC(); if (k+1<n) counter(k+1); replyB() }
val reaction2 = go { case barrier(_, replyB) + counter(k, replyC) => if (k+1<n) counter(k+1); replyC(); replyB() }
val reaction1 = go { case barrier(_, reply) + counter(k, replyCounter) => replyCounter(); if (k + 1 < n) counter(k + 1); reply() }
val reaction2 = go { case barrier(_, reply) + counter(k, replyCounter) => if (k + 1 < n) counter(k + 1); replyCounter(); reply() }

```

Expand All @@ -850,83 +851,101 @@ However, in `reaction1` the reply is already sent to `counter(7)` before emittin
This will unblock the previous reaction,

```scala
{ case barrier(_, replyB) + counter(6, replyC) => replyC(); counter(7); replyB() }
{ case barrier(_, reply) + counter(6, replyCounter) => replyCounter(); counter(7); reply() }

```

which was blocked at the call to `counter(7)`.
Now this reaction can proceed to reply to `barrier()` using `replyB()`.
Now this reaction can proceed to reply to `barrier()` using `reply()`.
However, at this point it is too early to send a reply to `barrier()`, because we still have two `barrier()` molecules that we have not yet consumed!

Therefore, `reaction1` is incorrect.
On the other hand, `reaction2` works correctly.
It will block at each emission of `counter(k+1)` and unblock only when `k=n-1`.
At that time, `reaction2` will reply to the `counter(n-1)` molecule and to the last consumed `barrier()` molecule.
It will block at each emission of `counter(k + 1)` and unblock only when `k=n-1`.
At that time, `reaction2` will reply to the `counter(n-1)` molecule and to the just consumed `barrier()` molecule.
The reply to the `counter(n-1)` molecule will unblock the copy of `reaction2` that emitted it, which will unblock the next molecules.

In this way, the `n`-rendezvous will require all `n` reactions to wait at the "barrier" until all participants reach the barrier, and then unblock all of them.

It remains to see how the first reaction will start.
We could emit a `counter(0)` molecule at the initial time.
This molecule is blocking, so emitting it will block a thread until the very end of the `n`-rendezvous.
We could avoid blocking a thread by writing a special initial reaction such as
This molecule is blocking, so emitting it will block a reaction until the very end of the `n`-rendezvous.
Alternatively, we can write a special initial reaction such as

```scala
go { case barrier(_, replyB) + counterInit(_) => counter(1); replyB() }
go { case barrier(_, reply) + counterInit(_) => counter(1); reply() }

```

The complete code looks like this:
Then the complete code looks like this:

```scala
val barrier = b[Unit,Unit]
val counterInit = m[Unit]
val counter = b[Int,Unit]

site(
go { case barrier(_, replyB) + counterInit(_) => // this reaction will consume the very first barrier molecule emitted
go { case barrier(_, reply) + counterInit(_) => // this reaction will consume the very first barrier molecule emitted
counter(1) // one reaction has reached the rendezvous point
replyB()
reply()
},
go { case barrier(_, replyB) + counter(k, replyC) =>
if (k + 1 < n) counter(k+1)
replyC()
replyB()
go { case barrier(_, reply) + counter(k, replyCounter) =>
if (k + 1 < n) counter(k + 1)
replyCounter()
reply()
}
)
counterInit() // This needs to be emitted initially.

```

Note that this chemistry will block `n` threads that emit `barrier()` and another set of `n` threads that run `reaction2`.
Unless (in a future version of `JoinRun`) the blocking is optimized away, the current implementation of the `n`-rendezvous will require `2*n` threads and block them until the rendezvous is passed.
Note that this chemistry will block `n` reactions that emit `barrier()` and another set of `n` copies of `reaction2`.
In the current implementation of `JoinRun`, a blocked reaction will block a thread, so the`n`-rendezvous will require `2*n` new threads that remain blocked until the rendezvous is passed.
(A future implementation of `JoinRun` might be able to perform a code transformation that never blocks any threads.)

How do we use this `n`-rendezvous?
Suppose we have a reaction where a certain processing step needs to wait for all other reactions to reach the same step.
Then we emit the `barrier()` molecule at that step:

```scala
site (
go { case begin(_) =>
work()
barrier() // need to wait here for other reactions
more_work()
}
)

(1 to 1000).foreach (begin) // start 1000 copies of this reaction

```

### Refactoring into a function

As usual when encapsulating some piece of chemistry into a reusable library function, we first figure out what new molecule emitters are necessary for the users of the library to be able to use the encapsulated chemistry.
These new emitters will be returned by the library function; all the other chemistry will remain encapsulated within the local scope of the library function.
As usual when encapsulating some piece of chemistry into a reusable function, we first figure out what new molecule emitters are necessary for the users of the function to be able to run the encapsulated chemistry.
These new emitters will be returned (say, as a tuple) by the function; all the other chemistry will remain encapsulated within the local scope of the function.

In our case, the users of the library only need the `barrier()` molecule emitter.
In fact, users should _not_ have access to `counter` or `counterInit` emitters: if users emit some more of these molecules, the logic of the reactions will be disrupted.
In our case, the users of the function will only need the `barrier()` molecule emitter.
In fact, users should _not_ have access to `counter` or `counterInit` emitters: if users emit more copies of these molecules, the encapsulated reactions will work incorrectly.

We also note that the number `n` needs to be given in advance, before creating the reaction site for the `barrier` molecule.
Therefore, we write the library function with an integer argument `n`:
Therefore, we write a function with an integer argument `n`:

```scala
def makeRendezvous(n: Int): EE = { // returning EE <: B[Unit,Unit]
def makeRendezvous(n: Int): EE = { // We are returning EE, which is a subclass of B[Unit,Unit]
val barrier = b[Unit,Unit]
val counterInit = m[Unit]
val counter = b[Int,Unit]

site(
go { case barrier(_, replyB) + counterInit(_) => // this reaction will consume the very first barrier molecule emitted
go { case barrier(_, reply) + counterInit(_) => // this reaction will consume the very first barrier molecule emitted
counter(1) // one reaction has reached the rendezvous point
replyB()
reply()
},
go { case barrier(_, replyB) + counter(k, replyC) =>
if (k + 1 < n) counter(k+1)
replyC()
replyB()
go { case barrier(_, reply) + counter(k, replyCounter) =>
if (k + 1 < n) counter(k + 1)
replyCounter()
reply()
}
)
counterInit() // This needs to be emitted initially.
Expand All @@ -936,10 +955,34 @@ def makeRendezvous(n: Int): EE = { // returning EE <: B[Unit,Unit]

```

The usage example will then look like this:

```scala
val barrier = makeRendezvous(1000)

site (
go { case begin(_) =>
work()
barrier() // need to wait here for other reactions
more_work()
}
)

(1 to 1000).foreach (begin) // start 1000 copies of this reaction

```


### Reusable `n`-rendezvous

Reusable `n`-rendezvous is a small variation on the previous pattern:
Namely, once `n` participant reactions have passed the rendezvous point, the `barrier` molecule should automatically become ready to be used again by some other `n` reactions.
The `n`-rendezvous as implemented in the previous section has a drawback:
After `n` reactions have passed the rendezvous point, emitting more `barrier()` molecules will have no effect.
If another set of `n` reactions needs to participate in an `n`-rendezvous, a new call to `makeRendezvous()` needs to be made in order to produce a whole new reaction site with a new `barrier()` molecule.

In a _reusable_ `n`-rendezvous,
once a set of `n` participant reactions have passed the rendezvous point, the same `barrier` molecule should automatically become ready to be used again by another set of `n` reactions.
This can be seen as a batching functionality:
If reactions emit a large number of `barrier()` molecules, these molecules are split into batches of `n` and an `n`-rendezvous procedure is automatically performed for one batch after another.

Let us see how we can revise the code of `makeRendezvous()` to implement this new requirement.
We need to consider what molecules will be present after one rendezvous is complete.
Expand All @@ -950,15 +993,15 @@ In order to allow starting the `n`-rendezvous again, we need to make sure that `
Therefore, the only needed change is to emit `counterInit()` when the `n`-rendezvous is complete:

```scala
if (k + 1 < n) counter(k+1) else counterInit()
if (k + 1 < n) counter(k + 1) else counterInit()

```

After this single change, the `n`-rendezvous function becomes a reusable `n`-rendezvous.

## Pair up for dance

There are two doors that open to the dance floor where men must pair up with women to dance.
In the 18th century Paris, there are two doors that open to the dance floor where men must pair up with women to dance.
At random intervals, men and women arrive to the dancing place.
Men queue up at door A, women at door B.

Expand All @@ -981,14 +1024,15 @@ go { case man(_) + woman(_) => begin_dancing() }

To simplify this example, let us assume that some other reactions will randomly emit `man()` and `woman()` molecules.

The problem with the above reaction is that it does not respect the order of the queue.
If processes emit several `man()` and `woman()` molecules, they will be paired up in random order, rather than in the order of arrival in the queue.

The problem with the above reaction is that it does not respect the linear nature of the queue.
If processes emit several `man()` and `woman()` molecules quickly enough, they will be paired up in random order, rather than in the order of arrival in the queue.
Also, nothing prevents several pairs to begin dancing at once, regardless of the dancer's positions in the queues.

TODO: expand

## Choose and reply to one of many blocking calls (Unix `select`, Actor Model's `receive`)

The task is to organize the processing of several blocking calls emitted at the same time.
The task is to organize the processing of several blocking calls emitted by different concurrent reactions.
One receiver is available to process one of these calls at a time.

TODO
Expand Down

0 comments on commit 91af296

Please sign in to comment.