Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suspend and resume functionality #75

Merged
merged 5 commits into from
Jan 18, 2022
Merged

Add suspend and resume functionality #75

merged 5 commits into from
Jan 18, 2022

Conversation

mdedetrich
Copy link
Contributor

@mdedetrich mdedetrich commented Oct 25, 2021

About this change - What it does

This PR is currently an in progress PR on the suspend/resume functionality. Its currently blocked because of the following issues

@mdedetrich mdedetrich marked this pull request as draft October 25, 2021 10:08
kafkaClientInterface.batchCursorContext(cursors)
}

val kafkaBatchSink: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how you make sinks do something different depending on an input, in our case if we have an SuccessfulUploadPart we want to commit the cursors and if we have a FailedUploadPart we just log (note that Alpakka S3's client will automatically retry uploading a failed chunk)

@@ -49,7 +49,9 @@ trait BackupClientSpec
with StrictLogging {

implicit val ec: ExecutionContext = system.dispatcher
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing issues in our tests, normally for property driven tests they run a certain number of times however in our case with a real integration test with Kafka we only want it to run once (and to succeed once)

def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]]
def backupToStorageSink(key: String,
currentState: Option[CurrentState]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now instead of just having a Sink that is a generic ByteString instead we also have to pass the cursor context into the Sink (and its now the sinks responsibility on how to handle each outputting element)

},
empty
)
val subFlowSink = substreams.to(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change here is that we no longer commit the cursors after a timeslice since it makes no sense to do so.

@@ -51,7 +53,7 @@ class KafkaClient(
/** @return
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
*/
override val getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] =
override def getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] =
Copy link
Contributor Author

@mdedetrich mdedetrich Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being changed from a val to def because you cannot make super calls in subclasses on a field that is defined as a val (it has to be a method). This is relevant for KafkaClientWithKillSwitch which extends KafkaClient

@@ -70,5 +72,14 @@ class KafkaClient(
/** @return
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
*/
override val commitCursor: Sink[Committable, Future[Done]] = Committer.sink(committerSettings)
override def commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings)
Copy link
Contributor Author

@mdedetrich mdedetrich Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being changed from a val to def because you cannot make super calls in subclasses on a field that is defined as a val (it has to be a method). This is relevant for KafkaClientWithKillSwitch which extends KafkaClient

* @return
* A collection data structure that represents the batched cursors
*/
override def batchCursorContext(cursors: immutable.Iterable[CommittableOffset]): CommittableOffsetBatch =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given a list of cursors, we need to define how to "batch" these together so that they can be committed to Kafka (which is done with commitCursor. For the actual batching we use CommittableOffsetBatch which is Alpakka's own internal mechanism for batching Kafka cursors (internally it uses a Map[GroupPartitionOffset, Long] to continuously maintain a set of cursors for each given group/partition/offset).

Note that CommittableOffsetBatch is also global

@@ -18,6 +19,10 @@ trait KafkaClientInterface {
*/
type Control

/** The type that represents the result of batching a `CursorContext`
*/
type BatchedCursorContext
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because there is a difference between a single cursor and a batch of cursors.

* @return
* A collection data structure that represents the batched cursors
*/
override def batchCursorContext(cursors: immutable.Iterable[Long]): Long = cursors.max
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case of a dummy mock, we just get the largest value out of all of the currently submitted cursors

@mdedetrich mdedetrich force-pushed the add-suspend-resume branch 2 times, most recently from df6e4fc to 1526f2e Compare November 7, 2021 11:11
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Nov 7, 2021

So I just updated this PR (note that its not compiling because it requires a locally published Alpakka with akka/alpakka#2770, since this PR is in flux I will create a snapshot of it when I get feedback from the Alpakka maintainers). Here are some major notes

  • Previously the tests were not working correctly for a number of reasons which are detailed below
    • Originally we were using .stop()/.shutdown() on the control returned from the Alpakka Kafka client to shut down the client inbetween the suspend/resume. What this ended up doing is it shutdown the entire test suite because since these methods are designed for a controlled shutdown when your app is terminating. To fix this I ended up using a killswitch (hence KafkaClientWithKillSwitch). There is an argument that this should be in the main KafkaClient however since the KafkaClient is a class any user can just extend it and change it as they want (as is evidenced by the existence of KafkaClientWithKillSwitch)
    • Due to us using property tests there was an issue where the tests was looping because typically speaking property based tests do a number of runs with randomly generated data to try and find corner cases. However since RealS3BackupClientSpec is more of an end to end test that takes a while to run this is not what we want, so I set the test to just run once
      • This brings us another ongoing problem which is that currently when we try to compare the maps of received/input data at the end of RealS3BackupClientSpec using Scala's/Java's equality, it takes around 5 minutes to do so. Current suspicion is that this is due to a bottleneck in hashCode() where a lot of collisions are being made on the datasets (which are very large, they are ~10-20 mb in memory because of how large S3 chunks are i.e. 5mb). This suspicion is likely a result of the type of data that scalacheck generates, i.e. it doesn't generate a uniform distribution of data but rather generates data that help test corner cases. Its likely that I will need to modify the tests so that the given input data for Kafka's actual message content is not random but just an incrementing integer which shouldn't have this hashcode problem (same as how the tests which I wrote in Alpakka work, where this problem doesn't exist)
  • I hit a corner case which I previously missed in the case of killing the backup client inside the time window. Because of how JSON works and the fact we are outputting the data as a JSON array, at the end of every chunk you will have something like ..}, (i.e. the ending character will always be a JSON ,). If we try and complete the file here you will end up with ..},] which is not valid JSON since it doesn't support trailing comma's. Since we cannot modify currently uploaded chunks in order to solve this problem we append null at the end so you will end up with ..},null]. This means that
    • Whenever you try and decode/stream a backed up JSON file from guardian, you have to account for possible null values in the JSON array. This is why the decoders have been changed from CirceStreamSupport.decode[List[ReducedConsumerRecord]] to CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]
    • As a bonus this is a nice way of detecting if a JSON file backed up by guardian is the result of the backup tool being killed/closed versus if a JSON file has been naturally created due to hitting a time slice boundary (which will terminate without a null since we know this case is coming up)
    • We also need to create such a tool/project in guardian to do this (i.e. converting the chunks into a proper S3 object by adding a null] on the end)
    • Of course you are not forced to complete the chunks into a file if you don't want to (I just needed to do so in this test because I need actually have to download the S3 object to see if its correctly created).
  • Even with the previously mentioned problem of equals/hashCode bottlenecking the CPU, I can tell that the current implementation is not working by comparing the size of the datastructures. What is being downloaded from S3 is smaller than the input data that is being sent to Kafka and then streamed into our backup client. This could be any number of things (the good news is that the data is always well formed, i.e. chunks are always being created with a properly terminated ..},.

@mdedetrich mdedetrich force-pushed the add-suspend-resume branch 3 times, most recently from 0915de4 to b27a14e Compare November 7, 2021 12:06
@mdedetrich mdedetrich force-pushed the add-suspend-resume branch 11 times, most recently from 94d83fa to c39b723 Compare January 10, 2022 09:40
@mdedetrich mdedetrich force-pushed the add-suspend-resume branch 2 times, most recently from c275bfb to 6210752 Compare January 13, 2022 08:41
@@ -247,18 +271,37 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
}
}

/** Prepares the sink before it gets handed to `backupToStorageSink`
*/
private[backup] def prepareStartOfStream(state: Option[CurrentState],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function handles the case where if there is current state (i.e. you are resuming from a previously upload) then we need to drop the first character [ from the stream since there is an already existing half complete upload which is the middle of an array

configureConsumer
.fold(base)(block => block(base))
.withProperties(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
Copy link
Contributor Author

@mdedetrich mdedetrich Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the magic configuration without which the suspend/resume case does not work. It basically prevents Kafka from pushing the commit to an offset that is after the one we manually/explicitly committed.

Another thing to note here is that this configuration is applied last which essentially means its hardcoded/impossible to override. This is done deliberately because otherwise Guardian just won't work correctly

@mdedetrich mdedetrich marked this pull request as ready for review January 13, 2022 08:53
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Jan 13, 2022

The PR is ready to review, the main notable change since the last comment is that the configuration for the timeslice has been changed (see TimeConfiguration). I am not completely happy with the terminology/nomenclature used for TimeConfiguration so feel free to come up with better suggestions.

Ontop of this the testing strategy for the end to end tests have changed, rather than manually terminating incomplete uploads instead we send all of the data we care about in a single time window (i.e. a MINUTE) and then we send a dummy message on the next time window. This forces the previous time window with all of the test data that we care about to complete as a full object in S3

): Long = {
val (period, finalInitialTime) = timeConfiguration match {
case PeriodFromFirst(duration) => (duration, initialTime)
case ChronoUnitSlice(chronoUnit) =>
Copy link
Contributor Author

@mdedetrich mdedetrich Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If its configured for the suspend/resume case then we need to set the initial time to also be the start of the time unit for things to work correctly.

max-interval = ${?AKKA_KAFKA_COMMITTER_MAX_INTERVAL}
parallelism = ${?AKKA_KAFKA_COMMITTER_PARALLELISM}
delivery = ${?AKKA_KAFKA_COMMITTER_DELIVERY}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configurations are being removed because they are hardcoded by the Kafka client itself so its impossible to override.

@@ -47,6 +51,14 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse
/** @return
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
*/
override def commitCursor: Sink[Long, Future[Done]] = Sink.foreach(cursor => committedOffsets ++ Iterable(cursor))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is the same as what was done in #76, i.e. committedOffsets ++ Iterable(cursor) doesn't actually end up doing anything because committedOffsets is immutable so we are just throwing away the reference.

@reta
Copy link
Contributor

reta commented Jan 14, 2022

LGTM but certainly @jlprat is needed here :-)

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Jan 17, 2022

I just rebased the original commit, I forgot to change the reference.conf for the new Backup configuration. Ontop of this I also added tests (ConfigSpec) for this new configuration to make sure that the reference.conf is working.

Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of minor comments, but LGTM

gcsConfig: GCSConfig
) extends BackupClientInterface[T] {

override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)

override type BackupResult = Option[StorageObject]

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
override type CurrentState = Nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this Nothing is purely a placeholder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, GCS is not implemented fully

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
override type CurrentState = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

override type CurrentState = CurrentS3State

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it would be better to use a dedicated EC in this case. This one is the internal one used by Akka, and usually you don't want to cog that one.

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought the same however for this case I believe its overcomplicated due to the fact that this ec will only get executed per time slice which in the grand scheme of things is nothing (unless you do something silly and configure it with timeslice of 1 nanosecond).

case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a call to action to clean the other ones up?

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I want to resolve this comprehensively with #82, it may be that this case won't actually occur after resolving this ticket.

parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? And what happens this last part? Is it corrupt? Is it simply useless data?

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was added when I was figuring out what was causing corrupted data in the S3 buckets. I don't think this case can occur and I just left it there as a safeguard but I can remove it if requested (I think it theoretically can occur but I need to simulate it via a test case)

@@ -23,6 +23,7 @@ val akkaStreamsJson = "0.8.0"
val diffxVersion = "0.5.6"
val testContainersVersion = "0.39.8"
val testContainersJavaVersion = "1.16.2"
val scalaCheckVersion = "1.15.5-1-SNAPSHOT"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a snapshot version?

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It contains typelevel/scalacheck#849 which is required to generate data that is approximately the size of S3 chunks

Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mdedetrich mdedetrich merged commit d453fb9 into main Jan 18, 2022
@mdedetrich mdedetrich deleted the add-suspend-resume branch January 18, 2022 14:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants