diff --git a/CHANGELOG.md b/CHANGELOG.md index aa199a477b..e9cf88e55c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.6.0] - 2018-03-26 + ### Added - Allow to select partitions to read from a subscription +## [2.5.10] - 2018-03-26 + +### Added +- Added support of future format of session in ZK + ## [2.5.9] - 2018-03-06 ### Changed diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index d68882c3ed..2ae06ff5b3 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -174,6 +174,17 @@ public Map getOffsets( new SubscriptionCursorWithoutToken(etp.getEventType(), etp.getPartition(), new String(value, UTF_8))); } + protected Session deserializeSession(final String sessionId, final byte[] sessionZkData) throws IOException { + try { + // old version of session: zkNode data is session weight + final int weight = Integer.parseInt(new String(sessionZkData, UTF_8)); + return new Session(sessionId, weight); + } catch (final NumberFormatException nfe) { + // new version of session: zkNode data is session object as json + return objectMapper.readValue(sessionZkData, Session.class); + } + } + @Override public void transfer(final String sessionId, final Collection partitions) throws NakadiRuntimeException, SubscriptionNotInitializedException {