From 75370c0bd8b05599db80d644d8b0f861731dcc6f Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 23 Feb 2018 16:01:42 +0100 Subject: [PATCH 1/3] ARUHA-1520: added possibility to read both versions of Session from ZK; --- .../nakadi/service/subscription/model/Session.java | 7 ++++++- .../zk/AbstractZkSubscriptionClient.java | 10 +++++++++- .../subscription/zk/NewZkSubscriptionClient.java | 12 ++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java b/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java index 25a7ab4d46..d369205a5b 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/model/Session.java @@ -1,12 +1,17 @@ package org.zalando.nakadi.service.subscription.model; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.util.UUID; public class Session { private final String id; private final int weight; - public Session(final String id, final int weight) { + @JsonCreator + public Session(@JsonProperty("id") final String id, + @JsonProperty("weight") final int weight) { this.id = id; this.weight = weight; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 75fd4ab359..54fe153f6d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -257,7 +257,13 @@ public final Collection listSessions() return loadDataAsync( zkSessions, key -> getSubscriptionPath("/sessions/" + key), - (key, data) -> new Session(key, Integer.parseInt(new String(data, UTF_8)))).values(); + (key, data) -> { + try { + return deserializeSession(key, data); + } catch (final IOException e) { + throw new NakadiRuntimeException(e); + } + }).values(); } @Override @@ -462,4 +468,6 @@ protected abstract byte[] createTopologyAndOffsets(Collection getOffsets( new SubscriptionCursorWithoutToken(etp.getEventType(), etp.getPartition(), new String(value, UTF_8))); } + protected Session deserializeSession(final String sessionId, final byte[] sessionZkData) throws IOException { + try { + // old version of session: zkNode data is session weight + final int weight = Integer.parseInt(new String(sessionZkData, UTF_8)); + return new Session(sessionId, weight); + } catch (final NumberFormatException nfe) { + // new version of session: zkNode data is session object as json + return objectMapper.readValue(sessionZkData, Session.class); + } + } + @Override public void transfer(final String sessionId, final Collection partitions) throws NakadiRuntimeException, SubscriptionNotInitializedException { From d999d6792ede3b343e7f606cef580f3d414a9495 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 23 Feb 2018 16:35:49 +0100 Subject: [PATCH 2/3] ARUHA-1520: updated changelog; --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8b86c22aa..69e54b4dd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- Added support of future format of session in ZK + ## [2.5.8] - 2018-02-22 ### Added From 9ad5ae0a9478388d1332c926376ef599a0745707 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Mon, 26 Mar 2018 13:16:27 +0200 Subject: [PATCH 3/3] ARUHA-1520: added release version; --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 985316c77f..9013803cac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.5.10] - 2018-03-26 + ### Added - Added support of future format of session in ZK