-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add suspend and resume functionality #75
Conversation
55b43b2
to
d14913d
Compare
kafkaClientInterface.batchCursorContext(cursors) | ||
} | ||
|
||
val kafkaBatchSink: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how you make sinks do something different depending on an input, in our case if we have an SuccessfulUploadPart
we want to commit the cursors and if we have a FailedUploadPart
we just log (note that Alpakka S3's client will automatically retry uploading a failed chunk)
@@ -49,7 +49,9 @@ trait BackupClientSpec | |||
with StrictLogging { | |||
|
|||
implicit val ec: ExecutionContext = system.dispatcher | |||
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) | |||
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis) | |||
implicit override val generatorDrivenConfig: PropertyCheckConfiguration = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was causing issues in our tests, normally for property driven tests they run a certain number of times however in our case with a real integration test with Kafka we only want it to run once (and to succeed once)
def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] | ||
def backupToStorageSink(key: String, | ||
currentState: Option[CurrentState] | ||
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now instead of just having a Sink
that is a generic ByteString
instead we also have to pass the cursor context into the Sink
(and its now the sinks responsibility on how to handle each outputting element)
}, | ||
empty | ||
) | ||
val subFlowSink = substreams.to( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main change here is that we no longer commit the cursors after a timeslice since it makes no sense to do so.
@@ -51,7 +53,7 @@ class KafkaClient( | |||
/** @return | |||
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors | |||
*/ | |||
override val getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] = | |||
override def getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being changed from a val
to def
because you cannot make super calls in subclasses on a field that is defined as a val
(it has to be a method). This is relevant for KafkaClientWithKillSwitch
which extends KafkaClient
@@ -70,5 +72,14 @@ class KafkaClient( | |||
/** @return | |||
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message | |||
*/ | |||
override val commitCursor: Sink[Committable, Future[Done]] = Committer.sink(committerSettings) | |||
override def commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being changed from a val
to def
because you cannot make super calls in subclasses on a field that is defined as a val
(it has to be a method). This is relevant for KafkaClientWithKillSwitch
which extends KafkaClient
* @return | ||
* A collection data structure that represents the batched cursors | ||
*/ | ||
override def batchCursorContext(cursors: immutable.Iterable[CommittableOffset]): CommittableOffsetBatch = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given a list of cursors, we need to define how to "batch" these together so that they can be committed to Kafka (which is done with commitCursor
. For the actual batching we use CommittableOffsetBatch
which is Alpakka's own internal mechanism for batching Kafka cursors (internally it uses a Map[GroupPartitionOffset, Long]
to continuously maintain a set of cursors for each given group/partition/offset).
Note that CommittableOffsetBatch
is also global
@@ -18,6 +19,10 @@ trait KafkaClientInterface { | |||
*/ | |||
type Control | |||
|
|||
/** The type that represents the result of batching a `CursorContext` | |||
*/ | |||
type BatchedCursorContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed because there is a difference between a single cursor and a batch of cursors.
* @return | ||
* A collection data structure that represents the batched cursors | ||
*/ | ||
override def batchCursorContext(cursors: immutable.Iterable[Long]): Long = cursors.max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case of a dummy mock, we just get the largest value out of all of the currently submitted cursors
df6e4fc
to
1526f2e
Compare
So I just updated this PR (note that its not compiling because it requires a locally published Alpakka with akka/alpakka#2770, since this PR is in flux I will create a snapshot of it when I get feedback from the Alpakka maintainers). Here are some major notes
|
0915de4
to
b27a14e
Compare
94d83fa
to
c39b723
Compare
c39b723
to
887516a
Compare
c275bfb
to
6210752
Compare
@@ -247,18 +271,37 @@ trait BackupClientInterface[T <: KafkaClientInterface] { | |||
} | |||
} | |||
|
|||
/** Prepares the sink before it gets handed to `backupToStorageSink` | |||
*/ | |||
private[backup] def prepareStartOfStream(state: Option[CurrentState], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function handles the case where if there is current state (i.e. you are resuming from a previously upload) then we need to drop the first character [
from the stream since there is an already existing half complete upload which is the middle of an array
configureConsumer | ||
.fold(base)(block => block(base)) | ||
.withProperties( | ||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the magic configuration without which the suspend/resume case does not work. It basically prevents Kafka from pushing the commit to an offset that is after the one we manually/explicitly committed.
Another thing to note here is that this configuration is applied last which essentially means its hardcoded/impossible to override. This is done deliberately because otherwise Guardian just won't work correctly
6210752
to
441fd83
Compare
The PR is ready to review, the main notable change since the last comment is that the configuration for the timeslice has been changed (see Ontop of this the testing strategy for the end to end tests have changed, rather than manually terminating incomplete uploads instead we send all of the data we care about in a single time window (i.e. a |
): Long = { | ||
val (period, finalInitialTime) = timeConfiguration match { | ||
case PeriodFromFirst(duration) => (duration, initialTime) | ||
case ChronoUnitSlice(chronoUnit) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its configured for the suspend/resume case then we need to set the initial time to also be the start of the time unit for things to work correctly.
441fd83
to
93bfcd5
Compare
93bfcd5
to
151614d
Compare
max-interval = ${?AKKA_KAFKA_COMMITTER_MAX_INTERVAL} | ||
parallelism = ${?AKKA_KAFKA_COMMITTER_PARALLELISM} | ||
delivery = ${?AKKA_KAFKA_COMMITTER_DELIVERY} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These configurations are being removed because they are hardcoded by the Kafka client itself so its impossible to override.
@@ -47,6 +51,14 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse | |||
/** @return | |||
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message | |||
*/ | |||
override def commitCursor: Sink[Long, Future[Done]] = Sink.foreach(cursor => committedOffsets ++ Iterable(cursor)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is the same as what was done in #76, i.e. committedOffsets ++ Iterable(cursor)
doesn't actually end up doing anything because committedOffsets
is immutable so we are just throwing away the reference.
backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala
Outdated
Show resolved
Hide resolved
134dce5
to
1e950e1
Compare
backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala
Outdated
Show resolved
Hide resolved
LGTM but certainly @jlprat is needed here :-) |
0845e66
to
9a41aef
Compare
9a41aef
to
bc7f1c2
Compare
I just rebased the original commit, I forgot to change the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple of minor comments, but LGTM
gcsConfig: GCSConfig | ||
) extends BackupClientInterface[T] { | ||
|
||
override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None) | ||
|
||
override type BackupResult = Option[StorageObject] | ||
|
||
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = { | ||
override type CurrentState = Nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing this Nothing
is purely a placeholder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, GCS is not implemented fully
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = { | ||
override type CurrentState = Nothing | ||
|
||
override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala
Outdated
Show resolved
Hide resolved
override type CurrentState = CurrentS3State | ||
|
||
override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = { | ||
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably it would be better to use a dedicated EC in this case. This one is the internal one used by Akka, and usually you don't want to cog that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought the same however for this case I believe its overcomplicated due to the fact that this ec will only get executed per time slice which in the grand scheme of things is nothing (unless you do something silly and configure it with timeslice of 1 nanosecond).
case rest => | ||
val last = rest.maxBy(_.initiated)(Ordering[Instant]) | ||
logger.warn( | ||
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a call to action to clean the other ones up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I want to resolve this comprehensively with #82, it may be that this case won't actually occur after resolving this ticket.
parts | ||
case _ => | ||
// We drop the last part here since its broken | ||
parts.dropRight(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? And what happens this last part? Is it corrupt? Is it simply useless data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this was added when I was figuring out what was causing corrupted data in the S3 buckets. I don't think this case can occur and I just left it there as a safeguard but I can remove it if requested (I think it theoretically can occur but I need to simulate it via a test case)
@@ -23,6 +23,7 @@ val akkaStreamsJson = "0.8.0" | |||
val diffxVersion = "0.5.6" | |||
val testContainersVersion = "0.39.8" | |||
val testContainersJavaVersion = "1.16.2" | |||
val scalaCheckVersion = "1.15.5-1-SNAPSHOT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why a snapshot version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It contains typelevel/scalacheck#849 which is required to generate data that is approximately the size of S3 chunks
backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
About this change - What it does
This PR is currently an in progress PR on the suspend/resume functionality. Its currently blocked because of the following issues