@@ -455,20 +455,167 @@ points: in this case, when :func:`remotecall_fetch` is called.
455
455
456
456
Channels
457
457
--------
458
- Channels provide for a fast means of inter-task communication. A
459
- ``Channel{T}(n::Int) `` is a shared queue of maximum length ``n ``
460
- holding objects of type ``T ``. Multiple readers can read off the :class: `Channel `
461
- via :func: `fetch ` and :func: `take! `. Multiple writers can add to the :class: `Channel ` via
462
- :func: `put! `. :func: `isready ` tests for the presence of any object in
463
- the channel, while :func: `wait ` waits for an object to become available.
464
- :func: `close ` closes a :class: `Channel `. On a closed :class: `Channel `\, :func: `put! ` will fail,
465
- while :func: `take! ` and :func: `fetch ` successfully return any existing values
466
- until it is emptied.
458
+ The section on Tasks in :ref: `man-control-flow ` discussed the execution of
459
+ multiple functions in a co-operative manner. :class: `Channels ` can be quite useful
460
+ to pass data between running tasks, particularly those involving I/O operations.
461
+
462
+ Examples of operations involving I/O include reading/writing to files,
463
+ accessing web services, executing external programs, etc. In all
464
+ these cases, overall execution time can be improved if other tasks can be run
465
+ while a file is being read, or while waiting for an external service/program to complete.
466
+
467
+ A channel can be visualized as a pipe, i.e., it has a write end and read end.
468
+
469
+ - Multiple writers in different tasks can write to the same channel concurrently via :func: `put! `
470
+ calls.
471
+ - Multiple readers in different tasks can read data concurrently via :func: `take! ` calls.
472
+ - As an example::
473
+
474
+ # Given Channels c1 and c2,
475
+ c1 = Channel(32)
476
+ c2 = Channel(32)
477
+
478
+ # and a function `foo()` which reads items from from c1, processes the item read
479
+ # and writes a result to c2,
480
+ function foo()
481
+ while true
482
+ data = take!(c1)
483
+ ....... # process data
484
+ put!(c2, result) # write out result
485
+ end
486
+ end
487
+
488
+ # we can schedule `n` instances of `foo()` to be active concurrently.
489
+ for _ in 1:n
490
+ @schedule foo()
491
+ end
492
+
493
+ - Channels are created via the :class: `Channel{T}(sz) ` constructor. The channel will only hold
494
+ objects of type ``T ``. If the type is not specified, the channel can hold objects of
495
+ any type. ``sz `` refers to the maximum number of elements that can be held in the channel
496
+ at any time. For example, ``Channel(32) `` creates a channel that can hold a maximum of 32 objects
497
+ of any type. A ``Channel{MyType}(64) `` can hold up to 64 objects of ``MyType `` at any time.
498
+ - If a :class: `Channel ` is empty, readers (on a :func: `take! ` call) will block until data is available.
499
+ - If a :class: `Channel ` is full, writers (on a :func: `put! ` call) will block until space becomes available.
500
+ - :func: `isready ` tests for the presence of any object in the channel, while :func: `wait `
501
+ waits for an object to become available.
502
+ - A :class: `Channel ` is in an open state initially. This means that it can be
503
+ read from and written to freely via :func: `take! ` and :func: `put! ` calls. :func: `close ` closes a :class: `Channel `.
504
+ On a closed :class: `Channel `, :func: `put! ` will fail. For example:
505
+
506
+ .. doctest ::
507
+
508
+ julia> c=Channel(2);
509
+
510
+ julia> put!(c,1) # `put! ` on an open channel succeeds
511
+ 1
512
+
513
+ julia> close(c);
514
+
515
+ julia> put!(c,2) # `put! ` on a closed channel throws an exception.
516
+ ERROR: InvalidStateException("Channel is closed.",:closed)
517
+ ...
518
+
519
+
520
+ - :func: `take! ` and :func: `fetch ` (which retrieves but does not remove the value) on a closed channel
521
+ successfully return any existing values until it is emptied. Continuing the above example:
522
+
523
+ .. doctest ::
524
+
525
+ julia> fetch(c) # Any number of `fetch ` calls succeed.
526
+ 1
527
+
528
+ julia> fetch(c)
529
+ 1
530
+
531
+ julia> take!(c) # The first `take! ` removes the value.
532
+ 1
533
+
534
+ julia> take!(c) # No more data available on a closed channel.
535
+ ERROR: InvalidStateException("Channel is closed.",:closed)
536
+ ...
537
+
467
538
468
539
A :class: `Channel ` can be used as an iterable object in a ``for `` loop, in which
469
540
case the loop runs as long as the :class: `Channel ` has data or is open. The loop
470
- variable takes on all values added to the :class: `Channel `. An empty, closed :class: `Channel `
471
- causes the ``for `` loop to terminate.
541
+ variable takes on all values added to the :class: `Channel `. The ``for `` loop is
542
+ terminated once the :class: `Channel ` is closed and emptied.
543
+
544
+
545
+ For example, the following would cause the ``for `` loop to wait for more data::
546
+
547
+ c=Channel{Int}(10)
548
+ foreach(i->put!(c, i), 1:3) # add a few entries
549
+ data = [i for i in c]
550
+
551
+ while this will return after reading all data::
552
+
553
+ .. doctest ::
554
+
555
+ julia> c=Channel{Int}(10);
556
+
557
+ julia> foreach(i->put!(c, i), 1:3); # add a few entries
558
+
559
+ julia> close(c); # `for ` loops can exit
560
+
561
+ julia> data = [i for i in c]
562
+ 3-element Array{Int64,1}:
563
+ 1
564
+ 2
565
+ 3
566
+
567
+
568
+ .. _man-channels-example :
569
+
570
+ Consider a simple example using channels for inter-task communication.
571
+ We start 4 tasks to process data from a single ``jobs `` channel.
572
+ Jobs, identified by an id (``job_id ``), are written to the channel.
573
+ Each task in this simulation reads a ``job_id ``,
574
+ waits for a random amout of time and writes back a tuple of ``job_id `` and the simulated time to
575
+ the results channel. Finally all the ``results `` are printed out.
576
+
577
+ ::
578
+
579
+ const jobs = Channel{Int}(32)
580
+ const results = Channel{Tuple}(32)
581
+
582
+ function do_work()
583
+ for job_id in jobs
584
+ exec_time = rand()
585
+ sleep(exec_time) # simulates elapsed time doing actual work
586
+ # typically performed externally.
587
+ put!(results, (job_id, exec_time))
588
+ end
589
+ end
590
+
591
+ function make_jobs(n)
592
+ for i in 1:n
593
+ put!(jobs, i)
594
+ end
595
+ end
596
+
597
+ # feed the jobs channel with "n" jobs
598
+ n = 12
599
+ @schedule make_jobs(n)
600
+
601
+ # start 4 tasks to process requests in parallel
602
+ for i in 1:4
603
+ @schedule do_work()
604
+ end
605
+
606
+ # print out results
607
+ @elapsed while n > 0
608
+ job_id, exec_time = take!(results)
609
+ println("$job_id finished in $(round(exec_time,2)) seconds")
610
+ n = n - 1
611
+ end
612
+
613
+
614
+ The current version of Julia multiplexes all tasks onto a single OS thread. Thus, while tasks
615
+ involving I/O operations benefit from parallel execution, compute bound tasks are effectively
616
+ executed sequentially on a single OS thread. Future versions of Julia may support scheduling
617
+ of tasks on multiple threads, in which case compute bound tasks will see benefits of parallel
618
+ execution too.
472
619
473
620
474
621
Remote References and AbstractChannels
@@ -498,6 +645,71 @@ the backing store on the remote process.
498
645
example of this is provided in ``examples/dictchannel.jl `` which uses a dictionary as its remote store.
499
646
500
647
648
+ Channels and RemoteChannels
649
+ ---------------------------
650
+ - A :class: `Channel ` is local to a process. Worker 2 cannot directly refer to a :class: `Channel `
651
+ on worker 3 and vice-versa. A :class: `RemoteChannel `, however, can put and take values across
652
+ workers.
653
+ - A :class: `RemoteChannel ` can be thought of as a *handle * to a :class: `Channel `.
654
+ - The process id, ``pid ``, associated with a :class: `RemoteChannel ` identifies the process where
655
+ the backing store, i.e., the backing :class: `Channel ` exists.
656
+ - Any process with a reference to a :class: `RemoteChannel ` can put and take items from the channel.
657
+ Data is automatically sent to (or retrieved from) the process a :class: `RemoteChannel ` is associated with.
658
+ - Serializing a :class: `Channel ` also serializes any data present in the channel. Deserializing
659
+ it therefore effectively makes a copy of the original object.
660
+ - On the other hand, serializing a :class: `RemoteChannel ` only involves the serialization
661
+ of an identifier that identifies the location and instance of :class: `Channel ` referred to
662
+ by the handle. A deserialized :class: `RemoteChannel ` object (on any worker), therefore
663
+ also points to the same backing store as the original.
664
+
665
+ The channels example from above :ref: `man-channels-example ` can be modified for
666
+ interprocess communication, as shown below.
667
+
668
+ We start 4 workers to process a single ``jobs `` remote channel. Jobs, identified by an id (``job_id ``),
669
+ are written to the channel. Each remotely executing task in this simulation reads a ``job_id ``,
670
+ waits for a random amout of time and writes back a tuple of ``job_id ``, time taken and its own ``pid `` to
671
+ the results channel. Finally all the ``results `` are printed out on the master process.
672
+
673
+ ::
674
+
675
+ addprocs(4) # add worker processes
676
+
677
+ const jobs = RemoteChannel(()->Channel{Int}(32))
678
+ const results = RemoteChannel(()->Channel{Tuple}(32))
679
+
680
+ # define work function everywhere
681
+ @everywhere function do_work(jobs, results)
682
+ while true
683
+ job_id = take!(jobs)
684
+ exec_time = rand()
685
+ sleep(exec_time) # simulates elapsed time doing actual work
686
+ put!(results, (job_id, exec_time, myid()))
687
+ end
688
+ end
689
+
690
+ function make_jobs(n)
691
+ for i in 1:n
692
+ put!(jobs, i)
693
+ end
694
+ end
695
+
696
+ # feed the jobs channel with "n" jobs
697
+ n = 12
698
+ @schedule make_jobs(n)
699
+
700
+ # start tasks on the workers to process requests in parallel
701
+ for p in workers()
702
+ @async remote_do(do_work, p, jobs, results)
703
+ end
704
+
705
+ # print out results
706
+ @elapsed while n > 0
707
+ job_id, exec_time, where = take!(results)
708
+ println("$job_id finished in $(round(exec_time,2)) seconds on worker $where")
709
+ n = n - 1
710
+ end
711
+
712
+
501
713
Remote References and Distributed Garbage Collection
502
714
----------------------------------------------------
503
715
0 commit comments