Skip to content

Commit

Permalink
Deprecate everything inside the current library in favor of new `fs2-…
Browse files Browse the repository at this point in the history
…pubsub`
  • Loading branch information
alejandrohdezma committed May 24, 2024
1 parent a54ad7e commit 820249d
Show file tree
Hide file tree
Showing 48 changed files with 364 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ object PubsubGoogleConsumer {
* @param subscription name of the subscription
* @param errorHandler upon failure to decode, an exception is thrown. Allows acknowledging the message.
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
final def subscribe[F[_]: Sync, A: MessageDecoder](
projectId: Model.ProjectId,
subscription: Model.Subscription,
Expand Down Expand Up @@ -77,6 +81,10 @@ object PubsubGoogleConsumer {
* @param subscription name of the subscription
* @param errorHandler upon failure to decode, an exception is thrown. Allows acknowledging the message.
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
final def subscribeAndAck[F[_]: Sync, A: MessageDecoder](
projectId: Model.ProjectId,
subscription: Model.Subscription,
Expand All @@ -95,6 +103,10 @@ object PubsubGoogleConsumer {
*
* The stream fails with an [[InternalPubSubError]] if the underlying Java consumer fails.
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
final def subscribeRaw[F[_]: Sync](
projectId: Model.ProjectId,
subscription: Model.Subscription,
Expand All @@ -106,6 +118,10 @@ object PubsubGoogleConsumer {
ConsumerRecord(msg.value, msg.value.getAttributesMap.asScala.toMap, msg.ack, msg.nack, _ => Applicative[F].unit)
)

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
private def subscribeDecode[F[_]: Sync, A: MessageDecoder, B](
projectId: Model.ProjectId,
subscription: Model.Subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import scala.concurrent.duration._
* @param onFailedTerminate upon failure to terminate, call this function
* @param customizeSubscriber optionally, provide a function that allows full customisation of the underlying Java Subscriber object.
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
case class PubsubGoogleConsumerConfig[F[_]](
maxQueueSize: Int = 1000,
parallelPullCount: Int = 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import scala.collection.mutable.Builder

private[consumer] object PubsubSubscriber {

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def createSubscriber[F[_]: Sync](
projectId: PublicModel.ProjectId,
subscription: PublicModel.Subscription,
Expand Down Expand Up @@ -103,6 +107,10 @@ private[consumer] object PubsubSubscriber {
}
} yield chunk

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def subscribe[F[_]: Sync](
projectId: PublicModel.ProjectId,
subscription: PublicModel.Subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.grpc.internal.{DefaultPublisher, PubsubPublisher}

object GooglePubsubProducer {
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def of[F[_]: Async, A: MessageEncoder](
projectId: ProjectId,
topic: Topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import com.google.cloud.pubsub.v1.Publisher

import scala.concurrent.duration._

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
case class PubsubProducerConfig[F[_]](
batchSize: Long,
delayThreshold: FiniteDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import com.permutive.pubsub.producer.{Model, PubsubProducer}
import java.util.UUID
import scala.jdk.CollectionConverters._

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
private[pubsub] class DefaultPublisher[F[_]: Async, A: MessageEncoder](
publisher: Publisher,
) extends PubsubProducer[F, A] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import org.threeten.bp.Duration

import java.util.concurrent.TimeUnit

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
private[producer] object PubsubPublisher {
def createJavaPublisher[F[_]: Sync](
projectId: ProjectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration._
import scala.annotation.nowarn

@nowarn("cat=deprecation")
class GrpcPingPongSpec extends PubSubSpec with BeforeAndAfterEach {

implicit val logger: Logger[IO] = Slf4jLogger.getLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import org.testcontainers.containers.wait.strategy.Wait
import org.typelevel.log4cats.Logger

import scala.concurrent.duration._
import scala.annotation.nowarn

@nowarn("cat=deprecation")
trait PubSubSpec extends AnyFlatSpec with ForAllTestContainer with Matchers with TripleEquals {

implicit val logger: Logger[IO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package com.permutive.pubsub
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import com.permutive.pubsub.producer.encoder.MessageEncoder
import scala.util.Try
import scala.annotation.nowarn

case class ValueHolder(value: String) extends AnyVal

@nowarn("cat=deprecation")
object ValueHolder {
implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
Try(ValueHolder(new String(bytes))).toEither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.permutive.pubsub.consumer.grpc
import cats.effect.{ExitCode, IO, IOApp}
import com.permutive.pubsub.consumer.Model
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import scala.annotation.nowarn

@nowarn("cat=deprecation")
object SimpleDriver extends IOApp {
case class ValueHolder(value: String) extends AnyVal

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder

import scala.concurrent.duration._
import scala.annotation.nowarn

@nowarn("cat=deprecation")
object PubsubProducerExample extends IOApp {

case class Value(v: Int) extends AnyVal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.http4s.client.middleware.RetryPolicy.{exponentialBackoff, recklesslyR

import java.util.Base64
import scala.concurrent.duration._
import scala.annotation.nowarn

object PubsubHttpConsumer {

Expand All @@ -48,14 +49,18 @@ object PubsubHttpConsumer {
* - https://cloud.google.com/compute/docs/metadata/default-metadata-values
* - https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
final def subscribe[F[_]: Async: Logger, A: MessageDecoder](
projectId: ProjectId,
subscription: Subscription,
serviceAccountPath: Option[String],
config: PubsubHttpConsumerConfig[F],
httpClient: Client[F],
errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit],
httpClientRetryPolicy: RetryPolicy[F] = recklesslyRetryPolicy[F]
httpClientRetryPolicy: RetryPolicy[F] = recklesslyRetryPolicy[F]: @nowarn
): Stream[F, ConsumerRecord[F, A]] =
subscribeDecode[F, A, ConsumerRecord[F, A]](
projectId,
Expand All @@ -81,14 +86,18 @@ object PubsubHttpConsumer {
* - https://cloud.google.com/compute/docs/metadata/default-metadata-values
* - https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
final def subscribeAndAck[F[_]: Async: Logger, A: MessageDecoder](
projectId: ProjectId,
subscription: Subscription,
serviceAccountPath: Option[String],
config: PubsubHttpConsumerConfig[F],
httpClient: Client[F],
errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit],
httpClientRetryPolicy: RetryPolicy[F] = recklesslyRetryPolicy[F]
httpClientRetryPolicy: RetryPolicy[F] = recklesslyRetryPolicy[F]: @nowarn
): Stream[F, A] =
subscribeDecode[F, A, A](
projectId,
Expand All @@ -113,13 +122,17 @@ object PubsubHttpConsumer {
* - https://cloud.google.com/compute/docs/metadata/default-metadata-values
* - https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
final def subscribeRaw[F[_]: Async: Logger](
projectId: ProjectId,
subscription: Subscription,
serviceAccountPath: Option[String],
config: PubsubHttpConsumerConfig[F],
httpClient: Client[F],
httpClientRetryPolicy: RetryPolicy[F] = recklesslyRetryPolicy[F],
httpClientRetryPolicy: RetryPolicy[F] = recklesslyRetryPolicy[F]: @nowarn,
): Stream[F, ConsumerRecord[F, PubsubMessage]] =
PubsubSubscriber
.subscribe(projectId, subscription, serviceAccountPath, config, httpClient, httpClientRetryPolicy)
Expand All @@ -129,9 +142,17 @@ object PubsubHttpConsumer {
Pub/Sub requests are `POST` and thus are not considered idempotent by http4s, therefore we must
use a different retry behaviour than the default.
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def recklesslyRetryPolicy[F[_]]: RetryPolicy[F] =
RetryPolicy(exponentialBackoff(maxWait = 5.seconds, maxRetry = 3), (_, result) => recklesslyRetriable(result))

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
private def subscribeDecode[F[_]: Async: Logger, A: MessageDecoder, B](
projectId: ProjectId,
subscription: Subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import scala.concurrent.duration._
* @param readMaxMessages how many messages to retrieve at once simultaneously
* @param readConcurrency how much parallelism to use when fetching messages from PubSub
*/
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
case class PubsubHttpConsumerConfig[F[_]](
host: String = "pubsub.googleapis.com",
port: Int = 443,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package com.permutive.pubsub.consumer.http
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker}

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
case class PubsubMessage(
data: String,
attributes: Map[String, String],
Expand All @@ -27,6 +31,10 @@ case class PubsubMessage(
)

object PubsubMessage {
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
implicit final val Codec: JsonValueCodec[PubsubMessage] =
JsonCodecMaker.make[PubsubMessage](CodecMakerConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ private[internal] class HttpPubsubReader[F[_]: Async: Logger] private (
final private[this] val acknowledgeEndpoint = appendToUrl("acknowledge")
final private[this] val modifyDeadlineEndpoint = appendToUrl("modifyAckDeadline")

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
final override val read: F[PullResponse] = {
for {
json <- Sync[F].delay(
Expand Down Expand Up @@ -119,6 +123,10 @@ private[internal] class HttpPubsubReader[F[_]: Async: Logger] private (
}

private[internal] object HttpPubsubReader {
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def resource[F[_]: Async: Logger](
projectId: ProjectId,
subscription: Subscription,
Expand Down Expand Up @@ -168,6 +176,10 @@ private[internal] object HttpPubsubReader {
maxMessages = config.readMaxMessages
)

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def createBaseApi[F[_]](config: PubsubHttpConsumerConfig[F], projectNameSubscription: ProjectNameSubscription): Uri =
Uri(
scheme = Option(if (config.port == 443) Uri.Scheme.https else Uri.Scheme.http),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@ import com.permutive.pubsub.consumer.http.PubsubMessage
import com.permutive.pubsub.consumer.{ConsumerRecord, Model => PublicModel}

import scala.concurrent.duration.FiniteDuration
import scala.annotation.nowarn

private[http] object Model {
case class ProjectNameSubscription(value: String) extends AnyVal
object ProjectNameSubscription {
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def of(projectId: PublicModel.ProjectId, subscription: PublicModel.Subscription): ProjectNameSubscription =
ProjectNameSubscription(s"projects/${projectId.value}/subscriptions/${subscription.value}")
}

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
trait InternalRecord[F[_]] { self =>
def value: PubsubMessage
def ack: F[Unit]
Expand Down Expand Up @@ -59,18 +68,30 @@ private[http] object Model {
JsonCodecMaker.make[PullRequest](CodecMakerConfig)
}

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
case class PullResponse(
receivedMessages: List[ReceivedMessage]
)

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
object PullResponse {
implicit final val PullResponseCodec: JsonValueCodec[PullResponse] =
JsonCodecMaker.make[PullResponse](CodecMakerConfig)
}

@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
case class ReceivedMessage(
ackId: AckId,
message: PubsubMessage
@nowarn message: PubsubMessage
)

case class AckRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import com.permutive.pubsub.consumer.http.internal.Model.{AckId, PullResponse}
import scala.concurrent.duration.FiniteDuration

trait PubsubReader[F[_]] {
@deprecated(
"Use `fs2-pubsub` instead. Replace with: `\"com.permutive\" %% \"fs2-pubsub\" % \"1.0.0\"`",
since = "0.22.2"
)
def read: F[PullResponse]

def ack(ackId: List[AckId]): F[Unit]
Expand Down
Loading

0 comments on commit 820249d

Please sign in to comment.