Skip to content

Commit

Permalink
Remove in-pull direction from event processing and transport layers
Browse files Browse the repository at this point in the history
* New direction names:

    out     -> out
    in-push -> in

    in-pull removed

* Remove `rsb.event-processing:pull-processor'

* Remove in-pull- connector variants from all transports

fixes #3
  • Loading branch information
scymtym committed Jan 12, 2020
1 parent 196736b commit ed9ff5b
Show file tree
Hide file tree
Showing 33 changed files with 118 additions and 468 deletions.
2 changes: 0 additions & 2 deletions rsb-transport-spread.asd
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@

(:file "connector")
(:file "in-connector")
(:file "in-push-connector")
(:file "in-pull-connector")
(:file "out-connector"))))

:in-order-to ((test-op (test-op "rsb-transport-spread/test"))))
Expand Down
1 change: 0 additions & 1 deletion rsb.asd
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@
"filter" "transform") ; for {filter,transform}-mixin
:serial t
:components ((:file "broadcast-processor")
(:file "pull-processor")

(:file "processor-mixins")

Expand Down
8 changes: 3 additions & 5 deletions src/event-processing/in-route-configurator.lisp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;;;; in-route-configurator.lisp --- Configurator for incoming event route.
;;;;
;;;; Copyright (C) 2011-2017 Jan Moringen
;;;; Copyright (C) 2011-2018 Jan Moringen
;;;;
;;;; Author: Jan Moringen <[email protected]>

Expand All @@ -18,14 +18,12 @@
The client generally is an event receiving participant."))

(defmethod collect-processor-mixins append ((configurator in-route-configurator))
`(error-policy-handler-mixin
'(error-policy-handler-mixin
restart-handler-mixin
restart-dispatcher-mixin
filtering-processor-mixin
deliver-timestamp-mixin
,(ecase (configurator-direction configurator)
(:in-push 'broadcast-processor)
(:in-pull 'pull-processor))))
broadcast-processor))

;;; Connectors

Expand Down
70 changes: 0 additions & 70 deletions src/event-processing/pull-processor.lisp

This file was deleted.

4 changes: 2 additions & 2 deletions src/listener.lisp
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
;;;; listener.lisp --- Listeners receive events that are broadcast on a bus.
;;;;
;;;; Copyright (C) 2011, 2012, 2013, 2014, 2015 Jan Moringen
;;;; Copyright (C) 2011-2018 Jan Moringen
;;;;
;;;; Author: Jan Moringen <[email protected]>

(cl:in-package #:rsb)

(defclass listener (receiving-client)
((direction :allocation :class
:initform :in-push)
:initform :in)
(handlers :type list
:accessor rsb.ep:handlers
:initform '()
Expand Down
8 changes: 5 additions & 3 deletions src/mixins.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@
:reader participant-direction))
(:documentation
"This mixin class is intended to be mixed into classes to which a
communication direction can be associated. Examples are
communication direction can be associated.
`reader' -> :in-pull
`listener' -> :in-push
Examples are
`reader' -> :in
`listener' -> :in
`informer' -> :out"))

;;; meta-data and timestamp plist mixins
Expand Down
6 changes: 3 additions & 3 deletions src/participant.lisp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;;;; participant.lisp --- A superclass for participant classes.
;;;;
;;;; Copyright (C) 2011-2017 Jan Moringen
;;;; Copyright (C) 2011-2018 Jan Moringen
;;;;
;;;; Author: Jan Moringen <[email protected]>

Expand Down Expand Up @@ -74,8 +74,8 @@
(list (cons t converters))))
(configurator (make-instance
(ecase direction
((:in-push :in-pull) 'rsb.ep:in-route-configurator)
(:out 'rsb.ep:out-route-configurator))
(:in 'rsb.ep:in-route-configurator)
(:out 'rsb.ep:out-route-configurator))
:scope scope
:direction direction
:transform transform))
Expand Down
33 changes: 8 additions & 25 deletions src/transport/connector-mixins.lisp
Original file line number Diff line number Diff line change
@@ -1,36 +1,21 @@
;;;; connector-mixins.lisp --- Mixin for connector classes.
;;;;
;;;; Copyright (C) 2011-2017 Jan Moringen
;;;; Copyright (C) 2011-2018 Jan Moringen
;;;;
;;;; Author: Jan Moringen <[email protected]>

(cl:in-package #:rsb.transport)

;;; Mixin class `error-handling-pull-receiver-mixin'
;;; Mixin class `error-handling-receiver-mixin'

(defclass error-handling-pull-receiver-mixin (error-policy-mixin)
(defclass error-handling-receiver-mixin (error-policy-mixin)
()
(:documentation
"This class is intended to be mixed into in-direction, pull-style
connector classes to provide client-supplied error handling policies
for the `emit' method."))

(defmethod emit :around ((connector error-handling-pull-receiver-mixin)
(block? t))
;; Call the actual `emit' method with a condition handler that
;; applies the error policy of CONNECTOR.
(with-error-policy (connector) (call-next-method)))

;;; Mixin class `error-handling-push-receiver-mixin'

(defclass error-handling-push-receiver-mixin (error-policy-mixin)
()
(:documentation
"This class is intended to be mixed into in-direction, push-style
connector classes to provide client-supplied error handling policies
for the `receive-messages' method."))
"This class is intended to be mixed into in-direction connector
classes to provide client-supplied error handling policies for the
`receive-messages' method."))

(defmethod receive-messages :around ((connector error-handling-push-receiver-mixin))
(defmethod receive-messages :around ((connector error-handling-receiver-mixin))
;; Call the actual `receive-messages' method with a condition
;; handler that applies the error policy of CONNECTOR.
(with-error-policy (connector) (call-next-method)))
Expand Down Expand Up @@ -172,9 +157,7 @@ converter."))
"This class is intended to be mixed into connector classes that
perform two tasks:
1) receive notifications
2) decode received notifications
The associated protocol is designed to be
direction-agnostic (i.e. should work for both push and pull)."))
2) decode received notifications"))

(defmethod notification->event :around ((connector timestamping-receiver-mixin)
(notification t)
Expand Down
87 changes: 12 additions & 75 deletions src/transport/inprocess/connectors.lisp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;;;; connectors.lisp --- Connectors of the inprocess transport.
;;;;
;;;; Copyright (C) 2011-2016 Jan Moringen
;;;; Copyright (C) 2011-2018 Jan Moringen
;;;;
;;;; Author: Jan Moringen <[email protected]>

Expand Down Expand Up @@ -28,12 +28,18 @@

;;; `in-connector'

(defclass in-connector (connector)
(defclass in-connector (broadcast-processor
error-policy-handler-mixin
restart-handler-mixin
restart-dispatcher-mixin
connector)
()
(:metaclass connector-class)
(:direction :in)
(:documentation
"Superclass for in-direction connector classes of the inprocess
transport."))
"Receives events from the in-process bus."))

(register-connector :inprocess :in 'in-connector)

(defmethod notify ((connector in-connector)
(scope scope)
Expand All @@ -55,75 +61,7 @@
(log:debug "~@<Scope trie after removing ~A:~@:_~/rsb.ep::print-trie/~@:>"
connector scope-sinks)))

;;; `in-pull-connector' class

(defclass in-pull-connector (broadcast-processor
error-handling-pull-receiver-mixin
restart-dispatcher-mixin
in-connector)
((queue :type lparallel.queue:queue
:reader connector-queue
:initform (lparallel.queue:make-queue)
:documentation
"Stores events as they arrive via the message bus."))
(:metaclass connector-class)
(:direction :in-pull)
(:documentation
"Instances of this connector class deliver RSB events within a
process."))

(register-connector :inprocess :in-pull 'in-pull-connector)

(defmethod handle ((connector in-pull-connector)
(event event))
;; Put EVENT into the queue maintained by CONNECTOR.
(lparallel.queue:push-queue event (connector-queue connector)))

(defmethod receive-notification ((connector in-pull-connector)
(block? (eql nil)))
;; Extract and return one event from the queue maintained by
;; CONNECTOR, if there are any. If there are no queued events,
;; return nil.
(lparallel.queue:try-pop-queue (connector-queue connector)))

(defmethod receive-notification ((connector in-pull-connector)
(block? t))
;; Extract and return one event from the queue maintained by
;; CONNECTOR, if there are any. If there are no queued events,
;; block.
(lparallel.queue:pop-queue (connector-queue connector)))

(defmethod emit ((connector in-pull-connector) (block? t))
(when-let ((event (receive-notification connector block?)))
(setf (timestamp event :receive) (local-time:now))
(dispatch connector event)
t))

(defmethod print-object ((object in-pull-connector) stream)
(print-unreadable-object (object stream :identity t)
(format stream "~A ~A (~D)"
(connector-direction object)
(connector-relative-url object "/")
(lparallel.queue:queue-count
(connector-queue object)))))

;;; `in-push-connector' class

(defclass in-push-connector (broadcast-processor
error-policy-handler-mixin
restart-handler-mixin
restart-dispatcher-mixin
in-connector)
()
(:metaclass connector-class)
(:direction :in-push)
(:documentation
"Instances of this connector class deliver RSB events within a
process."))

(register-connector :inprocess :in-push 'in-push-connector)

(defmethod handle :before ((connector in-push-connector)
(defmethod handle :before ((connector in-connector)
(event event))
(setf (timestamp event :receive) (local-time:now)))

Expand All @@ -138,8 +76,7 @@
(:metaclass connector-class)
(:direction :out)
(:documentation
"Instances of this connector class deliver RSB events within a
process."))
"Send events to the in-process bus."))

(register-connector :inprocess :out 'out-connector)

Expand Down
6 changes: 2 additions & 4 deletions src/transport/package.lisp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;;;; package.lisp --- Package definition for the transport module.
;;;;
;;;; Copyright (C) 2011-2016 Jan Moringen
;;;; Copyright (C) 2011-2016, 2018 Jan Moringen
;;;;
;;;; Author: Jan Moringen <[email protected]>

Expand Down Expand Up @@ -134,9 +134,7 @@

;; Error handling mixin classes
(:export
#:error-handling-push-receiver-mixin

#:error-handling-pull-receiver-mixin
#:error-handling-receiver-mixin

#:error-handling-sender-mixin)

Expand Down
10 changes: 5 additions & 5 deletions src/transport/protocol.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
Each transport is implemented by associated connector classes for
incoming and outgoing communication. The \"directions\" of
connector classes are :in-push, :in-pull and :out. Instances of
connector classes of a particular transport are created to
actually perform communication."))
connector classes are :in and :out. Instances of connector classes
of a particular transport are created to actually perform
communication."))

(defmethod service-provider:find-provider
((service (eql (service-provider:find-service 'transport)))
Expand All @@ -32,7 +32,7 @@
(provider cons)
&key if-does-not-exist)
(let+ (((schema . direction) provider))
(check-type direction direction "one of :IN-PUSH, :IN-PULL, :OUT")
(check-type direction direction "one of :IN, :OUT")
(when-let ((provider (service-provider:find-provider
service schema
:if-does-not-exist if-does-not-exist)))
Expand All @@ -44,7 +44,7 @@
(provider cons)
&rest args &key)
(let+ (((schema . direction) provider))
(check-type direction direction "one of :IN-PUSH, :IN-PULL, :OUT")
(check-type direction direction "one of :IN, :OUT")
(apply #'call-next-method service provider :schema schema args)))

;;; Transport protocol
Expand Down
Loading

0 comments on commit ed9ff5b

Please sign in to comment.