From 8ff03d42c1790756f78e905348aa1381044e2087 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Sat, 12 Oct 2024 22:52:27 +0200 Subject: [PATCH 1/5] Basic websocket implementation with exec and log helper functions TODO: specs, tests, credo etc. --- lib/kubereq.ex | 140 ++++++++++++++ lib/kubereq/application.ex | 3 +- lib/kubereq/step.ex | 2 +- lib/kubereq/step/operation.ex | 8 + lib/kubereq/websocket/adapter.ex | 274 ++++++++++++++++++++++++++++ lib/kubereq/websocket/response.ex | 21 +++ lib/kubereq/websocket/supervisor.ex | 17 ++ mix.exs | 4 + mix.lock | 3 +- 9 files changed, 469 insertions(+), 3 deletions(-) create mode 100644 lib/kubereq/websocket/adapter.ex create mode 100644 lib/kubereq/websocket/response.ex create mode 100644 lib/kubereq/websocket/supervisor.ex diff --git a/lib/kubereq.ex b/lib/kubereq.ex index 87a9870..1c7f007 100644 --- a/lib/kubereq.ex +++ b/lib/kubereq.ex @@ -639,4 +639,144 @@ defmodule Kubereq do {:ok, stream} end end + + @doc """ + + Opens a websocket to the given Pod and streams logs from it. + + ## Options + + * `:params` - Map defining the query parameteres added to the request to the + `pods/log` subresource. The `log` subresource supports the following + paremeters: + + * `container` - (optional) Specifies the container for which to return + logs. If omitted, returns logs for the first container in the pod. + * `follow` - (optional) If set to true, the request will stay open and + continue to return new log entries as they are generated. Default is + false. + * `previous` - (optional) If true, return logs from previous terminated + containers. Default is false. + * `sinceSeconds` - (optional) Returns logs newer than a relative duration in + seconds. Conflicts with sinceTime. + * `sinceTime` - (optional) Returns logs after a specific date (RFC3339 + format). Conflicts with sinceSeconds. + * `timestamps` - (optional) If true, add an RFC3339 timestamp at the + beginning of every line. Default is false. + * `tailLines` - (optional) Specifies the number of lines from the end of the + logs to show. If not specified, logs are shown from the creation of the + container or sinceSeconds/sinceTime. + * `limitBytes` - (optional) The maximum number of bytes to return from the + server. If not specified, no limit is imposed. + * `insecureSkipTLSVerifyBackend` - (optional) If true, bypasses certificate + verification for the kubelet's HTTPS endpoint. This is useful for clusters + with self-signed certificates. Default is false. + * `pretty` - (optional) If true, formats the output in a more readable + format. This is typically used for debugging and not recommended for + programmatic access. + * `prefix` - (optional) [Note: Availability may depend on Kubernetes + version] If true, adds the container name as a prefix to each line. Useful + when requesting logs for multiple containers. + """ + @spec log( + Req.Request.t(), + namespace :: namespace(), + name :: String.t(), + stream_to :: {Process.dest(), reference()} | Process.dest(), + opts :: Keyword.t() | nil + ) :: + response() + def log(req, namespace \\ nil, name, stream_to, opts \\ []) + + def log(req, name, stream_to, opts, []) when is_list(opts), + do: log(req, nil, name, stream_to, opts) + + def log(req, namespace, name, stream_to, opts) do + options = + Keyword.merge(opts, + operation: :connect, + path_params: [namespace: namespace, name: name], + into: stream_to, + subresource: "log" + ) + + Req.request(req, options) + end + + @doc """ + + Opens a websocket to the given Pod and executes a command on it. Can be used + to open a shell. + + ## Examples + + iex> ref = make_ref() + ...> res = + ...> Req.new() + ...> |> Kubereq.attach(api_version: "v1", kind: "ConfigMap") + ...> |> Kubereq.exec("default", "my-pod", {self(), ref}, + params: %{ + "command" => "/bin/bash" + "tty" => true, + "stdin" => true, + "stdout" => true, + "stderr" => true, + } + + Messages are sent to the passed Process with the reference included: + + iex> receive(do: ({^ref, message} -> IO.inspect(message))) + + The `body` of the `Req.Response` is a struct. If `tty` is set to true and + `command` is a shell, you can pass to + `Kubereq.Websocket.Response.send_message/3` in order to send instructions + through the websocket to the shell. + + ...> res.body.() + + ## Options + + * `:params` - Map defining the query parameteres added to the request to the + `pods/exec` subresource. The `exec` subresource supports the following + paremeters: + + * `container` (optional) - Specifies the container in the pod to execute the + command. If omitted, the first container in the pod will be chosen. + * `command` (optional) - The command to execute inside the container. This + parameter can be specified multiple times to represent a command with + multiple arguments. If omitted, the container's default command will be + used. + * `stdin` (optional) - If true, pass stdin to the container. Default is + false. + * `stdout` (optional) - If true, return stdout from the container. Default + is false. + * `stderr` (optional) - If true, return stderr from the container. Default + is false. + * `tty` (optional) - If true, allocate a pseudo-TTY for the container. + Default is false. + """ + @spec exec( + Req.Request.t(), + namespace :: namespace(), + name :: String.t(), + stream_to :: {Process.dest(), reference()} | Process.dest(), + opts :: Keyword.t() | nil + ) :: + response() + def exec(req, namespace \\ nil, name, stream_to, opts \\ []) + + def exec(req, name, stream_to, opts, []) when is_list(opts), + do: exec(req, nil, name, stream_to, opts) + + def exec(req, namespace, name, stream_to, opts) do + options = + Keyword.merge(opts, + operation: :connect, + path_params: [namespace: namespace, name: name], + into: stream_to, + subresource: "exec" + ) + + Req.request(req, options) + end end diff --git a/lib/kubereq/application.ex b/lib/kubereq/application.ex index 985244b..3601df2 100644 --- a/lib/kubereq/application.ex +++ b/lib/kubereq/application.ex @@ -6,7 +6,8 @@ defmodule Kubereq.Application do @impl true def start(_start_type, _start_args) do children = [ - {Registry, keys: :unique, name: Kubereq.Exec} + {Registry, keys: :unique, name: Kubereq.Exec}, + Kubereq.Websocket.Supervisor ] opts = [strategy: :one_for_one, name: Kubereq.Supervisor] diff --git a/lib/kubereq/step.ex b/lib/kubereq/step.ex index d3e74fa..a8d43cc 100644 --- a/lib/kubereq/step.ex +++ b/lib/kubereq/step.ex @@ -46,12 +46,12 @@ defmodule Kubereq.Step do def call(req) do with %Req.Request{} = req <- Step.Context.call(req), %Req.Request{} = req <- Step.Plug.call(req), + %Req.Request{} = req <- Step.BaseURL.call(req), %Req.Request{} = req <- Step.Operation.call(req), %Req.Request{} = req <- Step.Impersonate.call(req), %Req.Request{} = req <- Step.Auth.call(req), %Req.Request{} = req <- Step.TLS.call(req), %Req.Request{} = req <- Step.Compression.call(req), - %Req.Request{} = req <- Step.BaseURL.call(req), %Req.Request{} = req <- Step.FieldSelector.call(req) do Step.LabelSelector.call(req) end diff --git a/lib/kubereq/step/operation.ex b/lib/kubereq/step/operation.ex index dcd4266..dc29d20 100644 --- a/lib/kubereq/step/operation.ex +++ b/lib/kubereq/step/operation.ex @@ -99,4 +99,12 @@ defmodule Kubereq.Step.Operation do headers: [{"Content-Type", "application/merge-patch+json"}] ] end + + defp operation(:connect, request_path, subresource) do + [ + url: "#{request_path}/#{subresource}", + method: :post, + adapter: &Kubereq.Websocket.Adapter.run/1 + ] + end end diff --git a/lib/kubereq/websocket/adapter.ex b/lib/kubereq/websocket/adapter.ex new file mode 100644 index 0000000..076d03c --- /dev/null +++ b/lib/kubereq/websocket/adapter.ex @@ -0,0 +1,274 @@ +defmodule Kubereq.Websocket.Adapter do + use GenServer, restart: :transient + + alias Kubereq.Websocket.Response + + require Mint.HTTP + + @type incoming_frame() :: {:binary, binary} | {:close, any, any} + @type incoming_message() :: + {:close, integer(), binary()} + | {:error, binary} + | {:stderr, binary} + | {:stdout, binary} + | {:binary, binary} + + @type outgoing_frame() :: + {:close, integer(), binary()} + | :close + | {:binary, binary()} + | {:text, binary()} + @type outgoing_message() :: + {:binary, binary()} + | :close + | {:close, any, any} + | :exit + | {:stdin, binary()} + + defstruct [:mint, :websocket, :ref, :into] + + def run(%{into: _stream_to} = req) do + conn_opts = + req.options.connect_options + |> Keyword.put(:mode, :passive) + |> Keyword.put_new(:protocols, [:http1]) + + headers = + for {name, values} <- req.headers, + value <- values do + {name, value} + end + + registry_key = make_ref() + + start_child_resp = + DynamicSupervisor.start_child( + __MODULE__, + {__MODULE__, {req.url, headers, req.into, conn_opts, registry_key}} + ) + + case start_child_resp do + {:ok, _pid} -> + resp = + Req.Response.new( + status: 101, + headers: [], + trailers: [], + body: %Response{registry_key: registry_key} + ) + + {req, resp} + + {:error, error} -> + {req, error} + + {:error, _mint, error} -> + {req, error} + end + end + + def run(req) do + {req, + %ArgumentError{ + message: + ":connect operation requires setting the `:into` operation on the req request to a `{pid, ref}` tuple." + }} + end + + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end + + @impl GenServer + def init({url, headers, into, conn_opts, registry_key}) do + Registry.register(Kubereq.Websocket.Adapter.Registry, registry_key, []) + + with {:ok, mint} <- + Mint.HTTP.connect( + http_scheme(url.scheme), + url.host, + url.port, + conn_opts + ), + {:ok, mint, ref} <- + Mint.WebSocket.upgrade( + ws_scheme(url.scheme), + mint, + "#{url.path}?#{url.query}", + headers + ), + {:ok, mint, upgrade_response} <- receive_upgrade_response(mint, ref), + {:ok, mint, websocket} <- + Mint.WebSocket.new( + mint, + ref, + upgrade_response.status, + upgrade_response.headers + ) do + {:ok, nil, {:continue, {:connect, mint, websocket, ref, into}}} + else + {:error, error} -> + {:stop, error} + + {:error, _mint, error} -> + {:stop, error} + end + end + + @impl GenServer + def handle_continue({:connect, mint, websocket, ref, into}, _state) do + case Mint.HTTP.set_mode(mint, :active) do + {:ok, mint} -> + # Mint.HTTP.controlling_process causes a side-effect, but it doesn't actually + # change the conn, so we can ignore the value returned above. + pid = + case into do + {pid, _} -> pid + pid -> pid + end + + Process.flag(:trap_exit, true) + Process.monitor(pid) + {:noreply, struct(__MODULE__, mint: mint, websocket: websocket, ref: ref, into: into)} + + {:error, error} -> + {:stop, error} + end + end + + def handle_continue({:send, frame}, state) do + case send_frame(frame, state.mint, state.websocket, state.ref) do + {:ok, mint, websocket} -> + {:noreply, %{state | mint: mint, websocket: websocket}} + + {:error, mint, websocket, error} -> + {:stop, error, %{state | mint: mint, websocket: websocket}} + + :closed -> + {:stop, {:shutdown, :closed}, state} + end + end + + @impl GenServer + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + {:stop, :normal, state} + end + + def handle_info(message, state) when Mint.HTTP.is_connection_message(state.mint, message) do + case receive_frames(message, state.mint, state.websocket, state.ref) do + {:ok, mint, websocket, frames} -> + new_state = %{state | mint: mint, websocket: websocket} + + Enum.reduce_while(frames, {:noreply, new_state}, fn + {:binary, message}, acc -> + message + |> map_incoming_message() + |> stream_to(state.into) + + {:cont, acc} + + {:close, 1_000, ""} = message, _acc -> + stream_to(message, state.into) + {:halt, {:stop, :normal, new_state}} + + {:close, code, reason} = message, _acc -> + stream_to(message, state.into) + {:halt, {:stop, {:remote_closed, code, reason}, new_state}} + end) + + {:error, mint, websocket, error} -> + {:stop, error, %{state | mint: mint, websocket: websocket}} + end + end + + @impl GenServer + def handle_call({:send, message}, _from, state) do + case map_outgoing_frame(message) do + {:ok, frame} -> {:reply, :ok, state, {:continue, {:send, frame}}} + {:error, error} -> {:reply, {:error, error}, state} + end + end + + @impl GenServer + def terminate(_reason, state) do + if not is_nil(state.mint) and Mint.HTTP.open?(state.mint) do + {:ok, _websocket, data} = Mint.WebSocket.encode(state.websocket, :close) + Mint.WebSocket.stream_request_body(state.mint, state.ref, data) + end + end + + defp send_frame(frame, mint, websocket, ref) do + with true <- Mint.HTTP.open?(mint), + {:ok, websocket, data} <- Mint.WebSocket.encode(websocket, frame), + {:ok, mint} <- Mint.WebSocket.stream_request_body(mint, ref, data) do + {:ok, mint, websocket} + else + false -> :closed + {:error, %Mint.WebSocket{} = websocket, error} -> {:error, mint, websocket, error} + {:error, mint, error} -> {:error, mint, websocket, error} + end + end + + defp receive_frames(message, mint, websocket, ref) do + with {:ok, mint, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(mint, message), + {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do + {:ok, mint, websocket, frames} + else + {:error, error} -> {:error, mint, websocket, error} + {:error, %Mint.WebSocket{} = websocket, error} -> {:error, mint, websocket, error} + {:error, mint, error} -> {:error, mint, websocket, error} + end + end + + @spec map_incoming_message(binary()) :: incoming_message() + def map_incoming_message(<<1, message::binary>>), do: {:stdout, message} + def map_incoming_message(<<2, message::binary>>), do: {:stderr, message} + def map_incoming_message(<<3, message::binary>>), do: {:error, message} + def map_incoming_message(binary), do: {:binary, binary} + + @spec map_outgoing_frame(outgoing_message()) :: + {:ok, outgoing_frame()} | {:error, %ArgumentError{}} + def map_outgoing_frame({:binary, data}), do: {:ok, {:binary, data}} + def map_outgoing_frame(:close), do: {:ok, :close} + def map_outgoing_frame({:close, code, reason}), do: {:ok, {:close, code, reason}} + def map_outgoing_frame(:exit), do: {:ok, :close} + def map_outgoing_frame({:stdin, data}), do: {:ok, {:text, <<0>> <> data}} + + def map_outgoing_frame(data) do + {:error, + %ArgumentError{ + message: "The given message #{inspect(data)} is not supported to be sent to the Pod." + }} + end + + defp http_scheme("http"), do: :http + defp http_scheme("https"), do: :https + + defp ws_scheme("http"), do: :ws + defp ws_scheme("https"), do: :wss + + defp stream_to(message, {dest, ref}), do: send(dest, {ref, message}) + defp stream_to(message, dest), do: send(dest, message) + + defp receive_upgrade_response(mint, ref) do + Enum.reduce_while(Stream.cycle([:ok]), {mint, %{}}, fn _, {mint, response} -> + case Mint.HTTP.recv(mint, 0, 10000) do + {:ok, mint, parts} -> + response = + parts + |> Map.new(fn + {type, ^ref} -> {type, true} + {type, ^ref, value} -> {type, value} + end) + |> Map.merge(response) + + if response[:done], + do: {:halt, {:ok, mint, response}}, + else: {:cont, {mint, response}} + + {:error, mint, error, _} -> + {:halt, {:error, mint, error}} + end + end) + end +end diff --git a/lib/kubereq/websocket/response.ex b/lib/kubereq/websocket/response.ex new file mode 100644 index 0000000..a2f5e3b --- /dev/null +++ b/lib/kubereq/websocket/response.ex @@ -0,0 +1,21 @@ +defmodule Kubereq.Websocket.Response do + defstruct [:registry_key] + + def send_message(_response, message, timeout \\ 5_000) + + def send_message(_response, message, timeout) when timeout <= 0 do + {:error, %RuntimeError{message: "Could not send message #{inspect(message)} to websocket."}} + end + + def send_message(response, message, timeout) do + with [{server, _}] <- + Registry.lookup(Kubereq.Websocket.Adapter.Registry, response.registry_key), + true <- Process.alive?(server) do + GenServer.call(server, {:send, message}) + else + _ -> + Process.sleep(100) + send_message(response, message, timeout - 100) + end + end +end diff --git a/lib/kubereq/websocket/supervisor.ex b/lib/kubereq/websocket/supervisor.ex new file mode 100644 index 0000000..5b90dd5 --- /dev/null +++ b/lib/kubereq/websocket/supervisor.ex @@ -0,0 +1,17 @@ +defmodule Kubereq.Websocket.Supervisor do + use Supervisor + + def start_link(init_arg) do + Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + @impl Supervisor + def init(_init_arg) do + children = [ + {DynamicSupervisor, name: Kubereq.Websocket.Adapter, strategy: :one_for_one}, + {Registry, name: Kubereq.Websocket.Adapter.Registry, keys: :unique} + ] + + Supervisor.init(children, strategy: :rest_for_one) + end +end diff --git a/mix.exs b/mix.exs index d86a428..5c2c1e6 100644 --- a/mix.exs +++ b/mix.exs @@ -41,6 +41,10 @@ defmodule Kubereq.MixProject do {:req, "~> 0.5.0"}, {:yaml_elixir, "~> 2.0"}, + # Optional deps + {:mint, "~> 1.0"}, + {:mint_web_socket, "~> 1.0"}, + # Test deps {:excoveralls, "~> 0.18", only: :test}, {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 7629c6f..95024b5 100644 --- a/mix.lock +++ b/mix.lock @@ -16,6 +16,7 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, + "mint_web_socket": {:hex, :mint_web_socket, "1.0.4", "0b539116dbb3d3f861cdf5e15e269a933cb501c113a14db7001a3157d96ffafd", [:mix], [{:mint, ">= 1.4.1 and < 2.0.0-0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "027d4c5529c45a4ba0ce27a01c0f35f284a5468519c045ca15f43decb360a991"}, "mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_ownership": {:hex, :nimble_ownership, "0.3.1", "99d5244672fafdfac89bfad3d3ab8f0d367603ce1dc4855f86a1c75008bce56f", [:mix], [], "hexpm", "4bf510adedff0449a1d6e200e43e57a814794c8b5b6439071274d248d272a549"}, @@ -27,5 +28,5 @@ "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, - "yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"}, + "yaml_elixir": {:hex, :yaml_elixir, "2.11.0", "9e9ccd134e861c66b84825a3542a1c22ba33f338d82c07282f4f1f52d847bd50", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "53cc28357ee7eb952344995787f4bb8cc3cecbf189652236e9b163e8ce1bc242"}, } From eeeb323a13300042010db85f547ba30192efbda1 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Sun, 13 Oct 2024 22:45:25 +0200 Subject: [PATCH 2/5] specs and docs --- lib/kubereq.ex | 22 +++++---- lib/kubereq/step/operation.ex | 3 +- lib/kubereq/websocket/adapter.ex | 70 ++++++++++++++++++++++------- lib/kubereq/websocket/response.ex | 12 +++++ lib/kubereq/websocket/supervisor.ex | 2 + 5 files changed, 81 insertions(+), 28 deletions(-) diff --git a/lib/kubereq.ex b/lib/kubereq.ex index 1c7f007..997625f 100644 --- a/lib/kubereq.ex +++ b/lib/kubereq.ex @@ -644,6 +644,14 @@ defmodule Kubereq do Opens a websocket to the given Pod and streams logs from it. + ## Examples + + iex> ref = make_ref() + ...> res = + ...> Req.new() + ...> |> Kubereq.attach(api_version: "v1", kind: "ConfigMap") + ...> |> Kubereq.log("default", "my-pod", {self(), ref}, + params: %{"follow" => true} ## Options * `:params` - Map defining the query parameteres added to the request to the @@ -686,12 +694,7 @@ defmodule Kubereq do opts :: Keyword.t() | nil ) :: response() - def log(req, namespace \\ nil, name, stream_to, opts \\ []) - - def log(req, name, stream_to, opts, []) when is_list(opts), - do: log(req, nil, name, stream_to, opts) - - def log(req, namespace, name, stream_to, opts) do + def log(req, namespace, name, stream_to, opts \\ []) do options = Keyword.merge(opts, operation: :connect, @@ -763,12 +766,7 @@ defmodule Kubereq do opts :: Keyword.t() | nil ) :: response() - def exec(req, namespace \\ nil, name, stream_to, opts \\ []) - - def exec(req, name, stream_to, opts, []) when is_list(opts), - do: exec(req, nil, name, stream_to, opts) - - def exec(req, namespace, name, stream_to, opts) do + def exec(req, namespace, name, stream_to, opts \\ []) do options = Keyword.merge(opts, operation: :connect, diff --git a/lib/kubereq/step/operation.ex b/lib/kubereq/step/operation.ex index dc29d20..cf273d0 100644 --- a/lib/kubereq/step/operation.ex +++ b/lib/kubereq/step/operation.ex @@ -2,6 +2,7 @@ defmodule Kubereq.Step.Operation do @moduledoc false alias Kubereq.Error.StepError + alias Kubereq.Websocket.Adapter @spec call(req :: Req.Request.t()) :: Req.Request.t() | {Req.Request.t(), StepError.t()} def call(req) when not is_map_key(req.options, :operation) or is_nil(req.options.operation) do @@ -104,7 +105,7 @@ defmodule Kubereq.Step.Operation do [ url: "#{request_path}/#{subresource}", method: :post, - adapter: &Kubereq.Websocket.Adapter.run/1 + adapter: &Adapter.run/1 ] end end diff --git a/lib/kubereq/websocket/adapter.ex b/lib/kubereq/websocket/adapter.ex index 076d03c..dca0cc3 100644 --- a/lib/kubereq/websocket/adapter.ex +++ b/lib/kubereq/websocket/adapter.ex @@ -1,10 +1,16 @@ defmodule Kubereq.Websocket.Adapter do + @moduledoc false + use GenServer, restart: :transient alias Kubereq.Websocket.Response require Mint.HTTP + @typep start_args :: + {URI.t(), list(), {Process.dest(), reference()} | Process.dest(), + conn_opts :: Keyword.t(), registry_key :: reference()} + @type incoming_frame() :: {:binary, binary} | {:close, any, any} @type incoming_message() :: {:close, integer(), binary()} @@ -27,6 +33,8 @@ defmodule Kubereq.Websocket.Adapter do defstruct [:mint, :websocket, :ref, :into] + @spec run(Req.Request.t()) :: + {Req.Request.t(), Req.Response.t()} | {Req.Request.t(), Mint.WebSocket.error()} def run(%{into: _stream_to} = req) do conn_opts = req.options.connect_options @@ -59,11 +67,14 @@ defmodule Kubereq.Websocket.Adapter do {req, resp} - {:error, error} -> + {:error, error} when is_exception(error) -> {req, error} - {:error, _mint, error} -> - {req, error} + other -> + {req, + %RuntimeError{ + message: "Failed to start the Websocket. start_child() returned #{inspect(other)}." + }} end end @@ -75,9 +86,8 @@ defmodule Kubereq.Websocket.Adapter do }} end - def start_link(args) do - GenServer.start_link(__MODULE__, args) - end + @spec start_link(start_args()) :: GenServer.on_start() + def start_link(args), do: GenServer.start_link(__MODULE__, args) @impl GenServer def init({url, headers, into, conn_opts, registry_key}) do @@ -176,7 +186,17 @@ defmodule Kubereq.Websocket.Adapter do {:halt, {:stop, {:remote_closed, code, reason}, new_state}} end) - {:error, mint, websocket, error} -> + {:error, mint, websocket, error, frames} -> + Enum.each(frames, fn + {:binary, message} -> + message + |> map_incoming_message() + |> stream_to(state.into) + + other -> + stream_to(other, state.into) + end) + {:stop, error, %{state | mint: mint, websocket: websocket}} end end @@ -197,26 +217,39 @@ defmodule Kubereq.Websocket.Adapter do end end + @spec send_frame(outgoing_frame(), Mint.HTTP.t(), Mint.WebSocket.t(), Mint.Types.request_ref()) :: + {:ok, Mint.HTTP.t(), Mint.WebSocket.t()} + | :closed + | {:error, Mint.HTTP.t(), Mint.WebSocket.t(), Mint.WebSocket.error()} defp send_frame(frame, mint, websocket, ref) do with true <- Mint.HTTP.open?(mint), {:ok, websocket, data} <- Mint.WebSocket.encode(websocket, frame), {:ok, mint} <- Mint.WebSocket.stream_request_body(mint, ref, data) do {:ok, mint, websocket} else - false -> :closed - {:error, %Mint.WebSocket{} = websocket, error} -> {:error, mint, websocket, error} - {:error, mint, error} -> {:error, mint, websocket, error} + false -> + :closed + + {:error, mint_or_websocket, error} -> + if is_struct(mint_or_websocket, Mint.WebSocket) do + {:error, mint, mint_or_websocket, error} + else + {:error, mint_or_websocket, websocket, error} + end end end + @spec receive_frames(term(), Mint.HTTP.t(), Mint.WebSocket.t(), Mint.Types.request_ref()) :: + {:ok, Mint.HTTP.t(), Mint.WebSocket.t(), [Mint.Types.response()]} + | {:error, Mint.HTTP.t(), Mint.WebSocket.t(), Mint.WebSocket.error(), + [Mint.Types.response()]} defp receive_frames(message, mint, websocket, ref) do with {:ok, mint, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(mint, message), {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do {:ok, mint, websocket, frames} else - {:error, error} -> {:error, mint, websocket, error} - {:error, %Mint.WebSocket{} = websocket, error} -> {:error, mint, websocket, error} - {:error, mint, error} -> {:error, mint, websocket, error} + {:error, websocket, error, frames} -> {:error, mint, websocket, error, frames} + {:error, mint, error} -> {:error, mint, websocket, error, []} end end @@ -227,7 +260,7 @@ defmodule Kubereq.Websocket.Adapter do def map_incoming_message(binary), do: {:binary, binary} @spec map_outgoing_frame(outgoing_message()) :: - {:ok, outgoing_frame()} | {:error, %ArgumentError{}} + {:ok, outgoing_frame()} | {:error, Exception.t()} def map_outgoing_frame({:binary, data}), do: {:ok, {:binary, data}} def map_outgoing_frame(:close), do: {:ok, :close} def map_outgoing_frame({:close, code, reason}), do: {:ok, {:close, code, reason}} @@ -241,18 +274,24 @@ defmodule Kubereq.Websocket.Adapter do }} end + @spec http_scheme(binary()) :: atom() defp http_scheme("http"), do: :http defp http_scheme("https"), do: :https + @spec ws_scheme(binary()) :: atom() defp ws_scheme("http"), do: :ws defp ws_scheme("https"), do: :wss + @spec stream_to(incoming_message(), {Process.dest(), reference()} | Process.dest()) :: + incoming_message() defp stream_to(message, {dest, ref}), do: send(dest, {ref, message}) defp stream_to(message, dest), do: send(dest, message) + @spec receive_upgrade_response(Mint.HTTP.t(), Mint.Types.request_ref()) :: + {:ok, Mint.HTTP.t(), map()} | {:error, Mint.HTTP.t(), Mint.WebSocket.error()} defp receive_upgrade_response(mint, ref) do Enum.reduce_while(Stream.cycle([:ok]), {mint, %{}}, fn _, {mint, response} -> - case Mint.HTTP.recv(mint, 0, 10000) do + case Mint.HTTP.recv(mint, 0, 10_000) do {:ok, mint, parts} -> response = parts @@ -262,6 +301,7 @@ defmodule Kubereq.Websocket.Adapter do end) |> Map.merge(response) + # credo:disable-for-lines:3 if response[:done], do: {:halt, {:ok, mint, response}}, else: {:cont, {mint, response}} diff --git a/lib/kubereq/websocket/response.ex b/lib/kubereq/websocket/response.ex index a2f5e3b..31125be 100644 --- a/lib/kubereq/websocket/response.ex +++ b/lib/kubereq/websocket/response.ex @@ -1,6 +1,18 @@ defmodule Kubereq.Websocket.Response do + @moduledoc """ + Represents a response of a websocket request. + """ + alias Kubereq.Websocket.Adapter + + @type t :: %__MODULE__{registry_key: reference()} + defstruct [:registry_key] + @spec send_message( + response :: t(), + message :: Adapter.outgoing_message(), + timeout :: non_neg_integer() + ) :: :ok | {:error, term()} def send_message(_response, message, timeout \\ 5_000) def send_message(_response, message, timeout) when timeout <= 0 do diff --git a/lib/kubereq/websocket/supervisor.ex b/lib/kubereq/websocket/supervisor.ex index 5b90dd5..a5772b5 100644 --- a/lib/kubereq/websocket/supervisor.ex +++ b/lib/kubereq/websocket/supervisor.ex @@ -1,4 +1,6 @@ defmodule Kubereq.Websocket.Supervisor do + @moduledoc false + use Supervisor def start_link(init_arg) do From 3ddc138b2d17bb728feac31dd2eed11dba947c06 Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Mon, 14 Oct 2024 21:49:22 +0200 Subject: [PATCH 3/5] add integration tests for log and exec --- test/kubereq_integration_test.exs | 108 ++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/test/kubereq_integration_test.exs b/test/kubereq_integration_test.exs index f804c85..6690603 100644 --- a/test/kubereq_integration_test.exs +++ b/test/kubereq_integration_test.exs @@ -1,9 +1,12 @@ defmodule KubereqIntegrationTest do use ExUnit.Case, async: true + @moduletag :integration import YamlElixir.Sigil + alias Kubereq.Websocket.Response + @cluster_name "kubereq" @kubeconfig_path "test/support/kubeconfig-integration.yaml" @namespace "integrationtest" @@ -30,6 +33,9 @@ defmodule KubereqIntegrationTest do req_ns = Req.new() |> Kubereq.attach(kubeconfig: kubeconf, api_version: "v1", kind: "Namespace") + req_pod = + Req.new() |> Kubereq.attach(kubeconfig: kubeconf, api_version: "v1", kind: "Pod") + req_cm = Req.new() |> Kubereq.attach(kubeconfig: kubeconf, api_version: "v1", kind: "ConfigMap") @@ -43,18 +49,20 @@ defmodule KubereqIntegrationTest do [ req_cm: req_cm, - req_ns: req_ns + req_ns: req_ns, + req_pod: req_pod, + kubeconfig: kubeconf ] end - setup %{req_cm: req_cm} do - test_id = :rand.uniform(10) + setup %{req_cm: req_cm, req_pod: req_pod} do + test_id = :rand.uniform(10_000) example_config_1 = ~y""" apiVersion: v1 kind: ConfigMap metadata: - name: example-config-1-#{:rand.uniform(10000)} + name: example-config-1-#{:rand.uniform(10_000)} namespace: #{@namespace} labels: test: kubereq-#{test_id} @@ -78,6 +86,7 @@ defmodule KubereqIntegrationTest do on_exit(fn -> Kubereq.delete_all(req_cm, @namespace, label_selectors: [{"app", "kubereq"}]) + Kubereq.delete_all(req_pod, @namespace, label_selectors: [{"app", "kubereq"}]) end) [example_config_1: example_config_1, example_config_2: example_config_2, test_id: test_id] @@ -256,4 +265,95 @@ defmodule KubereqIntegrationTest do assert_receive {^ref, %{"type" => "DELETED", "object" => %{"metadata" => %{"name" => ^cm_name}}}} end + + test "streams pod logs to process", %{req_pod: req} do + pod_name = "example-pod-#{:rand.uniform(10_000)}" + log_stmt = "foo bar" + + pod = ~y""" + apiVersion: v1 + kind: Pod + metadata: + namespace: #{@namespace} + name: #{pod_name} + labels: + app: kubereq + spec: + containers: + - name: main + image: busybox + command: + - /bin/sh + - "-c" + - 'echo "#{log_stmt}"' + - "sleep infinity" + """ + + Kubereq.apply(req, pod) + + :ok = + Kubereq.wait_until(req, @namespace, pod_name, &(&1["status"]["phase"] == "Running"), + timeout: :timer.minutes(2) + ) + + ref = make_ref() + Kubereq.log(req, @namespace, pod_name, {self(), ref}, params: %{"follow" => false}) + assert_receive({^ref, {:binary, ^log_stmt <> "\n"}}) + assert_receive({^ref, {:close, 1_000, ""}}) + end + + test "streams exec commands to pod and prompts back to process", %{req_pod: req} do + pod_name = "example-pod-#{:rand.uniform(10_000)}" + + pod = ~y""" + apiVersion: v1 + kind: Pod + metadata: + namespace: #{@namespace} + name: #{pod_name} + labels: + app: kubereq + spec: + containers: + - name: main + image: busybox + command: + - /bin/sh + - "-c" + - "sleep infinity" + """ + + Kubereq.apply(req, pod) + + :ok = + Kubereq.wait_until(req, @namespace, pod_name, &(&1["status"]["phase"] == "Running"), + timeout: :timer.minutes(2) + ) + + ref = make_ref() + + {:ok, resp} = + Kubereq.exec( + req, + @namespace, + pod_name, + {self(), ref}, + params: %{"tty" => true, "stdout" => true, "stdin" => true, "command" => "/bin/sh"} + ) + + Response.send_message(resp.body, {:stdin, ~s(echo "foo bar"\n)}) + + result = receive_loop(ref, resp.body, "") |> IO.iodata_to_binary() + assert result == ~s(/ # echo "foo bar"\r\nfoo bar\r\n/ # ) + end + + defp receive_loop(ref, websocket_response, acc) do + receive do + {^ref, {:stdout, data}} -> receive_loop(ref, websocket_response, [acc, data]) + after + 500 -> + Response.send_message(websocket_response, :close) + acc + end + end end From d90777474fdcbf69d479cc91166366619e878f7b Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Wed, 6 Nov 2024 18:57:37 +0100 Subject: [PATCH 4/5] use Fresh for websockets --- lib/kubereq.ex | 229 ++++++++++---------- lib/kubereq/application.ex | 3 +- lib/kubereq/{ => auth}/exec.ex | 2 +- lib/kubereq/connect.ex | 308 +++++++++++++++++++++++++++ lib/kubereq/kubeconfig/stub.ex | 2 +- lib/kubereq/pod_exec.ex | 127 +++++++++++ lib/kubereq/pod_logs.ex | 106 ++++++++++ lib/kubereq/step/auth.ex | 2 +- lib/kubereq/step/operation.ex | 5 +- lib/kubereq/websocket/adapter.ex | 314 ---------------------------- lib/kubereq/websocket/response.ex | 33 --- lib/kubereq/websocket/supervisor.ex | 19 -- mix.exs | 7 +- mix.lock | 1 + test/kubereq_integration_test.exs | 138 ++++++++++-- 15 files changed, 790 insertions(+), 506 deletions(-) rename lib/kubereq/{ => auth}/exec.ex (99%) create mode 100644 lib/kubereq/connect.ex create mode 100644 lib/kubereq/pod_exec.ex create mode 100644 lib/kubereq/pod_logs.ex delete mode 100644 lib/kubereq/websocket/adapter.ex delete mode 100644 lib/kubereq/websocket/response.ex delete mode 100644 lib/kubereq/websocket/supervisor.ex diff --git a/lib/kubereq.ex b/lib/kubereq.ex index 997625f..7603f35 100644 --- a/lib/kubereq.ex +++ b/lib/kubereq.ex @@ -1,13 +1,17 @@ defmodule Kubereq do @moduledoc ~S""" - Kubereq processes requests to your Kubernetes API Server. + A Kubernetes client for Elixir based on `Req`. ## Usage - This library can used with plan `Req` but the function in this module - provide an easier API to people used to `kubectl` and friends. + First, attach `kubereq` to your `Req` request (see `attach/2` for options): - ### Plain Req + Req.new() |> Kubereq.attach() + + Now you can use plain Req functionality. However, the functions defined in + this module make it much easier to perform the most common operation. + + ### Usage with plain Req functionality Use `Kubereq.Kubeconfig.Default` to create connection to cluster and plain `Req.request()` to make the request @@ -45,7 +49,7 @@ defmodule Kubereq do Req.request!(sa_req, operation: :list, path_params: [namespace: "default"]) ``` - ### Kubectl API + ### Kubereq API While this library can attach to any `Req` struct, it is sometimes easier to prepare `Req` for a specific resource and then use the functions @@ -119,15 +123,20 @@ defmodule Kubereq do end @doc """ - Attaches `kubereq` to a `Req.Request` struct for making HTTP requests to a Kubernetes - cluster. You can optionally pass a Kubernetes configuration or pipeline via - `kubeconfig` option. If it is omitted, the default config + Attaches `kubereq` to a `Req.Request` struct for making HTTP requests to a + Kubernetes cluster. You can optionally pass a Kubernetes configuration or + pipeline via `kubeconfig` option. If it is omitted, the default config `Kubereq.Kubeconfig.Default` is loaded. ### Examples iex> Req.new() |> Kubereq.attach() %Request.Req{...} + + ### Options + + All options (see Options section in module doc) are accepted and merged with + the given req. """ @spec attach(req :: Req.Request.t(), opts :: Keyword.t()) :: Req.Request.t() def attach(req, opts \\ []) do @@ -642,139 +651,137 @@ defmodule Kubereq do @doc """ - Opens a websocket to the given Pod and streams logs from it. + Opens a websocket to the given container and streams logs from it. + + > #### Info {: .tip} + > + > This function blocks the process. It should be used to retrieve a finite + > set of logs from a container. If you want to follow logs, use + > `Kubereq.PodLogs` combined with the `:follow` options instead. ## Examples - iex> ref = make_ref() - ...> res = - ...> Req.new() - ...> |> Kubereq.attach(api_version: "v1", kind: "ConfigMap") - ...> |> Kubereq.log("default", "my-pod", {self(), ref}, - params: %{"follow" => true} + req = Req.new() |> Kubereq.attach() + {:ok, resp} = + Kubereq.logs(req, "default", "my-pod", + container: "main-container", + tailLines: 5 + ) + Enum.each(resp.body, &IO.inspect/1) + ## Options - * `:params` - Map defining the query parameteres added to the request to the - `pods/log` subresource. The `log` subresource supports the following - paremeters: - - * `container` - (optional) Specifies the container for which to return - logs. If omitted, returns logs for the first container in the pod. - * `follow` - (optional) If set to true, the request will stay open and - continue to return new log entries as they are generated. Default is - false. - * `previous` - (optional) If true, return logs from previous terminated - containers. Default is false. - * `sinceSeconds` - (optional) Returns logs newer than a relative duration in - seconds. Conflicts with sinceTime. - * `sinceTime` - (optional) Returns logs after a specific date (RFC3339 - format). Conflicts with sinceSeconds. - * `timestamps` - (optional) If true, add an RFC3339 timestamp at the - beginning of every line. Default is false. - * `tailLines` - (optional) Specifies the number of lines from the end of the - logs to show. If not specified, logs are shown from the creation of the - container or sinceSeconds/sinceTime. - * `limitBytes` - (optional) The maximum number of bytes to return from the - server. If not specified, no limit is imposed. - * `insecureSkipTLSVerifyBackend` - (optional) If true, bypasses certificate - verification for the kubelet's HTTPS endpoint. This is useful for clusters - with self-signed certificates. Default is false. - * `pretty` - (optional) If true, formats the output in a more readable - format. This is typically used for debugging and not recommended for - programmatic access. - * `prefix` - (optional) [Note: Availability may depend on Kubernetes - version] If true, adds the container name as a prefix to each line. Useful - when requesting logs for multiple containers. + * `:container` - The container for which to stream logs. Defaults to only + container if there is one container in the pod. Fails if not defined for + pods with multiple pods. + * `:follow` - Follow the log stream of the pod. If this is set to `true`, + the connection is kept alive which blocks current the process. If you need + this, you probably want to use `Kubereq.PodLogs` instead. Defaults to + `false`. + * `:insecureSkipTLSVerifyBackend` - insecureSkipTLSVerifyBackend indicates + that the apiserver should not confirm the validity of the serving + certificate of the backend it is connecting to. This will make the HTTPS + connection between the apiserver and the backend insecure. This means the + apiserver cannot verify the log data it is receiving came from the real + kubelet. If the kubelet is configured to verify the apiserver's TLS + credentials, it does not mean the connection to the real kubelet is + vulnerable to a man in the middle attack (e.g. an attacker could not + intercept the actual log data coming from the real kubelet). + * `:limitBytes` - If set, the number of bytes to read from the server before + terminating the log output. This may not display a complete final line of + logging, and may return slightly more or slightly less than the specified + limit. + * `:pretty` - If 'true', then the output is pretty printed. + * `:previous` - Return previous t erminated container logs. Defaults to + `false`. + * `:sinceSeconds` - A relative time in seconds before the current time from + which to show logs. If this value precedes the time a pod was started, + only logs since the pod start will be returned. If this value is in the + future, no logs will be returned. Only one of sinceSeconds or sinceTime + may be specified. + * `:tailLines` - If set, the number of lines from the end of the logs to + show. If not specified, logs are shown from the creation of the container + or sinceSeconds or sinceTime + * `:timestamps` - If true, add an RFC3339 or RFC3339Nano timestamp at the + beginning of every line of log output. Defaults to `false`. """ - @spec log( + @spec logs( Req.Request.t(), namespace :: namespace(), name :: String.t(), - stream_to :: {Process.dest(), reference()} | Process.dest(), opts :: Keyword.t() | nil ) :: response() - def log(req, namespace, name, stream_to, opts \\ []) do - options = - Keyword.merge(opts, + def logs(req, namespace, name, opts \\ []) do + opts = + opts + |> Keyword.merge( + namespace: namespace, + name: name, operation: :connect, - path_params: [namespace: namespace, name: name], - into: stream_to, - subresource: "log" + subresource: "log", + adapter: &Kubereq.Connect.run(&1) ) + |> Kubereq.Connect.args_to_opts() - Req.request(req, options) + Req.request(req, opts) end - @doc """ + @doc ~S""" + + Opens a websocket to the given Pod and executes a command on it. - Opens a websocket to the given Pod and executes a command on it. Can be used - to open a shell. + > #### Info {: .tip} + > + > This function blocks the process. It should be used to execute commands + > which terminate eventually. To implement a shell with a long running + > connection, use `Kubereq.PodExec` with `tty: true` instead. ## Examples + {:ok, resp} = + Kubereq.exec(req, "defaault", "my-pod", + container: "main-container", + command: "/bin/sh", + command: "-c", + command: "echo foobar", + stdout: true, + stderr: true + ) + Enum.each(resp.body, &IO.inspect/1) + # {:stdout, ""} + # {:stdout, "foobar\n"} + + ## Options + + * `:container` (optional) - The container to connect to. Defaults to only + container if there is one container in the pod. Fails if not defined for + pods with multiple pods. + * `:command` - Command is the remote command to execute. Not executed within a shell. + * `:stdin` (optional) - Redirect the standard input stream of the pod for this call. Defaults to `true`. + * `:stdin` (optional) - Redirect the standard output stream of the pod for this call. Defaults to `true`. + * `:stderr` (optional) - Redirect the standard error stream of the pod for this call. Defaults to `true`. + * `:tty` (optional) - If `true` indicates that a tty will be allocated for the exec call. Defaults to `false`. - iex> ref = make_ref() - ...> res = - ...> Req.new() - ...> |> Kubereq.attach(api_version: "v1", kind: "ConfigMap") - ...> |> Kubereq.exec("default", "my-pod", {self(), ref}, - params: %{ - "command" => "/bin/bash" - "tty" => true, - "stdin" => true, - "stdout" => true, - "stderr" => true, - } - - Messages are sent to the passed Process with the reference included: - - iex> receive(do: ({^ref, message} -> IO.inspect(message))) - - The `body` of the `Req.Response` is a struct. If `tty` is set to true and - `command` is a shell, you can pass to - `Kubereq.Websocket.Response.send_message/3` in order to send instructions - through the websocket to the shell. - - ...> res.body.() - - ## Options - - * `:params` - Map defining the query parameteres added to the request to the - `pods/exec` subresource. The `exec` subresource supports the following - paremeters: - - * `container` (optional) - Specifies the container in the pod to execute the - command. If omitted, the first container in the pod will be chosen. - * `command` (optional) - The command to execute inside the container. This - parameter can be specified multiple times to represent a command with - multiple arguments. If omitted, the container's default command will be - used. - * `stdin` (optional) - If true, pass stdin to the container. Default is - false. - * `stdout` (optional) - If true, return stdout from the container. Default - is false. - * `stderr` (optional) - If true, return stderr from the container. Default - is false. - * `tty` (optional) - If true, allocate a pseudo-TTY for the container. - Default is false. """ @spec exec( - Req.Request.t(), + req :: Req.Request.t(), namespace :: namespace(), name :: String.t(), - stream_to :: {Process.dest(), reference()} | Process.dest(), opts :: Keyword.t() | nil ) :: response() - def exec(req, namespace, name, stream_to, opts \\ []) do - options = - Keyword.merge(opts, + def exec(req, namespace, name, opts \\ []) do + opts = + opts + |> Keyword.merge( + namespace: namespace, + name: name, operation: :connect, - path_params: [namespace: namespace, name: name], - into: stream_to, - subresource: "exec" + subresource: "exec", + adapter: &Kubereq.Connect.run(&1) ) + |> Kubereq.Connect.args_to_opts() - Req.request(req, options) + Req.request(req, opts) end end diff --git a/lib/kubereq/application.ex b/lib/kubereq/application.ex index 3601df2..024b5b0 100644 --- a/lib/kubereq/application.ex +++ b/lib/kubereq/application.ex @@ -6,8 +6,7 @@ defmodule Kubereq.Application do @impl true def start(_start_type, _start_args) do children = [ - {Registry, keys: :unique, name: Kubereq.Exec}, - Kubereq.Websocket.Supervisor + {Registry, keys: :unique, name: Kubereq.Auth.Exec} ] opts = [strategy: :one_for_one, name: Kubereq.Supervisor] diff --git a/lib/kubereq/exec.ex b/lib/kubereq/auth/exec.ex similarity index 99% rename from lib/kubereq/exec.ex rename to lib/kubereq/auth/exec.ex index d8364dc..04c4fa9 100644 --- a/lib/kubereq/exec.ex +++ b/lib/kubereq/auth/exec.ex @@ -1,4 +1,4 @@ -defmodule Kubereq.Exec do +defmodule Kubereq.Auth.Exec do @moduledoc false alias Kubereq.Error.KubeconfError diff --git a/lib/kubereq/connect.ex b/lib/kubereq/connect.ex new file mode 100644 index 0000000..96307cb --- /dev/null +++ b/lib/kubereq/connect.ex @@ -0,0 +1,308 @@ +defmodule Kubereq.Connect do + @moduledoc false + + @stdout 0x01 + @stderr 0x02 + @err 0x03 + + defmacro __using__(opts) do + quote location: :keep do + @behaviour Fresh + + @doc false + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + restart: :transient + } + |> Supervisor.child_spec(unquote(Macro.escape(opts))) + end + + @doc false + def start_link(args) do + Kubereq.Connect.start_link(__MODULE__, args) + end + + @doc false + def start(args) do + Kubereq.Connect.start(__MODULE__, args) + end + + @doc false + def handle_connect(_status, _headers, state), do: {:ok, state} + + @doc false + def handle_control(_message, state), do: {:ok, state} + + @doc false + def handle_in(frame, state), do: Kubereq.Connect.handle_in(__MODULE__, frame, state) + + @doc false + def handle_in_stdout(_message, state), do: {:ok, state} + + @doc false + def handle_in_stderr(_message, state), do: {:ok, state} + + @doc false + def handle_in_error(_message, state), do: {:ok, state} + + @doc false + def handle_binary(_message, state), do: {:ok, state} + + @doc false + def handle_info(_message, state), do: {:ok, state} + + @doc false + def handle_error({error, _reason}, state) + when error in [:encoding_failed, :casting_failed], + do: {:ignore, state} + + def handle_error(_error, _state), do: :reconnect + + @doc false + def handle_disconnect(_code, _reason, _state), do: :close + + @doc false + def handle_terminate(_reason, _state), do: :ok + + defoverridable child_spec: 1, + start_link: 1, + start: 1, + handle_connect: 3, + handle_control: 2, + handle_in: 2, + handle_info: 2, + handle_error: 2, + handle_disconnect: 3, + handle_terminate: 2, + handle_in_stdout: 2, + handle_in_stderr: 2, + handle_in_error: 2 + end + end + + defdelegate close(dest, code, reason), to: Fresh + defdelegate open?(dest), to: Fresh + + def send_stdin(dest, data) do + Fresh.send(dest, {:text, <<0, data::binary>>}) + end + + @spec close(dest :: :gen_statem.server_ref()) :: :ok + def close(dest), do: Fresh.send(dest, {:close, 1000, ""}) + + def handle_in(module, frame, state) do + case map_frame(frame) do + {:stdout, msg} -> module.handle_in_stdout(msg, state) + {:stderr, msg} -> module.handle_in_stderr(msg, state) + {:error, msg} -> module.handle_in_error(msg, state) + end + end + + @doc false + def start_link(module, args) do + do_start(module, args, &Fresh.start_link/4) + end + + @doc false + def start(module, args) do + do_start(module, args, &Fresh.start/4) + end + + defp do_start(module, args, start_callback) do + req = Keyword.fetch!(args, :req) + state = Keyword.fetch!(args, :state) + opts = Keyword.fetch!(args, :opts) + + {:ok, resp} = + req + |> Req.merge( + kind: "Pod", + operation: :connect, + adapter: &run(&1, module, state, start_callback) + ) + |> Req.request(opts) + + {:ok, resp.body} + end + + defp run(req, module, state, start) do + {_, _, scheme} = ws_scheme(req.url.scheme) + + uri = %{req.url | scheme: scheme} + + headers = format_headres(req.headers) + opts = Keyword.merge(req.options.connect_options, headers: headers) + + {:ok, pid} = start.(uri, module, state, opts) + {req, Req.Response.new(status: 101, body: pid)} + end + + def run(req) do + uri = req.url + {http_scheme, ws_scheme, _} = ws_scheme(uri.scheme) + path = uri.path || "/" + + path = + case uri.query do + nil -> path + query -> path <> "?" <> query + end + + conn_opts = + req.options.connect_options + |> Keyword.put(:mode, :passive) + |> Keyword.put_new(:protocols, [:http1]) + + headers = format_headres(req.headers) + + with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port, conn_opts), + {:ok, conn} <- Mint.HTTP.set_mode(conn, :passive), + {:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, headers), + {:ok, conn, upgrade_response} <- receive_upgrade_response(conn, ref), + {:ok, conn, websocket} <- + Mint.WebSocket.new( + conn, + ref, + upgrade_response.status, + upgrade_response.headers + ) do + stream = create_stream(conn, ref, websocket) + {req, Req.Response.new(status: 101, body: stream)} + else + {:error, error} -> + {req, error} + + {:error, _, error} -> + {req, error} + end + end + + defp format_headres(headers) do + for {name, values} <- headers, value <- values, do: {name, value} + end + + defp create_stream(conn, ref, websocket) do + Stream.resource( + fn -> {[], conn, ref, websocket} end, + fn + {[{:close, _, _} | _], conn, ref, websocket} -> + {:halt, {conn, ref, websocket}} + + {[frame | rest], conn, ref, websocket} -> + {[map_frame(frame)], {rest, conn, ref, websocket}} + + {[], conn, ref, websocket} -> + with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.recv(conn, 0, :infinity), + {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do + {[], {frames, conn, ref, websocket}} + else + {:error, _conn, _error} -> + {:halt, :ok} + + {:ok, conn, _other} -> + {[], {[], conn, ref, websocket}} + end + end, + fn _ -> :ok end + ) + end + + defp map_frame(frame) do + case frame do + {:binary, <<@stdout, msg::binary>>} -> {:stdout, msg} + {:binary, <<@stderr, msg::binary>>} -> {:stderr, msg} + {:binary, <<@err, msg::binary>>} -> {:error, msg} + {:binary, msg} -> {:stdout, msg} + other -> other + end + end + + @spec ws_scheme(binary()) :: {:http, :ws, binary()} | {:https, :wss, binary()} + defp ws_scheme("http"), do: {:http, :ws, "ws"} + defp ws_scheme("https"), do: {:https, :wss, "wss"} + + defp receive_upgrade_response(conn, ref) do + Enum.reduce_while(Stream.cycle([:ok]), {conn, %{}}, fn _, {conn, response} -> + case Mint.HTTP.recv(conn, 0, 10_000) do + {:ok, conn, parts} -> + response = + parts + |> Map.new(fn + {type, ^ref} -> {type, true} + {type, ^ref, value} -> {type, value} + end) + |> Map.merge(response) + + # credo:disable-for-lines:3 + if response[:done], + do: {:halt, {:ok, conn, response}}, + else: {:cont, {conn, response}} + + {:error, conn, error, _} -> + {:halt, {:error, conn, error}} + end + end) + end + + @path_params [:name, :namespace] + + @doc """ + Turns arguments passed to a connect function to `Req` options to be passed to + `Req.request()` + """ + @spec args_to_opts(args :: Keyword.t()) :: Keyword.t() + def args_to_opts(args) do + case Keyword.fetch!(args, :subresource) do + "log" -> log_args_to_opts(args) + "exec" -> exec_args_to_opts(args) + end + end + + @log_params [ + :container, + :follow, + :insecureSkipTLSVerifyBackend, + :limitBytes, + :pretty, + :previous, + :sinceSeconds, + :tailLines, + :timestamps + ] + defp log_args_to_opts(args) do + {params, args} = Keyword.split(args, @log_params) + {path_params, args} = Keyword.split(args, @path_params) + + params = Keyword.put_new(params, :follow, true) + Keyword.merge(args, params: params, path_params: path_params) + end + + @exec_params [:container, :command, :stdin, :stdout, :stderr, :tty] + defp exec_args_to_opts(args) do + {params, args} = Keyword.split(args, @exec_params) + {path_params, args} = Keyword.split(args, @path_params) + + params = + params + |> Keyword.get_values(:command) + |> format_commands() + |> Keyword.merge(stdin: true, stdout: true, stderr: true) + |> Keyword.merge(Keyword.delete(params, :command)) + + Keyword.merge(args, params: params, path_params: path_params) + end + + defp format_commands([command]) when is_binary(command) do + [command: command] + end + + defp format_commands([commands]) when is_list(commands) do + format_commands(commands) + end + + defp format_commands(commands) when is_list(commands) do + Enum.map(commands, &{:command, &1}) + end +end diff --git a/lib/kubereq/kubeconfig/stub.ex b/lib/kubereq/kubeconfig/stub.ex index 3f622dc..07cc28d 100644 --- a/lib/kubereq/kubeconfig/stub.ex +++ b/lib/kubereq/kubeconfig/stub.ex @@ -97,7 +97,7 @@ defmodule Kubereq.Kubeconfig.Stub do raise ArgumentError, "You have to pass the :plugs option to use this step." end - Keyword.validate!(opts, [:plugs]) + opts end @impl true diff --git a/lib/kubereq/pod_exec.ex b/lib/kubereq/pod_exec.ex new file mode 100644 index 0000000..c01c87c --- /dev/null +++ b/lib/kubereq/pod_exec.ex @@ -0,0 +1,127 @@ +defmodule Kubereq.PodExec do + @moduledoc ~S""" + Establish a connection to a Pod and execute a command in a container. + + The connection is kept alive until the websocket is closed by the counterpart. + The bytes received from the container are sent to the process passed via the + `:into` option. Bytes sent to this process via `send_stdin/2` are forwarded to + the container. + + ### Examples + + When the command terminates, the websocket is automatically closed and the + process terminates. + + req = Req.new() |> Kubereq.attach() + Kubereq.PodExec.start_link( + req: req, + namespace: "default", + name: "my-pod", + container: "main", + into: self(), + command: ["/bin/sh", "-c", "echo foo"], + stdin: true, + stdout: true, + stderr: true + tty: false, + ) + # Messages in inbox: {:stdout, "foo\n"}, {:close, 1000, ""} + + Passing the path to a shell as command will keep the socket open. Together + with `:stdin`, `:stdout`, `:stderr` and `:tty`, this can be used to implement + an interactive shell: + + req = Req.new() |> Kubereq.attach() + {:ok, dest} = Kubereq.PodExec.start_link( + req: req, + namespace: "default", + name: "my-pod", + container: "main", + into: self(), + command: ["/bin/sh"], + stdin: true, + stdout: true, + stderr: true + tty: false, + ) + # Message in inbox: {:stdout, "sh-5.2# "} + + Kubereq.PodExec.send_stdin(dest, "echo foo") + # Message in inbox: {:stdout, "echo foo\r\nfoo\r\nsh-5.2# "} + + ### Arguments + + * `:req` - A `Req.Request` struct with Kubereq attached. + * `:namespace` - The namespace the Pod runs in + * `:name` - The name of the Pod + * `:container` (optional) - The container for which to stream logs. Defaults to only + container if there is one container in the pod. Fails if not defined for + pods with multiple pods. + * `:into` - Destination for messages received from the pod. Can be a `pid` or + a `{pid, ref}` tuple. + * `:command` - Command is the remote command to execute. Not executed within a shell. + * `:stdin` (optional) - Redirect the standard input stream of the pod for this call. Defaults to `true`. + * `:stdin` (optional) - Redirect the standard output stream of the pod for this call. Defaults to `true`. + * `:stderr` (optional) - Redirect the standard error stream of the pod for this call. Defaults to `true`. + * `:tty` (optional) - If `true` indicates that a tty will be allocated for the exec call. Defaults to `false`. + * `:opts` (optional) - Additional options passed to `Req` + """ + use Kubereq.Connect + + def start_link(args) do + {into, args} = Keyword.pop!(args, :into) + {req, args} = Keyword.pop!(args, :req) + + opts = + args + |> Keyword.put(:subresource, "exec") + |> Kubereq.Connect.args_to_opts() + + Kubereq.Connect.start_link(__MODULE__, req: req, state: %{into: into}, opts: opts) + end + + defdelegate open?(dest), to: Fresh + defdelegate close(dest, code, reason), to: Fresh + + @doc """ + Send the given `data` to the container. + """ + @spec send_stdin(dest :: :gen_statem.server_ref(), data :: binary()) :: :ok + def send_stdin(dest, data) do + Fresh.send(dest, {:text, <<0, data::binary>>}) + end + + @doc """ + Close the connection and terminate the process. + """ + @spec close(dest :: :gen_statem.server_ref()) :: :ok + def close(dest), do: Fresh.send(dest, {:close, 1000, ""}) + + def handle_connect(_status, _headers, state) do + send_frame(state.into, :connected) + {:ok, state} + end + + def handle_disconnect(code, reason, state) do + send_frame(state.into, {:close, code, reason}) + :close + end + + def handle_in_stdout(frame, state) do + send_frame(state.into, {:stdout, frame}) + {:ok, state} + end + + def handle_in_stderr(frame, state) do + send_frame(state.into, {:stderr, frame}) + {:ok, state} + end + + def handle_in_error(frame, state) do + send_frame(state.into, {:error, frame}) + {:ok, state} + end + + defp send_frame({dest, ref}, frame), do: send(dest, {ref, frame}) + defp send_frame(dest, frame), do: send(dest, frame) +end diff --git a/lib/kubereq/pod_logs.ex b/lib/kubereq/pod_logs.ex new file mode 100644 index 0000000..500d479 --- /dev/null +++ b/lib/kubereq/pod_logs.ex @@ -0,0 +1,106 @@ +defmodule Kubereq.PodLogs do + @moduledoc ~S""" + Establish a connection to a Pod and stream logs. + + The connection is kept alive until the websocket is closed via `close/1`. + The bytes received from the container are sent to the process passed via the + `:into` option. + + ### Examples + + req = Req.new() |> Kubereq.attach() + Kubereq.PodLogs.start_link( + req: req, + into: self(), + namespace: "default", + name: "my-pod", + container: "main-container", + ) + # Messages in inbox: {:stdout, "log entries"}, {:stdout, "more log entries"} + + ### Arguments + + * `:req` - A `Req.Request` struct with Kubereq attached. + * `:namespace` - The namespace the Pod runs in + * `:name` - The name of the Pod + * `:container` - The container for which to stream logs. Defaults to only + container if there is one container in the pod. Fails if not defined for + pods with multiple pods. + * `:into` - Destination for messages received from the pod. Can be a `pid` or + a `{pid, ref}` tuple. + * `:follow` - Follow the log stream of the pod. If this is set to `true`, + the connection is kept alive which blocks current the process. Defaults to + `true`. + * `:insecureSkipTLSVerifyBackend` - insecureSkipTLSVerifyBackend indicates + that the apiserver should not confirm the validity of the serving + certificate of the backend it is connecting to. This will make the HTTPS + connection between the apiserver and the backend insecure. This means the + apiserver cannot verify the log data it is receiving came from the real + kubelet. If the kubelet is configured to verify the apiserver's TLS + credentials, it does not mean the connection to the real kubelet is + vulnerable to a man in the middle attack (e.g. an attacker could not + intercept the actual log data coming from the real kubelet). + * `:limitBytes` - If set, the number of bytes to read from the server before + terminating the log output. This may not display a complete final line of + logging, and may return slightly more or slightly less than the specified + limit. + * `:pretty` - If 'true', then the output is pretty printed. + * `:previous` - Return previous terminated container logs. Defaults to `false`. + * `:sinceSeconds` - A relative time in seconds before the current time from + which to show logs. If this value precedes the time a pod was started, + only logs since the pod start will be returned. If this value is in the + future, no logs will be returned. Only one of sinceSeconds or sinceTime + may be specified. + * `:tailLines` - If set, the number of lines from the end of the logs to + show. If not specified, logs are shown from the creation of the container + or sinceSeconds or sinceTime + * `:timestamps` - If true, add an RFC3339 or RFC3339Nano timestamp at the + beginning of every line of log output. Defaults to `false`. + * `:opts` (optional) - Additional options passed to `Req` + """ + use Kubereq.Connect + + def start_link(args) do + {into, args} = Keyword.pop!(args, :into) + {req, args} = Keyword.pop!(args, :req) + + opts = + args + |> Keyword.put(:subresource, "log") + |> Kubereq.Connect.args_to_opts() + + Kubereq.Connect.start_link(__MODULE__, req: req, state: %{into: into}, opts: opts) + end + + defdelegate open?(dest), to: Fresh + defdelegate close(dest, code, reason), to: Fresh + + @doc """ + Close the connection and terminate the process. + """ + @spec close(dest :: :gen_statem.server_ref()) :: :ok + def close(dest), do: Fresh.send(dest, {:close, 1000, ""}) + + def handle_disconnect(code, reason, state) do + send_frame(state.into, {:close, code, reason}) + :close + end + + def handle_in_stdout(frame, state) do + send_frame(state.into, {:stdout, frame}) + {:ok, state} + end + + def handle_in_stderr(frame, state) do + send_frame(state.into, {:stderr, frame}) + {:ok, state} + end + + def handle_in_error(frame, state) do + send_frame(state.into, {:error, frame}) + {:ok, state} + end + + defp send_frame({dest, ref}, frame), do: send(dest, {ref, frame}) + defp send_frame(dest, frame), do: send(dest, frame) +end diff --git a/lib/kubereq/step/auth.ex b/lib/kubereq/step/auth.ex index 676fae0..7d6510d 100644 --- a/lib/kubereq/step/auth.ex +++ b/lib/kubereq/step/auth.ex @@ -1,8 +1,8 @@ defmodule Kubereq.Step.Auth do @moduledoc false + alias Kubereq.Auth.Exec alias Kubereq.Error.StepError - alias Kubereq.Exec @spec call(req :: Req.Request.t()) :: Req.Request.t() | {Req.Request.t(), StepError.t()} def call(req), do: auth(req, req.options.kubeconfig.current_user) diff --git a/lib/kubereq/step/operation.ex b/lib/kubereq/step/operation.ex index cf273d0..5d9b1bf 100644 --- a/lib/kubereq/step/operation.ex +++ b/lib/kubereq/step/operation.ex @@ -2,7 +2,6 @@ defmodule Kubereq.Step.Operation do @moduledoc false alias Kubereq.Error.StepError - alias Kubereq.Websocket.Adapter @spec call(req :: Req.Request.t()) :: Req.Request.t() | {Req.Request.t(), StepError.t()} def call(req) when not is_map_key(req.options, :operation) or is_nil(req.options.operation) do @@ -103,9 +102,7 @@ defmodule Kubereq.Step.Operation do defp operation(:connect, request_path, subresource) do [ - url: "#{request_path}/#{subresource}", - method: :post, - adapter: &Adapter.run/1 + url: "#{request_path}/#{subresource}" ] end end diff --git a/lib/kubereq/websocket/adapter.ex b/lib/kubereq/websocket/adapter.ex deleted file mode 100644 index dca0cc3..0000000 --- a/lib/kubereq/websocket/adapter.ex +++ /dev/null @@ -1,314 +0,0 @@ -defmodule Kubereq.Websocket.Adapter do - @moduledoc false - - use GenServer, restart: :transient - - alias Kubereq.Websocket.Response - - require Mint.HTTP - - @typep start_args :: - {URI.t(), list(), {Process.dest(), reference()} | Process.dest(), - conn_opts :: Keyword.t(), registry_key :: reference()} - - @type incoming_frame() :: {:binary, binary} | {:close, any, any} - @type incoming_message() :: - {:close, integer(), binary()} - | {:error, binary} - | {:stderr, binary} - | {:stdout, binary} - | {:binary, binary} - - @type outgoing_frame() :: - {:close, integer(), binary()} - | :close - | {:binary, binary()} - | {:text, binary()} - @type outgoing_message() :: - {:binary, binary()} - | :close - | {:close, any, any} - | :exit - | {:stdin, binary()} - - defstruct [:mint, :websocket, :ref, :into] - - @spec run(Req.Request.t()) :: - {Req.Request.t(), Req.Response.t()} | {Req.Request.t(), Mint.WebSocket.error()} - def run(%{into: _stream_to} = req) do - conn_opts = - req.options.connect_options - |> Keyword.put(:mode, :passive) - |> Keyword.put_new(:protocols, [:http1]) - - headers = - for {name, values} <- req.headers, - value <- values do - {name, value} - end - - registry_key = make_ref() - - start_child_resp = - DynamicSupervisor.start_child( - __MODULE__, - {__MODULE__, {req.url, headers, req.into, conn_opts, registry_key}} - ) - - case start_child_resp do - {:ok, _pid} -> - resp = - Req.Response.new( - status: 101, - headers: [], - trailers: [], - body: %Response{registry_key: registry_key} - ) - - {req, resp} - - {:error, error} when is_exception(error) -> - {req, error} - - other -> - {req, - %RuntimeError{ - message: "Failed to start the Websocket. start_child() returned #{inspect(other)}." - }} - end - end - - def run(req) do - {req, - %ArgumentError{ - message: - ":connect operation requires setting the `:into` operation on the req request to a `{pid, ref}` tuple." - }} - end - - @spec start_link(start_args()) :: GenServer.on_start() - def start_link(args), do: GenServer.start_link(__MODULE__, args) - - @impl GenServer - def init({url, headers, into, conn_opts, registry_key}) do - Registry.register(Kubereq.Websocket.Adapter.Registry, registry_key, []) - - with {:ok, mint} <- - Mint.HTTP.connect( - http_scheme(url.scheme), - url.host, - url.port, - conn_opts - ), - {:ok, mint, ref} <- - Mint.WebSocket.upgrade( - ws_scheme(url.scheme), - mint, - "#{url.path}?#{url.query}", - headers - ), - {:ok, mint, upgrade_response} <- receive_upgrade_response(mint, ref), - {:ok, mint, websocket} <- - Mint.WebSocket.new( - mint, - ref, - upgrade_response.status, - upgrade_response.headers - ) do - {:ok, nil, {:continue, {:connect, mint, websocket, ref, into}}} - else - {:error, error} -> - {:stop, error} - - {:error, _mint, error} -> - {:stop, error} - end - end - - @impl GenServer - def handle_continue({:connect, mint, websocket, ref, into}, _state) do - case Mint.HTTP.set_mode(mint, :active) do - {:ok, mint} -> - # Mint.HTTP.controlling_process causes a side-effect, but it doesn't actually - # change the conn, so we can ignore the value returned above. - pid = - case into do - {pid, _} -> pid - pid -> pid - end - - Process.flag(:trap_exit, true) - Process.monitor(pid) - {:noreply, struct(__MODULE__, mint: mint, websocket: websocket, ref: ref, into: into)} - - {:error, error} -> - {:stop, error} - end - end - - def handle_continue({:send, frame}, state) do - case send_frame(frame, state.mint, state.websocket, state.ref) do - {:ok, mint, websocket} -> - {:noreply, %{state | mint: mint, websocket: websocket}} - - {:error, mint, websocket, error} -> - {:stop, error, %{state | mint: mint, websocket: websocket}} - - :closed -> - {:stop, {:shutdown, :closed}, state} - end - end - - @impl GenServer - def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do - {:stop, :normal, state} - end - - def handle_info(message, state) when Mint.HTTP.is_connection_message(state.mint, message) do - case receive_frames(message, state.mint, state.websocket, state.ref) do - {:ok, mint, websocket, frames} -> - new_state = %{state | mint: mint, websocket: websocket} - - Enum.reduce_while(frames, {:noreply, new_state}, fn - {:binary, message}, acc -> - message - |> map_incoming_message() - |> stream_to(state.into) - - {:cont, acc} - - {:close, 1_000, ""} = message, _acc -> - stream_to(message, state.into) - {:halt, {:stop, :normal, new_state}} - - {:close, code, reason} = message, _acc -> - stream_to(message, state.into) - {:halt, {:stop, {:remote_closed, code, reason}, new_state}} - end) - - {:error, mint, websocket, error, frames} -> - Enum.each(frames, fn - {:binary, message} -> - message - |> map_incoming_message() - |> stream_to(state.into) - - other -> - stream_to(other, state.into) - end) - - {:stop, error, %{state | mint: mint, websocket: websocket}} - end - end - - @impl GenServer - def handle_call({:send, message}, _from, state) do - case map_outgoing_frame(message) do - {:ok, frame} -> {:reply, :ok, state, {:continue, {:send, frame}}} - {:error, error} -> {:reply, {:error, error}, state} - end - end - - @impl GenServer - def terminate(_reason, state) do - if not is_nil(state.mint) and Mint.HTTP.open?(state.mint) do - {:ok, _websocket, data} = Mint.WebSocket.encode(state.websocket, :close) - Mint.WebSocket.stream_request_body(state.mint, state.ref, data) - end - end - - @spec send_frame(outgoing_frame(), Mint.HTTP.t(), Mint.WebSocket.t(), Mint.Types.request_ref()) :: - {:ok, Mint.HTTP.t(), Mint.WebSocket.t()} - | :closed - | {:error, Mint.HTTP.t(), Mint.WebSocket.t(), Mint.WebSocket.error()} - defp send_frame(frame, mint, websocket, ref) do - with true <- Mint.HTTP.open?(mint), - {:ok, websocket, data} <- Mint.WebSocket.encode(websocket, frame), - {:ok, mint} <- Mint.WebSocket.stream_request_body(mint, ref, data) do - {:ok, mint, websocket} - else - false -> - :closed - - {:error, mint_or_websocket, error} -> - if is_struct(mint_or_websocket, Mint.WebSocket) do - {:error, mint, mint_or_websocket, error} - else - {:error, mint_or_websocket, websocket, error} - end - end - end - - @spec receive_frames(term(), Mint.HTTP.t(), Mint.WebSocket.t(), Mint.Types.request_ref()) :: - {:ok, Mint.HTTP.t(), Mint.WebSocket.t(), [Mint.Types.response()]} - | {:error, Mint.HTTP.t(), Mint.WebSocket.t(), Mint.WebSocket.error(), - [Mint.Types.response()]} - defp receive_frames(message, mint, websocket, ref) do - with {:ok, mint, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(mint, message), - {:ok, websocket, frames} <- Mint.WebSocket.decode(websocket, data) do - {:ok, mint, websocket, frames} - else - {:error, websocket, error, frames} -> {:error, mint, websocket, error, frames} - {:error, mint, error} -> {:error, mint, websocket, error, []} - end - end - - @spec map_incoming_message(binary()) :: incoming_message() - def map_incoming_message(<<1, message::binary>>), do: {:stdout, message} - def map_incoming_message(<<2, message::binary>>), do: {:stderr, message} - def map_incoming_message(<<3, message::binary>>), do: {:error, message} - def map_incoming_message(binary), do: {:binary, binary} - - @spec map_outgoing_frame(outgoing_message()) :: - {:ok, outgoing_frame()} | {:error, Exception.t()} - def map_outgoing_frame({:binary, data}), do: {:ok, {:binary, data}} - def map_outgoing_frame(:close), do: {:ok, :close} - def map_outgoing_frame({:close, code, reason}), do: {:ok, {:close, code, reason}} - def map_outgoing_frame(:exit), do: {:ok, :close} - def map_outgoing_frame({:stdin, data}), do: {:ok, {:text, <<0>> <> data}} - - def map_outgoing_frame(data) do - {:error, - %ArgumentError{ - message: "The given message #{inspect(data)} is not supported to be sent to the Pod." - }} - end - - @spec http_scheme(binary()) :: atom() - defp http_scheme("http"), do: :http - defp http_scheme("https"), do: :https - - @spec ws_scheme(binary()) :: atom() - defp ws_scheme("http"), do: :ws - defp ws_scheme("https"), do: :wss - - @spec stream_to(incoming_message(), {Process.dest(), reference()} | Process.dest()) :: - incoming_message() - defp stream_to(message, {dest, ref}), do: send(dest, {ref, message}) - defp stream_to(message, dest), do: send(dest, message) - - @spec receive_upgrade_response(Mint.HTTP.t(), Mint.Types.request_ref()) :: - {:ok, Mint.HTTP.t(), map()} | {:error, Mint.HTTP.t(), Mint.WebSocket.error()} - defp receive_upgrade_response(mint, ref) do - Enum.reduce_while(Stream.cycle([:ok]), {mint, %{}}, fn _, {mint, response} -> - case Mint.HTTP.recv(mint, 0, 10_000) do - {:ok, mint, parts} -> - response = - parts - |> Map.new(fn - {type, ^ref} -> {type, true} - {type, ^ref, value} -> {type, value} - end) - |> Map.merge(response) - - # credo:disable-for-lines:3 - if response[:done], - do: {:halt, {:ok, mint, response}}, - else: {:cont, {mint, response}} - - {:error, mint, error, _} -> - {:halt, {:error, mint, error}} - end - end) - end -end diff --git a/lib/kubereq/websocket/response.ex b/lib/kubereq/websocket/response.ex deleted file mode 100644 index 31125be..0000000 --- a/lib/kubereq/websocket/response.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Kubereq.Websocket.Response do - @moduledoc """ - Represents a response of a websocket request. - """ - alias Kubereq.Websocket.Adapter - - @type t :: %__MODULE__{registry_key: reference()} - - defstruct [:registry_key] - - @spec send_message( - response :: t(), - message :: Adapter.outgoing_message(), - timeout :: non_neg_integer() - ) :: :ok | {:error, term()} - def send_message(_response, message, timeout \\ 5_000) - - def send_message(_response, message, timeout) when timeout <= 0 do - {:error, %RuntimeError{message: "Could not send message #{inspect(message)} to websocket."}} - end - - def send_message(response, message, timeout) do - with [{server, _}] <- - Registry.lookup(Kubereq.Websocket.Adapter.Registry, response.registry_key), - true <- Process.alive?(server) do - GenServer.call(server, {:send, message}) - else - _ -> - Process.sleep(100) - send_message(response, message, timeout - 100) - end - end -end diff --git a/lib/kubereq/websocket/supervisor.ex b/lib/kubereq/websocket/supervisor.ex deleted file mode 100644 index a5772b5..0000000 --- a/lib/kubereq/websocket/supervisor.ex +++ /dev/null @@ -1,19 +0,0 @@ -defmodule Kubereq.Websocket.Supervisor do - @moduledoc false - - use Supervisor - - def start_link(init_arg) do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) - end - - @impl Supervisor - def init(_init_arg) do - children = [ - {DynamicSupervisor, name: Kubereq.Websocket.Adapter, strategy: :one_for_one}, - {Registry, name: Kubereq.Websocket.Adapter.Registry, keys: :unique} - ] - - Supervisor.init(children, strategy: :rest_for_one) - end -end diff --git a/mix.exs b/mix.exs index 5c2c1e6..4af3714 100644 --- a/mix.exs +++ b/mix.exs @@ -40,10 +40,9 @@ defmodule Kubereq.MixProject do {:pluggable, "~> 1.0"}, {:req, "~> 0.5.0"}, {:yaml_elixir, "~> 2.0"}, - - # Optional deps {:mint, "~> 1.0"}, {:mint_web_socket, "~> 1.0"}, + {:fresh, "~> 0.4.4"}, # Test deps {:excoveralls, "~> 0.18", only: :test}, @@ -67,6 +66,10 @@ defmodule Kubereq.MixProject do "CHANGELOG.md" ], groups_for_modules: [ + "Websocket Connection": [ + Kubereq.PodExec, + Kubereq.PodLogs + ], "Kubeconfig Loading": [ Kubereq.Kubeconfig, Kubereq.Kubeconfig.Default, diff --git a/mix.lock b/mix.lock index 95024b5..8172cda 100644 --- a/mix.lock +++ b/mix.lock @@ -9,6 +9,7 @@ "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, + "fresh": {:hex, :fresh, "0.4.4", "9d67a1d97112e70f4dfabd63b40e4b182ef64dfa84a2d9ee175eb4e34591e9f7", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: false]}, {:mint_web_socket, "~> 1.0", [hex: :mint_web_socket, repo: "hexpm", optional: false]}], "hexpm", "ba21d3fa0aa77bf18ca397e4c851de7432bb3f9c170a1645a16e09e4bba54315"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, diff --git a/test/kubereq_integration_test.exs b/test/kubereq_integration_test.exs index 6690603..ddca065 100644 --- a/test/kubereq_integration_test.exs +++ b/test/kubereq_integration_test.exs @@ -1,12 +1,10 @@ defmodule KubereqIntegrationTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false @moduletag :integration import YamlElixir.Sigil - alias Kubereq.Websocket.Response - @cluster_name "kubereq" @kubeconfig_path "test/support/kubeconfig-integration.yaml" @namespace "integrationtest" @@ -266,6 +264,46 @@ defmodule KubereqIntegrationTest do %{"type" => "DELETED", "object" => %{"metadata" => %{"name" => ^cm_name}}}} end + test "receives pod logs synchronously from pod container", %{req_pod: req} do + pod_name = "example-pod-#{:rand.uniform(10_000)}" + log_stmt = "foo bar" + + pod = ~y""" + apiVersion: v1 + kind: Pod + metadata: + namespace: #{@namespace} + name: #{pod_name} + labels: + app: kubereq + spec: + containers: + - name: main + image: busybox + command: + - /bin/sh + - "-c" + - 'echo "#{log_stmt}"' + - "sleep infinity" + """ + + Kubereq.apply(req, pod) + + :ok = + Kubereq.wait_until(req, @namespace, pod_name, &(&1["status"]["phase"] == "Running"), + timeout: :timer.minutes(2) + ) + + {:ok, resp} = Kubereq.logs(req, @namespace, pod_name) + + stdout = + resp.body + |> Enum.reduce("", fn {:stdout, out}, acc -> [acc, out] end) + |> IO.iodata_to_binary() + + assert stdout =~ "#{log_stmt}\n" + end + test "streams pod logs to process", %{req_pod: req} do pod_name = "example-pod-#{:rand.uniform(10_000)}" log_stmt = "foo bar" @@ -297,9 +335,71 @@ defmodule KubereqIntegrationTest do ) ref = make_ref() - Kubereq.log(req, @namespace, pod_name, {self(), ref}, params: %{"follow" => false}) - assert_receive({^ref, {:binary, ^log_stmt <> "\n"}}) - assert_receive({^ref, {:close, 1_000, ""}}) + + {:ok, _pid} = + Kubereq.PodLogs.start_link( + req: req, + namespace: @namespace, + name: pod_name, + into: {self(), ref} + ) + + logs = + Stream.repeatedly(fn -> :ok end) + |> Enum.reduce_while("", fn _, acc -> + receive do + {^ref, :stdout, text} -> {:cont, [acc, text]} + {:close, 1_000, ""} -> {:halt, acc} + _ -> {:cont, acc} + after + 500 -> {:halt, acc} + end + end) + + assert logs == "" + end + + test "sends exec commands to pod and returns stdout", %{req_pod: req} do + pod_name = "example-pod-#{:rand.uniform(10_000)}" + + pod = ~y""" + apiVersion: v1 + kind: Pod + metadata: + namespace: #{@namespace} + name: #{pod_name} + labels: + app: kubereq + spec: + containers: + - name: main + image: busybox + command: + - /bin/sh + - "-c" + - "sleep infinity" + """ + + Kubereq.apply(req, pod) + + :ok = + Kubereq.wait_until(req, @namespace, pod_name, &(&1["status"]["phase"] == "Running"), + timeout: :timer.minutes(2) + ) + + {:ok, resp} = + Kubereq.exec(req, @namespace, pod_name, + command: "echo", + command: "foo", + stdout: true + ) + + stdout = + resp.body + |> Enum.reduce("", fn {:stdout, out}, acc -> [acc, out] end) + |> IO.iodata_to_binary() + + assert stdout == "foo\n" end test "streams exec commands to pod and prompts back to process", %{req_pod: req} do @@ -332,27 +432,29 @@ defmodule KubereqIntegrationTest do ref = make_ref() - {:ok, resp} = - Kubereq.exec( - req, - @namespace, - pod_name, - {self(), ref}, - params: %{"tty" => true, "stdout" => true, "stdin" => true, "command" => "/bin/sh"} + {:ok, pid} = + Kubereq.PodExec.start_link( + req: req, + namespace: @namespace, + name: pod_name, + into: {self(), ref}, + tty: true, + command: "/bin/sh" ) - Response.send_message(resp.body, {:stdin, ~s(echo "foo bar"\n)}) + assert_receive {^ref, :connected} + Kubereq.PodExec.send_stdin(pid, ~s(echo "foo bar"\n)) - result = receive_loop(ref, resp.body, "") |> IO.iodata_to_binary() + result = receive_loop(ref, pid, "") |> IO.iodata_to_binary() assert result == ~s(/ # echo "foo bar"\r\nfoo bar\r\n/ # ) end - defp receive_loop(ref, websocket_response, acc) do + defp receive_loop(ref, dest, acc) do receive do - {^ref, {:stdout, data}} -> receive_loop(ref, websocket_response, [acc, data]) + {^ref, {:stdout, data}} -> receive_loop(ref, dest, [acc, data]) after 500 -> - Response.send_message(websocket_response, :close) + Kubereq.PodExec.close(dest) acc end end From 9d9429ac21d3334a71c710636aa369ca0323ba5b Mon Sep 17 00:00:00 2001 From: Michael Ruoss Date: Fri, 22 Nov 2024 17:10:59 +0100 Subject: [PATCH 5/5] add changelog --- CHANGELOG.md | 6 ++++++ README.md | 2 +- mix.exs | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f56736..20462cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.3.2 - 2024-11-22 + +### Added + +- Support for websocket connections to `pods/log` and `pods/exec` subresources [#37](https://github.com/mruoss/kubereq/pull/37) + ## 0.3.1 - 2024-10-24 ### Fixed diff --git a/README.md b/README.md index b7559ca..ab1941b 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ The package can be installed by adding `kubereq` to your list of dependencies in ```elixir def deps do [ - {:kubereq, "~> 0.3.0"} + {:kubereq, "~> 0.3.2"} ] end ``` diff --git a/mix.exs b/mix.exs index 4af3714..6b76f36 100644 --- a/mix.exs +++ b/mix.exs @@ -3,7 +3,7 @@ defmodule Kubereq.MixProject do @app :kubereq @source_url "https://github.com/mruoss/#{@app}" - @version "0.3.1" + @version "0.3.2" def project do [