From 6819a48797864b56b0d0dfaf37174e532ebdc67c Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 5 Jul 2024 23:04:55 +0200 Subject: [PATCH] Add cache of background polling threads and use from kafka SDF --- .../sdk/io/kafka/KafkaConsumerPollThread.java | 14 +- .../kafka/KafkaConsumerPollThreadCache.java | 207 ++++++++++++++++++ .../sdk/io/kafka/KafkaUnboundedReader.java | 10 +- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 94 +++----- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 23 +- .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 2 + 6 files changed, 268 insertions(+), 82 deletions(-) create mode 100644 sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThreadCache.java diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThread.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThread.java index 4dbf8a9ed50f3..175c5672f3dc1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThread.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThread.java @@ -19,7 +19,12 @@ import java.io.IOException; import java.time.Duration; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -168,13 +173,14 @@ ConsumerRecords readRecords() throws IOException { recordsDequeuePollTimeout = recordsDequeuePollTimeout.minus(Duration.ofMillis(1)); LOG.debug("Reducing poll timeout for reader to " + recordsDequeuePollTimeout.toMillis()); } - } else if (recordsDequeuePollTimeout.compareTo(RECORDS_DEQUEUE_POLL_TIMEOUT_MAX) < 0) { + return ConsumerRecords.empty(); + } + if (recordsDequeuePollTimeout.compareTo(RECORDS_DEQUEUE_POLL_TIMEOUT_MAX) < 0) { recordsDequeuePollTimeout = recordsDequeuePollTimeout.plus(Duration.ofMillis(1)); LOG.debug("Increasing poll timeout for reader to " + recordsDequeuePollTimeout.toMillis()); LOG.debug("Record count: " + records.count()); } - - return records == null ? ConsumerRecords.empty() : records; + return records; } /** diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThreadCache.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThreadCache.java new file mode 100644 index 0000000000000..8828cc9e5f440 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaConsumerPollThreadCache.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalCause; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerPollThreadCache { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerPollThreadCache.class); + private final ExecutorService invalidationExecutor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("KafkaConsumerPollCache-invalidation-%d") + .build()); + private final ExecutorService backgroundThreads = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("KafkaConsumerPollCache-poll-%d") + .build()); + + // Note on thread safety. This class is thread safe because: + // - Guava Cache is thread safe. + // - There is no state other than Cache. + // - API is strictly a 1:1 wrapper over Cache API (not counting cache.cleanUp() calls). + // - i.e. it does not invoke more than one call, which could make it inconsistent. + // If any of these conditions changes, please test ensure and test thread safety. + + private static class CacheKey { + final Map consumerConfig; + final SerializableFunction, Consumer> consumerFactoryFn; + final KafkaSourceDescriptor descriptor; + + CacheKey( + Map consumerConfig, + SerializableFunction, Consumer> consumerFactoryFn, + KafkaSourceDescriptor descriptor) { + this.consumerConfig = consumerConfig; + this.consumerFactoryFn = consumerFactoryFn; + this.descriptor = descriptor; + } + + @Override + public boolean equals(@Nullable Object other) { + if (other == null) { + return false; + } + if (!(other instanceof CacheKey)) { + return false; + } + CacheKey otherKey = (CacheKey) other; + return descriptor.equals(otherKey.descriptor) + && consumerFactoryFn.equals(otherKey.consumerFactoryFn) + && consumerConfig.equals(otherKey.consumerConfig); + } + + @Override + public int hashCode() { + return Objects.hash(descriptor, consumerFactoryFn, consumerConfig); + } + } + + private static class CacheEntry { + + final KafkaConsumerPollThread pollThread; + final long offset; + + CacheEntry(KafkaConsumerPollThread pollThread, long offset) { + this.pollThread = pollThread; + this.offset = offset; + } + } + + private final Duration cacheDuration = Duration.ofMinutes(1); + private final Cache cache; + + @SuppressWarnings("method.invocation") + KafkaConsumerPollThreadCache() { + this.cache = + CacheBuilder.newBuilder() + .expireAfterWrite(cacheDuration.toMillis(), TimeUnit.MILLISECONDS) + .removalListener( + (RemovalNotification notification) -> { + if (notification.getCause() != RemovalCause.EXPLICIT) { + LOG.info( + "Asynchronously closing reader for {} as it has been idle for over {}", + notification.getKey(), + cacheDuration); + asyncCloseConsumer( + checkNotNull(notification.getKey()), checkNotNull(notification.getValue())); + } + }) + .build(); + } + + KafkaConsumerPollThread acquireConsumer( + Map consumerConfig, + SerializableFunction, Consumer> consumerFactoryFn, + KafkaSourceDescriptor kafkaSourceDescriptor, + long offset) { + CacheKey key = new CacheKey(consumerConfig, consumerFactoryFn, kafkaSourceDescriptor); + CacheEntry entry = cache.asMap().remove(key); + cache.cleanUp(); + if (entry != null) { + if (entry.offset == offset) { + return entry.pollThread; + } else { + // Offset doesn't match, close. + LOG.info("Closing consumer as it is no longer valid {}", kafkaSourceDescriptor); + asyncCloseConsumer(key, entry); + } + } + + Map updatedConsumerConfig = + overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); + LOG.info( + "Creating Kafka consumer for process continuation for {}", + kafkaSourceDescriptor.getTopicPartition()); + Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig); + ConsumerSpEL.evaluateAssign( + consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); + consumer.seek(kafkaSourceDescriptor.getTopicPartition(), offset); + KafkaConsumerPollThread pollThread = new KafkaConsumerPollThread(); + pollThread.startOnExecutor(backgroundThreads, consumer); + return pollThread; + } + + /** Close the reader and log a warning if close fails. */ + private void asyncCloseConsumer(CacheKey key, CacheEntry entry) { + invalidationExecutor.execute( + () -> { + try { + entry.pollThread.close(); + LOG.info("Finished closing consumer for {}", key); + } catch (IOException e) { + LOG.warn("Failed to close consumer for {}", key, e); + } + }); + } + + void releaseConsumer( + Map consumerConfig, + SerializableFunction, Consumer> consumerFactoryFn, + KafkaSourceDescriptor kafkaSourceDescriptor, + KafkaConsumerPollThread pollThread, + long offset) { + CacheKey key = new CacheKey(consumerConfig, consumerFactoryFn, kafkaSourceDescriptor); + CacheEntry existing = cache.asMap().putIfAbsent(key, new CacheEntry(pollThread, offset)); + if (existing != null) { + LOG.warn("Unexpected collision of topic and partition"); + asyncCloseConsumer(key, existing); + } + cache.cleanUp(); + } + + private Map overrideBootstrapServersConfig( + Map currentConfig, KafkaSourceDescriptor description) { + checkState( + currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + || description.getBootStrapServers() != null); + Map config = new HashMap<>(currentConfig); + if (description.getBootStrapServers() != null && !description.getBootStrapServers().isEmpty()) { + config.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + String.join(",", description.getBootStrapServers())); + } + return config; + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 7aa9df199f919..5f1d621605f6b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -348,7 +348,7 @@ public long getSplitBacklogBytes() { private final Counter checkpointMarkCommitsSkipped = Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC); - private final KafkaConsumerPollThread pollThread; + private final transient KafkaConsumerPollThread pollThread; // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout @@ -555,7 +555,7 @@ private void nextBatch() throws IOException { curBatch = Collections.emptyIterator(); ConsumerRecords records = pollThread.readRecords(); - if (records != null) { + if (!records.isEmpty()) { partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator()); // cycle through the partitions in order to interleave records from each. curBatch = Iterators.cycle(new ArrayList<>(partitionStates)); @@ -641,7 +641,11 @@ private long getSplitBacklogMessageCount() { @Override public void close() throws IOException { closed.set(true); - pollThread.close(); + try { + pollThread.close(); + } catch (IOException e) { + LOG.warn("Error shutting down poll thread", e); + } offsetFetcherThread.shutdown(); boolean isShutdown = false; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 46599bf1bf8f1..ae02221e87067 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -21,14 +21,10 @@ import java.time.Duration; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -59,12 +55,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; @@ -104,11 +98,10 @@ *

Checkpoint and Resume Processing

* *

There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and - * system-checkpoint which is issued by the runner via {@link - * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the - * consumer gets empty response from {@link Consumer#poll(Duration)}, {@link ReadFromKafkaDoFn} will - * checkpoint the current {@link KafkaSourceDescriptor} and move to process the next element. These - * deferred elements will be resumed by the runner as soon as possible. + * system-checkpoint which is issued by the runner via {@link BeamFnApi.ProcessBundleSplitRequest}. + * Every time the consumer gets empty response from {@link Consumer#poll(Duration)}, {@link + * ReadFromKafkaDoFn} will checkpoint the current {@link KafkaSourceDescriptor} and move to process + * the next element. These deferred elements will be resumed by the runner as soon as possible. * *

Progress and Size

* @@ -220,6 +213,9 @@ private ReadFromKafkaDoFn( private final TupleTag>> recordTag; + private static final Supplier cache = + Suppliers.memoize(KafkaConsumerPollThreadCache::new); + // Valid between bundle start and bundle finish. private transient @Nullable Deserializer keyDeserializerInstance = null; private transient @Nullable Deserializer valueDeserializerInstance = null; @@ -435,27 +431,15 @@ public ProcessContinuation processElement( kafkaSourceDescriptor.getTopicPartition(), Optional.ofNullable(watermarkEstimator.currentWatermark())); } - - LOG.info( - "Creating Kafka consumer for process continuation for {}", - kafkaSourceDescriptor.getTopicPartition()); - try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { - ConsumerSpEL.evaluateAssign( - consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); - long startOffset = tracker.currentRestriction().getFrom(); - - long expectedOffset = startOffset; - consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset); - - KafkaConsumerPollThread pollThread = new KafkaConsumerPollThread(); - ExecutorService backgroundThread = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("KafkaConsumerPoll-thread") - .build()); - pollThread.startOnExecutor(backgroundThread, consumer); - + final long startOffset = tracker.currentRestriction().getFrom(); + long resumeOffset = startOffset; + @Nullable KafkaConsumerPollThread pollThread = null; + try { + pollThread = + cache + .get() + .acquireConsumer( + updatedConsumerConfig, consumerFactoryFn, kafkaSourceDescriptor, startOffset); while (true) { ConsumerRecords rawRecords = pollThread.readRecords(); // When there are no records available for the current TopicPartition, self-checkpoint @@ -472,6 +456,7 @@ public ProcessContinuation processElement( } for (ConsumerRecord rawRecord : rawRecords) { if (!tracker.tryClaim(rawRecord.offset())) { + // XXX need to add unconsumed records back. return ProcessContinuation.stop(); } try { @@ -490,9 +475,9 @@ public ProcessContinuation processElement( + (rawRecord.value() == null ? 0 : rawRecord.value().length); avgRecordSize .getUnchecked(kafkaSourceDescriptor.getTopicPartition()) - .update(recordSize, rawRecord.offset() - expectedOffset); + .update(recordSize, rawRecord.offset() - resumeOffset); rawSizes.update(recordSize); - expectedOffset = rawRecord.offset() + 1; + resumeOffset = rawRecord.offset() + 1; Instant outputTimestamp; // The outputTimestamp and watermark will be computed by timestampPolicy, where the // WatermarkEstimator should be a manual one. @@ -522,6 +507,17 @@ public ProcessContinuation processElement( } } } + } finally { + if (pollThread != null) { + cache + .get() + .releaseConsumer( + updatedConsumerConfig, + consumerFactoryFn, + kafkaSourceDescriptor, + pollThread, + resumeOffset); + } } } @@ -542,34 +538,6 @@ private boolean topicPartitionExists( return true; } - // see https://github.com/apache/beam/issues/25962 - private ConsumerRecords poll( - Consumer consumer, TopicPartition topicPartition) { - final Stopwatch sw = Stopwatch.createStarted(); - long previousPosition = -1; - java.time.Duration elapsed = java.time.Duration.ZERO; - java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout); - while (true) { - final ConsumerRecords rawRecords = consumer.poll(timeout.minus(elapsed)); - if (!rawRecords.isEmpty()) { - // return as we have found some entries - return rawRecords; - } - if (previousPosition == (previousPosition = consumer.position(topicPartition))) { - // there was no progress on the offset/position, which indicates end of stream - return rawRecords; - } - elapsed = sw.elapsed(); - if (elapsed.toMillis() >= timeout.toMillis()) { - // timeout is over - LOG.warn( - "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", - consumerPollingTimeout); - return rawRecords; - } - } - } - private TimestampPolicyContext updateWatermarkManually( TimestampPolicy timestampPolicy, WatermarkEstimator watermarkEstimator, diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 73aee5aeeef0f..247b39e691fc8 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -114,12 +114,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -151,6 +146,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.slf4j.Logger; @@ -162,7 +158,7 @@ */ @RunWith(JUnit4.class) public class KafkaIOTest { - + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private static final Logger LOG = LoggerFactory.getLogger(KafkaIOTest.class); /* @@ -178,7 +174,7 @@ public class KafkaIOTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Rule - public ExpectedLogs unboundedReaderExpectedLogs = ExpectedLogs.none(KafkaUnboundedReader.class); + public ExpectedLogs pollThreadExpectedLogs = ExpectedLogs.none(KafkaConsumerPollThread.class); @Rule public ExpectedLogs kafkaIOExpectedLogs = ExpectedLogs.none(KafkaIO.class); @@ -1398,7 +1394,7 @@ public void testUnboundedSourceMetrics() { } @Test - public void testUnboundedReaderLogsCommitFailure() throws Exception { + public void testUnboundedReaderLogsFetchFailure() throws Exception { List topics = ImmutableList.of("topic_a"); @@ -1415,9 +1411,12 @@ public void testUnboundedReaderLogsCommitFailure() throws Exception { UnboundedReader> reader = source.createReader(null, null); - reader.start(); - - unboundedReaderExpectedLogs.verifyWarn("exception while fetching latest offset for partition"); + try { + reader.start(); + } catch (Exception e) { + // Racy if we observe the exception on initial advance. + } + pollThreadExpectedLogs.verifyError("Exception while reading from Kafka"); reader.close(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 612b20393d789..cc8546e50a3c6 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -83,8 +83,10 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; public class ReadFromKafkaDoFnTest { + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private final TopicPartition topicPartition = new TopicPartition("topic", 0);