diff --git a/src/core/builtins/builtins_ffmpeg_encoder.ml b/src/core/builtins/builtins_ffmpeg_encoder.ml index 70f92cb4f6..c4af6e06b3 100644 --- a/src/core/builtins/builtins_ffmpeg_encoder.ml +++ b/src/core/builtins/builtins_ffmpeg_encoder.ml @@ -35,12 +35,17 @@ module InternalScaler = Swscale.Make (Swscale.BigArray) (Swscale.Frame) type source_idx = { source : Source.source; idx : int64 } -module SourceIdx = Weak.Make (struct - type t = source_idx +module SourceIdx = struct + include Weak.Make (struct + type t = source_idx - let equal x y = x.source == y.source - let hash x = Obj.magic x.source -end) + let equal x y = x.source == y.source + let hash x = Oo.id x.source + end) + + let create n = (create n, Mutex.create ()) + let merge (c, m) v = Mutex.mutexify m (fun () -> merge c v) () +end let source_idx_map = SourceIdx.create 0 diff --git a/src/core/converters/video/ffmpeg_video_converter.ml b/src/core/converters/video/ffmpeg_video_converter.ml index 4a1d9ef823..3073a502a1 100644 --- a/src/core/converters/video/ffmpeg_video_converter.ml +++ b/src/core/converters/video/ffmpeg_video_converter.ml @@ -76,16 +76,21 @@ module WH = struct (* Number of converters to always keep in memory. *) let n = 2 let keep = Array.make n None - - let add h fmt conv = - let conv = (fmt, Some conv) in - for i = 1 to n - 1 do - keep.(i - 1) <- keep.(i) - done; - keep.(n - 1) <- Some conv; - add h conv - - let assoc h fmt = Option.get (snd (find h (fmt, None))) + let create n = (create n, Mutex.create ()) + + let add (h, m) fmt conv = + Mutex.mutexify m + (fun () -> + let conv = (fmt, Some conv) in + for i = 1 to n - 1 do + keep.(i - 1) <- keep.(i) + done; + keep.(n - 1) <- Some conv; + add h conv) + () + + let assoc (h, m) fmt = + Mutex.mutexify m (fun () -> Option.get (snd (find h (fmt, None)))) () end (* Weak hashtable containing converters already created. *) diff --git a/src/core/encoder/encoders/ffmpeg_encoder_common.ml b/src/core/encoder/encoders/ffmpeg_encoder_common.ml index e31a84acd5..33e67feb74 100644 --- a/src/core/encoder/encoders/ffmpeg_encoder_common.ml +++ b/src/core/encoder/encoders/ffmpeg_encoder_common.ml @@ -53,12 +53,18 @@ type stream_data = { mutable ready : bool; } -module Stream = Weak.Make (struct - type t = stream_data +module Stream = struct + include Weak.Make (struct + type t = stream_data - let equal x y = x.idx = y.idx - let hash x = Int64.to_int x.idx -end) + let equal x y = x.idx = y.idx + let hash x = Int64.to_int x.idx + end) + + let create n = (create n, Mutex.create ()) + let merge (c, m) v = Mutex.mutexify m (fun () -> merge c v) () + let remove (c, m) v = Mutex.mutexify m (fun () -> remove c v) () +end (* We lazily store last_start when concatenating streams. The idea is to always have the greatest diff --git a/src/core/io/srt_io.ml b/src/core/io/srt_io.ml index b6e9996632..7582bc7d8d 100644 --- a/src/core/io/srt_io.ml +++ b/src/core/io/srt_io.ml @@ -23,6 +23,7 @@ (** SRT input *) module Pcre = Re.Pcre +module WeakQueue = Liquidsoap_lang.Queues.WeakQueue exception Done exception Not_connected @@ -449,17 +450,11 @@ class virtual output_networking_agent = : Srt.socket -> exn -> Printexc.raw_backtrace -> unit end -module ToDisconnect = Liquidsoap_lang.Active_value.Make (struct - type t = < disconnect : unit ; srt_id : int > - - let id t = t#srt_id -end) - -let to_disconnect = ToDisconnect.create 10 +let to_disconnect = WeakQueue.create () let () = Lifecycle.on_core_shutdown ~name:"Srt disconnect" (fun () -> - ToDisconnect.iter (fun s -> s#disconnect) to_disconnect) + WeakQueue.iter to_disconnect (fun s -> s#disconnect)) class virtual caller ~enforced_encryption ~pbkeylen ~passphrase ~streamid ~polling_delay ~payload_size ~messageapi ~hostname ~port ~connection_timeout @@ -470,7 +465,7 @@ class virtual caller ~enforced_encryption ~pbkeylen ~passphrase ~streamid val mutable connect_task = None val task_should_stop = Atomic.make false val socket = Atomic.make None - initializer ToDisconnect.add to_disconnect (self :> ToDisconnect.data) + initializer WeakQueue.push to_disconnect (self :> < disconnect : unit >) method private get_socket = match Atomic.get socket with Some s -> s | None -> raise Not_connected @@ -557,7 +552,7 @@ class virtual listener ~enforced_encryption ~pbkeylen ~passphrase ~max_clients method virtual should_stop : bool method virtual mutexify : 'a 'b. ('a -> 'b) -> 'a -> 'b val listening_socket = Atomic.make None - initializer ToDisconnect.add to_disconnect (self :> ToDisconnect.data) + initializer WeakQueue.push to_disconnect (self :> < disconnect : unit >) method private is_connected = self#mutexify (fun () -> client_sockets <> []) () diff --git a/src/core/lang_source.ml b/src/core/lang_source.ml index 4190696d2c..0f3cb00e1d 100644 --- a/src/core/lang_source.ml +++ b/src/core/lang_source.ml @@ -21,14 +21,9 @@ *****************************************************************************) module Lang = Liquidsoap_lang.Lang +module WeakQueue = Liquidsoap_lang.Queues.WeakQueue open Lang -module Alive_values_map = Liquidsoap_lang.Active_value.Make (struct - type t = Value.t - - let id v = v.Value.id -end) - module ClockValue = struct include Value.MkAbstract (struct type content = Clock.t @@ -389,16 +384,16 @@ let to_track = Track.of_value the currently defined source as argument). *) type 'a operator_method = string * scheme * string * ('a -> value) -let checked_values = Alive_values_map.create 10 +let checked_values = WeakQueue.create () (** Ensure that the frame contents of all the sources occurring in the value agree with [t]. *) let check_content v t = let check t t' = Typing.(t <: t') in let rec check_value v t = - if not (Alive_values_map.mem checked_values v) then ( + if not (WeakQueue.exists checked_values (fun v' -> v' == v)) then ( (* We need to avoid checking the same value multiple times, otherwise we get an exponential blowup, see #1247. *) - Alive_values_map.add checked_values v; + WeakQueue.push checked_values v; match (v.Value.value, (Type.deref t).Type.descr) with | _, Type.Var _ -> () | _ when Source_val.is_value v -> @@ -656,7 +651,7 @@ let add_track_operator ~(category : Doc.Value.source) ~descr ?(flags = []) let category = `Track category in add_builtin ~category ~descr ~flags ?base name arguments return_t f -let itered_values = Alive_values_map.create 10 +let itered_values = WeakQueue.create () let iter_sources ?(on_imprecise = fun () -> ()) f v = let rec iter_term env v = @@ -700,10 +695,10 @@ let iter_sources ?(on_imprecise = fun () -> ()) f v = v.Term.methods; iter_base_term env v and iter_value v = - if not (Alive_values_map.mem itered_values v) then ( + if not (WeakQueue.exists itered_values (fun v' -> v == v')) then ( (* We need to avoid checking the same value multiple times, otherwise we get an exponential blowup, see #1247. *) - Alive_values_map.add itered_values v; + WeakQueue.push itered_values v; Value.Methods.iter (fun _ v -> iter_value v) v.Value.methods; match v.value with | _ when Source_val.is_value v -> f (Source_val.of_value v) diff --git a/src/core/tools/pool.ml b/src/core/tools/pool.ml index 2e388c8d9a..846649f48a 100644 --- a/src/core/tools/pool.ml +++ b/src/core/tools/pool.ml @@ -44,44 +44,38 @@ module type S = sig val clear : unit -> unit end +module WeakQueue = Liquidsoap_lang.Queues.WeakQueue + module Make (P : T) : S with type t = P.t = struct type t = P.t - module WeakHash = Weak.Make (struct - type t = P.t - - let equal t t' = P.id t = P.id t' - let hash = P.id - end) + let q = WeakQueue.create () - let h = WeakHash.create 10 + exception Found of t let find id = - match WeakHash.find_opt h (P.destroyed id) with - | Some v when not (P.is_destroyed v) -> Some v - | _ -> None + try + WeakQueue.iter q (fun r -> + if P.id r = id && not (P.is_destroyed r) then raise (Found r)); + None + with Found r -> Some r let fold f = - WeakHash.fold - (fun v cur -> if P.is_destroyed v then cur else f (P.id v) v cur) - h + WeakQueue.fold q (fun v cur -> + if P.is_destroyed v then cur else f (P.id v) v cur) let iter f = - WeakHash.iter - (fun entry -> if not (P.is_destroyed entry) then f (P.id entry) entry) - h + WeakQueue.iter q (fun entry -> + if not (P.is_destroyed entry) then f (P.id entry) entry) - let remove id = WeakHash.remove h (P.destroyed id) + let remove id = WeakQueue.filter q (fun r -> P.id r = id) let current_id = Atomic.make 0 let add fn = - let rec f () = - let id = Atomic.fetch_and_add current_id 1 in - let v = fn id in - match WeakHash.merge h v with v' when v == v' -> v | _ -> f () - in - f () - - let size () = WeakHash.count h - let clear () = WeakHash.clear h + let v = fn (Atomic.fetch_and_add current_id 1) in + WeakQueue.push q v; + v + + let size () = WeakQueue.length q + let clear () = WeakQueue.flush q (fun _ -> ()) end diff --git a/src/lang/active_value.ml b/src/lang/active_value.ml deleted file mode 100644 index 6198c1f8f8..0000000000 --- a/src/lang/active_value.ml +++ /dev/null @@ -1,38 +0,0 @@ -(***************************************************************************** - - Liquidsoap, a programmable stream generator. - Copyright 2003-2024 Savonet team - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details, fully stated in the COPYING - file at the root of the liquidsoap distribution. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - - *****************************************************************************) - -(* Weak hashes to track values that have not been collected. *) - -module type T = sig - type t - - val id : t -> int -end - -module Make (T : T) = struct - include Weak.Make (struct - type t = T.t - - let equal t t' = T.id t = T.id t' - let hash t = T.id t - end) -end diff --git a/src/lang/active_value.mli b/src/lang/active_value.mli deleted file mode 100644 index dfd25ebef2..0000000000 --- a/src/lang/active_value.mli +++ /dev/null @@ -1,33 +0,0 @@ -(***************************************************************************** - - Liquidsoap, a programmable stream generator. - Copyright 2003-2024 Savonet team - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details, fully stated in the COPYING - file at the root of the liquidsoap distribution. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - - *****************************************************************************) - -(* Weak hashes to track values that have not been collected. *) - -module type T = sig - type t - - val id : t -> int -end - -module Make (T : T) : sig - include Weak.S with type data = T.t -end diff --git a/src/lang/dune b/src/lang/dune index 726f19a0d5..5692604c20 100644 --- a/src/lang/dune +++ b/src/lang/dune @@ -84,7 +84,6 @@ (pps sedlex.ppx ppx_string)) (libraries liquidsoap-lang.console dune-site re str unix menhirLib) (modules - active_value backoff build_config builtins_bool diff --git a/src/lang/term/runtime_term.ml b/src/lang/term/runtime_term.ml index f9a9782a81..b9e150531a 100644 --- a/src/lang/term/runtime_term.ml +++ b/src/lang/term/runtime_term.ml @@ -19,12 +19,7 @@ type pattern = and meth_term_default = [ `Nullable | `Pattern of pattern | `None ] -type 'a term = { - mutable t : Type.t; - term : 'a; - methods : 'a term Methods.t; - id : int; -} +type 'a term = { mutable t : Type.t; term : 'a; methods : 'a term Methods.t } (* ~l1:x1 .. ?li:(xi=defi) .. *) type ('a, 'b) func_argument = { diff --git a/src/lang/term/term_base.ml b/src/lang/term/term_base.ml index 05462811c8..7de8053cda 100644 --- a/src/lang/term/term_base.ml +++ b/src/lang/term/term_base.ml @@ -22,6 +22,7 @@ (** Terms and values in the Liquidsoap language. *) +module WeakQueue = Queues.WeakQueue include Runtime_term (** An internal error. Those should not happen in theory... *) @@ -326,13 +327,9 @@ let rec to_string (v : t) = ^ "}") (** Create a new value. *) -let id = - let counter = Atomic.make 0 in - fun () -> Atomic.fetch_and_add counter 1 - let make ?pos ?t ?(methods = Methods.empty) e = let t = match t with Some t -> t | None -> Type.var ?pos () in - { t; term = e; methods; id = id () } + { t; term = e; methods } let rec free_vars_pat = function | `PVar [] -> assert false @@ -625,27 +622,15 @@ module MkAbstract (Def : AbstractDef) = struct let of_term t = match t.term with `Ground (Value c) -> c | _ -> assert false let to_term c = - { - t = Type.make T.descr; - term = `Ground (Value c); - methods = Methods.empty; - id = id (); - } + { t = Type.make T.descr; term = `Ground (Value c); methods = Methods.empty } let is_term t = match t.term with `Ground (Value _) -> true | _ -> false end -module ActiveTerm = Active_value.Make (struct - type typ = t - type t = typ - - let id { id } = id -end) - -let active_terms = ActiveTerm.create 1024 +let active_terms = WeakQueue.create () let trim_runtime_types () = - ActiveTerm.iter (fun term -> term.t <- Type.deep_demeth term.t) active_terms + WeakQueue.iter active_terms (fun term -> term.t <- Type.deep_demeth term.t) (** Create a new value. *) let make ?pos ?t ?methods e = @@ -656,7 +641,7 @@ let make ?pos ?t ?methods e = (Pos.Option.to_string t.Type.pos) (try to_string term with _ -> "") (Repr.string_of_type t); - ActiveTerm.add active_terms term; + WeakQueue.push active_terms term; term let rec fresh ~handler { t; term; methods } = @@ -725,8 +710,7 @@ let rec fresh ~handler { t; term; methods } = t = Type.Fresh.make handler t; term; methods = Methods.map (fresh ~handler) methods; - id = id (); } in - ActiveTerm.add active_terms term; + WeakQueue.push active_terms term; term diff --git a/src/lang/term/term_reducer.ml b/src/lang/term/term_reducer.ml index bad277ea61..6540119065 100644 --- a/src/lang/term/term_reducer.ml +++ b/src/lang/term/term_reducer.ml @@ -945,14 +945,9 @@ and to_term_base (tm : Parsed_term.t) : Term.t = { p with doc = Doc.parse_doc ~pos (String.concat "\n" doc) } | ast, _ -> ast in - { - t = Type.var ~pos:tm.pos (); - term; - methods = Methods.empty; - id = Term_base.id (); - } + { t = Type.var ~pos:tm.pos (); term; methods = Methods.empty } and to_term parsed_term = let term = to_term_base parsed_term in - Term_base.ActiveTerm.add Term_base.active_terms term; + Queues.WeakQueue.push Term_base.active_terms term; term