From 23bd132e22353a25d53797ebbae074ae9cc23a14 Mon Sep 17 00:00:00 2001 From: antban Date: Fri, 24 Feb 2017 10:15:11 +0100 Subject: [PATCH 1/4] ARUHA-657 Fix committing to subscription for closed stream It is possible to have situation, when stream is closing, and at the same time there is a task for streaming data to client in subscription queue. In this case streaming session was closed without waiting for uncommitted events. This fix moves calculation of uncommitted events to the very end of StreamingState (so no further writes may occur after this calculation). --- .../nakadi/webservice/hila/HilaAT.java | 18 +++++++++ .../subscription/state/ClosingState.java | 37 +++++++++++-------- .../subscription/state/StreamingState.java | 24 +++++++----- .../zk/CuratorZkSubscriptionClient.java | 4 -- 4 files changed, 54 insertions(+), 29 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index fcce83a03c..44e2a38bfb 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -73,6 +73,24 @@ public void before() throws IOException { this.subscription = createSubscription(subscription); } + @Test(timeout = 10000) + public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { + publishEvent(eventType.getName(),"{\"foo\":\"bar\"}"); + publishEvent(eventType.getName(),"{\"foo\":\"bar\"}"); + final TestStreamingClient client = TestStreamingClient + .create(URL, subscription.getId(), "stream_limit=1&stream_timeout=1") + .start(); + waitFor(() -> Assert.assertFalse(client.getBatches().isEmpty()), 100L, 100); + final SubscriptionCursor toCommit = client.getBatches().get(0).getCursor(); + client.close(); // connection is closed, and stream as well + + final int statusCode = NakadiTestUtils.commitCursors( + subscription.getId(), + Collections.singletonList(toCommit), + client.getSessionId()); + Assert.assertEquals(HttpStatus.SC_NO_CONTENT, statusCode); + } + @Test(timeout = 30000) public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() throws Exception { // write 4 events to event-type diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java index d1b369d93b..c672c17f33 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java @@ -1,7 +1,5 @@ package org.zalando.nakadi.service.subscription.state; -import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.zk.ZKSubscription; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -9,17 +7,23 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Stream; +import org.zalando.nakadi.service.subscription.model.Partition; +import org.zalando.nakadi.service.subscription.zk.ZKSubscription; class ClosingState extends State { - private final Map uncommitedOffsets; + private final Supplier> uncommittedOffsetsSupplier; + private final LongSupplier lastCommitSupplier; + private Map uncommittedOffsets; private final Map listeners = new HashMap<>(); - private final long lastCommitMillis; private ZKSubscription topologyListener; - ClosingState(final Map uncommitedOffsets, final long lastCommitMillis) { - this.uncommitedOffsets = uncommitedOffsets; - this.lastCommitMillis = lastCommitMillis; + ClosingState(final Supplier> uncommittedOffsetsSupplier, + final LongSupplier lastCommitSupplier) { + this.uncommittedOffsetsSupplier = uncommittedOffsetsSupplier; + this.lastCommitSupplier = lastCommitSupplier; } @Override @@ -39,9 +43,10 @@ public void onExit() { @Override public void onEnter() { - final long timeToWaitMillis = getParameters().commitTimeoutMillis - (System.currentTimeMillis() - - lastCommitMillis); - if (timeToWaitMillis > 0) { + final long timeToWaitMillis = getParameters().commitTimeoutMillis - + (System.currentTimeMillis() - lastCommitSupplier.getAsLong()); + uncommittedOffsets = uncommittedOffsetsSupplier.get(); + if (!uncommittedOffsets.isEmpty() && timeToWaitMillis > 0) { scheduleTask(() -> switchState(new CleanupState()), timeToWaitMillis, TimeUnit.MILLISECONDS); topologyListener = getZk().subscribeForTopologyChanges(() -> addTask(this::onTopologyChanged)); reactOnTopologyChange(); @@ -72,7 +77,7 @@ private void reactOnTopologyChange() { final Set addListeners = new HashSet<>(); for (final Partition p : partitions.values()) { if (Partition.State.REASSIGNING.equals(p.getState())) { - if (!uncommitedOffsets.containsKey(p.getKey())) { + if (!uncommittedOffsets.containsKey(p.getKey())) { freeRightNow.add(p.getKey()); } else { if (!listeners.containsKey(p.getKey())) { @@ -80,12 +85,12 @@ private void reactOnTopologyChange() { } } } else { // ASSIGNED - if (uncommitedOffsets.containsKey(p.getKey()) && !listeners.containsKey(p.getKey())) { + if (uncommittedOffsets.containsKey(p.getKey()) && !listeners.containsKey(p.getKey())) { addListeners.add(p.getKey()); } } } - uncommitedOffsets.keySet().stream().filter(p -> !partitions.containsKey(p)).forEach(freeRightNow::add); + uncommittedOffsets.keySet().stream().filter(p -> !partitions.containsKey(p)).forEach(freeRightNow::add); freePartitions(freeRightNow); addListeners.forEach(this::registerListener); tryCompleteState(); @@ -108,14 +113,14 @@ private void offsetChanged(final Partition.PartitionKey key) { private void reactOnOffset(final Partition.PartitionKey key) { final long newOffset = getZk().getOffset(key); - if (uncommitedOffsets.containsKey(key) && uncommitedOffsets.get(key) <= newOffset) { + if (uncommittedOffsets.containsKey(key) && uncommittedOffsets.get(key) <= newOffset) { freePartitions(Collections.singletonList(key)); } tryCompleteState(); } private void tryCompleteState() { - if (uncommitedOffsets.isEmpty()) { + if (uncommittedOffsets.isEmpty()) { switchState(new CleanupState()); } } @@ -123,7 +128,7 @@ private void tryCompleteState() { private void freePartitions(final Collection keys) { RuntimeException exceptionCaught = null; for (final Partition.PartitionKey partitionKey : keys) { - uncommitedOffsets.remove(partitionKey); + uncommittedOffsets.remove(partitionKey); final ZKSubscription listener = listeners.remove(partitionKey); if (null != listener) { try { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index bbf800ad41..1774400dd1 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -38,6 +38,8 @@ class StreamingState extends State { private long committedEvents; private long sentEvents; private long batchesSent; + // Uncommitted offsets are calculated right on exiting from Streaming state. + private Map uncommittedOffsets; @Override public void onEnter() { @@ -85,17 +87,17 @@ private void sendMetadata(final String metadata) { .ifPresent(pk -> flushData(pk.getKey(), new TreeMap<>(), Optional.of(metadata))); } + private long getLastCommitMillis() { + return lastCommitMillis; + } + + private Map getUncommittedOffsets() { + return uncommittedOffsets; + } + private void shutdownGracefully(final String reason) { getLog().info("Shutting down gracefully. Reason: {}", reason); - - final Map uncommitted = offsets.entrySet().stream() - .filter(e -> !e.getValue().isCommitted()) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getSentOffset())); - if (uncommitted.isEmpty()) { - switchState(new CleanupState()); - } else { - switchState(new ClosingState(uncommitted, lastCommitMillis)); - } + switchState(new ClosingState(this::getUncommittedOffsets, this::getLastCommitMillis)); } private void pollDataFromKafka() { @@ -222,6 +224,10 @@ private String serializeBatch(final Partition.PartitionKey partitionKey, final l @Override public void onExit() { + uncommittedOffsets = offsets.entrySet().stream() + .filter(e -> !e.getValue().isCommitted()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getSentOffset())); + if (null != topologyChangeSubscription) { try { topologyChangeSubscription.cancel(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/CuratorZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/CuratorZkSubscriptionClient.java index 3931498a46..11b02fc0f9 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/CuratorZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/CuratorZkSubscriptionClient.java @@ -44,23 +44,19 @@ public CuratorZkSubscriptionClient(final String subscriptionId, final CuratorFra @Override public void runLocked(final Runnable function) { - log.info("Taking lock for " + function.hashCode()); try { Exception releaseException = null; lock.acquire(); - log.debug("Lock taken " + function.hashCode()); try { function.run(); } finally { - log.info("Releasing lock for " + function.hashCode()); try { lock.release(); } catch (final Exception e) { log.error("Failed to release lock", e); releaseException = e; } - log.debug("Lock released " + function.hashCode()); } if (releaseException != null) { throw releaseException; From 6a01571b8880e211a74c3b7a4827bbe967fb40e5 Mon Sep 17 00:00:00 2001 From: antban Date: Fri, 24 Feb 2017 14:08:13 +0100 Subject: [PATCH 2/4] ARUHA-657 Upgrade gradlew, optimize tests --- gradle/wrapper/gradle-wrapper.jar | Bin 53638 -> 53324 bytes gradle/wrapper/gradle-wrapper.properties | 4 +- gradlew | 46 ++++++++++-------- gradlew.bat | 6 +-- .../webservice/SettingsControllerAT.java | 3 ++ .../nakadi/webservice/hila/HilaAT.java | 7 +-- 6 files changed, 37 insertions(+), 29 deletions(-) diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 5ccda13e9cb94678ba179b32452cf3d60dc36353..3baa851b28c65f87dd36a6748e1a85cf360c1301 100644 GIT binary patch delta 5305 zcmZWs2|QF^*uI8AF%-r+8vDLSk)-SjA=%d&Tb67^qzNS%qHeN;RD>4mphU9FVCk~O|NS6}^o-~Ih&&Y9;v@AJOryyu)bZ}el@)+}0-=?Pl8-2lMI2+-+%zl{<@ zL5+dBn(C?+0036Ek6Li1P4j#YSPY;6{!y28zTeIhq@^~k5^%GuKy5%Nno(+7%7O?) zz`!-SDH!lYX2U=!nvtCzMxX{ut*hfkBTBPFr33N!T= zjHFW*YlY*_uVy)0nh2S{euy9OK?d%g4ELzZ<}y>0GbolI8DlME3`3spZ3)TUD{Y-L zc5h(f376NA0rG3}Dsj>}+uAYV`4MeoTI7X@@;+PdOQD zWlMMEqHO$S%N~7*kvuXmi)=acQB7oQ&X`I5UTIDH80jh*{T z(+n%xIQrQuPtBlJu@c!{)(?4vb8ax~e*gSzw$=~Zw7IJ2=CjlN_pa73^C`Z!$sG|g zQYda0#^YQ<#S3!z>y>5#2l4NNc??R~KgtdUQPga$hd+$7m|Y<2M%=pdTlki**DX;{ zGwBG$#@AT;Qa>VIw?VG-o{{Cg`eZq^r5UzmMLw4qiSe+5zp+z~KhuamCD{ACB40%* z3cH*$>2ZsjaSse1OL>!$R5l(O5tEr~aabbx>B|@{I@^Th)NUoi`5Xa!!5LcnVKc(6 zh=N=TTfM4hLJ`@!waLDPW&siN;|<9rPtJPJh_)9{P8qw!e~9*FvQH`ZX_+51nRPo) zd3RuXPNTr?o5N(lX_DBy;cfF`8cWp;uA}j4@QClqxQom#5NW-^F2AZzn zL(}?K&1C_FtH*AkxbbxgX7(~~qV5}vRA_&A@wHy%U`o`wen|8_Y@N3D?+wAbDtf>4 zZs-SijPzD8To{l=*Q_`13w?aUw3nqz=VyNTB*vEek;;wo+u=k_p7w|1Kn9}gHf=g* z$hA3(N;>l3m0%Y4{tdMt&ANrIiC36Vvc?tGvIm`N6U!40?lT(rw`yFL%bRN4q0C&X z)B1@w8T{2W*A%1umEQ@oVs+I&+l2VDv`rKzF|Rc{VQuE7++&DzYPZbz#ARLBN@Mr- z5swHwX60F8*mPM%7LHiOO&Z#tAsz{0Lr=)L{M-|q+OHkN{!u^Frz$piK}oXPFG^Z} zMzoB0%m3$?yM3*fgv@p9aA2>(2?aA5liJ{<{<6z7PPxDI9K(J%?KlWa4{;`LA7p1% zJf)}U0f3hU0K@<=f-D4Xa__=B@%8TAk~6TidF~@~R^8da)reh?=IJ2W$9U|XbU7Q{ zU1^eu7AKK&E+(nub;(1k&{xLaq&41EE{B6Xw+A{;{s`O3DcGT-ptDS7?HL5Vlw;!c)} z;b=`$KkrIn0hhROV_92StQY>C4AQozggNX{p8Bm(3Dpaiu6PH8p1a`ba_-6!#oPDX z1sDG#>VY=LF0VyjIn}{&;aQ?h7R@t-ry~nG%>nj<0m(~eNTz-`-z&dv=HDc(9?-2nc=n!B!hPK+F`>#!SUCWfs&fUWi7@wh@E!1S+4XljG!c^nNJM5%@3W zHz;`Hb0}DQCTsUy>C`x7E?Dwd4GQrAaKVS3x@>sk_hy$BKtV2&G4!?OOGABtPLx&VP zBsD0M(qLz1@F3pVt=E@(u4*r9Ogy@^GRplvuP>8!%Vvg4!gob_CTzWRi)B^Ki;C|>6}4Vr`DT7=TR(Yf z#BtTj-Wk^I{Fv2<_^0^NRk~fbGe;^4O#@7Mt+UPB=j1f5fxToTP6P|Yh?H)&-^7)> zmYBa-3pwlVUoSU!C%KDI9hGosyqV9gw3mb<_!%?{=9XNn@yaxEvs+KvhvkS~)Y6qd zWc+KRgrC2v@o1TZ-*Rv05Q}u#d7=kFO|@RS6M`Z{Lo8;22Ed zHpp?(wyUS?twU($XkEL+_&YNPd~?y+P$WWBKum$BZ$n-qKmB~}yVJzpJ(Qg2bBrC? z(eG7sPKUPG>D&4gfyO`uRL&clj{$e z6)TfVymzW-1#l2hVbA`)%Y4t1jQWr}=GW3Lrz4ao_SaSMtl2R|)v@d<>FJ(2V^P>& zwcnIQ7clI>L-qH*`+O)yg{X3zO#6N(Zocn|;Y@Fq z6*n0}6>lUg=-?SvQyUS*={Z#~qVj3C53TK$37h^jARqam-Qqq|&MP#B|~D(#)-E`11LOOxUHvc^|B6PZu_=&g@w!$(Owt+SI81 zO3<+OZO)NHn0}sFCJif6Jt{$*hll$_ioZu(HrO{#9BRTJKRM{dn(2b^_E(KKU3f;> zQD{-#W;EDvWJ@%YT~F-&A`Tr@D3Df3D9;Ew{Q1S9oaynIFlV{wU=i74)eZXA;*MRo zd8_rXArGxJ<+&xE(C~X|)~*+(!wSCQ_-90VBAK6uP5o%ivfy{pv*dqG+1gnsl!^&h zvcori{C;L6+_|O7$pm+DwU>9porfla+f_ltX@>1Ofnar-5k)Nz#;Q~U#A&(S`Xm_Q!~-DSc^_V-rJAKqfaBj8WFpUC;o zS|XA;k{_ljqB0jxu<*N%TIJ`nKDRSvizAL?cbR-59A+=>wk2oKu4eqY@&H6gZ# zG;{olYVDZC^tXg}52Ss659&3&{3&w8^IOh~gI^@`b``L1%=lXzUuUiP-)jMq5rBdM zFS*FjLTzcDOALbA0r%r(U;yj71p}pm9y8k=lovN0J0RQ5GDXF1ksm9B?Pm3f<YopAm;bL8WC8Nl@~PMUdFLMBg7bF$zmy%K?Ek9? zP({B`FKwJs{|S#8ghwRK?J>mbhCaf7`2awSz=B1!N?0m~G<3a(1B%oSic)6>hZdx% zJ0(EnOntEqdaff**Zq8#;ZL&~erEp?}|01He$$-)a3)^C${ z*5&`Ci4hxn{#5Cb)XfWG8u$d6Fqy+%g9X@_pKw;~_+7vP$0N~4khb#weXqW}OJ!rzA&QQ-vMXcVJvK(MHh4}o(A+Zy@d zpI@P$pfjk@&I5M3@PZmm%rI=!#D~~-7IaAv0trnVu)4X{F8(|yv&+y35C7GsF4EmN zm}u&X0e}CoXKv<0FuTL>RSE-WyfF1Igtpi!nn!SCFe1CRg4BALT1^WQU!x>7>K#w?tnz0b=8 z0P6n+4CS)s3A7dr;TSHr^C3c>LBY%Y9enxQqQDf$;ZtZ0>`cj(g1-(i9YWh_>EJ{7 z7eg=xWG1juM+*n7-NiXM3r*f-$oI~5)Kvm&^ma(VL=aFWg4h=Xl=|8Pgk&%nDJhIqLnE1z zhY}n#TpA9&uE1b0ljJML46NULcYNWne~Hu}@zptk(_s(;4M(D<10V>WI~}WN5Pk>< zh$UQx0B@o^2&iHrr6-0EA*yi%Ey&d{2t&Iq62f3CU=X5!m@XqAM9YC)_oFvq_;oYW zu`RIgg}#fRaEHJB(vII#>q$> za_^=RBOShp3J1~i^O$5|O?T2`O%n)x)Jp%*9MtmFjY=%y9CQyxjlADbCZBHUV6kzZau*dnuW8sO7;XI2U5U&MSrFGphEOSR!l@%A)p3Okhl!I_$}O>CBXsmJ z8wuWntP(7~6P+7*$&ba^jGp#g%gL19CA-z-!Hd+s*y*9xzJxxH&7jv6taucpx?kB7;PzFQZ6ALI9})6eoao=-uY@ny}w|cGmo_A7gr|9*U{^w=ZiUw z^W|(ycUaDsU`bT{+@uj!Md$vHIS{&Q6g`K*bH&w|Ruj~Cq9H18D?*>S5z zv6-s5p@kRpNXoqiQv>!3+3Me3{KU?3(>&%ujr@EHt!b~$ZAuxfCn-l;CppI=%Lec) znefcT{>kREKt*-T;a0M#LqOBfAf}eTvhJ>P%nlr#`Fk3LWf@ ztn(!G-zSV(#Xx< z-^#9cxSpKk7t}gQ`HSSt#j24XB8oGu+Cs+Rug5{(^*AI64CYA%gYf{gCg%aqHezg= z>5&7#}2h-HAGP_w!!-Te$k-_E=wf z%t06-BK12l)1Q3p)$luCk`PVf)|R#WBl{$cGFxub{W20WnScHIIdmce&+2TRmyK&> z&+0H_Blc|PYvfchh+V{78?SY&pzFZK`?MQ8L|b0qB{o|!WHXpb{c0&*qE^A!r4)K0 zF{u~pvSUcW5kJ4+6N<=5m^1PTWz9+Ma8bpfJC+@PhO%{SS5nyAn~ygL<%<>R8jdMz z-!?J$`GmrTDRYKqHB&59d&#S7JDa%))pf8P*5bn(%hA5AaxsKAuAYo4!8{)Or%$)H z)ucJ*O{^2YzF$GnWR&WFkH30LvNmv{a=vX>!l(_{&FE6UyIfen>*tWca6!$}Ri?(n zK!SXnk5!a>8M%S3zSi~eT9Bzgx{WXln2ivJB<%YBH!g=(+Vek(zbo*(&{6}DZCb92-p#sb&;BU7*NjQn?dn> zldWPyc{C`k(aBvv0GFNpZYuwdTFo_`%tWfZKpdmjcXC|41jlaCt=gwME^4?UyspyY1V!5@To@<1q^%$c*$9td-3=Y&;5i&w^61{k3M_m+UB@k4y+Z?d!&PE zOYqw3kGGk8MvFqL9hpSj2kE$%-es_tF63{{HTK~aT#3TEF)NA1xou{?e|MzZtaBe9 z{)#oa+)%zmCWk7e!ss(EJiBRn*NkZ|jq@^XmtRZk#BpVS5y$m_fRUZ|Y~qa@nEu!l zhr!6+?-$o89%kY)vmf_=rK`l|`E?3=CI=g3cAA+768W&AZDSfXf;OK%;Qy{-sL!5N zskQ%HIMeZ%ifXUt45ujU+|wDM$8MU)1DR`dfRbvM1tS6 z#-1}{SUMU!iw}!deo$O?QLE&?bFP^DFh9=b{@?Bm&u9DVUFtp^>M%5$V8tQ|EpUMD z{srwZD^;;3ZxtK=7l+w#GYixWhc8r;)OOU7*w!4Ak_8haw#5cn9juT+nlc6z{XIGj!m8X}jSYd6a zhM8)cF`nv*jce9c+l;TrE=>>@%~LjL`VwX*F;%rYk7dYgVAx+C-ep)p{M7U$lNjXI zY|NXxpuI$15D;&h;+0`fR5(4S!;Lcz5|W}Gt*(P7c?%Wc7|KkZZo8ss&77|*ZOP$g z;mvv0Z$^#1&(+_+d7)LReiGTYeG?-l=TO0ZZfdCN8rv;zoGhNi8dXK5WMa;b31vnb zZAV;6zK$+h^gdDCdH2iB`OZR5NYWCQgM8y{wVGH?+zWWM?8b~SRh!^NQ`}z78Xr(!EI4?KX;_c|I-+i;C<|ca>91en@7^WwDk-I z{~CEL&MH9qpx`z?X;S`e3FOtm7-^bppxGX|F@7bqG?J(EgZMtrzK<)V55@sYG_c&7 zi6aSI;R#&1WvI8O??^zh{QmXas0NYHy5(H;xmK|beOv{H8=GA^$#9g3vzc=JM$|Of zS7~9NB&!zf?g~aMrF;Orp5cuz7J%M_!?gjw`jd=~(Kfq;_DZO4`1j*RxAY=k(<@x25zYB7CS# zq8eNxX6og6dB4-$JlKAR0Y()^M=aI4h!Y25?ky@HFJ$VaVtqox{0Xf6?$ehH_mDhY zTo4RgPAu8;W4l5xCJ9{177S$S6FM9eZz7!#yU+L3)!w;4OhwQ-)nzAvNoR)fSTXdI z$_%Fs5&XUT2gc_$j&#%+K5NR~t)nH@A9wDH8-8rK(BS&i{(+inNa`E*gCD#nRhI03 zdUx#7w_(lF5qRuFK#qApvO0$ zX`-R2)7Rf8*wa6>V7NapWaO=_V<=j#FIrwLE?O=wt?wO3X&@>tD!IT9Jh4Hc77_my zCjqFMG9fKC1gf&Zfgu}qtU{8KR~siq%+`;zBGM7!6oQOt9-T&yi2%exfiawmbNJ5B zl$uPht*V@yuj!&keROCf;LEiXxJ}FT(Rslm55}Fh60=FHFJdncd$IG~^~Tj>ASj%b zWUhfkZx*iiDdI!8R={T5AQjH#FDuK+QJxwGVB-cFY~z{?eea2b7pEc(M#`Y*b?RBROA~TDdax; z9E&R7tN7v}z|u3vA&-_jhX4Di18x@Zu@xi$$EsZ0dx+BsAmFeH0Zfj+AfSrHWfeam zx&gqUrA&7YQfSiX$3Xy1W&-8#6ZEYSA_&>V9^=W|5g$n)r2GCb9?6O?#tUjCzNCYe z_HuXd$m)w({G`92mJnKsDF%g41B@@-#xLEeu*U05S1RE-Tz|d8BYSnmc+RzYfPfOT z((uSKNL2&R@+z>=#AEXRs}ND6e=7vjJEWx}%vZa}&p#m0)2~wlRSO)!K=8X?2c()M z&o=BpV6)U|Sp%ibD5gx%yO*zvh`#o}R;O#T%_w*?@S~X(VzRtN!Pme~OD5pjTj-Sl z{(OrLUu$D zl+(h)dHT+<&09})5W!%_;07eX*#7fWN05?2L^)DUfVCAb$=8ZPuv0z}#oQhuH zfA(?qe+T~&1IOx!V<5NnGUOBbo<@Km=$GRRM~D#;r(~o6INK;7@|mygGvv3-5IL8b z9w6<dz)qAr?C{m-;%n#V;UwbV;dZ7lPmLdT|D7%mS{s3fCHC9+ z^guK1fOnmjp)ka4oRSK_#TD=ebB4s@2+a0z0agyAfJGMtL^f3Z1N{lX zCy1=!$OiQG;K>aoI^xM7ISO>IaE8p_3XwPcNPwn(Vt~Dy?et;*6uMCem^-+{28iBZ z0v>czKyVfar+Yva{qATWdOZ9E6!~LmoA5Q2^|{cvz?xh1d-LM}!0r2&et+)iT)gQ7YI$cF}?zf$Rok18C0)+6$i9O?U~oWiH^qQLd2H+mTfC#{GB>p zLJbNQFe@&nfuGR{sRRw!fGdM2D2D5UD8zUbq(NQ92D}->A2 \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" @@ -30,6 +48,7 @@ die ( ) { cygwin=false msys=false darwin=false +nonstop=false case "`uname`" in CYGWIN* ) cygwin=true @@ -40,26 +59,11 @@ case "`uname`" in MINGW* ) msys=true ;; + NONSTOP* ) + nonstop=true + ;; esac -# Attempt to set APP_HOME -# Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi -done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null - CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar # Determine the Java command to use to start the JVM. @@ -85,7 +89,7 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then MAX_FD_LIMIT=`ulimit -H -n` if [ $? -eq 0 ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then diff --git a/gradlew.bat b/gradlew.bat index 72d362dafd..f6d5974e72 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -8,14 +8,14 @@ @rem Set local scope for the variables with windows NT shell if "%OS%"=="Windows_NT" setlocal -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - set DIRNAME=%~dp0 if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java index ce27df226c..3c70c979cd 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.jayway.restassured.http.ContentType; +import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.http.HttpStatus; import org.junit.After; @@ -58,6 +59,8 @@ public void testGetFlooders() throws Exception { final EventType eventType = NakadiTestUtils.createEventType(); blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET); + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // give time for TreeCache refresh + given() .contentType(ContentType.JSON) .get(BLACKLIST_URL) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 44e2a38bfb..abf5fc3a53 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.http.HttpStatus; @@ -80,15 +81,15 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "stream_limit=1&stream_timeout=1") .start(); - waitFor(() -> Assert.assertFalse(client.getBatches().isEmpty()), 100L, 100); + waitFor(() -> Assert.assertFalse(client.getBatches().isEmpty()), TimeUnit.SECONDS.toMillis(1), 100); final SubscriptionCursor toCommit = client.getBatches().get(0).getCursor(); client.close(); // connection is closed, and stream as well - final int statusCode = NakadiTestUtils.commitCursors( + final int statusCode = commitCursors( subscription.getId(), Collections.singletonList(toCommit), client.getSessionId()); - Assert.assertEquals(HttpStatus.SC_NO_CONTENT, statusCode); + Assert.assertEquals(SC_NO_CONTENT, statusCode); } @Test(timeout = 30000) From e407aeb6ffed0a56b888c3ee829b305c9bff4666 Mon Sep 17 00:00:00 2001 From: antban Date: Mon, 27 Feb 2017 15:41:31 +0100 Subject: [PATCH 3/4] ARUHA-657 Fixes after review --- .../zalando/nakadi/webservice/SettingsControllerAT.java | 9 ++------- .../service/subscription/state/StreamingState.java | 2 ++ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java index 3c70c979cd..eeec46b67f 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/SettingsControllerAT.java @@ -4,7 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.jayway.restassured.http.ContentType; -import java.util.concurrent.TimeUnit; +import java.io.IOException; +import java.util.Collections; import org.apache.curator.framework.CuratorFramework; import org.apache.http.HttpStatus; import org.junit.After; @@ -16,10 +17,6 @@ import org.zalando.nakadi.utils.JsonTestHelper; import org.zalando.nakadi.webservice.utils.NakadiTestUtils; import org.zalando.nakadi.webservice.utils.ZookeeperTestUtils; - -import java.io.IOException; -import java.util.Collections; - import static com.jayway.restassured.RestAssured.given; public class SettingsControllerAT extends BaseAT { @@ -59,8 +56,6 @@ public void testGetFlooders() throws Exception { final EventType eventType = NakadiTestUtils.createEventType(); blacklist(eventType.getName(), BlacklistService.Type.CONSUMER_ET); - Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // give time for TreeCache refresh - given() .contentType(ContentType.JSON) .get(BLACKLIST_URL) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 1774400dd1..29b734779f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -1,6 +1,7 @@ package org.zalando.nakadi.service.subscription.state; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -92,6 +93,7 @@ private long getLastCommitMillis() { } private Map getUncommittedOffsets() { + Preconditions.checkNotNull(uncommittedOffsets, "uncommittedOffsets should not be null on time of call"); return uncommittedOffsets; } From 0d45111477e9f845b5eeda87a45686d10f349e91 Mon Sep 17 00:00:00 2001 From: antban Date: Fri, 3 Mar 2017 11:34:54 +0100 Subject: [PATCH 4/4] ARUHA-657 Fix test subscription client --- .../zalando/nakadi/webservice/hila/HilaAT.java | 11 ++++++----- .../webservice/utils/TestStreamingClient.java | 18 ++++++++---------- .../nakadi/service/EventStreamTest.java | 10 ++++++---- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index abf5fc3a53..f66635a724 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -76,15 +76,16 @@ public void before() throws IOException { @Test(timeout = 10000) public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { - publishEvent(eventType.getName(),"{\"foo\":\"bar\"}"); - publishEvent(eventType.getName(),"{\"foo\":\"bar\"}"); final TestStreamingClient client = TestStreamingClient - .create(URL, subscription.getId(), "stream_limit=1&stream_timeout=1") + .create(URL, subscription.getId(), "batch_limit=1&stream_limit=2&stream_timeout=1") .start(); - waitFor(() -> Assert.assertFalse(client.getBatches().isEmpty()), TimeUnit.SECONDS.toMillis(1), 100); + waitFor(() -> assertThat(client.getSessionId(), not(equalTo(SESSION_ID_UNKNOWN)))); + + publishEvent(eventType.getName(),"{\"foo\":\"bar\"}"); + waitFor(() -> Assert.assertFalse(client.getBatches().isEmpty()), TimeUnit.SECONDS.toMillis(2), 100); final SubscriptionCursor toCommit = client.getBatches().get(0).getCursor(); client.close(); // connection is closed, and stream as well - + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); final int statusCode = commitCursors( subscription.getId(), Collections.singletonList(toCommit), diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java index 3067919c23..678897399a 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java @@ -29,7 +29,7 @@ public class TestStreamingClient implements Runnable { private volatile boolean running; private final List batches; - private InputStream inputStream; + private HttpURLConnection connection; private String sessionId; private Optional token; private volatile int responseCode; @@ -60,17 +60,17 @@ public static TestStreamingClient create(final String baseUrl, final String subs public void run() { try { final String url = format("{0}/subscriptions/{1}/events?{2}", baseUrl, subscriptionId, params); - final HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); - token.ifPresent(token -> conn.setRequestProperty("Authorization", "Bearer " + token)); - responseCode = conn.getResponseCode(); - conn.getHeaderFields().entrySet().stream() + connection = (HttpURLConnection) new URL(url).openConnection(); + token.ifPresent(token -> connection.setRequestProperty("Authorization", "Bearer " + token)); + responseCode = connection.getResponseCode(); + connection.getHeaderFields().entrySet().stream() .filter(entry -> entry.getKey() != null) .forEach(entry -> headers.put(entry.getKey(), entry.getValue())); if (responseCode != HttpURLConnection.HTTP_OK) { throw new IOException("Response code is " + responseCode); } - sessionId = conn.getHeaderField("X-Nakadi-StreamId"); - inputStream = conn.getInputStream(); + sessionId = connection.getHeaderField("X-Nakadi-StreamId"); + final InputStream inputStream = connection.getInputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); running = true; @@ -118,9 +118,7 @@ public TestStreamingClient start() { public boolean close() { if (running) { try { - inputStream.close(); - } catch (IOException e) { - e.printStackTrace(); + connection.disconnect(); } finally { running = false; } diff --git a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java index 64fe589ffc..8328c4e789 100644 --- a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java @@ -10,10 +10,12 @@ import java.util.List; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.zalando.nakadi.domain.ConsumedEvent; @@ -32,6 +34,7 @@ import static org.mockito.Mockito.when; import static org.zalando.nakadi.service.EventStream.BATCH_SEPARATOR; import static org.zalando.nakadi.utils.TestUtils.randomString; +import static org.zalando.nakadi.utils.TestUtils.waitFor; import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; public class EventStreamTest { @@ -91,14 +94,13 @@ public void whenCrutchWorkedThenStreamIsClosed() throws NakadiException, Interru final Thread thread = new Thread(() -> eventStream.streamEvents(streamOpen)); thread.start(); - Thread.sleep(3000); - assertThat("As there are no exit conditions in config - the thread should be running", - thread.isAlive(), is(true)); + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + waitFor(()-> Assert.assertTrue(thread.isAlive())); // simulation of client closing the connection using crutch streamOpen.set(false); - Thread.sleep(3000); + waitFor(() -> Assert.assertFalse(thread.isAlive()), TimeUnit.SECONDS.toMillis(3)); assertThat("The thread should be dead now, as we simulated that client closed connection", thread.isAlive(), is(false)); thread.join();