Skip to content

Commit

Permalink
Merge pull request #36 from Chymyst/feature/more-examples
Browse files Browse the repository at this point in the history
Feature/more examples
  • Loading branch information
winitzki authored Dec 26, 2016
2 parents be12e3e + 91af296 commit 9fd7335
Show file tree
Hide file tree
Showing 24 changed files with 2,036 additions and 268 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The initial code of `JoinRun` was based on previous work by Jiansen He (https://
The current implementation is tested under Oracle JDK 8 with Scala 2.11 and 2.12.
It also works with Scala 2.10 and with OpenJDK 7 (except for the new `LocalDateTime` functions used in tests, and some performance issues).

# Overview of `JoinRun`
# Overview of `JoinRun`/`Chymyst`

To get started, begin with this [tutorial introduction](https://chymyst.github.io/joinrun-scala/chymyst00.html).

Expand Down Expand Up @@ -141,7 +141,7 @@ Other than that, `JoinRun`'s syntax is closely modeled on that of `ScalaJoin` an

# Status

Current version is `0.1.3`.
Current released version is `0.1.3`.
The semantics of the chemical machine (restricted to single-host, multicore computations) is fully implemented and tested.

Unit tests include examples such as concurrent counters, parallel “or”, concurrent merge-sort, and “dining philosophers”.
Expand Down
45 changes: 29 additions & 16 deletions benchmark/src/main/scala/code/chymyst/benchmark/JiansenJoin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ class AsyName[Arg](implicit owner: Join, argT:ClassTag[Arg]) extends NameBase{

override def pendArg(arg:Any):Unit = {
argQ += arg.asInstanceOf[Arg]
()
}

override def popArg():Unit = {
argQ.dequeue()
()
}

var argQ = new Queue[Arg] //queue of arguments pending on this name
Expand All @@ -71,13 +73,18 @@ class AsyName[Arg](implicit owner: Join, argT:ClassTag[Arg]) extends NameBase{
*
* @param a the message pending on this channel
*/
def apply(a:Arg) :Unit = new Thread {synchronized{
if(argQ.contains(a)){
argQ += a
}else {
owner.trymatch(AsyName.this, a)// see if the new message will trigger any pattern
def apply(a:Arg) :Unit = {
new Thread {
synchronized {
if (argQ.contains(a)) {
argQ += a
} else {
owner.trymatch(AsyName.this, a) // see if the new message will trigger any pattern
}
}
}
}}
()
}

def unapply(attr:Any) : Option[Arg]= attr match { // for pattern matching
case (ch:NameBase, arg:Any) => {
Expand Down Expand Up @@ -143,6 +150,7 @@ class SynName[Arg, R](implicit owner: Join, argT:ClassTag[Arg], resT:ClassTag[R]

override def pendArg(arg:Any):Unit = {
argQ += arg.asInstanceOf[(Int,Arg)]
()
}

override def popArg():Unit = synchronized {
Expand Down Expand Up @@ -202,11 +210,16 @@ class SynName[Arg, R](implicit owner: Join, argT:ClassTag[Arg], resT:ClassTag[R]
*
* @param r the reply value
*/
def reply(r:R):Unit = new Thread {resultQ.synchronized {
// println("reply "+r)
resultQ.enqueue((msgTags.pop, r))
resultQ.notifyAll()
}}
def reply(r:R):Unit = {
new Thread {
resultQ.synchronized {
// println("reply "+r)
resultQ.enqueue((msgTags.pop, r))
resultQ.notifyAll()
}
}
()
}

private def fetch(a:(Int,Arg)):R = resultQ.synchronized {
if (resultQ.isEmpty || resultQ.front._1 != a){
Expand Down Expand Up @@ -284,7 +297,7 @@ class Join {
private var joinPat: PartialFunction[Any, Any] = _
// private var joinPat: PartialFunction[(Set[NameBase], Queue[(NameBase, Any)], PartialFunction[Any, Any], Int), Unit] = _
// def join(joinPat: PartialFunction[(Set[NameBase], Queue[(NameBase, Any)], PartialFunction[Any, Any], Int), Unit]) {
def join(joinPat: PartialFunction[Any, Any]) {
def join(joinPat: PartialFunction[Any, Any]): Unit = {
if(!hasDefined){
this.joinPat = joinPat
hasDefined = true
Expand All @@ -297,13 +310,13 @@ class Join {
val names: Set[NameBase] = new HashSet
//println(ch +" "+ arg)
try{
if(ch.isInstanceOf[SynName[Any, Any]]) {ch.asInstanceOf[SynName[Any,Any]].pushMsgTag(arg)}
if(ch.isInstanceOf[SynName[_, _]]) {ch.asInstanceOf[SynName[_,_]].pushMsgTag(arg)}
if(joinPat.isDefinedAt((ch, arg))){// optimization for singleton pattern
joinPat((ch,arg))
}else{
if(ch.isInstanceOf[SynName[Any, Any]]){
if(ch.isInstanceOf[SynName[_, _]]){
joinPat((names, this.joinPat, (new HashMap[NameBase, Any]+((ch, arg))), 1, new SynName))
ch.asInstanceOf[SynName[Any,Any]].pushMsgTag(arg)
ch.asInstanceOf[SynName[_,_]].pushMsgTag(arg)
}else{
joinPat((names, this.joinPat, (new HashMap[NameBase, Any]+((ch, arg))), 1, new AsyName))
}
Expand All @@ -313,7 +326,7 @@ class Join {
}
}catch{
case e:MatchError => {// no pattern is matched
if(ch.isInstanceOf[SynName[Any, Any]]) {ch.asInstanceOf[SynName[Any,Any]].popMsgTag}
if(ch.isInstanceOf[SynName[_, _]]) {ch.asInstanceOf[SynName[_,_]].popMsgTag}
ch.pendArg(arg)
}
}
Expand Down
43 changes: 22 additions & 21 deletions benchmark/src/test/scala/code/chymyst/benchmark/MergesortSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import org.scalatest.{FlatSpec, Matchers}

import scala.annotation.tailrec
import scala.collection.mutable
import scala.reflect.ClassTag

import Common._

Expand All @@ -23,7 +22,9 @@ class MergesortSpec extends FlatSpec with Matchers {
}
}

def arrayMerge[T : Ordering : ClassTag](arr1: Array[T], arr2: Array[T]): Array[T] = {
type Coll[T] = IndexedSeq[T]

def arrayMerge[T : Ordering](arr1: Coll[T], arr2: Coll[T]): Coll[T] = {
val id = amCounter.c
// amCounter.inc() // avoid this for now - this is a debugging tool
val wantToLog = false // (arr1.length > 20000 && arr1.length < 41000)
Expand All @@ -46,54 +47,54 @@ class MergesortSpec extends FlatSpec with Matchers {
}
mergeRec(0,0,0)
if (wantToLog) println(s"${System.currentTimeMillis} finished merging #$id")
result.toArray
result.toIndexedSeq
}

def performMergeSort[T : Ordering : ClassTag](array: Array[T], threads: Int = 8): Array[T] = {
def performMergeSort[T : Ordering](array: Coll[T], threads: Int = 8): Coll[T] = {

val finalResult = m[Array[T]]
val getFinalResult = b[Unit, Array[T]]
val tp = new FixedPool(threads)
val jp = new FixedPool(3)
val finalResult = m[Coll[T]]
val getFinalResult = b[Unit, Coll[T]]
val reactionPool = new FixedPool(threads)
val sitePool = new FixedPool(3)

site(jp,jp)(
site(sitePool,sitePool)(
go { case finalResult(arr) + getFinalResult(_, r) => r(arr) }
)

// recursive molecule that will define the reactions at one level
// recursive molecule that will define the reactions at one level lower

val mergesort = m[(Array[T], M[Array[T]])]
val mergesort = m[(Coll[T], M[Coll[T]])]

site(tp,jp)(
site(reactionPool, sitePool)(
go {
case mergesort((arr, resultToYield)) =>
if (arr.length <= 1) resultToYield(arr)
else {
val (part1, part2) = arr.splitAt(arr.length/2)
// "sorted1" and "sorted2" will be the sorted results from lower level
val sorted1 = m[Array[T]]
val sorted2 = m[Array[T]]
site(tp,jp)(
// "sorted1" and "sorted2" will be the sorted results from the lower level
val sorted1 = m[Coll[T]]
val sorted2 = m[Coll[T]]
site(reactionPool, sitePool)(
go { case sorted1(x) + sorted2(y) =>
resultToYield(arrayMerge(x,y)) }
)

// emit lower-level mergesort
// emit `mergesort` with the lower-level `sorted` result molecules
mergesort((part1, sorted1)) + mergesort((part2, sorted2))
}
}
)
// sort our array at top level
// sort our array: emit `mergesort` at top level
mergesort((array, finalResult))

val result = getFinalResult()
tp.shutdownNow()
jp.shutdownNow()
reactionPool.shutdownNow()
sitePool.shutdownNow()
result
}

it should "merge arrays correctly" in {
arrayMerge(Array(1,2,5), Array(3,6)) shouldEqual Array(1,2,3,5,6)
arrayMerge(IndexedSeq(1,2,5), IndexedSeq(3,6)) shouldEqual IndexedSeq(1,2,3,5,6)
}

it should "sort an array using concurrent merge-sort correctly with one thread" in {
Expand Down
Loading

0 comments on commit 9fd7335

Please sign in to comment.