diff --git a/lib/dune b/lib/dune index 4d91bea..0f25c32 100644 --- a/lib/dune +++ b/lib/dune @@ -8,7 +8,7 @@ (name ptt_sendmail) (public_name ptt.sendmail) (modules ptt_sendmail) - (libraries emile colombe.emile mrmime sendmail-mirage ptt.common)) + (libraries hxd.core hxd.string emile colombe.emile mrmime sendmail-mirage ptt.common)) (library (name ptt_aggregate) @@ -20,7 +20,7 @@ (name ptt_flow) (public_name ptt.flow) (modules ptt_flow) - (libraries ke colombe tcpip)) + (libraries hxd.core hxd.string logs ke colombe tcpip)) (library (name ptt_map) @@ -31,7 +31,7 @@ (library (name ptt) (public_name ptt) - (modules ptt authentication logic mechanism messaged relay sMTP sSMTP submission) + (modules ptt authentication logic mechanism msgd relay sMTP sSMTP submission) (libraries ptt.common ptt.flow ptt.aggregate digestif mrmime colombe.emile domain-name dns sendmail.starttls logs ipaddr) (preprocess future_syntax)) @@ -72,6 +72,12 @@ (modules hm) (libraries mirage-time mirage-clock mirage-random ptt ptt.map ptt.server uspf-mirage)) +(library + (name elit) + (public_name ptt.elit) + (modules elit) + (libraries mirage-time mirage-clock mirage-random ptt ptt.map ptt.server dns-client-mirage uspf-mirage)) + (library (name ptt_value) (public_name ptt.value) diff --git a/lib/elit.ml b/lib/elit.ml new file mode 100644 index 0000000..95bc4c0 --- /dev/null +++ b/lib/elit.ml @@ -0,0 +1,318 @@ +open Rresult +open Lwt.Infix + +let src = Logs.Src.create "ptt.elit" + +module Log : Logs.LOG = (val Logs.src_log src) + +let ( $ ) f g = fun x -> f (g x) + +module Make + (Time : Mirage_time.S) + (Mclock : Mirage_clock.MCLOCK) + (Pclock : Mirage_clock.PCLOCK) + (Stack : Tcpip.Stack.V4V6) + (Dns_client : Dns_client_mirage.S) + (Happy_eyeballs : Happy_eyeballs_mirage.S with type flow = Stack.TCP.flow) = +struct + module Server = Ptt_server.Make (Time) (Stack) + module Sendmail = Ptt_sendmail.Make (Pclock) (Stack) (Happy_eyeballs) + module Nss = Ca_certs_nss.Make (Pclock) + module Uspf_client = Uspf_mirage.Make (Dns_client) + + module Local = struct + module Submission = Ptt.Submission.Make (Stack) + + let submission_resolver = + let open Ptt_common in + let getmxbyname _ipaddr mail_exchange = + Dns.Rr_map.Mx_set.(singleton { Dns.Mx.preference= 0; mail_exchange }) + |> Lwt.return_ok in + let gethostbyname ipaddr _domain_name = + Lwt.return_ok ipaddr in + { getmxbyname; gethostbyname } + + let submission_job ~pool ?stop ?(port= 465) ~destination + random hash stack server close = + let handler flow = + let ipaddr, port = Stack.TCP.dst flow in + Lwt.finalize + (fun () -> + Lwt_pool.use pool @@ fun (encoder, decoder, _) -> + Submission.accept_without_starttls + ~encoder:(Fun.const encoder) ~decoder:(Fun.const decoder) ~ipaddr + flow destination submission_resolver + random hash server + >|= R.reword_error (R.msgf "%a" Submission.pp_error)) + (fun () -> Stack.TCP.close flow) + >>= function + | Ok () -> Lwt.return () + | Error (`Msg err) -> + Log.err (fun m -> m "<%a:%d> raised an error: %s" Ipaddr.pp ipaddr port err); + Lwt.return () in + Server.init ~port stack >>= fun service -> + Server.serve_when_ready ?stop ~handler service + |> fun (`Initialized job) -> + let job = job >|= close in job + + let submission_logic_job ~info map (ic, oc) = + let rec go () = + Lwt_stream.get ic >>= function + | None -> oc None; Lwt.return_unit + | Some (key, stream, wk) -> + Lwt.catch + (fun () -> + let sender = fst (Ptt.Msgd.from key) in + let recipients = Ptt.Msgd.recipients key in + let recipients = List.map fst recipients in + let recipients = Ptt_map.expand ~info map recipients in + let recipients = Ptt_aggregate.to_recipients ~info recipients in + let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in + Log.debug (fun m -> m "%a submitted a new email %a." + Colombe.Reverse_path.pp sender Mrmime.MessageID.pp id); + let elts = List.map (fun recipients -> + { Ptt_sendmail.sender + ; recipients + ; data= Lwt_stream.clone stream + ; policies= [] + ; id }) recipients in + Log.debug (fun m -> m "Notice the SMTP server that everything is ok for %a from %a." + Colombe.Reverse_path.pp sender Mrmime.MessageID.pp id); + Lwt.wakeup_later wk `Ok; + Log.debug (fun m -> m "Send the incoming email %a to our destination." + Mrmime.MessageID.pp id); + List.iter (oc $ Option.some) elts; + Lwt.return_unit) + (fun exn -> + Log.err (fun m -> m "Got an error into the submission logic: %S" (Printexc.to_string exn)); + Lwt.return_unit) + >>= Lwt.pause >>= go in + go () + + let job ?(limit = 20) ?stop ~locals ?port ~tls ~info ~destination + stack he random hash authenticator mechanisms = + let pool0 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + { Ptt_sendmail.pool= fun fn -> Lwt_pool.use pool1 fn } in + let ic_server, stream0, close0 = Submission.create ~info ~authenticator mechanisms in + let oc_server, push0 = Sendmail.v ~resolver:submission_resolver ~pool:pool1 ~info tls in + Lwt.join + [ submission_job ~pool:pool0 ?stop ?port ~destination random hash stack ic_server close0 + ; submission_logic_job ~info locals (stream0, push0) + ; Sendmail.job destination he oc_server ] + end + + module Out = struct + module Relay = Ptt.Relay.Make (Stack) + + let mail_exchange_resolver = + let open Ptt_common in + let getmxbyname dns domain_name = + Dns_client.getaddrinfo dns Dns.Rr_map.Mx domain_name + >|= Result.map snd in + let gethostbyname dns domain_name = + let ipv4 = + Dns_client.gethostbyname dns domain_name + >|= Result.map (fun ipv4 -> Ipaddr.V4 ipv4) in + let ipv6 = + Dns_client.gethostbyname6 dns domain_name + >|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in + Lwt.all [ ipv4; ipv6 ] >|= function + | [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ] + | [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ] + | [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ] + | [ (Error _ as err); _ ] -> err + | [] | [_] | _ :: _ :: _ -> assert false in + { getmxbyname; gethostbyname } + + let mail_exchange_job ~pool ?stop ?(port= 25) stack dns server close = + let handler flow = + let ipaddr, port = Stack.TCP.dst flow in + Lwt.finalize + (fun () -> + Lwt_pool.use pool @@ fun (encoder, decoder, queue) -> + Relay.accept ~encoder:(Fun.const encoder) ~decoder:(Fun.const decoder) + ~queue:(Fun.const queue) ~ipaddr flow dns mail_exchange_resolver server + >|= R.reword_error (R.msgf "%a" Relay.pp_error)) + (fun () -> Stack.TCP.close flow) + >>= function + | Ok () -> Lwt.return () + | Error (`Msg err) -> + Log.err (fun m -> m "<%a:%d> raised an error: %s" Ipaddr.pp ipaddr port err); + Lwt.return () in + Server.init ~port stack >>= fun service -> + Server.serve_when_ready ?stop ~handler service + |> fun (`Initialized job) -> + let job = job >|= close in job + + let stream_of_field (field_name : Mrmime.Field_name.t) unstrctrd = + Lwt_stream.of_list + [ (field_name :> string) + ; ": " + ; Unstrctrd.to_utf_8_string unstrctrd; "\r\n" ] + + let forward_granted ipaddr allowed_to_forward = + List.exists (fun prefix -> Ipaddr.Prefix.mem ipaddr prefix) allowed_to_forward + + let only_registered_recipients ~info map recipients = + let for_all = function + | Colombe.Forward_path.Postmaster -> true + | Domain domain' -> Colombe.Domain.equal info.Ptt_common.domain domain' + | Forward_path { Colombe.Path.local; domain= domain'; _ } -> + Colombe.Domain.equal info.Ptt_common.domain domain' + && Ptt_map.exists ~local map in + List.for_all for_all recipients + + let verify ~info ~sender ~ipaddr dns stream = + let ctx = + Uspf.empty + |> Uspf.with_ip ipaddr + |> fun ctx -> Option.fold ~none:ctx + ~some:(fun v -> Uspf.with_sender (`MAILFROM v) ctx) + sender in + Uspf_client.get ~ctx dns >>= function + | Error _ -> Lwt.return (`Requested_action_not_taken `Permanent) + | Ok record -> + Uspf_client.check ~ctx dns record >>= fun result -> + let receiver = match info.Ptt_common.domain with + | Colombe.Domain.Domain ds -> `Domain ds + | IPv4 ipv4 -> `Addr (Emile.IPv4 ipv4) + | IPv6 ipv6 -> `Addr (Emile.IPv6 ipv6) + | Extension (k, v) -> `Addr (Emile.Ext (k, v)) in + let field_name, unstrctrd = Uspf.to_field ~ctx ~receiver result in + let stream = Lwt_stream.append (stream_of_field field_name unstrctrd) stream in + Lwt.return (`Ok stream) + + let mail_exchange_logic_job ~info ~map ~allowed_to_forward dns (ic, oc) = + let sender = + let local = `Dot_string [ "ptt"; "elit" ] in + Some (Colombe.Path.{ local; domain= info.Ptt_common.domain; rest= [] }) in + let rec go () = + Lwt_stream.get ic >>= function + | None -> oc None; Lwt.return_unit + | Some (key, stream, wk) -> + let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in + Log.debug (fun m -> m "%a sent a new email %a to: @[%a@]." + Colombe.Reverse_path.pp sender Mrmime.MessageID.pp id + Fmt.(Dump.list Colombe.Forward_path.pp) (List.map fst (Ptt.Msgd.recipients key))); + let fake_recipients = Ptt.Msgd.recipients key in + let fake_recipients = List.map fst fake_recipients in + let real_recipients = Ptt_map.expand ~info map fake_recipients in + Log.debug (fun m -> m "real recipients of %a: @[%a@]" + Mrmime.MessageID.pp id + Fmt.(Dump.list Colombe.Forward_path.pp) real_recipients); + let real_recipients = Ptt_aggregate.to_recipients ~info real_recipients in + begin + if forward_granted (Ptt.Msgd.ipaddr key) allowed_to_forward + then Lwt.return (`Ok stream) + else verify ~info + ~sender:(fst (Ptt.Msgd.from key)) + ~ipaddr:(Ptt.Msgd.ipaddr key) dns stream end >>= function + | #Ptt.Msgd.error as err -> + Log.warn (fun m -> m "Can not verify SPF informations from %a for %a, discard it!" + Colombe.Reverse_path.pp (fst (Ptt.Msgd.from key)) + Mrmime.MessageID.pp id); + Lwt.wakeup_later wk err; + Lwt.pause () >>= go + | `Ok stream -> + let elts = List.map (fun recipients -> + { Ptt_sendmail.sender + ; recipients + ; data= Lwt_stream.clone stream + ; policies= [] + ; id }) real_recipients in + let src = Ptt.Msgd.ipaddr key in + if forward_granted src allowed_to_forward + || only_registered_recipients ~info map fake_recipients + then begin + List.iter (oc $ Option.some) elts; + Log.debug (fun m -> m "Notice the SMTP server that everything is ok for %a from %a (%a)." + Mrmime.MessageID.pp id + Colombe.Reverse_path.pp (fst (Ptt.Msgd.from key)) + Ipaddr.pp (Ptt.Msgd.ipaddr key)); + Lwt.wakeup_later wk `Ok + end else begin + Log.warn (fun m -> m "Email %a to unknown users (%a), discard it!" + Mrmime.MessageID.pp id + Fmt.(Dump.list Colombe.Forward_path.pp) fake_recipients); + Lwt.wakeup_later wk (`Requested_action_not_taken `Permanent) + end; + Lwt.pause () >>= go in + go () + + let job ?(limit = 20) ?stop ~locals ?port ~tls ~info ?(forward_granted= []) stack dns he = + let pool0 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + Lwt_pool.create limit @@ fun () -> + let encoder = Bytes.create 0x7ff in + let decoder = Bytes.create 0x7ff in + let queue = Ke.Rke.create ~capacity:0x800 Bigarray.char in + Lwt.return (encoder, decoder, queue) in + let pool1 = + { Ptt_sendmail.pool= fun fn -> Lwt_pool.use pool1 fn } in + let ic_server, stream0, close0 = Relay.create ~info in + let oc_server, push0 = Sendmail.v ~resolver:mail_exchange_resolver ~pool:pool1 ~info tls in + let allowed_to_forward = forward_granted in + Lwt.join + [ mail_exchange_job ~pool:pool0 ?stop ?port stack dns ic_server close0 + ; mail_exchange_logic_job ~info ~map:locals ~allowed_to_forward dns (stream0, push0) + ; Sendmail.job dns he oc_server ] + end + + type 'k t = + { locals : Ptt_map.t + ; tls : Tls.Config.client + ; random : Mirage_crypto_rng.g option + ; hash : 'k Digestif.hash + ; authentication : 'k Ptt.Authentication.t + ; mechanisms : Ptt.Mechanism.t list + ; destination : Ipaddr.t + ; forward_granted : Ipaddr.Prefix.t list } + + type 'k iter = (Ptt_map.local -> 'k Digestif.t -> Emile.mailbox list -> unit Lwt.t) -> unit Lwt.t + + let v ?g ?(mechanisms= [ Ptt.Mechanism.PLAIN ]) ~postmaster ?(forward_granted= []) hash iter destination = + let authenticator = R.failwith_error_msg (Nss.authenticator ()) in + let tls = Rresult.R.failwith_error_msg (Tls.Config.client ~authenticator ()) in + let locals = Ptt_map.empty ~postmaster in + let passwds = Hashtbl.create 0x100 in + let add local passwd dsts = + List.iter (fun dst -> Ptt_map.add ~local dst locals) dsts; + Hashtbl.add passwds local passwd; + Lwt.return_unit in + iter add >|= fun () -> + let authentication local passwd' = + match Hashtbl.find_opt passwds local with + | Some passwd -> Lwt.return (Digestif.equal hash passwd passwd') + | None -> Lwt.return false in + let authentication = Ptt.Authentication.v authentication in + { locals; tls; random= g; hash; authentication; mechanisms; destination + ; forward_granted } + + let job ?stop t ~info ?submission ?relay stack dns he = + if Option.is_some info.Ptt_common.tls + then Log.warn (fun m -> m "Discard the TLS server configuration from the [info] value"); + let submission = { info with Ptt_common.tls= submission } in + let relay = { info with Ptt_common.tls= relay } in + Lwt.join + [ Local.job ?stop ~locals:t.locals ~tls:t.tls ~info:submission ~destination:[ t.destination ] + stack he t.random t.hash t.authentication t.mechanisms + ; Out.job ?stop ~locals:t.locals ~tls:t.tls ~info:relay ~forward_granted:t.forward_granted + stack dns he ] +end diff --git a/lib/elit.mli b/lib/elit.mli new file mode 100644 index 0000000..59ec360 --- /dev/null +++ b/lib/elit.mli @@ -0,0 +1,32 @@ +module Make + (Time : Mirage_time.S) + (Mclock : Mirage_clock.MCLOCK) + (Pclock : Mirage_clock.PCLOCK) + (Stack : Tcpip.Stack.V4V6) + (Dns_client : Dns_client_mirage.S) + (Happy_eyeballs : Happy_eyeballs_mirage.S with type flow = Stack.TCP.flow) : +sig + type 'k t + type 'k iter = (Ptt_map.local -> 'k Digestif.t -> Emile.mailbox list -> unit Lwt.t) -> unit Lwt.t + + val v : + ?g:Mirage_crypto_rng.g + -> ?mechanisms:Ptt.Mechanism.t list + -> postmaster:Emile.mailbox + -> ?forward_granted:Ipaddr.Prefix.t list + -> 'k Digestif.hash + -> 'k iter + -> Ipaddr.t + -> 'k t Lwt.t + + val job : + ?stop:Lwt_switch.t + -> 'k t + -> info:Ptt_common.info + -> ?submission:Tls.Config.server + -> ?relay:Tls.Config.server + -> Stack.TCP.t + -> Dns_client.t + -> Happy_eyeballs.t + -> unit Lwt.t +end diff --git a/lib/hm.ml b/lib/hm.ml index 037a4ea..cdc85dc 100644 --- a/lib/hm.ml +++ b/lib/hm.ml @@ -33,8 +33,9 @@ struct Dns_client.gethostbyname6 dns domain_name >|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in Lwt.all [ ipv4; ipv6 ] >|= function - | [ _; (Ok _ as ipv6) ] -> ipv6 - | [ (Ok _ as ipv4); Error _ ] -> ipv4 + | [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ] + | [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ] + | [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ] | [ (Error _ as err); _ ] -> err | [] | [_] | _ :: _ :: _ -> assert false in { getmxbyname; gethostbyname } @@ -72,11 +73,11 @@ struct let rec go () = Lwt_stream.get ic >>= function | None -> oc None; Lwt.return_unit - | Some (key, stream) -> - let sender, _ = Ptt.Messaged.from key in + | Some (key, stream, wk) -> + let sender, _ = Ptt.Msgd.from key in let ctx = Uspf.empty - |> Uspf.with_ip (Ptt.Messaged.ipaddr key) + |> Uspf.with_ip (Ptt.Msgd.ipaddr key) |> fun ctx -> Option.fold ~none:ctx ~some:(fun sender -> Uspf.with_sender (`MAILFROM sender) ctx) sender in @@ -84,8 +85,8 @@ struct Uspf_client.get ~ctx dns >>= function | Error (`Msg msg) -> Log.err (fun m -> m "Got an error from SPF: %s" msg); - (* TODO(dinosaure): add a new field into the incoming email. *) - Lwt.return stream + Lwt.wakeup_later wk (`Requested_action_not_taken `Temporary); + Lwt.return_unit | Ok record -> Uspf_client.check ~ctx dns record >>= fun result -> let receiver = match info.Ptt_common.domain with @@ -95,21 +96,25 @@ struct | Extension (k, v) -> `Addr (Emile.Ext (k, v)) in let field_name, unstrctrd = Uspf.to_field ~ctx ~receiver result in let stream = Lwt_stream.append (stream_of_field field_name unstrctrd) stream in - Lwt.return stream in - verify () >>= fun stream -> - let recipients = Ptt.Messaged.recipients key in - let recipients = List.map fst recipients in - let recipients = Ptt_map.expand ~info map recipients in - let recipients = Ptt_aggregate.to_recipients ~info recipients in - let id = Ptt_common.id_to_messageID ~info (Ptt.Messaged.id key) in - let elts = List.map (fun recipients -> - { Ptt_sendmail.sender - ; recipients - ; data= Lwt_stream.clone stream - ; policies= [] - ; id }) recipients in - List.iter (oc $ Option.some) elts; - Lwt.pause () >>= go in + let result = match result with + | `Pass _ | `Neutral | `None -> `Ok + | `Permerror | `Fail -> `Requested_action_not_taken `Permanent + | `Temperror | `Softfail -> `Requested_action_not_taken `Temporary in + Lwt.wakeup_later wk result; + let recipients = Ptt.Msgd.recipients key in + let recipients = List.map fst recipients in + let recipients = Ptt_map.expand ~info map recipients in + let recipients = Ptt_aggregate.to_recipients ~info recipients in + let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in + let elts = List.map (fun recipients -> + { Ptt_sendmail.sender + ; recipients + ; data= Lwt_stream.clone stream + ; policies= [] + ; id }) recipients in + List.iter (oc $ Option.some) elts; + Lwt.return_unit in + verify () >>= Lwt.pause >>= go in go () let job ?(limit = 20) ?stop ~locals ~port ~tls ~info stack dns he = diff --git a/lib/lipap.ml b/lib/lipap.ml index 0d7420c..78e3f2e 100644 --- a/lib/lipap.ml +++ b/lib/lipap.ml @@ -32,8 +32,9 @@ struct Dns_client.gethostbyname6 dns domain_name >|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in Lwt.all [ ipv4; ipv6 ] >|= function - | [ _; (Ok _ as ipv6) ] -> ipv6 - | [ (Ok _ as ipv4); Error _ ] -> ipv4 + | [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ] + | [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ] + | [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ] | [ (Error _ as err); _ ] -> err | [] | [_] | _ :: _ :: _ -> assert false in { getmxbyname; gethostbyname } @@ -44,7 +45,7 @@ struct Lwt.finalize (fun () -> Lwt_pool.use pool @@ fun (encoder, decoder, _) -> - Submission.accept ~encoder:(Fun.const encoder) + Submission.accept_without_starttls ~encoder:(Fun.const encoder) ~decoder:(Fun.const decoder) ~ipaddr flow dns resolver random hash server >|= R.reword_error (R.msgf "%a" Submission.pp_error)) @@ -63,13 +64,13 @@ struct let rec go () = Lwt_stream.get ic >>= function | None -> oc None; Lwt.return_unit - | Some (key, stream) -> - let sender = fst (Ptt.Messaged.from key) in - let recipients = Ptt.Messaged.recipients key in + | Some (key, stream, wk) -> + let sender = fst (Ptt.Msgd.from key) in + let recipients = Ptt.Msgd.recipients key in let recipients = List.map fst recipients in let recipients = Ptt_map.expand ~info map recipients in let recipients = Ptt_aggregate.to_recipients ~info recipients in - let id = Ptt_common.id_to_messageID ~info (Ptt.Messaged.id key) in + let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in let elts = List.map (fun recipients -> (* TODO(dinosaure): Can we use multiple MAIL FROM to keep the original sender? We actually force to be @@ -82,10 +83,11 @@ struct ; policies= [] ; id }) recipients in List.iter (oc $ Option.some) elts; - Lwt.pause () >>= go in + Lwt.wakeup_later wk `Ok; + Lwt.pause () >>= go in go () - let job ?(limit = 20) ?stop ~locals ~port ~tls ~info + let job ?(limit = 20) ?stop ~locals ~port ~tls ~info random hash stack dns he authenticator mechanisms = let pool0 = diff --git a/lib/logic.ml b/lib/logic.ml index 9990909..78793a4 100644 --- a/lib/logic.ml +++ b/lib/logic.ml @@ -128,7 +128,7 @@ let () = Colombe.Request.Decoder.add_extension "STARTTLS" let () = Colombe.Request.Decoder.add_extension "AUTH" type email = { - from: Messaged.from + from: Msgd.from ; recipients: (Forward_path.t * (string * string option) list) list ; domain_from: Domain.t } @@ -268,9 +268,22 @@ module Make (Monad : MONAD) = struct let open Monad in send ctx Value.TP_354 ["Ok buddy! Finish it with ."] - let m_end ctx = + let m_end result ctx = let open Monad in - let* () = send ctx Value.PP_250 ["Mail sended, buddy!"] in + let* () = match result with + | `Ok -> send ctx Value.PP_250 ["Mail sended, buddy!"] + | `Aborted -> + send ctx Value.Code (451, ["Requested action aborted: local error in processing"]) + | `Not_enough_memory -> + send ctx Value.Code (452, ["Requested action not taken: insufficient system storage"]) + | `Too_big -> + send ctx Value.Code (552, ["Requested mail action aborted: exceeded storage allocation"]) + | `Failed -> + send ctx Value.Code (554, ["Transaction failed"]) + | `Requested_action_not_taken `Temporary -> + send ctx Value.Code (450, ["Requested mail action not taken"]) + | `Requested_action_not_taken `Permanent -> + send ctx Value.Code (550, ["Requested mail action not taken"]) in let* () = recv ctx Value.Quit in m_politely_close ctx @@ -280,8 +293,10 @@ module Make (Monad : MONAD) = struct let* () = send ctx Value.PP_250 [ - politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr; "8BITMIME" - ; "SMTPUTF8"; Fmt.str "SIZE %Ld" info.Ptt_common.size + politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr + ; "8BITMIME" + ; "SMTPUTF8" + ; Fmt.str "SIZE %Ld" info.Ptt_common.size ] in m_relay ctx ~domain_from @@ -291,8 +306,10 @@ module Make (Monad : MONAD) = struct let* () = send ctx Value.PP_250 [ - politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr; "8BITMIME" - ; "SMTPUTF8"; Fmt.str "SIZE %Ld" info.Ptt_common.size + politely ~domain:info.Ptt_common.domain ~ipaddr:info.Ptt_common.ipaddr + ; "8BITMIME" + ; "SMTPUTF8" + ; Fmt.str "SIZE %Ld" info.Ptt_common.size ; Fmt.str "AUTH %a" Fmt.(list ~sep:(const string " ") Mechanism.pp) ms ] in m_submission ctx ~domain_from ms diff --git a/lib/messaged.ml b/lib/messaged.ml deleted file mode 100644 index 02a8ef5..0000000 --- a/lib/messaged.ml +++ /dev/null @@ -1,180 +0,0 @@ -open Colombe - -type from = Reverse_path.t * (string * string option) list -type recipient = Forward_path.t * (string * string option) list - -type key = - { domain_from: Domain.t - ; from: from - ; recipients: recipient list - ; id: int64 - ; ip: Ipaddr.t } - -let domain_from {domain_from; _} = domain_from -let from {from; _} = from -let recipients {recipients; _} = recipients -let id {id; _} = id -let ipaddr {ip; _} = ip - -let key ~domain_from ~from ~recipients ~ipaddr:ip id = - {domain_from; from; recipients; id; ip} - -let pp ppf key = - Fmt.pf ppf - "{ @[domain_from= %a;@ from= %a;@ recipients= @[%a@];@ id= \ - %Ld;@] }" - Domain.pp key.domain_from Reverse_path.pp (fst key.from) - Fmt.(Dump.list Forward_path.pp) - (List.map fst key.recipients) - key.id - -let equal_recipients a b = - let a = List.sort Forward_path.compare a in - let b = List.sort Forward_path.compare b in - let rec go a b = - match a, b with - | _ :: _, [] -> false - | [], _ :: _ -> false - | a :: ar, b :: br -> - let res = Forward_path.equal a b in - if res then go ar br else false - | [], [] -> true in - go a b - -let equal a b = - Domain.equal a.domain_from b.domain_from - && Reverse_path.equal (fst a.from) (fst b.from) - && equal_recipients (List.map fst a.recipients) (List.map fst b.recipients) - && a.id = b.id - && Ipaddr.compare a.ip b.ip = 0 - -(* -module type S = sig - type +'a s - type queue - type 'a producer = 'a option -> unit s - type 'a consumer = unit -> 'a option s - type chunk = string * int * int - type t - - val create : unit -> t - val close : queue -> unit s - val push : ?chunk:int -> t -> key -> chunk producer s - val await : t -> unit s - val pop : t -> (key * queue * chunk consumer) option s - val broadcast : t -> unit -end -*) - -let src = Logs.Src.create "ptt.messaged" - -module Log = (val Logs.src_log src) - -type t = (key * string Lwt_stream.t) Lwt_stream.t - -(* -let blit_to_bytes src src_off dst dst_off len = - Bigstringaf.blit_to_bytes src ~src_off dst ~dst_off ~len - -let blit_of_string src src_off dst dst_off len = - Bigstringaf.blit_from_string src ~src_off dst ~dst_off ~len - -(* XXX(dinosaure): preferred one writer / one reader *) -let pipe_of_queue ?(chunk = 0x1000) queue = - if chunk <= 0 then - Fmt.invalid_arg "stream_of_queue: chunk must be bigger than 0"; - - let close = ref false in - let mutex = Mutex.create () in - let condition = Condition.create () in - - let consumer () = - let rec wait () = - if Ke.is_empty queue && not !close - then Lwt_condition.wait ~mutex condition >>= wait - else Lwt.return_unit in - Lwt_mutex.with_lock mutex @@ fun () -> - let* () = wait () in - let len = min (Ke.length queue) chunk in - - if len = 0 && !close then Lwt.return_none - else - let buf = Bytes.create chunk in - Log.debug (fun m -> m "Transmit %d byte(s) from the client." len); - Ke.N.keep_exn queue ~blit:blit_to_bytes ~length:Bytes.length ~off:0 ~len - buf; - Ke.N.shift_exn queue len; - Lwt.return_some (Bytes.unsafe_to_string buf, 0, len) in - - let rec producer = function - | None -> - Log.debug (fun m -> - m "The client finished the transmission of the message."); - Lwt_mutex.with_lock mutex @@ fun () -> - close := true; - Condition.broadcast condition (); - Lwt.return_unit - | Some (buf, off, len) as v -> - let* next = Lwt_mutex.with_lock mutex @@ fun () -> - if !close then Lwt.return_unit - else - let length = String.length in - match Ke.N.push queue ~blit:blit_of_string ~length ~off ~len buf with - | None -> - Lwt_condition.signal condition (); - Lwt.pause () >>= fun () -> Lwt.return `Retry - | Some _ -> - Lwt_condition.signal condition (); - Lwt.return `Stop in - match next with - | `Retry -> producer v - | `Stop -> Lwt.return_unit - in - {q= queue; m= mutex; c= condition; f= close}, producer, consumer - -let close queue = - Mutex.lock queue.m >>= fun () -> - queue.f := true; - Mutex.unlock queue.m; - return () - -type 'a producer = 'a option -> unit IO.t -type 'a consumer = unit -> 'a option IO.t -type chunk = string * int * int - -type t = - { q: (key * queue * chunk consumer) Queue.t - ; m: Lwt_mutex.t - ; c: unit Lwt_condition.t } - -let create () = - {q= Queue.create (); m= Lwt_mutex.create (); c= Lwt_condition.create ()} - -let push ?chunk t key = - let queue, _ = Ke.create ~capacity:0x1000 Bigarray.Char in - let queue, producer, consumer = pipe_of_queue ?chunk queue in - let ( let* ) = Lwt.bind in - let* () = Lwt_mutex.with_lock t.m @@ fun () -> - Queue.push (key, queue, consumer) t.q; - Lwt_condition.signal t.c (); - Lwt.return_unit in - return producer - -let await t = - let rec await () = - if Queue.is_empty t.q - then Lwt_condition.wait ~mutex:t.m t.c >>= await - else Lwt.return_unit in - Lwt_mutex.lock t.m await - -let pop t = - Lwt_mutex.with_lock t.m @@ fun () -> - match Queue.pop t.q with - | key, queue, consumer -> Lwt.return_some (key, queue, consumer) - | exception _ -> Lwt.return_none - -let broadcast t = - Lwt_mutex.with_lock t.m @@ fun () -> - Lwt_condition.broadcast t.c (); - Lwt.return_unit -*) diff --git a/lib/messaged.mli b/lib/messaged.mli deleted file mode 100644 index 03439ea..0000000 --- a/lib/messaged.mli +++ /dev/null @@ -1,64 +0,0 @@ -open Colombe - -type from = Reverse_path.t * (string * string option) list -type recipient = Forward_path.t * (string * string option) list -type key - -val domain_from : key -> Domain.t -val from : key -> from -val recipients : key -> recipient list -val id : key -> int64 -val ipaddr : key -> Ipaddr.t -val pp : key Fmt.t -val equal : key -> key -> bool - -val key : - domain_from:Domain.t - -> from:from - -> recipients:recipient list - -> ipaddr:Ipaddr.t - -> int64 - -> key - -type t = (key * string Lwt_stream.t) Lwt_stream.t - -(* -module type S = sig - type +'a s - type queue - type 'a producer = 'a option -> unit s - type 'a consumer = unit -> 'a option s - type chunk = string * int * int - - type t - (** The type of the dispatcher. *) - - val create : unit -> t - (** [create ()] creates a message dispatcher. *) - - val close : queue -> unit s - (** [close queue] notifies [queue] to be closed. Then, {!producer} will - {i eat} anything an {!consumer} will return as soon as possible [None]. *) - - val push : ?chunk:int -> t -> key -> chunk producer s - (** [push ?chunk messaged key] adds a new message into the dispatcher and give - to the caller the {!producer} to fill the message asynchronously. *) - - val await : t -> unit s - (** [await t] awaits a new message from the dispatcher. - - {b Note.} by design, a server should never stop, [await] can infinitely - blocks the process as long as there is no message available in [t]. *) - - val pop : t -> (key * queue * chunk consumer) option s - (** [pop t] returns next message available in [t]. It gives the identifier, - the queue which contains the mail and a safe-threaded consumer. *) - - (** / **) - - val broadcast : t -> unit -end - -module Make (Scheduler : SCHEDULER) (IO : IO with type 'a t = 'a Scheduler.s) : - S with type +'a s = 'a IO.t -*) diff --git a/lib/msgd.ml b/lib/msgd.ml new file mode 100644 index 0000000..89f9ab2 --- /dev/null +++ b/lib/msgd.ml @@ -0,0 +1,74 @@ +open Colombe + +type from = Reverse_path.t * (string * string option) list +type recipient = Forward_path.t * (string * string option) list + +type key = + { domain_from: Domain.t + ; from: from + ; recipients: recipient list + ; id: int64 + ; ip: Ipaddr.t } + +let domain_from {domain_from; _} = domain_from +let from {from; _} = from +let recipients {recipients; _} = recipients +let id {id; _} = id +let ipaddr {ip; _} = ip + +let key ~domain_from ~from ~recipients ~ipaddr:ip id = + {domain_from; from; recipients; id; ip} + +let pp ppf key = + Fmt.pf ppf + "{ @[domain_from= %a;@ from= %a;@ recipients= @[%a@];@ id= \ + %Ld;@] }" + Domain.pp key.domain_from Reverse_path.pp (fst key.from) + Fmt.(Dump.list Forward_path.pp) + (List.map fst key.recipients) + key.id + +let equal_recipients a b = + let a = List.sort Forward_path.compare a in + let b = List.sort Forward_path.compare b in + let rec go a b = + match a, b with + | _ :: _, [] -> false + | [], _ :: _ -> false + | a :: ar, b :: br -> + let res = Forward_path.equal a b in + if res then go ar br else false + | [], [] -> true in + go a b + +let equal a b = + Domain.equal a.domain_from b.domain_from + && Reverse_path.equal (fst a.from) (fst b.from) + && equal_recipients (List.map fst a.recipients) (List.map fst b.recipients) + && a.id = b.id + && Ipaddr.compare a.ip b.ip = 0 + +let src = Logs.Src.create "ptt.messaged" + +module Log = (val Logs.src_log src) + +type error = + [ `Aborted + | `Not_enough_memory + | `Too_big + | `Failed + | `Requested_action_not_taken of [ `Temporary | `Permanent ] ] + +type result = [ error | `Ok ] + +let pp_error ppf = function + | `Aborted -> Fmt.string ppf "Aborted" + | `Not_enough_memory -> Fmt.string ppf "Not enough memory" + | `Too_big -> Fmt.string ppf "Email too big" + | `Failed -> Fmt.string ppf "Failed" + | `Requested_action_not_taken `Temporary -> + Fmt.string ppf "Requested action not taken (temporary)" + | `Requested_action_not_taken `Permanent -> + Fmt.string ppf "Requested action not taken (permanent)" + +type t = (key * string Lwt_stream.t * result Lwt.u) Lwt_stream.t diff --git a/lib/msgd.mli b/lib/msgd.mli new file mode 100644 index 0000000..2be1913 --- /dev/null +++ b/lib/msgd.mli @@ -0,0 +1,34 @@ +open Colombe + +type from = Reverse_path.t * (string * string option) list +type recipient = Forward_path.t * (string * string option) list +type key + +val domain_from : key -> Domain.t +val from : key -> from +val recipients : key -> recipient list +val id : key -> int64 +val ipaddr : key -> Ipaddr.t +val pp : key Fmt.t +val equal : key -> key -> bool + +val key : + domain_from:Domain.t + -> from:from + -> recipients:recipient list + -> ipaddr:Ipaddr.t + -> int64 + -> key + +type error = + [ `Aborted + | `Not_enough_memory + | `Too_big + | `Failed + | `Requested_action_not_taken of [ `Temporary | `Permanent ] ] + +type result = [ error | `Ok ] + +type t = (key * string Lwt_stream.t * result Lwt.u) Lwt_stream.t + +val pp_error : error Fmt.t diff --git a/lib/mti_gf.ml b/lib/mti_gf.ml index 6c06547..d66b438 100644 --- a/lib/mti_gf.ml +++ b/lib/mti_gf.ml @@ -32,8 +32,9 @@ struct Dns_client.gethostbyname6 dns domain_name >|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in Lwt.all [ ipv4; ipv6 ] >|= function - | [ _; (Ok _ as ipv6) ] -> ipv6 - | [ (Ok _ as ipv4); Error _ ] -> ipv4 + | [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ] + | [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ] + | [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ] | [ (Error _ as err); _ ] -> err | [] | [_] | _ :: _ :: _ -> assert false in { getmxbyname; gethostbyname } @@ -65,12 +66,12 @@ struct let rec go () = Lwt_stream.get ic >>= function | None -> oc None; Lwt.return_unit - | Some (key, stream) -> - let recipients = Ptt.Messaged.recipients key in + | Some (key, stream, wk) -> + let recipients = Ptt.Msgd.recipients key in let recipients = List.map fst recipients in let recipients = Ptt_map.expand ~info map recipients in let recipients = Ptt_aggregate.to_recipients ~info recipients in - let id = Ptt_common.id_to_messageID ~info (Ptt.Messaged.id key) in + let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in let elts = List.map (fun recipients -> (* TODO(dinosaure): Can we use multiple MAIL FROM to keep the original sender? We actually force to be @@ -83,6 +84,7 @@ struct ; policies= [] ; id }) recipients in List.iter (oc $ Option.some) elts; + Lwt.wakeup_later wk `Ok; Lwt.pause () >>= go in go () diff --git a/lib/mxs.mli b/lib/mxs.mli index 809ca8b..5d932a8 100644 --- a/lib/mxs.mli +++ b/lib/mxs.mli @@ -4,5 +4,5 @@ val pp_key : key Fmt.t include Map.S with type key := key -val v : preference:int -> domain:[ `host ] Domain_name.t -> Ipaddr.t -> Ipaddr.t t -val vs : (Dns.Mx.t * Ipaddr.t) list -> Ipaddr.t t +val v : preference:int -> domain:[ `host ] Domain_name.t -> Ipaddr.t list -> Ipaddr.t list t +val vs : (Dns.Mx.t * Ipaddr.t list) list -> Ipaddr.t list t diff --git a/lib/nec.ml b/lib/nec.ml index a64a644..113be48 100644 --- a/lib/nec.ml +++ b/lib/nec.ml @@ -32,8 +32,9 @@ struct Dns_client.gethostbyname6 dns domain_name >|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in Lwt.all [ ipv4; ipv6 ] >|= function - | [ _; (Ok _ as ipv6) ] -> ipv6 - | [ (Ok _ as ipv4); Error _ ] -> ipv4 + | [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ] + | [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ] + | [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ] | [ (Error _ as err); _ ] -> err | [] | [_] | _ :: _ :: _ -> assert false in { getmxbyname; gethostbyname } @@ -63,7 +64,7 @@ struct let rec go () = Lwt_stream.get ic >>= function | None -> oc None; Lwt.return_unit - | Some (key, stream) -> + | Some (key, stream, wk) -> let sign_and_transmit () = Lwt.catch (fun () -> let consumer = @@ -73,12 +74,12 @@ struct >>= fun (_signed, consumer) -> let stream = Lwt_stream.from consumer in let stream = Lwt_stream.map (fun (str, off, len) -> String.sub str off len) stream in - let sender, _ = Ptt.Messaged.from key in - let recipients = Ptt.Messaged.recipients key in + let sender, _ = Ptt.Msgd.from key in + let recipients = Ptt.Msgd.recipients key in let recipients = List.map fst recipients in let recipients = Ptt_map.expand ~info map recipients in let recipients = Ptt_aggregate.to_recipients ~info recipients in - let id = Ptt_common.id_to_messageID ~info (Ptt.Messaged.id key) in + let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in let elts = List.map (fun recipients -> { Ptt_sendmail.sender ; recipients @@ -86,6 +87,7 @@ struct ; policies= [] ; id }) recipients in List.iter (oc $ Option.some) elts; + Lwt.wakeup_later wk `Ok; Lwt.return_unit) @@ fun exn -> Log.err (fun m -> m "Impossible to sign the incoming email: %S" (Printexc.to_string exn)); diff --git a/lib/ptt.ml b/lib/ptt.ml index 9c994b8..c2ba1c8 100644 --- a/lib/ptt.ml +++ b/lib/ptt.ml @@ -2,7 +2,7 @@ module SMTP = SMTP module SSMTP = SSMTP module Submission = Submission module Relay = Relay -module Messaged = Messaged +module Msgd = Msgd module Authentication = Authentication module Mechanism = Mechanism module Logic = Logic diff --git a/lib/ptt_common.ml b/lib/ptt_common.ml index 0e3598c..8b6b87d 100644 --- a/lib/ptt_common.ml +++ b/lib/ptt_common.ml @@ -15,7 +15,7 @@ type ('dns, 'a) getmxbyname = type ('dns, 'a) gethostbyname = 'dns -> [ `host ] Domain_name.t - -> (Ipaddr.t, [> `Msg of string ] as 'a) result Lwt.t + -> (Ipaddr.t list, [> `Msg of string ] as 'a) result Lwt.t type 'dns resolver = { getmxbyname : 'a. ('dns, 'a) getmxbyname diff --git a/lib/ptt_fake_dns.ml b/lib/ptt_fake_dns.ml index 7584713..514355a 100644 --- a/lib/ptt_fake_dns.ml +++ b/lib/ptt_fake_dns.ml @@ -10,7 +10,7 @@ module Make (Destination : sig val ipaddr : Ipaddr.t end) = struct [ `Plaintext of Ipaddr.t * int | `Tls of Tls.Config.client * Ipaddr.t * int ] - let create ?nameservers:_ ~timeout:_ _ = assert false + let create ?nameservers:_ ~timeout:_ _ = () let nameservers _ = `Tcp, [] let rng _ = String.empty let clock _ = 0L diff --git a/lib/ptt_flow.ml b/lib/ptt_flow.ml index 076a2ea..63af715 100644 --- a/lib/ptt_flow.ml +++ b/lib/ptt_flow.ml @@ -1,3 +1,7 @@ +let src = Logs.Src.create "ptt.flow" + +module Log = (val Logs.src_log src) + open Colombe.Sigs open Colombe.State open Colombe @@ -73,6 +77,10 @@ module Make (Flow : Mirage_flow.S) = struct and wr flow buf off len = inj (Flow'.send flow buf off len) in { Colombe.Sigs.rd; wr } + let pp_data buffer off _ ppf = function + | `End -> Fmt.string ppf "" + | `Len len -> Fmt.pf ppf "@[%a@]" (Hxd_string.pp Hxd.default) (Bytes.sub_string buffer off len) + let run : type s flow. s impl -> @@ -85,8 +93,12 @@ module Make (Flow : Mirage_flow.S) = struct let rec go = function | Read { buffer; off; len; k } -> - rdwr.rd flow buffer off len >>= fun v -> go (k v) + rdwr.rd flow buffer off len >>= fun v -> + Log.debug (fun m -> m "-> %a" (pp_data buffer off len) v); + go (k v) | Write { buffer; off; len; k } -> + Log.debug (fun m -> m "<- @[%a@]" + (Hxd_string.pp Hxd.default) (String.sub buffer off len)); rdwr.wr flow buffer off len >>= fun () -> go (k len) | Return v -> return (Ok v) | Error err -> return (Error err : ('a, 'err) result) in diff --git a/lib/ptt_map.ml b/lib/ptt_map.ml index 2a7e2be..8bd0b36 100644 --- a/lib/ptt_map.ml +++ b/lib/ptt_map.ml @@ -33,6 +33,8 @@ let exists_as_sender sender ~info t = Colombe.Domain.equal domain info.Ptt_common.domain && Hashtbl.mem t.map local +let exists ~local t = Hashtbl.mem t.map local + let recipients ~local {map; _} = Hashtbl.find_opt map local |> Option.value ~default:[] diff --git a/lib/ptt_map.mli b/lib/ptt_map.mli index 8918807..b0fcad7 100644 --- a/lib/ptt_map.mli +++ b/lib/ptt_map.mli @@ -6,6 +6,7 @@ val empty : postmaster:Emile.mailbox -> t val add : local:local -> Emile.mailbox -> t -> unit val exists_as_sender : Colombe.Reverse_path.t -> info:Ptt_common.info -> t -> bool val recipients : local:local -> t -> Colombe.Forward_path.t list +val exists : local:local -> t -> bool val all : t -> Colombe.Forward_path.t list val expand : diff --git a/lib/ptt_sendmail.ml b/lib/ptt_sendmail.ml index ee14759..3884fb4 100644 --- a/lib/ptt_sendmail.ml +++ b/lib/ptt_sendmail.ml @@ -32,6 +32,16 @@ and policy = [ `Ignore ] [@@@warning "+30"] +let pp_recipients ppf { domain; locals } = + let pp_domain ppf = function + | `Ipaddr (Ipaddr.V4 ipv4) -> Fmt.pf ppf "[%a]" Ipaddr.V4.pp ipv4 + | `Ipaddr (Ipaddr.V6 ipv6) -> Fmt.pf ppf "[IPv6:%a]" Ipaddr.V6.pp ipv6 + | `Domain domain_name -> Domain_name.pp ppf domain_name in + match locals with + | `All -> Fmt.pf ppf "<%a>" pp_domain domain + | `Postmaster -> Fmt.pf ppf "Postmaster@%a" pp_domain domain + | `Some locals -> Fmt.pf ppf "%a@%a" Fmt.(Dump.list Emile.pp_local) locals pp_domain domain + let warn_about_an_unreachable_mail_exchange ~domain ~mail_exchange msg = Log.warn @@ fun m -> m "Impossible to resolve %a, a mail exchange server for %a: %s" Domain_name.pp mail_exchange Domain_name.pp domain msg @@ -93,9 +103,9 @@ module Make we must use an IP address as a destination to avoid the resolution mechanism of happy-eyeballs! *) - let sendmail ?(last_option= false) he t ~ipaddr elt = + let sendmail ?(last_option= false) he t ~ipaddrs elt = let ( let* ) = Lwt.bind in - let destination = Ipaddr.to_string ipaddr in + let destination = `Ipaddrs ipaddrs in let backup = Lwt_stream.clone elt.data in let consumed, stream = to_stream elt.data in let recipients = recipients_to_forward_paths elt.recipients in @@ -115,12 +125,22 @@ module Make | Ok (), _ -> Lwt.return `Ok | Error _, false -> Lwt.return `Retry | Error err, true -> + let debug = Lwt_stream.clone backup in + let* debug = Lwt_stream.to_list debug in + let debug = String.concat "" debug in + Log.debug (fun m -> m "Incoming bad email:"); + Log.debug (fun m -> m "@[%a@]" (Hxd_string.pp Hxd.default) debug); let* forward_path = guess_return_path backup in Lwt.return (`Errored (forward_path, err)) else match result with | Ok () -> Lwt.return `Ok | Error _ when List.exists ((=) `Ignore) elt.policies -> Lwt.return `Ok | Error err -> + let debug = Lwt_stream.clone backup in + let* debug = Lwt_stream.to_list debug in + let debug = String.concat "" debug in + Log.debug (fun m -> m "Incoming bad email:"); + Log.debug (fun m -> m "@[%a@]" (Hxd_string.pp Hxd.default) debug); let* forward_path = guess_return_path backup in Lwt.return (`Errored (forward_path, err)) @@ -136,7 +156,7 @@ module Make Fmt.(list ~sep:(any ",") Colombe.Forward_path.pp) recipients Colombe.Reverse_path.pp elt.sender); Lwt.return_unit - | Some _forward_path -> assert false (* TODO *) + | Some _forward_path -> Lwt.return_unit (* TODO *) let pp_error ppf = function | #Sendmail_with_starttls.error as err -> @@ -154,7 +174,7 @@ module Make Colombe.Reverse_path.pp elt.sender pp_error err); Lwt.return_unit - | Some _forward_path -> assert false (* TODO *) + | Some _forward_path -> Lwt.return_unit (* TODO *) let sendmail dns he t elt = let ( let* ) = Lwt.bind in @@ -162,7 +182,7 @@ module Make begin match elt.recipients.domain with | `Ipaddr ipaddr -> let domain = Ipaddr.to_domain_name ipaddr in - Lwt.return_ok Mxs.(v ~preference:0 ~domain ipaddr) + Lwt.return_ok Mxs.(v ~preference:0 ~domain [ ipaddr ]) | `Domain domain -> let* r = t.resolver.getmxbyname dns domain in match r with @@ -171,7 +191,7 @@ module Make begin fun acc ({ Dns.Mx.mail_exchange; _ } as mx) -> let* r = t.resolver.gethostbyname dns mail_exchange in match r with - | Ok ipaddr -> Lwt.return ((mx, ipaddr) :: acc) + | Ok ipaddrs -> Lwt.return ((mx, ipaddrs) :: acc) | Error (`Msg msg) -> warn_about_an_unreachable_mail_exchange ~domain ~mail_exchange msg; Lwt.return acc end in @@ -190,13 +210,13 @@ module Make of [mxs] which does not do the recursion. This case should never occur. *) assert false - | [ _mx, ipaddr ] -> - let* result = sendmail ~last_option:true he t ~ipaddr elt in + | [ _mx, ipaddrs ] -> + let* result = sendmail ~last_option:true he t ~ipaddrs elt in begin match result with | `Retry | `Ok -> Lwt.return_unit | `Errored value -> error_while_sending_email elt value end - | (_mx, ipaddr) :: mxs -> - let* result = sendmail he t ~ipaddr elt in + | (_mx, ipaddrs) :: mxs -> + let* result = sendmail he t ~ipaddrs elt in match result with | `Ok -> Lwt.return_unit | `Retry -> go mxs diff --git a/lib/ptt_sendmail.mli b/lib/ptt_sendmail.mli index c920110..18253bc 100644 --- a/lib/ptt_sendmail.mli +++ b/lib/ptt_sendmail.mli @@ -36,6 +36,8 @@ and resource = bytes * bytes * (char, Bigarray.int8_unsigned_elt) Ke.Rke.t and 'a push = 'a option -> unit and policy = [ `Ignore ] +val pp_recipients : recipients Fmt.t + module Make (Clock : Mirage_clock.PCLOCK) (Stack : Tcpip.Stack.V4V6) diff --git a/lib/relay.ml b/lib/relay.ml index fd652d9..18ac0a9 100644 --- a/lib/relay.ml +++ b/lib/relay.ml @@ -12,8 +12,8 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct type server = { info: info - ; messaged: Messaged.t - ; push: ((Messaged.key * string Lwt_stream.t) option -> unit) + ; msgd: Msgd.t + ; push: ((Msgd.key * string Lwt_stream.t * Msgd.result Lwt.u) option -> unit) ; mutable count: int64} and info = Ptt_common.info = @@ -26,20 +26,20 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct let info {info; _} = info let create ~info = - let messaged, push = Lwt_stream.create () in + let msgd, push = Lwt_stream.create () in let close () = push None in - {info; messaged; push; count= 0L}, messaged, close + {info; msgd; push; count= 0L}, msgd, close let succ server = let v = server.count in server.count <- Int64.succ server.count; v - type error = [ SMTP.error | `Too_big_data | `Flow of string | `Invalid_recipients ] + type error = [ SMTP.error | Msgd.error | `Flow of string | `Invalid_recipients ] let pp_error ppf = function | #SMTP.error as err -> SMTP.pp_error ppf err - | `Too_big_data -> Fmt.pf ppf "Too big data" + | #Msgd.error as err -> Msgd.pp_error ppf err | `Flow msg -> Fmt.pf ppf "Error at the protocol level: %s" msg | `Invalid_recipients -> Fmt.string ppf "Invalid recipients" @@ -52,20 +52,30 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct let dot = ".\r\n" - let receive_mail ?(limit = 0x100000) flow ctx m bounded_stream = + let receive_mail ?(limit = 0x100000) flow ctx m push = let rec go count () = - if count >= limit then Lwt.return_error `Too_big_data + if count >= limit + then begin push None; Lwt.return_error `Too_big_data end + (* NOTE(dinosaure): [552] will be returned later. *) else run flow (m ctx) >>? function - | ".." -> bounded_stream#push dot >>= go (count + 3) - | "." -> bounded_stream#close; Lwt.return_ok () + | ".." -> push (Some dot); go (count + 3) () + | "." -> push None; Lwt.return_ok () | str -> let len = String.length str in let str = str ^ "\r\n" in - bounded_stream#push str >>= - go (count + len + 2) + push (Some str); + go (count + len + 2) () in go 0 () + + let merge from_protocol from_logic = + match from_protocol, from_logic with + | Error `Too_big_data, _ -> `Too_big + | Error `Not_enough_memory, _ -> `Not_enough_memory + | Error `End_of_input, _ -> `Aborted + | Error _, _ -> `Requested_action_not_taken `Temporary + | Ok (), value -> value let accept : ?encoder:(unit -> bytes) @@ -89,20 +99,26 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct >>= function | true -> let id = succ server in - let key = Messaged.key ~domain_from ~from ~recipients ~ipaddr id in - let stream, bounded_stream = Lwt_stream.create_bounded 0x7ff in - server.push (Some (key, stream)); + let key = Msgd.key ~domain_from ~from ~recipients ~ipaddr id in + let stream, push = Lwt_stream.create () in + let th, wk = Lwt.task () in + server.push (Some (key, stream, wk)); let m = SMTP.m_mail ctx in run flow m >>? fun () -> receive_mail ~limit:(Int64.to_int server.info.size) flow ctx SMTP.(fun ctx -> Monad.recv ctx Value.Payload) - bounded_stream - >>? fun () -> - let m = SMTP.m_end ctx in + push + >>= fun result -> + th >>= fun result' -> + let m = SMTP.m_end (merge result result') ctx in run flow m >>? fun `Quit -> - properly_close_tls flow ctx >>? fun () -> Lwt.return_ok () + properly_close_tls flow ctx >>? fun () -> + let result = match merge result result' with + | `Ok -> Ok () + | #Msgd.error as err -> Error err in + Lwt.return result | false -> let e = `Invalid_recipients in let m = SMTP.m_properly_close_and_fail ctx ~message:"No valid recipients" e in diff --git a/lib/relay.mli b/lib/relay.mli index 4ab99b0..363bd67 100644 --- a/lib/relay.mli +++ b/lib/relay.mli @@ -19,7 +19,7 @@ module Make val create : info:info - -> server * (Messaged.key * string Lwt_stream.t) Lwt_stream.t * (unit -> unit) + -> server * (Msgd.key * string Lwt_stream.t * Msgd.result Lwt.u) Lwt_stream.t * (unit -> unit) val accept : ?encoder:(unit -> bytes) diff --git a/lib/sMTP.ml b/lib/sMTP.ml index 86eb4e9..400dc97 100644 --- a/lib/sMTP.ml +++ b/lib/sMTP.ml @@ -73,7 +73,7 @@ type info = Ptt_common.info = { } type email = Logic.email = { - from: Messaged.from + from: Msgd.from ; recipients: (Forward_path.t * (string * string option) list) list ; domain_from: Domain.t } @@ -117,6 +117,12 @@ let m_relay_init ctx info = let* () = send ctx Value.PP_250 ["Yes buddy!"] in go () | `Quit -> m_politely_close ctx + | `Hello _from_domain -> + (* NOTE(dinosaure): [nstools.fr] asks [EHLO]/[HELO] two times. We must + handle it correctly. *) + incr bad; + let* () = send ctx Value.PP_250 capabilities in + go () | _ -> incr bad; let* () = diff --git a/lib/sMTP.mli b/lib/sMTP.mli index 73c2b03..f46fb32 100644 --- a/lib/sMTP.mli +++ b/lib/sMTP.mli @@ -41,7 +41,7 @@ type info = Ptt_common.info = { } type email = Logic.email = { - from: Messaged.from + from: Msgd.from ; recipients: (Forward_path.t * (string * string option) list) list ; domain_from: Domain.t } @@ -71,7 +71,16 @@ val m_relay : -> ([> `Quit | `Send of email ], [> error ]) Colombe.State.t val m_mail : context -> (unit, [> error ]) Colombe.State.t -val m_end : context -> ([> `Quit ], [> error ]) Colombe.State.t + +val m_end : + [ `Aborted + | `Not_enough_memory + | `Too_big + | `Failed + | `Requested_action_not_taken of [ `Temporary | `Permanent ] + | `Ok ] + -> context + -> ([> `Quit ], [> error ]) Colombe.State.t val m_relay_init : context diff --git a/lib/sSMTP.ml b/lib/sSMTP.ml index 2b46868..f272791 100644 --- a/lib/sSMTP.ml +++ b/lib/sSMTP.ml @@ -98,7 +98,7 @@ type info = Ptt_common.info = { } type email = Logic.email = { - from: Messaged.from + from: Msgd.from ; recipients: (Forward_path.t * (string * string option) list) list ; domain_from: Domain.t } diff --git a/lib/sSMTP.mli b/lib/sSMTP.mli index 4c8f0d4..a245c6c 100644 --- a/lib/sSMTP.mli +++ b/lib/sSMTP.mli @@ -16,9 +16,9 @@ module Monad : module type of State.Scheduler (Context) (Value) type context = Context.t type error = - [ `Protocol of Value.error + [ `No_recipients + | `Protocol of Value.error | `Too_many_bad_commands - | `No_recipients | `Too_many_recipients ] val pp_error : error Fmt.t @@ -32,7 +32,7 @@ type info = Ptt_common.info = { } type email = Logic.email = { - from: Messaged.from + from: Msgd.from ; recipients: (Forward_path.t * (string * string option) list) list ; domain_from: Domain.t } @@ -62,7 +62,16 @@ val m_relay : -> ([> `Quit | `Send of email ], [> error ]) Colombe.State.t val m_mail : context -> (unit, [> error ]) Colombe.State.t -val m_end : context -> ([> `Quit ], [> error ]) Colombe.State.t + +val m_end : + [ `Aborted + | `Not_enough_memory + | `Too_big + | `Failed + | `Requested_action_not_taken of [ `Temporary | `Permanent ] + | `Ok ] + -> context + -> ([> `Quit ], [> error ]) Colombe.State.t val m_relay_init : context diff --git a/lib/spartacus.ml b/lib/spartacus.ml index 983adba..334456b 100644 --- a/lib/spartacus.ml +++ b/lib/spartacus.ml @@ -32,8 +32,9 @@ struct Dns_client.gethostbyname6 dns domain_name >|= Result.map (fun ipv6 -> Ipaddr.V6 ipv6) in Lwt.all [ ipv4; ipv6 ] >|= function - | [ _; (Ok _ as ipv6) ] -> ipv6 - | [ (Ok _ as ipv4); Error _ ] -> ipv4 + | [ Ok ipv4; Ok ipv6 ] -> Ok [ ipv4; ipv6 ] + | [ (Ok ipv4); Error _ ] -> Ok [ ipv4 ] + | [ Error _; (Ok ipv6) ] -> Ok [ ipv6 ] | [ (Error _ as err); _ ] -> err | [] | [_] | _ :: _ :: _ -> assert false in { getmxbyname; gethostbyname } @@ -65,7 +66,7 @@ struct let rec go () = Lwt_stream.get ic >>= function | None -> oc None; Lwt.return_unit - | Some (key, stream) -> + | Some (key, stream, wk) -> let filter () = let backup = Lwt_stream.clone stream in Spamtacus_mirage.rank stream >>= function @@ -75,12 +76,12 @@ struct | Ok (_label, stream) -> Lwt.return stream in filter () >>= fun stream -> - let sender, _ = Ptt.Messaged.from key in - let recipients = Ptt.Messaged.recipients key in + let sender, _ = Ptt.Msgd.from key in + let recipients = Ptt.Msgd.recipients key in let recipients = List.map fst recipients in let recipients = Ptt_map.expand ~info map recipients in let recipients = Ptt_aggregate.to_recipients ~info recipients in - let id = Ptt_common.id_to_messageID ~info (Ptt.Messaged.id key) in + let id = Ptt_common.id_to_messageID ~info (Ptt.Msgd.id key) in let elts = List.map (fun recipients -> { Ptt_sendmail.sender ; recipients @@ -88,6 +89,7 @@ struct ; policies= [] ; id }) recipients in List.iter (oc $ Option.some) elts; + Lwt.wakeup_later wk `Ok; Lwt.pause () >>= go in go () diff --git a/lib/submission.ml b/lib/submission.ml index cebc75f..71beff2 100644 --- a/lib/submission.ml +++ b/lib/submission.ml @@ -14,8 +14,8 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct type 'k server = { info: info - ; messaged: Messaged.t - ; push: ((Messaged.key * string Lwt_stream.t) option -> unit) + ; msgd: Msgd.t + ; push: ((Msgd.key * string Lwt_stream.t * Msgd.result Lwt.u) option -> unit) ; mechanisms: Mechanism.t list ; authenticator: 'k Authentication.t ; mutable count: int64 } @@ -30,9 +30,9 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct let info {info; _} = info let create ~info ~authenticator mechanisms = - let messaged, push = Lwt_stream.create () in + let msgd, push = Lwt_stream.create () in let close () = push None in - {info; messaged; push; mechanisms; authenticator; count= 0L}, messaged, close + {info; msgd; push; mechanisms; authenticator; count= 0L}, msgd, close let succ server = let v = server.count in @@ -41,10 +41,11 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct type error = [ SSMTP.error - | `Too_big_data + | Msgd.error | `Too_many_tries | `Flow of string - | `Invalid_recipients ] + | `Invalid_recipients + | `End_of_input ] type 'err runner = Runner : { run : 'a. 'flow -> ('a, 'err) Colombe.State.t -> ('a, 'err) result Lwt.t @@ -54,10 +55,11 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct let pp_error ppf = function | #SSMTP.error as err -> SSMTP.pp_error ppf err - | `Too_big_data -> Fmt.pf ppf "Too big data" + | #Msgd.error as err -> Msgd.pp_error ppf err | `Too_many_tries -> Fmt.pf ppf "Too many tries" | `Flow msg -> Fmt.pf ppf "Error at the protocol level: %s" msg | `Invalid_recipients -> Fmt.string ppf "Invalid recipients" + | `End_of_input -> Fmt.string ppf "End of input" let authentication ctx ~domain_from (Runner { run; flow; }) random hash server ?payload mechanism = @@ -140,22 +142,31 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct let dot = ".\r\n" - let receive_mail ?(limit = 0x100000) (Runner { run; flow}) ctx m bounded_stream = + let receive_mail ?(limit = 0x100000) (Runner { run; flow}) ctx m push = let rec go count () = - if count >= limit then Lwt.return_error `Too_big_data + if count >= limit + then begin push None; Lwt.return_error `Too_big end else run flow (m ctx) >>? function - | ".." -> bounded_stream#push dot >>= go (count + 3) - | "." -> bounded_stream#close; Lwt.return_ok () + | ".." -> push (Some dot); go (count + 3) () + | "." -> push None; Lwt.return_ok () | str -> let len = String.length str in let str = str ^ "\r\n" in - bounded_stream#push str >>= - go (count + len + 2) + push (Some str); + go (count + len + 2) () in go 0 () - let accept : + let merge from_protocol from_logic = + match from_protocol, from_logic with + | Error `Too_big, _ -> `Too_big + | Error `Not_enough_memory, _ -> `Not_enough_memory + | Error `End_of_input, _ -> `Aborted + | Error _, _ -> `Requested_action_not_taken `Temporary + | Ok (), value -> value + + let accept_without_starttls : ?encoder:(unit -> bytes) -> ?decoder:(unit -> bytes) -> ipaddr:Ipaddr.t @@ -201,20 +212,26 @@ module Make (Stack : Tcpip.Stack.V4V6) = struct let from = let sender = Colombe.Path.{ local= user; domain= server.info.SSMTP.domain; rest= [] } in Some sender, snd from in - let key = Messaged.key ~domain_from ~from ~recipients ~ipaddr id in - let stream, bounded_stream = Lwt_stream.create_bounded 0x7ff in - server.push (Some (key, stream)); + let key = Msgd.key ~domain_from ~from ~recipients ~ipaddr id in + let stream, push = Lwt_stream.create () in + let th, wk = Lwt.task () in + server.push (Some (key, stream, wk)); let m = SSMTP.m_mail ctx in run flow m >>? fun () -> - Log.debug (fun m -> m "Start to receive the incoming email."); receive_mail ~limit:(Int64.to_int server.info.size) runner ctx SSMTP.(fun ctx -> Monad.recv ctx Value.Payload) - bounded_stream - >>? fun () -> - let m = SSMTP.m_end ctx in - run flow m >>? fun `Quit -> Lwt.return_ok () end + push + >>= fun result -> + Log.debug (fun m -> m "Email received, waiting result from the logic"); + th >>= fun result' -> + let m = SSMTP.m_end (merge result result') ctx in + run flow m >>? fun `Quit -> + let result = match merge result result' with + | `Ok -> Ok () + | #Msgd.error as err -> Error err in + Lwt.return result end | false -> let e = `Invalid_recipients in let m = SSMTP.m_properly_close_and_fail ctx ~message:"No valid recipients" e in diff --git a/lib/submission.mli b/lib/submission.mli index ccb9716..5526645 100644 --- a/lib/submission.mli +++ b/lib/submission.mli @@ -20,9 +20,9 @@ module Make (Stack : Tcpip.Stack.V4V6) : sig info:info -> authenticator:'k Authentication.t -> Mechanism.t list - -> 'k server * (Messaged.key * string Lwt_stream.t) Lwt_stream.t * (unit -> unit) + -> 'k server * (Msgd.key * string Lwt_stream.t * Msgd.result Lwt.u) Lwt_stream.t * (unit -> unit) - val accept : + val accept_without_starttls : ?encoder:(unit -> bytes) -> ?decoder:(unit -> bytes) -> ipaddr:Ipaddr.t diff --git a/test/test.ml b/test/test.ml index ebe47b6..bf1ab6c 100644 --- a/test/test.ml +++ b/test/test.ml @@ -539,7 +539,7 @@ module Sendmail = Sendmail_mirage.Make let sendmail he ipaddr port ~domain sender recipients contents = let open Lwt.Infix in - let destination = Fmt.str "%a" Ipaddr.pp ipaddr in + let destination = `Ipaddrs [ ipaddr ] in let stream = Lwt_stream.of_list contents in let stream = Lwt_stream.map (fun str -> str ^ "\r\n") stream in let mail () = @@ -553,7 +553,7 @@ let sendmail he ipaddr port ~domain sender recipients contents = | Error (#Sendmail_with_starttls.error as err) -> Fmt.failwith "%a" Sendmail_with_starttls.pp_error err -let key = Alcotest.testable Ptt.Messaged.pp Ptt.Messaged.equal +let key = Alcotest.testable Ptt.Msgd.pp Ptt.Msgd.equal let full_test_0 = Alcotest_lwt.test_case "Receive one email from Anil" `Quick @@ fun _sw () -> @@ -592,13 +592,18 @@ let full_test_0 = [ "From: anil@recoil.org" ; "Subject: SMTP server, PLZ!" ; "" - ; "Hello World!" ] >>= fun () -> - Logs.debug (fun m -> m "Close the SMTP server"); - Lwt_switch.turn_off stop in - Lwt.join [ sendmail; th ] >>= fun () -> - Lwt_stream.to_list stream >|= List.map fst >>= fun inbox -> + ; "Hello World!" ] + >>= fun () -> Lwt_switch.turn_off stop + >|= fun () -> `Done in + let fold (key, _, wk) acc = + let acc = match acc with `Done -> [] | `Inbox acc -> acc in + Lwt.wakeup_later wk `Ok; + Lwt.return (`Inbox (key :: acc)) in + Lwt.all [ sendmail; (th >|= fun () -> `Done) + ; Lwt_stream.fold_s fold stream (`Inbox []) ] >>= fun results -> + let[@warning "-8"] [ `Done; `Done; `Inbox inbox ] = results in Alcotest.(check (list key)) "inbox" inbox - [ Ptt.Messaged.key ~domain_from:recoil ~from:(anil, []) + [ Ptt.Msgd.key ~domain_from:recoil ~from:(anil, []) ~recipients:[romain_calascibetta, []] ~ipaddr:(Ipaddr.V4 Ipaddr.V4.localhost) 0L ]; Lwt.return_unit @@ -652,15 +657,21 @@ let full_test_1 = ; "Subject: SMTP server, PLZ!" ; "" ; "Hello World!" ] - >>= fun () -> Lwt_switch.turn_off stop in - Lwt.join [ sendmail; th ] >>= fun () -> - Lwt_stream.to_list stream >|= List.map fst >|= List.rev >>= fun inbox -> + >>= fun () -> Lwt_switch.turn_off stop + >|= fun () -> `Done in + let fold (key, _, wk) acc = + let acc = match acc with `Done -> [] | `Inbox acc -> acc in + Lwt.wakeup_later wk `Ok; + Lwt.return (`Inbox (key :: acc)) in + Lwt.all [ sendmail; (th >|= fun () -> `Done) + ; Lwt_stream.fold_s fold stream `Done ] >>= fun results -> + let[@warning "-8"] [ `Done; `Done; `Inbox inbox ] = results in Alcotest.(check (list key)) "inbox" inbox - [ Ptt.Messaged.key ~domain_from:gazagnaire ~from:(thomas, []) + [ Ptt.Msgd.key ~domain_from:gazagnaire ~from:(thomas, []) ~recipients:[romain_calascibetta, []] ~ipaddr:(Ipaddr.V4 Ipaddr.V4.localhost) 1L - ; Ptt.Messaged.key ~domain_from:recoil ~from:(anil, []) + ; Ptt.Msgd.key ~domain_from:recoil ~from:(anil, []) ~recipients:[romain_calascibetta, []] ~ipaddr:(Ipaddr.V4 Ipaddr.V4.localhost) 0L ]; Lwt.return_unit