From 27f01385bb32c1268af3ab09e6f692d9eae42a6c Mon Sep 17 00:00:00 2001 From: Ali Abrar Date: Wed, 17 Nov 2021 16:45:57 -0500 Subject: [PATCH 1/4] notify-listen: Don't allow channel to be configured --- .../src/Rhyolite/DB/NotifyListen.hs | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs b/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs index 86c4a87e..e788ac3f 100644 --- a/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs +++ b/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs @@ -57,28 +57,26 @@ listenCmd (NotificationChannel chan) = fromString $ "LISTEN " <> chan -- postgres @LISTEN@ mechanism. notificationListener :: (FromJSON notifyMessage) - => NotificationChannel - -- ^ The channel to listen on - -> Pool PG.Connection + => Pool PG.Connection -- ^ Connection pool -> IO (TChan notifyMessage, IO ()) -- ^ @notifyMessage@ is usually a 'DbNotification' -notificationListener notifyChannel db = do +notificationListener db = do nChan <- STM.newBroadcastTChanIO daemonThread <- forkIO $ withResource db $ \conn -> do - let cmd = listenCmd notifyChannel + let cmd = listenCmd defaultNotificationChannel _ <- PG.execute_ conn cmd forever $ do -- Handle notifications PG.Notification _ channel message <- PG.getNotification conn case channel of - _ | channel == channelToByteString notifyChannel -> do + _ | channel == channelToByteString defaultNotificationChannel -> do -- Notification is on the expected NOTIFY channel case decode $ LBS.fromStrict message of Just a -> STM.atomically $ STM.writeTChan nChan a - _ -> putStrLn $ errorMessage notifyChannel $ + _ -> putStrLn $ errorMessage defaultNotificationChannel $ "Could not parse message: " <> show message - _ -> putStrLn $ errorMessage notifyChannel $ + _ -> putStrLn $ errorMessage defaultNotificationChannel $ "Received a message on unexpected channel: " <> show channel return (nChan, killThread daemonThread) where @@ -92,11 +90,10 @@ notificationListener notifyChannel db = do -- 'DbNotification' retrieval function and finalizer startNotificationListener :: FromJSON notifyMessage - => NotificationChannel - -> Pool PG.Connection + => Pool PG.Connection -> IO (IO notifyMessage, IO ()) -startNotificationListener notifyChannel pool = do - (chan, nkill) <- notificationListener notifyChannel pool +startNotificationListener pool = do + (chan, nkill) <- notificationListener pool chan' <- STM.atomically $ STM.dupTChan chan return (STM.atomically $ STM.readTChan chan', nkill) @@ -191,14 +188,13 @@ notify :: , ForallF ToJSON notice , Psql m ) - => NotificationChannel - -> NotificationType + => NotificationType -> notice a -> a -> m () -notify notifyChannel nt n a = do +notify nt n a = do schemaName <- getSchemaName - let cmd = notifyCmd notifyChannel + let cmd = notifyCmd defaultNotificationChannel notifyMsg = DbNotification { _dbNotification_schemaName = SchemaName $ T.pack schemaName , _dbNotification_notificationType = nt From fe71731c136352721a8fab3985fbaabb1cc002dc Mon Sep 17 00:00:00 2001 From: Ali Abrar Date: Wed, 17 Nov 2021 21:39:07 -0500 Subject: [PATCH 2/4] notify-listen: add beam version --- ChangeLog.md | 2 +- .../src/Rhyolite/DB/NotifyListen/Groundhog.hs | 9 +- .../rhyolite-notify-listen-beam.cabal | 8 + .../src/Rhyolite/DB/NotifyListen/Beam.hs | 249 +++++++++++++++++- 4 files changed, 260 insertions(+), 8 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index c9291f28..cc69d096 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -12,7 +12,7 @@ This project's release branch is `master`. This log is written from the perspect * Remove `Rhyolite.Map.Monoidal`. For `=:` use `Data.Map.Monoidal.singleton` instead, and for `restrictKeys` use monoidal-containers >= 0.6.1.0. * Rename `PostgresRaw` to `Psql` and move it to `psql-simple-class`. * Move Psql (formerly PostgresRaw) instance for groundhog's `DbPersist` to psql-simple-groundhog. - * Move `Rhyolite.Backend.Listen` to its own project `rhyolite-notify-listen`. The module is now called `Rhyolite.DB.NotifyListen`. `insertAndNotify` and related classes and functions can now be found in the groundhog-legacy package in the `Rhyolite.DB.NotifyListen.Groundhog` module. The various `notify` functions now require `Psql m`. + * Move `Rhyolite.Backend.Listen` to its own project `rhyolite-notify-listen`. The module is now called `Rhyolite.DB.NotifyListen`. `insertAndNotify` and related classes and functions can now be found in the groundhog-legacy package in the `Rhyolite.DB.NotifyListen.Groundhog` module, and in `notify-listen-beam` for beam versions. The various `notify` functions now require `Psql m`. * New: * Add a `Psql` instance for beam's `Pg` * Version bumps: diff --git a/groundhog-legacy/src/Rhyolite/DB/NotifyListen/Groundhog.hs b/groundhog-legacy/src/Rhyolite/DB/NotifyListen/Groundhog.hs index d09ec036..66f1ba28 100644 --- a/groundhog-legacy/src/Rhyolite/DB/NotifyListen/Groundhog.hs +++ b/groundhog-legacy/src/Rhyolite/DB/NotifyListen/Groundhog.hs @@ -42,21 +42,18 @@ import Rhyolite.DB.NotifyListen (NotificationType(..)) import Database.PostgreSQL.Simple.Class import GHC.Generics -notifyChannel :: NL.NotificationChannel -notifyChannel = NL.defaultNotificationChannel - notificationListener :: (FromJSON notifyMessage) => Pool Postgresql -> IO (TChan notifyMessage, IO ()) -notificationListener = NL.notificationListener notifyChannel . coerce +notificationListener = NL.notificationListener . coerce startNotificationListener :: FromJSON notifyMessage => Pool Postgresql -> IO (IO notifyMessage, IO ()) startNotificationListener = - NL.startNotificationListener notifyChannel . coerce + NL.startNotificationListener . coerce notify :: ( Has' ToJSON n Identity @@ -67,7 +64,7 @@ notify -> n a -> a -> m () -notify nt n a = NL.notify notifyChannel nt n a +notify nt n a = NL.notify nt n a -- | Class for relating application-specific db notification types with db -- entity types. diff --git a/notify-listen/notify-listen-beam/rhyolite-notify-listen-beam.cabal b/notify-listen/notify-listen-beam/rhyolite-notify-listen-beam.cabal index 7bafc24e..e5504f3b 100644 --- a/notify-listen/notify-listen-beam/rhyolite-notify-listen-beam.cabal +++ b/notify-listen/notify-listen-beam/rhyolite-notify-listen-beam.cabal @@ -22,6 +22,14 @@ extra-source-files: CHANGELOG.md library exposed-modules: Rhyolite.DB.NotifyListen.Beam build-depends: base + , aeson , beam-core + , beam-postgres + , constraints + , constraints-extras + , psql-simple-class + , psql-simple-beam + , rhyolite-notify-listen + , these hs-source-dirs: src default-language: Haskell2010 diff --git a/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs b/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs index 80278423..3e342a21 100644 --- a/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs +++ b/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs @@ -1 +1,248 @@ -module Rhyolite.DB.NotifyListen.Beam where +{-# Language ConstraintKinds #-} +{-# Language DeriveGeneric #-} +{-# Language FlexibleContexts #-} +{-# Language FunctionalDependencies #-} +{-# Language GADTs #-} +{-# Language RankNTypes #-} +{-# Language ScopedTypeVariables #-} +{-# Language StandaloneDeriving #-} +{-# Language UndecidableInstances #-} +module Rhyolite.DB.NotifyListen.Beam + ( HasNotification(..) + , insertAndNotify + , updateAndNotify + , deleteAndNotify + , HasChangeNotification(..) + , Change(..) + , changeOld + , changeNew + , insertAndNotifyChange + , updateAndNotifyChange + , deleteAndNotifyChange + ) where + +import Data.Aeson +import Data.Constraint.Extras +import Data.Constraint.Forall +import Data.Functor.Identity +import Data.These +import Database.Beam +import Database.Beam.Backend.SQL +import Database.Beam.Backend.SQL.BeamExtensions +import Database.Beam.Postgres +import Database.Beam.Postgres.Syntax +import Database.Beam.Schema.Tables +import Database.PostgreSQL.Simple.Class +import GHC.Generics +import Rhyolite.DB.NotifyListen + +type HasSqlEquality t = + ( Generic (t (HasConstraint (HasSqlEqualityCheck Postgres))) + , Generic (t Identity) + , Generic (t Exposed) + , (Generic (t (HasConstraint (HasSqlValueSyntax PgValueSyntax)))) + , (GFieldsFulfillConstraint + (HasSqlValueSyntax PgValueSyntax) + (Rep (t Exposed)) + (Rep (t (HasConstraint (HasSqlValueSyntax PgValueSyntax))))) + , (GFieldsFulfillConstraint + (HasSqlEqualityCheck Postgres) + (Rep (t Exposed)) + (Rep (t (HasConstraint (HasSqlEqualityCheck Postgres))))) + ) + +class HasNotification n a | a -> n where + notification :: DatabaseEntity be db (TableEntity a) -> n (PrimaryKey a Identity) + +insertAndNotify :: forall be db t n m. + ( be ~ Postgres + , MonadBeamInsertReturning be m + , BeamSqlBackend be + , Table t + , Has' ToJSON n Identity + , ForallF ToJSON n + , HasNotification n t + , Psql m + , FromBackendRow be (t Identity) + ) + => DatabaseEntity be db (TableEntity t) + -> (forall s. t (QExpr be s)) + -> m (Maybe (PrimaryKey t Identity)) +insertAndNotify tbl g = do + xs <- runInsertReturningList $ insert tbl $ insertExpressions [g] + case xs of + [x] -> do + let xid = primaryKey x + notify NotificationType_Insert (notification tbl) xid + pure $ Just $ primaryKey x + _ -> pure Nothing + +updateAndNotify :: forall be db t n m. + ( be ~ Postgres + , MonadBeamUpdateReturning be m + , BeamSqlBackend be + , Table t + , Has' ToJSON n Identity + , ForallF ToJSON n + , HasNotification n t + , Psql m + , FromBackendRow be (t Identity) + , HasSqlEquality (PrimaryKey t) + ) + => DatabaseEntity be db (TableEntity t) + -> PrimaryKey t Identity + -> (forall s. t (QField s) -> QAssignment be s) + -> m (Maybe (PrimaryKey t Identity)) +updateAndNotify tbl k g = do + xs <- runUpdateReturningList $ + update tbl g (\t -> primaryKey t ==. val_ k) + case xs of + [x] -> do + let xid = primaryKey x + notify NotificationType_Update (notification tbl) xid + pure $ Just $ primaryKey x + _ -> pure Nothing + +deleteAndNotify :: forall be db t n m. + ( be ~ Postgres + , MonadBeam be m + , BeamSqlBackend be + , Table t + , Has' ToJSON n Identity + , ForallF ToJSON n + , HasNotification n t + , Psql m + , FromBackendRow be (t Identity) + , HasSqlEquality (PrimaryKey t) + ) + => DatabaseEntity be db (TableEntity t) + -> PrimaryKey t Identity + -> m (PrimaryKey t Identity) +deleteAndNotify tbl k = do + runDelete $ delete tbl (\t -> primaryKey t ==. val_ k) + notify NotificationType_Delete (notification tbl) k + pure k + +class HasChangeNotification n a | a -> n where + changeNotification :: DatabaseEntity be db (TableEntity a) -> n (Change a) + +data Change a = Change + { _change_id :: PrimaryKey a Identity + , _change_oldNew :: These (a Identity) (a Identity) + } deriving (Generic) + +changeOld :: Change a -> Maybe (a Identity) +changeOld = these Just (const Nothing) (\old _ -> Just old) . _change_oldNew + +changeNew :: Change a -> Maybe (a Identity) +changeNew = these (const Nothing) Just (\_ new -> Just new) . _change_oldNew + +deriving instance (Eq (PrimaryKey a Identity), Eq (a Identity)) + => Eq (Change a) +deriving instance (Show (PrimaryKey a Identity), Show (a Identity)) + => Show (Change a) +instance (ToJSON (PrimaryKey a Identity), ToJSON (a Identity)) + => ToJSON (Change a) +instance (FromJSON (PrimaryKey a Identity), FromJSON (a Identity)) + => FromJSON (Change a) + +insertAndNotifyChange :: forall be db t n m. + ( be ~ Postgres + , MonadBeamInsertReturning be m + , BeamSqlBackend be + , Table t + , Has' ToJSON n Identity + , ForallF ToJSON n + , HasChangeNotification n t + , Psql m + , FromBackendRow be (t Identity) + ) + => DatabaseEntity be db (TableEntity t) + -> (forall s. t (QExpr be s)) + -> m (Maybe (PrimaryKey t Identity)) +insertAndNotifyChange tbl g = do + xs <- runInsertReturningList $ insert tbl $ insertExpressions [g] + case xs of + [x] -> do + let xid = primaryKey x + change = Change { _change_id = xid, _change_oldNew = That x } + notify NotificationType_Insert (changeNotification tbl) change + pure $ Just xid + _ -> pure Nothing + +updateAndNotifyChange :: forall be db t n m. + ( be ~ Postgres + , MonadBeamUpdateReturning be m + , BeamSqlBackend be + , Table t + , Has' ToJSON n Identity + , ForallF ToJSON n + , HasChangeNotification n t + , Psql m + , FromBackendRow be (t Identity) + , HasSqlEquality (PrimaryKey t) + , Database be db + ) + => DatabaseEntity be db (TableEntity t) + -> PrimaryKey t Identity + -> (forall s. t (QField s) -> QAssignment be s) + -> m (Maybe (PrimaryKey t Identity)) +updateAndNotifyChange tbl k g = do + mx <- selectByKey tbl k + case mx of + Nothing -> pure Nothing + Just old -> do + xs <- runUpdateReturningList $ + update tbl g (\t -> primaryKey t ==. val_ k) + case xs of + [new] -> do + let change = Change + { _change_id = k + , _change_oldNew = These old new + } + notify NotificationType_Update (changeNotification tbl) change + pure $ Just k + _ -> pure Nothing + +deleteAndNotifyChange :: forall be db t n m. + ( be ~ Postgres + , MonadBeam be m + , Database Postgres db + , BeamSqlBackend be + , Table t + , Has' ToJSON n Identity + , ForallF ToJSON n + , HasChangeNotification n t + , Psql m + , FromBackendRow be (t Identity) + , HasSqlEquality (PrimaryKey t) + ) + => DatabaseEntity be db (TableEntity t) + -> PrimaryKey t Identity + -> m (Maybe (PrimaryKey t Identity)) +deleteAndNotifyChange tbl k = do + mv <- selectByKey tbl k + case mv of + Nothing -> pure Nothing + Just v -> do + let change = Change { _change_id = k, _change_oldNew = This v } + runDelete $ delete tbl (\t -> primaryKey t ==. val_ k) + notify NotificationType_Delete (changeNotification tbl) change + pure $ Just k + +selectByKey :: + ( be ~ Postgres + , Database Postgres db + , MonadBeam be m + , BeamSqlBackend be + , FromBackendRow be (t Identity) + , HasQBuilder be + , Beamable t + , Table t + , HasSqlEquality (PrimaryKey t) + ) + => DatabaseEntity be db (TableEntity t) + -> PrimaryKey t Identity + -> m (Maybe (t Identity)) +selectByKey tbl k = runSelectReturningOne $ + select $ filter_ (\t -> primaryKey t ==. val_ k) (all_ tbl) From 61f5f9e2621a13dcdbdc550876fdbbf3ea9778e8 Mon Sep 17 00:00:00 2001 From: Ali Abrar Date: Wed, 17 Nov 2021 22:21:59 -0500 Subject: [PATCH 3/4] notify-listen: docs --- .../src/Rhyolite/DB/NotifyListen/Beam.hs | 81 +++++++++++++++++++ .../src/Rhyolite/DB/NotifyListen.hs | 20 ++++- 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs b/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs index 3e342a21..b9d603ea 100644 --- a/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs +++ b/notify-listen/notify-listen-beam/src/Rhyolite/DB/NotifyListen/Beam.hs @@ -1,3 +1,7 @@ +{-| +Description: + Notifications about database changes in beam +-} {-# Language ConstraintKinds #-} {-# Language DeriveGeneric #-} {-# Language FlexibleContexts #-} @@ -36,6 +40,12 @@ import Database.PostgreSQL.Simple.Class import GHC.Generics import Rhyolite.DB.NotifyListen +-- | Type synonym for things that can be used in a postgres query equality +-- comparison. If @x@ satisfies these constraints, you should be able to use +-- @x@ in a beam query like this: +-- +-- > (\t -> t ==. val_ x) +-- type HasSqlEquality t = ( Generic (t (HasConstraint (HasSqlEqualityCheck Postgres))) , Generic (t Identity) @@ -51,9 +61,37 @@ type HasSqlEquality t = (Rep (t (HasConstraint (HasSqlEqualityCheck Postgres))))) ) +-- | Class for relating application-specific db notification types with db +-- table entities. The example below shows how this can be used. Here's a +-- table definition: +-- +-- > data Person f = Person { personName :: Columnar f Text } +-- > instance Table Person where +-- > data PrimaryKey Person f = PersonId (Columnar f Text) +-- > deriving (Generic, Beamable) +-- > primaryKey = PersonId . personName +-- +-- You can use a GADT to define the types of notifications you expect to +-- receive. Below, we're saying that we expect a notification payload carrying +-- the primary key of the changed row. +-- +-- > data DbNotify a where +-- > DbNotify_Person :: DbNotify (PrimaryKey Person Identity) +-- +-- We can now implement as 'HasNotification' instance: +-- +-- > instance HasNotification DbNotify Person where +-- > notification _ = DbNotify_Person +-- +-- Now, we can call 'insertAndNotify', 'updateAndNotify', or 'deleteAndNotify' +-- to modify the @Person@ table, and this instance will be used to produce the +-- right type of notification. +-- class HasNotification n a | a -> n where notification :: DatabaseEntity be db (TableEntity a) -> n (PrimaryKey a Identity) +-- | Insert a value into a table and send a db notification with information +-- about the change. insertAndNotify :: forall be db t n m. ( be ~ Postgres , MonadBeamInsertReturning be m @@ -66,8 +104,11 @@ insertAndNotify :: forall be db t n m. , FromBackendRow be (t Identity) ) => DatabaseEntity be db (TableEntity t) + -- ^ The table into which the new row will be inserted -> (forall s. t (QExpr be s)) + -- ^ The value to be inserted -> m (Maybe (PrimaryKey t Identity)) + -- ^ The primary key of the new row, if the insert was successful insertAndNotify tbl g = do xs <- runInsertReturningList $ insert tbl $ insertExpressions [g] case xs of @@ -77,6 +118,9 @@ insertAndNotify tbl g = do pure $ Just $ primaryKey x _ -> pure Nothing +-- | Update a row in a table and send a db notification with information about +-- the change. This function only updates the row specified by the primary key +-- argument. updateAndNotify :: forall be db t n m. ( be ~ Postgres , MonadBeamUpdateReturning be m @@ -90,9 +134,14 @@ updateAndNotify :: forall be db t n m. , HasSqlEquality (PrimaryKey t) ) => DatabaseEntity be db (TableEntity t) + -- ^ The table being updated -> PrimaryKey t Identity + -- ^ The primary key of the row to update -> (forall s. t (QField s) -> QAssignment be s) + -- ^ The actual update to perform + -- (e.g., @(\c -> addressCountry (customerAddress c) <-. val_ (Just "USA"))@) -> m (Maybe (PrimaryKey t Identity)) + -- ^ The primary key of the updated row, if the update was successful updateAndNotify tbl k g = do xs <- runUpdateReturningList $ update tbl g (\t -> primaryKey t ==. val_ k) @@ -103,6 +152,9 @@ updateAndNotify tbl k g = do pure $ Just $ primaryKey x _ -> pure Nothing +-- | Delete a row in a table and send a db notification with information about +-- the change. This function only deletes the row specified by the primary key +-- argument. deleteAndNotify :: forall be db t n m. ( be ~ Postgres , MonadBeam be m @@ -116,24 +168,35 @@ deleteAndNotify :: forall be db t n m. , HasSqlEquality (PrimaryKey t) ) => DatabaseEntity be db (TableEntity t) + -- ^ The table to delete from -> PrimaryKey t Identity + -- ^ The primary key of the row to delete -> m (PrimaryKey t Identity) + -- ^ The primary key that was deleted deleteAndNotify tbl k = do runDelete $ delete tbl (\t -> primaryKey t ==. val_ k) notify NotificationType_Delete (notification tbl) k pure k +-- | Similar to 'HasNotification' except that this class provides more detailed +-- information about what changed, including the old and new values, where +-- applicable. class HasChangeNotification n a | a -> n where changeNotification :: DatabaseEntity be db (TableEntity a) -> n (Change a) +-- | Information about a database change, including both the old and new +-- values, where they exist. data Change a = Change { _change_id :: PrimaryKey a Identity , _change_oldNew :: These (a Identity) (a Identity) + -- ^ @These oldValue newValue@ } deriving (Generic) +-- | Accessor for the old value, if it exists. changeOld :: Change a -> Maybe (a Identity) changeOld = these Just (const Nothing) (\old _ -> Just old) . _change_oldNew +-- | Accessor for the new value, if it exists. changeNew :: Change a -> Maybe (a Identity) changeNew = these (const Nothing) Just (\_ new -> Just new) . _change_oldNew @@ -146,6 +209,7 @@ instance (ToJSON (PrimaryKey a Identity), ToJSON (a Identity)) instance (FromJSON (PrimaryKey a Identity), FromJSON (a Identity)) => FromJSON (Change a) +-- | Insert a row into the database and send a 'Change' notification. insertAndNotifyChange :: forall be db t n m. ( be ~ Postgres , MonadBeamInsertReturning be m @@ -158,8 +222,11 @@ insertAndNotifyChange :: forall be db t n m. , FromBackendRow be (t Identity) ) => DatabaseEntity be db (TableEntity t) + -- ^ The table into which the new row will be inserted -> (forall s. t (QExpr be s)) + -- ^ The value to be inserted -> m (Maybe (PrimaryKey t Identity)) + -- ^ The primary key of the new row, if the insert was successful insertAndNotifyChange tbl g = do xs <- runInsertReturningList $ insert tbl $ insertExpressions [g] case xs of @@ -170,6 +237,10 @@ insertAndNotifyChange tbl g = do pure $ Just xid _ -> pure Nothing +-- | Update a row in the database and send a 'Change' notification about the +-- modification. Note that this function retrieves the entire row before the +-- modification and sends a notification including both the old and new values. +-- See the size limitation note on 'Rhyolite.DB.NotifyListen.notify'. updateAndNotifyChange :: forall be db t n m. ( be ~ Postgres , MonadBeamUpdateReturning be m @@ -184,9 +255,14 @@ updateAndNotifyChange :: forall be db t n m. , Database be db ) => DatabaseEntity be db (TableEntity t) + -- ^ The table being updated -> PrimaryKey t Identity + -- ^ The primary key of the row to update -> (forall s. t (QField s) -> QAssignment be s) + -- ^ The actual update to perform + -- (e.g., @(\c -> addressCountry (customerAddress c) <-. val_ (Just "USA"))@) -> m (Maybe (PrimaryKey t Identity)) + -- ^ The primary key of the updated row, if the update was successful updateAndNotifyChange tbl k g = do mx <- selectByKey tbl k case mx of @@ -204,6 +280,8 @@ updateAndNotifyChange tbl k g = do pure $ Just k _ -> pure Nothing +-- | Delete a row from the database and send a 'Change' notification. Note that +-- this retrieves the row before deleting it. deleteAndNotifyChange :: forall be db t n m. ( be ~ Postgres , MonadBeam be m @@ -218,8 +296,11 @@ deleteAndNotifyChange :: forall be db t n m. , HasSqlEquality (PrimaryKey t) ) => DatabaseEntity be db (TableEntity t) + -- ^ The table to delete from -> PrimaryKey t Identity + -- ^ The primary key of the row to delete -> m (Maybe (PrimaryKey t Identity)) + -- ^ The primary key of the deleted row, if the row existed when we tried deleteAndNotifyChange tbl k = do mv <- selectByKey tbl k case mv of diff --git a/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs b/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs index e788ac3f..07ba713d 100644 --- a/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs +++ b/notify-listen/notify-listen/src/Rhyolite/DB/NotifyListen.hs @@ -12,7 +12,23 @@ Description: {-# Language StandaloneDeriving #-} {-# Language UndecidableInstances #-} -module Rhyolite.DB.NotifyListen where +module Rhyolite.DB.NotifyListen + ( -- * Sending notifications + notify + , NotificationType(..) + , DbNotification(..) + -- * Running a notification listener + , notificationListener + , startNotificationListener + , NotificationChannel(..) + , defaultNotificationChannel + -- * Helpers + , listenCmd + , notifyCmd + , SchemaName(..) + , getSchemaName + , getSearchPath + ) where import Control.Concurrent (forkIO, killThread) import Control.Concurrent.STM (TChan) @@ -163,7 +179,7 @@ notifyCmd (NotificationChannel chan) = fromString $ "NOTIFY " <> chan <> ", ?" -- different types. To use this, construct a GADT that defines your -- notification protocol: -- --- > data Notice a where +-- > data Notify a where -- > Notify_Account :: Notify (Id Account) -- > Notify_Chatroom :: Notify (Id Chatroom) -- > Notify_Message :: Notify (Id Message) From 05cca46b80faf8cdce54dd9cabffd20b41fc2d9c Mon Sep 17 00:00:00 2001 From: Ali Abrar Date: Wed, 17 Nov 2021 22:38:05 -0500 Subject: [PATCH 4/4] notify-listen: Fix call site in rhyolite-backend --- backend/Rhyolite/Backend/App.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/Rhyolite/Backend/App.hs b/backend/Rhyolite/Backend/App.hs index 3c91e30c..6445a3e9 100644 --- a/backend/Rhyolite/Backend/App.hs +++ b/backend/Rhyolite/Backend/App.hs @@ -48,7 +48,7 @@ import Reflex (Group(..), Additive) import Rhyolite.Api import Rhyolite.App -import Rhyolite.DB.NotifyListen (startNotificationListener, defaultNotificationChannel) +import Rhyolite.DB.NotifyListen (startNotificationListener) import Rhyolite.Concurrent import Rhyolite.Sign (Signed) import Rhyolite.Backend.WebSocket (withWebsocketsConnection, getDataMessage, sendEncodedDataMessage) @@ -446,7 +446,7 @@ serveDbOverWebsocketsRaw -> Pipeline IO (MonoidalMap ClientKey q) q' -> IO (m a, IO ()) serveDbOverWebsocketsRaw withWsConn version fromWire db handleApi handleNotify handleQuery pipe = do - (getNextNotification, finalizeListener) <- startNotificationListener defaultNotificationChannel db + (getNextNotification, finalizeListener) <- startNotificationListener db rec (qh, finalizeFeed) <- feedPipeline (handleNotify <$> getNextNotification) handleQuery r (qh', r) <- unPipeline pipe qh r' (r', handleListen) <- connectPipelineToWebsocketsRaw withWsConn version fromWire handleApi qh'