Skip to content

Commit

Permalink
Add pod watcher timeout (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino authored Jan 13, 2023
1 parent 90e2f02 commit 5c864ad
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 18 deletions.
39 changes: 23 additions & 16 deletions lib/console/cached/kubernetes.ex
Original file line number Diff line number Diff line change
@@ -1,50 +1,57 @@
defmodule Console.Cached.Kubernetes do
use GenServer
alias Kazan.Watcher
require Logger
alias ETS.KeyValueSet
alias Kazan.Models.Apimachinery.Meta.V1, as: MetaV1
require Logger

defmodule State, do: defstruct [:instances, :pid]
defmodule State, do: defstruct [:table, :pid]

def start_link(name, request) do
GenServer.start_link(__MODULE__, request, name: name)
GenServer.start_link(__MODULE__, {request, name}, name: name)
end

def start(name, request) do
GenServer.start(__MODULE__, {request, name}, name: name)
end

def init(request) do
def init({request, name}) do
if Console.conf(:initialize) do
send self(), {:start, request}
end
{:ok, %State{}}
{:ok, table} = KeyValueSet.new(name: name, read_concurrency: true, ordered: true)
{:ok, %State{table: table}}
end

def fetch(pid \\ __MODULE__), do: GenServer.call(pid, :fetch)

def handle_call(:fetch, _, %State{instances: instances} = state), do: {:reply, Map.values(instances), state}
def fetch(name) do
KeyValueSet.wrap_existing!(name)
|> KeyValueSet.to_list!()
|> Enum.map(fn {_, v} -> v end)
end

def handle_info({:start, request}, state) do
def handle_info({:start, request}, %State{table: table} = state) do
Logger.info "starting namespace watcher"
{:ok, %{items: instances, metadata: %MetaV1.ListMeta{resource_version: vsn}}} = Kazan.run(request)
{:ok, pid} = Watcher.start_link(request, send_to: self(), resource_vsn: vsn)

:timer.send_interval(5000, :watcher_ping)
Process.link(pid)
{:noreply, %{state | pid: pid, instances: as_map(instances)}}
table = Enum.reduce(instances, table, fn inst, table -> KeyValueSet.put!(table, inst.metadata.name, inst) end)
{:noreply, %{state | pid: pid, table: table}}
end

def handle_info(:watcher_ping, %{pid: pid} = state) do
Logger.info "namespace k8s watcher alive at pid=#{inspect(pid)}"
{:noreply, state}
end

def handle_info(%Watcher.Event{object: o, type: event}, %{instances: instances} = state) when event in [:added, :modified] do
{:noreply, %{state | instances: Map.put(instances, o.metadata.name, o)}}
def handle_info(%Watcher.Event{object: o, type: event}, %{table: table} = state) when event in [:added, :modified] do
{:noreply, %{state | table: KeyValueSet.put!(table, o.metadata.name, o)}}
end

def handle_info(%Watcher.Event{object: o, type: :deleted}, %{instances: instances} = state) do
{:noreply, %{state | instances: Map.delete(instances, o.metadata.name)}}
def handle_info(%Watcher.Event{object: o, type: :deleted}, %{table: table} = state) do
{:noreply, %{state | table: KeyValueSet.delete!(table, o.metadata.name)}}
end

def handle_info(_, state), do: {:noreply, state}

defp as_map(instances), do: Enum.into(instances, %{}, & {&1.metadata.name, &1})
end
2 changes: 2 additions & 0 deletions lib/console/cached/namespace.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ defmodule Console.Cached.Namespace do

def start_link(), do: Console.Cached.Kubernetes.start_link(__MODULE__, CoreV1.list_namespace!())

def start(), do: Console.Cached.Kubernetes.start(__MODULE__, CoreV1.list_namespace!())

def fetch(), do: Console.Cached.Kubernetes.fetch(__MODULE__)
end
4 changes: 2 additions & 2 deletions lib/console/watchers/pod.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ defmodule Console.Watchers.Pod do

def handle_info(:start, state) do
Logger.info "starting pod watcher"
request = Kazan.Apis.Core.V1.list_pod_for_all_namespaces!(watch: true)
{:ok, pid} = Watcher.start_link(request, send_to: self())
request = Kazan.Apis.Core.V1.list_pod_for_all_namespaces!()
{:ok, pid} = Watcher.start_link(request, send_to: self(), recv_timeout: 15_000)

:timer.send_interval(5000, :watcher_ping)
Process.link(pid)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ defmodule Console.MixProject do
{:yaml_elixir, "~> 2.4"},
{:poison, "~> 3.1"},
{:mojito, "~> 0.3.0"},
{:ets, "~> 0.9"},
{:reverse_proxy_plug, "~> 1.2.1"},
{:kazan, "~> 0.11", github: "michaeljguarino/kazan", branch: "k8s-1.23"},
{:comeonin, "~> 5.1.2"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"ecto_sql": {:hex, :ecto_sql, "3.4.5", "30161f81b167d561a9a2df4329c10ae05ff36eca7ccc84628f2c8b9fa1e43323", [:mix], [{:db_connection, "~> 2.2", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.4.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.3.0 or ~> 0.4.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.0", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "31990c6a3579b36a3c0841d34a94c275e727de8b84f58509da5f1b2032c98ac2"},
"elixir_make": {:hex, :elixir_make, "0.6.1", "8faa29a5597faba999aeeb72bbb9c91694ef8068f0131192fb199f98d32994ef", [:mix], [], "hexpm", "35d33270680f8d839a4003c3e9f43afb595310a592405a00afc12de4c7f55a18"},
"erlsom": {:hex, :erlsom, "1.5.0", "c5a5cdd0ee0e8dca62bcc4b13ff08da24fdefc16ccd8b25282a2fda2ba1be24a", [:rebar3], [], "hexpm", "55a9dbf9cfa77fcfc108bd8e2c4f9f784dea228a8f4b06ea10b684944946955a"},
"ets": {:hex, :ets, "0.9.0", "79c6a6c205436780486f72d84230c6cba2f8a9920456750ddd1e47389107d5fd", [:mix], [], "hexpm", "2861fdfb04bcaeff370f1a5904eec864f0a56dcfebe5921ea9aadf2a481c822b"},
"evel": {:hex, :evel, "0.1.2", "11f232a098f7ed31adb12935e5690819c1617b7d4cb11424b0e554c348c15cab", [:rebar3], [{:hash_ring, "~>0.4.0", [hex: :hash_ring, repo: "hexpm", optional: false]}], "hexpm", "1d6808bb60415572ef13cd085adf2f3266a3eaa2e810d7d3e095d1364b4d8032"},
"ex_crypto": {:hex, :ex_crypto, "0.10.0", "af600a89b784b36613a989da6e998c1b200ff1214c3cfbaf8deca4aa2f0a1739", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "ccc7472cfe8a0f4565f97dce7e9280119bf15a5ea51c6535e5b65f00660cde1c"},
"ex_doc": {:hex, :ex_doc, "0.18.4", "4406b8891cecf1352f49975c6d554e62e4341ceb41b9338949077b0d4a97b949", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm", "9dbe1ce1d711dc5362e3b3280e92989ad61413ce423bc4e9f76d5fcc51ab8d6b"},
Expand Down
19 changes: 19 additions & 0 deletions test/console/cached/namespace_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Console.Cached.NamespaceTest do
use Console.DataCase
alias Console.Cached.Namespace
import KubernetesScaffolds

describe "#fetch/0" do
test "it can properly cache and fetch namespaces" do
pid = Process.whereis(Namespace)

send pid, %Kazan.Watcher.Event{object: namespace_scaffold("name"), type: :added}
:timer.sleep(200)
[%{metadata: %{name: "name"}}] = Namespace.fetch()

send pid, %Kazan.Watcher.Event{object: namespace_scaffold("name"), type: :deleted}
:timer.sleep(200)
[] = Namespace.fetch()
end
end
end

0 comments on commit 5c864ad

Please sign in to comment.