Skip to content

Commit

Permalink
feat: update to for more flexible errors
Browse files Browse the repository at this point in the history
  • Loading branch information
leostera committed Nov 6, 2024
1 parent 310a486 commit db9eb30
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 161 deletions.
6 changes: 4 additions & 2 deletions bench/http_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ let main () =
let acceptor () =
match accept_loop () with
| Ok () -> ()
| Error err ->
Logger.error (fun f -> f "error: %a" IO.pp_err (Obj.magic err))
| Error (`Process_down | `Timeout) ->
Logger.error (fun f -> f "error: process down or timeout")
| Error (`Net err) ->
Logger.error (fun f -> f "error: %a" Gluon.pp_err (Obj.magic err))
in

let _ = List.init 99 (fun _ -> spawn_link acceptor) in
Expand Down
14 changes: 10 additions & 4 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
(license MIT)

(pin
(url "git+https://github.com/riot-ml/rio.git")
(package (name rio)))
(package
(name gluon))
(url "git+https://github.com/riot-ml/gluon#leostera/expose-bad-uri-without-host"))

(pin
(url "git+https://github.com/riot-ml/bytestring.git")
(package (name bytestring)))
(package
(name rio))
(url "git+https://github.com/riot-ml/rio#leostera/remove-gluon-errors"))

(pin
(package (name bytestring))
(url "git+https://github.com/riot-ml/bytestring.git"))

(package
(name riot)
Expand Down
9 changes: 5 additions & 4 deletions packages/riot-runtime/Config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ type t = {
supervisor_restart_period : int;
}

let pp fmt t =
let pp fmt t =
Format.fprintf fmt "== RIOT CONFIG ==\n";
Format.fprintf fmt "* max_wokers=%d\n" t.max_workers;
Format.fprintf fmt "* workers=%d\n" t.workers;
Format.fprintf fmt "* supervisor_restart_limit=%d\n" t.supervisor_restart_limit;
Format.fprintf fmt "* supervisor_restart_period=%d\n" t.supervisor_restart_period;
Format.fprintf fmt "* supervisor_restart_limit=%d\n"
t.supervisor_restart_limit;
Format.fprintf fmt "* supervisor_restart_period=%d\n"
t.supervisor_restart_period;
Format.fprintf fmt "\n%!"
;;

let make ?(supervisor_restart_limit = 1) ?(supervisor_restart_period = 0)
?workers () =
Expand Down
24 changes: 16 additions & 8 deletions packages/riot-stdlib/SSL.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ module IO = Rio

let ( let* ) = Result.bind

type error = [ `Net of Gluon.error | `Process_down | `Timeout ]

type 'src t = {
writer : 'src IO.Writer.t;
reader : 'src IO.Reader.t;
writer : ('src, error) IO.Writer.t;
reader : ('src, error) IO.Reader.t;
mutable state : [ `Active of Tls.Engine.state | `Eof | `Error of exn ];
mutable linger : string option;
recv_buf : bytes;
Expand All @@ -52,10 +54,14 @@ exception Tls_alert of Tls.Packet.alert_type
exception Tls_failure of Tls.Engine.failure

module Tls_unix = struct
exception Read_error of Rio.io_error
exception Write_error of Rio.io_error
exception Read_error of error
exception Write_error of error

let err_to_str err = Format.asprintf "%a" Rio.pp_err err
let err_to_str err =
match err with
| `Process_down -> "Process down"
| `Timeout -> "Process timeout"
| `Net err -> Format.asprintf "%a" Gluon.pp_err err

let read_t t dst =
let src = IO.Bytes.with_capacity (Bytes.length dst) in
Expand All @@ -64,7 +70,7 @@ module Tls_unix = struct
trace (fun f -> f "read_t: %d/%d" len (Bytes.length dst));
BytesLabels.blit ~src ~src_pos:0 ~dst ~dst_pos:0 ~len;
len
| Error (`Closed | `Eof) ->
| Error (`Net Connection_closed) ->
trace (fun f -> f "read_t: 0/%d" (Bytes.length dst));
raise End_of_file
| Error err ->
Expand Down Expand Up @@ -224,10 +230,11 @@ module Tls_unix = struct
in
drain_handshake t

let to_reader : type src. src t -> src t IO.Reader.t =
let to_reader : type src. src t -> (src t, error) IO.Reader.t =
fun t ->
let module Read = struct
type nonrec t = src t
type nonrec error = error

let read t ?timeout:_ dst =
match single_read t dst with
Expand All @@ -238,10 +245,11 @@ module Tls_unix = struct
end in
IO.Reader.of_read_src (module Read) t

let to_writer : type src. src t -> src t IO.Writer.t =
let to_writer : type src. src t -> (src t, error) IO.Writer.t =
fun t ->
let module Write = struct
type nonrec t = src t
type nonrec error = error

let write t ~buf = single_write t buf

Expand Down
21 changes: 15 additions & 6 deletions packages/riot-stdlib/file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,25 @@ let exists path =

module Read = struct
type t = read_file
type error = [ `Gluon of Gluon.error ]

let rec read t ?timeout buf =
match File.read t.fd buf ~pos:0 ~len:(Rio.Bytes.length buf) with
match
File.read t.fd buf ~pos:0 ~len:(Rio.Bytes.length buf)
|> Result.map_error (fun e -> `Gluon e)
with
| Ok n -> Ok n
| Error `Would_block ->
| Error (`Gluon (Syscall_would_block _)) ->
syscall ?timeout "File.read" Interest.readable (File.to_source t.fd)
@@ fun _ -> read t ?timeout buf
| Error err -> Error err

let rec read_vectored t bufs =
match File.read_vectored t.fd bufs with
match
File.read_vectored t.fd bufs |> Result.map_error (fun e -> `Gluon e)
with
| Ok n -> Ok n
| Error `Would_block ->
| Error (`Gluon (Syscall_would_block _)) ->
syscall "File.read_vectored" Interest.readable (File.to_source t.fd)
@@ fun _ -> read_vectored t bufs
| Error err -> Error err
Expand All @@ -52,13 +58,16 @@ let to_reader t = Rio.Reader.of_read_src (module Read) t

module Write = struct
type t = write_file
type error = [ `Gluon of Gluon.error ]

let size t = (stat t).st_size

let rec write_owned_vectored t ~bufs =
match File.write_vectored t.fd bufs with
match
File.write_vectored t.fd bufs |> Result.map_error (fun e -> `Gluon e)
with
| Ok n -> Ok n
| Error `Would_block ->
| Error (`Gluon (Syscall_would_block _)) ->
syscall "File.write_vectored" Interest.writable (File.to_source t.fd)
@@ fun _ -> write_owned_vectored t ~bufs
| Error err -> Error err
Expand Down
9 changes: 0 additions & 9 deletions packages/riot-stdlib/lib_io.ml

This file was deleted.

39 changes: 28 additions & 11 deletions packages/riot-stdlib/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@ open Logger.Make (struct
let namespace = [ "riot"; "net" ]
end)

let map_err e = Result.map_error (fun e -> `Net e) e

module Socket = Gluon.Net.Socket
module Addr = Gluon.Net.Addr

module Addr = struct
include Gluon.Net.Addr

let get_info addr = get_info addr |> map_err
let of_uri uri = of_uri uri |> map_err
let parse str = parse str |> map_err
end

module Tcp_listener = struct
include Gluon.Net.Tcp_listener

let bind ?reuse_addr ?reuse_port ?backlog stream_addr =
bind ?reuse_addr ?reuse_port ?backlog stream_addr |> map_err

type listen_opts = {
reuse_addr : bool;
reuse_port : bool;
Expand All @@ -36,12 +48,12 @@ module Tcp_listener = struct
let this = self () in
let rec accept_loop t =
trace (fun f -> f "Socket is Accepting client at fd=%a" Fd.pp t);
match accept t with
match accept t |> map_err with
| Ok (conn, addr) ->
trace (fun f ->
f "Accepted client %a / %a" Addr.pp addr Socket.pp conn);
Ok (conn, addr)
| Error `Would_block ->
| Error (`Net (Syscall_would_block _)) ->
trace (fun f ->
f "Socket not ready, %a is retrying at fd=%a" Pid.pp this Fd.pp t);
syscall "accept" Interest.(add readable writable) (to_source t)
Expand All @@ -66,6 +78,12 @@ end
module Tcp_stream = struct
include Gluon.Net.Tcp_stream

let read t ?pos ?len bytes = read t ?pos ?len bytes |> map_err
let read_vectored t iovec = read_vectored t iovec |> map_err
let write t ?pos ?len bytes = write t ?pos ?len bytes |> map_err
let write_vectored t iovec = write_vectored t iovec |> map_err
let sendfile t ~file ~off ~len = sendfile t ~file ~off ~len |> map_err

let close t =
let this = self () in
trace (fun f -> f "Process %a: Closing socket fd=%a" Pid.pp this Fd.pp t);
Expand All @@ -82,14 +100,14 @@ module Tcp_stream = struct
let connect ?timeout addr =
let rec connect_loop addr =
trace (fun f -> f "Attempting to connect to %a" Addr.pp addr);
match connect addr with
match connect addr |> map_err with
| Ok (`Connected t) ->
trace (fun f -> f "Connected to %a" Addr.pp addr);
Ok t
| Ok (`In_progress t) ->
trace (fun f -> f "In_progress %a" Addr.pp addr);
syscall "connect" Interest.(writable) (to_source t) @@ fun () -> Ok t
| Error `Would_block ->
| Error (`Net (Syscall_would_block _)) ->
yield ();
connect_loop addr
| Error err -> Error err
Expand All @@ -104,7 +122,7 @@ module Tcp_stream = struct
| Ok len ->
trace (fun f -> f "received: %d octets from %a" len Socket.pp t);
Ok len
| Error `Would_block ->
| Error (`Net (Syscall_would_block _)) ->
trace (fun f -> f "waiting on %a to receive" Socket.pp t);
syscall ?timeout "receive" Interest.readable (to_source t) @@ fun () ->
receive ?timeout ~bufs t
Expand All @@ -116,23 +134,21 @@ module Tcp_stream = struct
| Ok bytes ->
trace (fun f -> f "sent: %d" (Rio.Iovec.length bufs));
Ok bytes
| Error `Would_block ->
| Error (`Net (Syscall_would_block _)) ->
trace (fun f -> f "retrying");
syscall ?timeout "send" Interest.writable (to_source t) @@ fun () ->
send ?timeout ~bufs t
| Error err -> Error err

let pp_err fmt = function
| `Net err -> Gluon.pp_err fmt err
| `Timeout -> Format.fprintf fmt "Timeout"
| `Process_down -> Format.fprintf fmt "Process_down"
| `System_limit -> Format.fprintf fmt "System_limit"
| `Closed -> Format.fprintf fmt "Closed"
| `Unix_error err ->
Format.fprintf fmt "Unix_error(%s)" (Unix.error_message err)

let to_reader ?timeout:global_timeout t =
let module Read = struct
type nonrec t = t
type error = [ `Net of Gluon.error | `Process_down | `Timeout ]

let read t ?timeout buf =
let timeout =
Expand All @@ -147,6 +163,7 @@ module Tcp_stream = struct
let to_writer ?timeout t =
let module Write = struct
type nonrec t = t
type error = [ `Net of Gluon.error | `Process_down | `Timeout ]

let write_owned_vectored t ~bufs = send ?timeout ~bufs t

Expand Down
4 changes: 2 additions & 2 deletions packages/riot-stdlib/riot_stdlib.ml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module Application = Application
module Bytestring = Bytestring
module Dynamic_supervisor = Dynamic_supervisor
module Crypto = Crypto
module Dynamic_supervisor = Dynamic_supervisor
module Fd = Fd
module File = File
module Gen_server = Gen_server
module Hashmap = Hashmap
module IO = Lib_io
module IO = Rio
module Logger = Logger_app
module Message = Message
module Net = Net
Expand Down
13 changes: 1 addition & 12 deletions packages/riot/riot.ml
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
include Riot_stdlib

open struct
open Riot_runtime
module Config = Config
module Log = Log
module Core = Core
module Import = Import
module Util = Util
module Scheduler = Scheduler
module Time = Time
end

open Riot_runtime
module Config = Config

open Logger.Make (struct
Expand Down
Loading

0 comments on commit db9eb30

Please sign in to comment.