From 698f305c7e8026e6f4f13ff7c68773bd057d6c7a Mon Sep 17 00:00:00 2001 From: Tim Cuthbertson Date: Wed, 5 Jul 2023 19:13:45 +1000 Subject: [PATCH 1/2] add optional resourceVersion to watch API --- .../client/operation/Watchable.scala | 7 ++- .../client/operation/WatchableTests.scala | 45 ++++++++++++++----- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/kubernetes-client/src/com/goyeau/kubernetes/client/operation/Watchable.scala b/kubernetes-client/src/com/goyeau/kubernetes/client/operation/Watchable.scala index 10d2bbfd..0318e042 100644 --- a/kubernetes-client/src/com/goyeau/kubernetes/client/operation/Watchable.scala +++ b/kubernetes-client/src/com/goyeau/kubernetes/client/operation/Watchable.scala @@ -25,8 +25,11 @@ private[client] trait Watchable[F[_], Resource] { implicit val parserFacade: Facade[Json] = new CirceSupportParser(None, false).facade - def watch(labels: Map[String, String] = Map.empty): Stream[F, Either[String, WatchEvent[Resource]]] = { - val uri = addLabels(labels, config.server.resolve(watchResourceUri)) + def watch( + labels: Map[String, String] = Map.empty, + resourceVersion: Option[String] = None + ): Stream[F, Either[String, WatchEvent[Resource]]] = { + val uri = addLabels(labels, config.server.resolve(watchResourceUri)).+??("resourceVersion" -> resourceVersion) val req = Request[F](GET, uri.withQueryParam("watch", "1")) .withOptionalAuthorization(authorization) jsonStream(req).map(_.as[WatchEvent[Resource]].leftMap(_.getMessage)) diff --git a/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala b/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala index 25712783..03958c68 100644 --- a/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala +++ b/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala @@ -55,30 +55,26 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }] private def sendEvents(namespace: String, resourceName: String)(implicit client: KubernetesClient[F]) = for { - _ <- retry( - create(namespace, resourceName), - maxRetries = 30, - actionClue = Some(s"Creating $resourceName in $namespace ns") - ) _ <- retry(update(namespace, resourceName), actionClue = Some(s"Updating $resourceName")) status <- deleteResource(namespace, resourceName) _ = assertEquals(status, Status.Ok, status.sanitizedReason) } yield () private def create(namespace: String, resourceName: String)(implicit client: KubernetesClient[F]) = - for { + retry(for { ns <- client.namespaces.get(namespace) _ <- logger.info( s"creating in namespace: ${ns.metadata.flatMap(_.name).getOrElse("n/a/")}, status: ${ns.status.flatMap(_.phase)}" ) status <- namespacedApi(namespace).create(sampleResource(resourceName, Map.empty)) _ = assertEquals(status, Status.Created, status.sanitizedReason) - } yield () + } yield ()) private def watchEvents( expected: Map[String, Set[EventType]], resourceName: String, - watchingNamespace: Option[String] + watchingNamespace: Option[String], + resourceVersion: Option[String] = None )(implicit client: KubernetesClient[F] ) = { @@ -120,7 +116,7 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }] watchStream = watchingNamespace .map(watchApi) .getOrElse(api) - .watch() + .watch(resourceVersion = resourceVersion) .through(processEvent(receivedEvents, signal)) .evalMap(_ => receivedEvents.get) .interruptWhen(signal) @@ -145,13 +141,16 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }] val expectedEvents = Set[EventType](EventType.ADDED, EventType.MODIFIED, EventType.DELETED) val expected = if (watchIsNamespaced) - (defaultNamespace +: extraNamespace.toList).map(_ -> expectedEvents).toMap + (defaultNamespace +: extraNamespace).map(_ -> expectedEvents).toMap else Map(defaultNamespace -> expectedEvents) ( watchEvents(expected, name, None), - F.sleep(100.millis) *> sendEvents(defaultNamespace, name) *> sendToAnotherNamespace(name) + F.sleep(100.millis) *> + create(defaultNamespace, name) *> + sendEvents(defaultNamespace, name) *> + sendToAnotherNamespace(name) ).parTupled } } @@ -164,8 +163,30 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }] ( watchEvents(Map(defaultNamespace -> expected), name, Some(defaultNamespace)), - F.sleep(100.millis) *> sendEvents(defaultNamespace, name) *> sendToAnotherNamespace(name) + F.sleep(100.millis) *> + create(defaultNamespace, name) *> + sendEvents(defaultNamespace, name) *> + sendToAnotherNamespace(name) ).parTupled } } + + test(s"watch $resourceName events from a given resourceVersion") { + usingMinikube { implicit client => + val name = s"${resourceName.toLowerCase}-watch-resource-version" + val expected = Set[EventType](EventType.MODIFIED, EventType.DELETED) + + for { + resourceVersion <- create(defaultNamespace, name) + resource <- retry(getChecked(defaultNamespace, resourceName)) + resourceVersion = resource.metadata.flatMap(_.resourceVersion).get + _ <- ( + watchEvents(Map(defaultNamespace -> expected), name, Some(defaultNamespace), Some(resourceVersion)), + F.sleep(100.millis) *> + sendEvents(defaultNamespace, name) *> + sendToAnotherNamespace(name) + ).parTupled + } yield () + } + } } From 8b1821715e882a40cc7c16f5acf3c3e60cb0b22d Mon Sep 17 00:00:00 2001 From: Jonas Chapuis Date: Fri, 10 Nov 2023 10:37:49 +0100 Subject: [PATCH 2/2] Simplify changes and correct WatchableTests --- .../client/operation/WatchableTests.scala | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala b/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala index 03958c68..6dd263e6 100644 --- a/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala +++ b/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala @@ -55,20 +55,25 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }] private def sendEvents(namespace: String, resourceName: String)(implicit client: KubernetesClient[F]) = for { + _ <- retry( + create(namespace, resourceName), + maxRetries = 30, + actionClue = Some(s"Creating $resourceName in $namespace ns") + ) _ <- retry(update(namespace, resourceName), actionClue = Some(s"Updating $resourceName")) status <- deleteResource(namespace, resourceName) _ = assertEquals(status, Status.Ok, status.sanitizedReason) } yield () private def create(namespace: String, resourceName: String)(implicit client: KubernetesClient[F]) = - retry(for { + for { ns <- client.namespaces.get(namespace) _ <- logger.info( s"creating in namespace: ${ns.metadata.flatMap(_.name).getOrElse("n/a/")}, status: ${ns.status.flatMap(_.phase)}" ) status <- namespacedApi(namespace).create(sampleResource(resourceName, Map.empty)) _ = assertEquals(status, Status.Created, status.sanitizedReason) - } yield ()) + } yield () private def watchEvents( expected: Map[String, Set[EventType]], @@ -141,16 +146,13 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }] val expectedEvents = Set[EventType](EventType.ADDED, EventType.MODIFIED, EventType.DELETED) val expected = if (watchIsNamespaced) - (defaultNamespace +: extraNamespace).map(_ -> expectedEvents).toMap + (defaultNamespace +: extraNamespace.toList).map(_ -> expectedEvents).toMap else Map(defaultNamespace -> expectedEvents) ( watchEvents(expected, name, None), - F.sleep(100.millis) *> - create(defaultNamespace, name) *> - sendEvents(defaultNamespace, name) *> - sendToAnotherNamespace(name) + F.sleep(100.millis) *> sendEvents(defaultNamespace, name) *> sendToAnotherNamespace(name) ).parTupled } } @@ -163,28 +165,24 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }] ( watchEvents(Map(defaultNamespace -> expected), name, Some(defaultNamespace)), - F.sleep(100.millis) *> - create(defaultNamespace, name) *> - sendEvents(defaultNamespace, name) *> - sendToAnotherNamespace(name) + F.sleep(100.millis) *> sendEvents(defaultNamespace, name) *> sendToAnotherNamespace(name) ).parTupled } } test(s"watch $resourceName events from a given resourceVersion") { usingMinikube { implicit client => - val name = s"${resourceName.toLowerCase}-watch-resource-version" - val expected = Set[EventType](EventType.MODIFIED, EventType.DELETED) + val bootstrapName = s"${resourceName.toLowerCase}-watch-resource-version-test-bootstrap" + val name = s"${resourceName.toLowerCase}-watch-resource-version" + val expected = Set[EventType](EventType.ADDED, EventType.MODIFIED, EventType.DELETED) for { - resourceVersion <- create(defaultNamespace, name) - resource <- retry(getChecked(defaultNamespace, resourceName)) + _ <- create(defaultNamespace, bootstrapName) + resource <- retry(getChecked(defaultNamespace, bootstrapName)) resourceVersion = resource.metadata.flatMap(_.resourceVersion).get _ <- ( watchEvents(Map(defaultNamespace -> expected), name, Some(defaultNamespace), Some(resourceVersion)), - F.sleep(100.millis) *> - sendEvents(defaultNamespace, name) *> - sendToAnotherNamespace(name) + F.sleep(100.millis) *> sendEvents(defaultNamespace, name) *> sendToAnotherNamespace(name) ).parTupled } yield () }