We've managed to get nicely typed arbitrary messages into our web handlers and gen servers, now it's time to look at Monitors.
A reasonably common pattern for monitors in some of our code is
In this case sometimes
This isn't always the case, sometimes a static message bus is a better option, sometimes pids can be checked ad-hoc, but for the purpose of this example we'll assume that this is exactly what we want as it'll be a nice end-to-end example of message passing and monitoring in Purerl.
So we'll define a basic gen server that keeps a state that's a map of pids to functions that receive data (Binary -> Effect Unit), and set up a timer to send us a Tick message after 500ms - our message type will therefore just be either that Tick message, or a message telling us that a client has disconnected. We'll configure the gen server to use a handleInfo function when these come in (explored further below).
type State = {
handlers :: Map.Map Pid MessageHandler
}
type MessageHandler = (Binary -> Effect Unit)
data Msg = ClientDisconnected Pid
| Tick
startLink :: BookWatchingStartArgs -> Effect StartLinkResult
startLink args =
Gen.buildStartLink serverName (init args) $ Gen.defaultStartLink { handleInfo = handleInfo }
init :: BookWatchingStartArgs -> Gen.Init State Msg
init args = do
self <- Gen.self
void $ Gen.lift $ Timer.sendAfter 500 Tick self
pure $ {
handlers: Map.empty
}
We can export a function registerClient for clients to invoke in order to start receiving data, while we're still in the process that called us we can get its pid by calling out to 'Pinto.self', and then in the context of the gen server, we'll get our own pid so we can add the monitor in the next function addHandler.
As we have the pid of our calling process, we can invoke Monitor.pid, and pass in a callback that disregards the message given to us when the monitor pops and just sends a message with the handler pid back to our handleInfo. Once we're monitoring the handler, we can add it to our map using the pid as a key so we can easily remove it later when we get the message telling us it went down.
registerClient :: MessageHandler -> Effect Unit
registerClient handler = do
handlerPid <- Pinto.self
Gen.doCall serverName \state -> do
self <- Gen.self
newState <- Gen.lift $ addHandler handler self handlerPid state
pure $ CallReply unit newState
addHandler :: MessageHandler -> Process Msg -> Pid -> State -> Effect State
addHandler handler self handlerPid state@{ handlers } = do
void $ Logger.info1 "Adding handler ~p as it has connected" handlerPid
void $ Monitor.pid handlerPid (\_ -> self ! ClientDisconnected handlerPid)
pure $ state { handlers = Map.insert handlerPid handler handlers }
All that is left therefore, is to handle the messages we might receive into the handleInfo we configured earlier as part of startLink.
handleInfo :: Msg -> State -> Gen.HandleInfo State Msg
handleInfo msg state@{ handlers } = do
case msg of
ClientDisconnected handlerPid -> do
void $ Gen.lift $ Logger.info1 "Removing ~p as it disconnected" handlerPid
pure $ CastNoReply $ state { handlers = Map.delete handlerPid handlers }
Tick -> do
Gen.lift $ sendData handlers
self <- Gen.self
void $ Gen.lift $ Timer.sendAfter 500 Tick self
pure $ CastNoReply $ state
Sending data is easy, seeing as the handlers are just a list of effectful callbacks of (Binary -> Effect Unit)
sendData :: Map.Map Pid MessageHandler -> Effect Unit
sendData handlers = do
freshData <- getDataFromSomeNativeCode
void $ traverse (\handler -> do handler freshData) $ Map.values handlers
pure unit
So that's an entire gen server, which
Note: Because of the callback/pids there is nothing stopping us using this code from any other Purerl (or indeed Erlang). None of this is Pinto specific and this is very much by design.
Speaking of other Purerl, a lot of Purerl gets written using Stetson to support Rest/Websockets/Streams/etc; so that's where we'll subscribe to this data. We'll also then add a monitor to that streaming process that closes the connection when it goes away. (We could also just block while we wait for it to restart for example).
So, we'll define a data type for our messages, we're either receiving data that needs to be streamed, our data source died, or our data source was already down when we tried to connect to it.
data DataStreamMessage = Data Binary
| DataSourceDied
| DataSourceAlreadyDown
We'll just kick off our handler with Loop.handler, start a streamed reply with a status code 200 and make sure that Stetson knows we're doing a Loop, we're typed as a StetsonHandler DataStreamMessage Unit because we receive DataSteamMessage and don't store any state of our own.
dataStream :: StetsonHandler DataStreamMessage Unit
dataStream =
Loop.handler (\req -> do
req2 <- streamReply (StatusCode 200) Map.empty req
Loop.initResult req2 unit)
In our Loop.init, we'll get our own typed process (Process DataStreamMessage), invoking 'Process.send' on this gives us a function of type (Msg -> Effect Unit) so we'll compose that with a constructor for our own data type (Data) giving us the correct function type of (Binary -> Effect Unit)
Using Gen.monitor with the server name of MonitorExample, we can detect when that process dies - there are two effectful callbacks for this, one for when the process dies and one for if the process is already down (there is no pid to monitor).
# Loop.init (\req state -> do
self <- Loop.self
void $ Loop.lift $ MonitorExample.registerClient $ send self <<< Data
void $ Loop.lift $ Gen.monitor MonitorExample.serverName (\_ -> send self DataSourceDied) (send self DataSourceAlreadyDown)
pure unit)
We receive those messages in the Loop.info callback
# Loop.info (\msg req state -> do
case msg of
Data binary -> do
_ <- Loop.lift $ streamBody binary req
pure $ LoopOk req state
DataSourceDied -> do
pure $ LoopStop req state
DataSourceAlreadyDown -> do
pure $ LoopStop req state
And that's that, we have a gen server running which will send data to any subscribers and clean up when those subscribers terminate, and we have a loop handler that'll subcribe to that data source and clean up if that data source dies. There is a lot going on here but it's worth unpicking as there are a lot of useful concepts here neatly packaged into a single example.
By sticking to plain ol' pids and callbacks, all of this code remains portable and not tied to either of these libraries, which is handy because at some point somebody smarter than I will write something more Purerl idiomatic for web serving and OTP wrapping and we'll probably want to switch to those things..
2020 © Rob Ashton. ALL Rights Reserved.