Skip to content

Commit

Permalink
Replace Req with :httpc
Browse files Browse the repository at this point in the history
  • Loading branch information
mruoss committed Jul 7, 2024
1 parent b509a7b commit 860000e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 41 deletions.
14 changes: 7 additions & 7 deletions lib/flame_k8s_backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ defmodule FLAMEK8sBackend do
boot_timeout: nil,
remote_terminator_pid: nil,
log: false,
req: nil
http: nil

@valid_opts ~w(app_container_name runner_pod_tpl terminator_sup log)a
@required_config ~w()a
Expand Down Expand Up @@ -192,13 +192,13 @@ defmodule FLAMEK8sBackend do

parent_ref = make_ref()

req = K8sClient.connect()
http = K8sClient.connect()

case K8sClient.get_pod(req, System.get_env("POD_NAMESPACE"), System.get_env("POD_NAME")) do
case K8sClient.get_pod(http, System.get_env("POD_NAMESPACE"), System.get_env("POD_NAME")) do
{:ok, base_pod} ->
new_state =
struct(state,
req: req,
http: http,
parent_ref: parent_ref,
runner_pod_manifest:
RunnerPodTemplate.manifest(
Expand Down Expand Up @@ -237,10 +237,10 @@ defmodule FLAMEK8sBackend do
@impl true
def system_shutdown() do
# This is not very nice but I don't have the opts on the runner
req = K8sClient.connect()
http = K8sClient.connect()
namespace = System.get_env("POD_NAMESPACE")
name = System.get_env("POD_NAME")
K8sClient.delete_pod!(req, namespace, name)
K8sClient.delete_pod!(http, namespace, name)
System.stop()
end

Expand All @@ -251,7 +251,7 @@ defmodule FLAMEK8sBackend do
{new_state, req_connect_time} =
with_elapsed_ms(fn ->
created_pod =
K8sClient.create_pod!(state.req, state.runner_pod_manifest, state.boot_timeout)
K8sClient.create_pod!(state.http, state.runner_pod_manifest, state.boot_timeout)

case created_pod do
{:ok, pod} ->
Expand Down
60 changes: 60 additions & 0 deletions lib/flame_k8s_backend/http.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule FlameK8sBackend.HTTP do
@moduledoc false

defstruct [:base_url, :token, :cacertfile]

@type t :: %__MODULE__{
base_url: String.t(),
token: String.t(),
cacertfile: String.t()
}

@spec new(fields :: Keyword.t()) :: t()
def new(fields), do: struct!(__MODULE__, fields)

@spec get(t(), path :: String.t()) :: {:ok, map()} | {:error, String.t()}
def get(http, path), do: request(http, :get, path)

@spec get!(t(), path :: String.t()) :: map()
def get!(http, path), do: request!(http, :get, path)

@spec post!(t(), path :: String.t(), body :: String.t()) :: map()
def post!(http, path, body), do: request!(http, :post, path, body)

@spec delete!(t(), path :: String.t()) :: map()
def delete!(http, path), do: request!(http, :delete, path)

@spec request!(http :: t(), verb :: atom(), path :: Stringt.t()) :: map()
defp request!(http, verb, path, body \\ nil) do
case request(http, verb, path, body) do
{:ok, response_body} -> Jason.decode!(response_body)
{:error, reason} -> raise reason
end
end

@spec request(http :: t(), verb :: atom(), path :: String.t(), body :: String.t()) ::
{:ok, String.t()} | {:error, String.t()}
defp request(http, verb, path, body \\ nil) do
headers = [{~c"Authorization", ~c"Bearer #{http.token}"}]
http_opts = [ssl: [verify: :verify_peer, cacertfile: http.cacertfile]]
url = http.base_url <> path

request =
if is_nil(body),
do: {url, headers},
else: {url, headers, ~c"application/json", body}

case :httpc.request(verb, request, http_opts, []) do
{:ok, {{_, status, _}, _, response_body}} when status in 200..299 ->
{:ok, response_body}

{:ok, {{_, status, reason}, _, resp_body}} ->
{:error,
"failed #{String.upcase("#{verb}")} #{url} with #{inspect(status)} (#{inspect(reason)}): #{inspect(resp_body)} #{inspect(headers)}"}

{:error, reason} ->
{:error,
"failed #{String.upcase("#{verb}")} #{url} with #{inspect(reason)} #{inspect(http.headers)}"}
end
end
end
51 changes: 19 additions & 32 deletions lib/flame_k8s_backend/k8s_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ defmodule FLAMEK8sBackend.K8sClient do
@moduledoc false

@sa_token_path "/var/run/secrets/kubernetes.io/serviceaccount"
@pod_tpl "/api/v1/namespaces/:namespace/pods/:name"
@pod_list_tpl "/api/v1/namespaces/:namespace/pods"

alias FlameK8sBackend.HTTP

def connect() do
ca_cert_path = Path.join(@sa_token_path, "ca.crt")
Expand All @@ -12,40 +12,31 @@ defmodule FLAMEK8sBackend.K8sClient do
apiserver_port = System.get_env("KUBERNETES_SERVICE_PORT_HTTPS")
token = File.read!(token_path)

req =
Req.new(
base_url: "https://#{apiserver_host}:#{apiserver_port}",
headers: [{:Authorization, "Bearer #{token}"}],
connect_options: [
transport_opts: [
cacertfile: String.to_charlist(ca_cert_path)
]
]
)
|> Req.Request.append_response_steps(verify_2xs: &verify_2xs/1)

req
HTTP.new(
base_url: "https://#{apiserver_host}:#{apiserver_port}",
token: token,
cacertfile: String.to_charlist(ca_cert_path)
)
end

def get_pod!(req, namespace, name) do
Req.get!(req, url: @pod_tpl, path_params: [namespace: namespace, name: name]).body
def get_pod!(http, namespace, name) do
HTTP.get!(http, pod_path(namespace, name))
end

def get_pod(req, namespace, name) do
with {:ok, %{body: body}} <-
Req.get(req, url: @pod_tpl, path_params: [namespace: namespace, name: name]),
do: {:ok, body}
def get_pod(http, namespace, name) do
with {:ok, response_body} <- HTTP.get(http, pod_path(namespace, name)),
do: Jason.decode(response_body)
end

def delete_pod!(req, namespace, name) do
Req.delete!(req, url: @pod_tpl, path_params: [namespace: namespace, name: name])
def delete_pod!(http, namespace, name) do
HTTP.delete!(http, pod_path(namespace, name))
end

def create_pod!(req, pod, timeout) do
def create_pod!(http, pod, timeout) do
name = pod["metadata"]["name"]
namespace = pod["metadata"]["namespace"]
Req.post!(req, url: @pod_list_tpl, path_params: [namespace: namespace], json: pod)
wait_until_scheduled(req, namespace, name, timeout)
HTTP.post!(http, pod_path(namespace, ""), Jason.encode!(pod))
wait_until_scheduled(http, namespace, name, timeout)
end

defp wait_until_scheduled(_req, _namespace, _name, timeout) when timeout <= 0, do: :error
Expand All @@ -61,11 +52,7 @@ defmodule FLAMEK8sBackend.K8sClient do
end
end

defp verify_2xs({request, response}) do
if response.status in 200..299 do
{request, response}
else
{request, RuntimeError.exception(response.body["message"])}
end
defp pod_path(namespace, name) do
"/api/v1/namespaces/#{namespace}/pods/#{name}"
end
end
1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ defmodule FlameK8sBackend.MixProject do
defp deps do
[
{:flame, "~> 0.2.0"},
{:req, "~> 0.5.0"},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
Expand Down
2 changes: 1 addition & 1 deletion test/integration/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule FlameK8sBackend.IntegrationTest do
"kind load docker-image --name flame-integration-test flamek8sbackend:integration"
)

Mix.Shell.IO.cmd("kubectl config set-context kind-flame-integration-test")
Mix.Shell.IO.cmd("kubectl config set-context --current kind-flame-integration-test")
Mix.Shell.IO.cmd("kubectl delete -f test/integration/manifest.yaml")
Mix.Shell.IO.cmd("kubectl apply -f test/integration/manifest.yaml")

Expand Down

0 comments on commit 860000e

Please sign in to comment.