Skip to content

Commit

Permalink
Complete support for postbuild substitutions from secrets and configmaps
Browse files Browse the repository at this point in the history
  • Loading branch information
allenporter committed Apr 20, 2024
1 parent e15f536 commit b8d6033
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 81 deletions.
163 changes: 98 additions & 65 deletions flux_local/git_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
import logging
import os
import tempfile
from collections.abc import Callable, Awaitable, Iterable
from collections.abc import Callable, Awaitable
from functools import cache
from pathlib import Path
import queue
from typing import Any, Generator

import git
Expand Down Expand Up @@ -393,6 +392,14 @@ async def build(
cmd = await cmd.stash()
self._cache[key] = cmd
return cmd

def remove(self, kustomization: Kustomization) -> None:
"""Remove the kustomization vlaue from the cache."""

Check failure on line 397 in flux_local/git_repo.py

View workflow job for this annotation

GitHub Actions / build

vlaue ==> value
target_key = f"{kustomization.namespaced_name} @"
for key in list(self._cache.keys()):
if key.startswith(target_key):
_LOGGER.debug("Invalidated cache %s", key)
del self._cache[key]


async def visit_kustomization(
Expand Down Expand Up @@ -458,24 +465,18 @@ async def kustomization_traversal(
visited_paths: set[Path] = set() # Relative paths within the cluster
visited_ks: set[str] = set()

path_queue: queue.Queue[tuple[Path, Kustomization | None]] = queue.Queue()
path_queue.put((selector.relative_path, None))
while not path_queue.empty():
path_queue: list[tuple[Path, Kustomization | None]] = []
path_queue.append((selector.relative_path, None))
while path_queue:
# Fully empty the queue, running all tasks in parallel
tasks = []
while not path_queue.empty():
(path, visit_ks) = path_queue.get()
while path_queue:
(path, visit_ks) = path_queue.pop(0)

if path in visited_paths:
_LOGGER.debug("Already visited %s", path)
continue
visited_paths.add(path)

if visit_ks and visit_ks.postbuild_substitute_from:
visit_ks = values.expand_postbuild_substitute_reference(
visit_ks, values.ks_cluster_config(response_kustomizations)
)

tasks.append(visit_kustomization(selector, builder, path, visit_ks))

# Find new kustomizations
Expand All @@ -499,7 +500,7 @@ async def kustomization_traversal(
if not (ks_path := adjust_ks_path(ks, selector)):
continue
ks.path = str(ks_path)
path_queue.put((ks_path, ks))
path_queue.append((ks_path, ks))
response_kustomizations.append(ks)

response_kustomizations.sort(key=lambda x: (x.namespace, x.name))
Expand All @@ -521,14 +522,8 @@ async def build_kustomization(
selector: ResourceSelector,
kustomize_flags: list[str],
builder: CachableBuilder,
) -> tuple[
Iterable[HelmRepository],
Iterable[HelmRelease],
Iterable[ClusterPolicy],
Iterable[ConfigMap],
Iterable[Secret],
]:
"""Build helm objects for the Kustomization."""
) -> None:
"""Build helm objects for the Kustomization and add them to the Kustomization."""

root: Path = selector.path.root
kustomization_selector: MetadataSelector = selector.kustomization
Expand All @@ -542,7 +537,7 @@ async def build_kustomization(
and not cluster_policy_selector.enabled
and not selector.doc_visitor
):
return ([], [], [], [], [])
return

with trace_context(f"Build '{kustomization.namespaced_name}'"):
cmd = await builder.build(kustomization, root / kustomization.path)
Expand Down Expand Up @@ -581,7 +576,7 @@ async def build_kustomization(
if selector.doc_visitor:
kinds.extend(selector.doc_visitor.kinds)
if not kinds:
return ([], [], [], [], [])
return

regexp = f"kind=^({'|'.join(kinds)})$"
docs = await cmd.grep(regexp).objects(
Expand All @@ -595,38 +590,44 @@ async def build_kustomization(
continue
selector.doc_visitor.func(kustomization.namespaced_name, doc)

return (
kustomization.helm_repos = list(
filter(
helm_repo_selector.predicate,
[
HelmRepository.parse_doc(doc)
for doc in docs
if doc.get("kind") == HELM_REPO_KIND
],
),
)
)
kustomization.helm_releases = list(
filter(
helm_release_selector.predicate,
[
HelmRelease.parse_doc(doc)
for doc in docs
if doc.get("kind") == HELM_RELEASE_KIND
],
),
)
)
kustomization.cluster_policies = list(
filter(
cluster_policy_selector.predicate,
[
ClusterPolicy.parse_doc(doc)
for doc in docs
if doc.get("kind") == CLUSTER_POLICY_KIND
],
),
[
ConfigMap.parse_doc(doc)
for doc in docs
if doc.get("kind") == CONFIG_MAP_KIND
],
[Secret.parse_doc(doc) for doc in docs if doc.get("kind") == SECRET_KIND],
)
)
kustomization.config_maps = [
ConfigMap.parse_doc(doc)
for doc in docs
if doc.get("kind") == CONFIG_MAP_KIND
]
kustomization.secrets = [
Secret.parse_doc(doc) for doc in docs if doc.get("kind") == SECRET_KIND
]


async def build_manifest(
Expand Down Expand Up @@ -663,42 +664,72 @@ async def build_manifest(
]

async def update_kustomization(cluster: Cluster) -> None:
build_tasks = []
for kustomization in cluster.kustomizations:
_LOGGER.debug(
"Processing kustomization '%s': %s",
kustomization.name,
kustomization.path,
)
build_tasks.append(
build_kustomization(
kustomization,
Path(cluster.path),
selector,
options.kustomize_flags,
builder,
queue = [*cluster.kustomizations]
visited: set[str] = set()
# cluster_config = values.ks_cluster_config(cluster.kustomizations)
while queue:
build_tasks = []
in_flight = []
pending = []
while queue:
kustomization = queue.pop(0)
if not_ready := (set(kustomization.depends_on or {}) - visited):
_LOGGER.debug(
"Waiting for %s before building %s",
not_ready,
kustomization.namespaced_name,
)
pending.append(kustomization)
continue
_LOGGER.debug(
"Processing kustomization '%s': %s",
kustomization.name,
kustomization.path,
)
)
results = list(await asyncio.gather(*build_tasks))
for kustomization, (
helm_repos,
helm_releases,
cluster_policies,
config_maps,
secrets,
) in zip(
cluster.kustomizations,
results,
):
kustomization.helm_repos = list(helm_repos)
kustomization.helm_releases = list(helm_releases)
kustomization.cluster_policies = list(cluster_policies)
kustomization.config_maps = list(config_maps)
kustomization.secrets = list(secrets)
if kustomization.postbuild_substitute_from:
values.expand_postbuild_substitute_reference(
kustomization,
values.ks_cluster_config(cluster.kustomizations),
)
# Clear the cache to remove any previous builds that are
# missing the postbuild substitutions.
builder.remove(kustomization)

in_flight.append(kustomization.namespaced_name)
build_tasks.append(
build_kustomization(
kustomization,
Path(cluster.path),
selector,
options.kustomize_flags,
builder,
)
)
if not build_tasks:
raise FluxException(
"Internal error: Unexpected loop without build tasks"
)
await asyncio.gather(*build_tasks)
visited.update(in_flight)

for kustomization in pending:
queue.append(kustomization)

kustomization_tasks = []
# Expand and visit Kustomizations
for cluster in clusters:
all_ks: set[str] = set(
[ks.namespaced_name for ks in cluster.kustomizations]
)
for ks in cluster.kustomizations:
if missing := (set(ks.depends_on or {}) - all_ks):
_LOGGER.warning(
"Kustomization %s has dependsOn with invalid names: %s",
ks.namespaced_name,
missing,
)
ks.depends_on = list(set(ks.depends_on or {}) - missing)

kustomization_tasks.append(update_kustomization(cluster))
await asyncio.gather(*kustomization_tasks)

Expand All @@ -709,6 +740,8 @@ async def update_kustomization(cluster: Cluster) -> None:
values.expand_value_references(helm_release, kustomization)
for helm_release in kustomization.helm_releases
]
if kustomization.name == 'apps':
_LOGGER.warning("apps = %s", kustomization.helm_releases )

# Visit Helm resources
for cluster in clusters:
Expand Down
23 changes: 23 additions & 0 deletions flux_local/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ class Kustomization(BaseManifest):
postbuild_substitute_from: Optional[list[SubstituteReference]] = None
"""A list of substitutions to reference from an ConfigMap or Secret."""

depends_on: list[str] | None = None
"""A list of namespaced names that this Kustomization depends on."""

@classmethod
def parse_doc(cls, doc: dict[str, Any]) -> "Kustomization":
"""Parse a partial Kustomization from a kubernetes resource."""
Expand All @@ -484,6 +487,12 @@ def parse_doc(cls, doc: dict[str, Any]) -> "Kustomization":
substitute_from = [
SubstituteReference(**subdoc) for subdoc in substitute_from_dict
]
depends_on = []
for dependency in spec.get("dependsOn", ()):
if not (dep_name := dependency.get("name")):
raise InputException(f"Invalid {cls} missing dependsOn.name: {doc}")
dep_namespace = dependency.get("namespace", namespace)
depends_on.append(f"{dep_namespace}/{dep_name}")
return Kustomization(
name=name,
namespace=namespace,
Expand All @@ -496,6 +505,7 @@ def parse_doc(cls, doc: dict[str, Any]) -> "Kustomization":
contents=doc,
postbuild_substitute=postbuild.get("substitute"),
postbuild_substitute_from=substitute_from,
depends_on=depends_on,
)

@property
Expand Down Expand Up @@ -532,7 +542,20 @@ def compact_exclude_fields(cls) -> dict[str, Any]:
"contents": True,
"postbuild_substitute": True,
"postbuild_substitute_from": True,
"depends_on": True,
}

def update_postbuild_substitutions(self, substitutions: dict[str, Any]) -> None:
"""Update the postBuild.subtitutions in the extracted values and raw doc contents."""

Check failure on line 549 in flux_local/manifest.py

View workflow job for this annotation

GitHub Actions / build

subtitutions ==> substitutions
if self.postbuild_substitute is None:
self.postbuild_substitute = {}
self.postbuild_substitute.update(substitutions)
if self.contents:
post_build = self.contents["spec"]["postBuild"]
if (substitute := post_build.get("substitute")) is None:
substitute = {}
post_build["substitute"] = substitute
substitute.update(substitutions)


class Cluster(BaseManifest):
Expand Down
8 changes: 4 additions & 4 deletions flux_local/values.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def expand_postbuild_substitute_reference(
if not ks.postbuild_substitute_from:
return ks

values = ks.postbuild_substitute or {}
values: dict[str, Any] = ks.postbuild_substitute or {}
for ref in ks.postbuild_substitute_from:
_LOGGER.debug("Expanding substitute reference %s", ref)
if not ks.namespace:
Expand All @@ -258,7 +258,7 @@ def expand_postbuild_substitute_reference(
continue

if found_data is None:
if not ref.optional:
if not ref.optional and not ref.kind == SECRET_KIND: # Secrets are commonly filtered
_LOGGER.warning(
"Unable to find SubstituteReference for %s: %s",
ks.namespaced_name,
Expand All @@ -267,5 +267,5 @@ def expand_postbuild_substitute_reference(
continue

values.update(found_data)

return ks.model_copy(update={"postbuild_substutite": values})
ks.update_postbuild_substitutions(values)
return ks
Loading

0 comments on commit b8d6033

Please sign in to comment.