From 19926fe75ea1c252773ea37100e78e781af648a7 Mon Sep 17 00:00:00 2001 From: Andreas Triantafyllos Date: Mon, 21 Oct 2024 15:06:30 +0200 Subject: [PATCH 1/6] GetActiveContractsResponse --- .../IdentityProvidingServiceClient.scala | 8 +- .../data/GetActiveContractsResponse.java | 28 +----- .../daml/ledger/javaapi/data/Generators.scala | 2 - .../ExtractSingleMaterializedValueTest.scala | 76 --------------- .../block/update/BlockChunkProcessor.scala | 5 +- .../daml/ledger/api/v2/state_service.proto | 7 -- .../admin/PackageUpgradeValidator.scala | 69 ++++++------- .../platform/index/IndexServiceImpl.scala | 5 - .../platform/store/dao/events/ACSReader.scala | 13 +-- .../JdbcLedgerDaoActiveContractsSpec.scala | 3 - .../services/state/StateServiceClient.scala | 48 +++------ .../ledger/error/PackageServiceErrors.scala | 23 ++--- .../canton/fetchcontracts/AcsTxStreams.scala | 38 ++++---- .../canton/http/ContractsService.scala | 36 ++++--- .../canton/http/HttpService.scala | 1 + .../canton/http/LedgerClientJwt.scala | 39 +++++--- .../canton/http/json/v2/JsStateService.scala | 1 - .../http/json/v2/ProtocolConverters.scala | 1 - .../fetchcontracts/AcsTxStreamsTest.scala | 3 +- .../admin/AdminWorkflowServices.scala | 3 +- .../error/SerializableErrorComponents.scala | 2 +- .../ExtractMaterializedValue.scala | 97 ------------------- 22 files changed, 147 insertions(+), 361 deletions(-) delete mode 100644 sdk/canton/community/common/src/test/scala/com/digitalasset/canton/pekkostreams/ExtractSingleMaterializedValueTest.scala delete mode 100644 sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/pekkostreams/ExtractMaterializedValue.scala diff --git a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/topology/client/IdentityProvidingServiceClient.scala b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/topology/client/IdentityProvidingServiceClient.scala index 6aa99a7153a6..c9b5e49e4fa9 100644 --- a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/topology/client/IdentityProvidingServiceClient.scala +++ b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/topology/client/IdentityProvidingServiceClient.scala @@ -180,9 +180,11 @@ trait TopologyClientApi[+T] { this: HasFutureSupervision => timestamp: CantonTimestamp )(implicit traceContext: TraceContext): Option[FutureUnlessShutdown[Unit]] - /** Finds the transaction with maximum effective time that has been sequenced before `sequencedTime` and - * yields the sequenced and effective time of that transaction, if necessary after waiting for a timestamp - * at or after `sequencedTime.immediatePredecessor` that the topology processor has fully processed. + /** Finds the topology transaction with maximum effective time whose effects would be visible, + * at earliest i.e. if delay is 0, in a topology snapshot at `effectiveTime`, and yields + * the sequenced and actual effective time of that topology transaction, if necessary after waiting + * to observe a timestamp at or after sequencing time `effectiveTime.immediatePredecessor` + * that the topology processor has fully processed. */ def awaitMaxTimestampUS(sequencedTime: CantonTimestamp)(implicit traceContext: TraceContext diff --git a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetActiveContractsResponse.java b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetActiveContractsResponse.java index da4a24acb042..1c9a2d370ab2 100644 --- a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetActiveContractsResponse.java +++ b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetActiveContractsResponse.java @@ -11,15 +11,12 @@ public final class GetActiveContractsResponse implements WorkflowEvent { - private final String offset; - private final Optional contractEntry; private final String workflowId; public GetActiveContractsResponse( - @NonNull String offset, @NonNull Optional contractEntry, String workflowId) { - this.offset = offset; + @NonNull Optional contractEntry, String workflowId) { this.contractEntry = contractEntry; this.workflowId = workflowId; } @@ -29,22 +26,18 @@ public static GetActiveContractsResponse fromProto( switch (response.getContractEntryCase()) { case ACTIVE_CONTRACT: return new GetActiveContractsResponse( - response.getOffset(), Optional.of(ActiveContract.fromProto(response.getActiveContract())), response.getWorkflowId()); case INCOMPLETE_UNASSIGNED: return new GetActiveContractsResponse( - response.getOffset(), Optional.of(IncompleteUnassigned.fromProto(response.getIncompleteUnassigned())), response.getWorkflowId()); case INCOMPLETE_ASSIGNED: return new GetActiveContractsResponse( - response.getOffset(), Optional.of(IncompleteAssigned.fromProto(response.getIncompleteAssigned())), response.getWorkflowId()); case CONTRACTENTRY_NOT_SET: - return new GetActiveContractsResponse( - response.getOffset(), Optional.empty(), response.getWorkflowId()); + return new GetActiveContractsResponse(Optional.empty(), response.getWorkflowId()); default: throw new ProtoContractEntryUnknown(response); } @@ -53,7 +46,6 @@ public static GetActiveContractsResponse fromProto( public StateServiceOuterClass.GetActiveContractsResponse toProto() { var builder = StateServiceOuterClass.GetActiveContractsResponse.newBuilder() - .setOffset(this.offset) .setWorkflowId(this.workflowId); if (contractEntry.isPresent()) { ContractEntry ce = contractEntry.get(); @@ -66,12 +58,6 @@ else if (ce instanceof IncompleteAssigned) return builder.build(); } - @NonNull - public Optional getOffset() { - // Empty string indicates that the field is not present in the protobuf. - return Optional.of(offset).filter(off -> !offset.equals("")); - } - public Optional getContractEntry() { return contractEntry; } @@ -84,10 +70,7 @@ public String getWorkflowId() { @Override public String toString() { return "GetActiveContractsResponse{" - + "offset='" - + offset - + '\'' - + ", contractEntry=" + + "contractEntry=" + contractEntry + ", workflowId=" + workflowId @@ -99,14 +82,13 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; GetActiveContractsResponse that = (GetActiveContractsResponse) o; - return Objects.equals(offset, that.offset) - && Objects.equals(contractEntry, that.contractEntry) + return Objects.equals(contractEntry, that.contractEntry) && Objects.equals(workflowId, that.workflowId); } @Override public int hashCode() { - return Objects.hash(offset, contractEntry, workflowId); + return Objects.hash(contractEntry, workflowId); } } diff --git a/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala b/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala index da5bc26d6e26..9140dddcfd81 100644 --- a/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala +++ b/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala @@ -507,12 +507,10 @@ object Generators { def getActiveContractResponseGen: Gen[v2.StateServiceOuterClass.GetActiveContractsResponse] = for { - offset <- Arbitrary.arbString.arbitrary workflowId <- Arbitrary.arbString.arbitrary entryGen <- contractEntryBuilderGen } yield v2.StateServiceOuterClass.GetActiveContractsResponse .newBuilder() - .setOffset(offset) .setWorkflowId(workflowId) .pipe(entryGen) .build() diff --git a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/pekkostreams/ExtractSingleMaterializedValueTest.scala b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/pekkostreams/ExtractSingleMaterializedValueTest.scala deleted file mode 100644 index c06d8d3da742..000000000000 --- a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/pekkostreams/ExtractSingleMaterializedValueTest.scala +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.digitalasset.canton.pekkostreams - -import com.daml.ledger.api.testing.utils.PekkoBeforeAndAfterAll -import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source} -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{Millis, Span} -import org.scalatest.wordspec.AnyWordSpec - -import scala.util.Random - -class ExtractSingleMaterializedValueTest - extends AnyWordSpec - with Matchers - with ScalaFutures - with PekkoBeforeAndAfterAll { - - implicit val config: PatienceConfig = PatienceConfig().copy(timeout = scaled(Span(1500, Millis))) - - private val discriminator = { (i: Int) => - if (i < 0) Some(i) else None - } - - private val elemsThatPassThrough = 0.to(10).toVector - - ExtractMaterializedValue.getClass.getSimpleName when { - - "there's a single valid value" should { - "extract it" in { - val elemToExtract = -1 - - val elements = elemToExtract +: elemsThatPassThrough - val (extractedF, restF) = processElements(Random.shuffle(elements)) - - whenReady(extractedF)(_ shouldEqual elemToExtract) - whenReady(restF)(_ should contain theSameElementsAs elements) - } - } - - "there are multiple valid values" should { - "extract the first matching element" in { - val elemToExtract = -1 - val otherCandidateShuffledIn = -2 - - val elements = - elemToExtract +: Random.shuffle(otherCandidateShuffledIn +: elemsThatPassThrough) - val (extractedF, restF) = processElements(elements) - - whenReady(extractedF)(_ shouldEqual elemToExtract) - whenReady(restF)(_ should contain theSameElementsAs elements) - } - } - - "there are no valid values" should { - "fail the materialized future, but let the stream continue otherwise" in { - - val (extractedF, restF) = - processElements(Random.shuffle(elemsThatPassThrough)) - - whenReady(extractedF.failed)(_ shouldBe a[RuntimeException]) - whenReady(restF)(_.sorted shouldEqual elemsThatPassThrough) - } - } - - } - - private def processElements(elements: Iterable[Int]) = - Source - .fromIterator(() => elements.iterator) - .viaMat(ExtractMaterializedValue(discriminator))(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() -} diff --git a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/BlockChunkProcessor.scala b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/BlockChunkProcessor.scala index c8fb87605e4f..4e87cfa8c5fe 100644 --- a/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/BlockChunkProcessor.scala +++ b/sdk/canton/community/domain/src/main/scala/com/digitalasset/canton/domain/block/update/BlockChunkProcessor.scala @@ -186,8 +186,9 @@ private[update] final class BlockChunkProcessor( // and we don't need to add a `Deliver` for the tick. logger.debug( - s"Block $height: emitting a topology tick at least at $tickAtLeastAt (actually at $tickSequencingTimestamp) " + - s"as requested by the block orderer" + s"Emitting topology tick: after processing block $height, the last sequenced timestamp is ${state.lastChunkTs} and " + + s"the block orderer requested to tick at least at $tickAtLeastAt, so " + + s"ticking topology at $tickSequencingTimestamp" ) // We bypass validation here to make sure that the topology tick is always received by the sequencer runtime. for { diff --git a/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/state_service.proto b/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/state_service.proto index ace6f210c542..9f1e3bfcd4b6 100644 --- a/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/state_service.proto +++ b/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/state_service.proto @@ -55,13 +55,6 @@ message GetActiveContractsRequest { } message GetActiveContractsResponse { - // Included only in the last message. - // The client should start consuming the transactions endpoint with this offset. - // On a participant that is not aware of any events in the ledger the offset returned could be the empty string. - // The empty string signifies that the offset returned is before the first transaction. - // The details of the offset field are described in ``community/ledger-api/README.md``. - string offset = 1; - // The workflow ID used in command submission which corresponds to the contract_entry. Only set if // the ``workflow_id`` for the command was set. // Must be a valid LedgerString (as described in ``value.proto``). diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala index 594c13ebe301..15850fac93b4 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala @@ -15,7 +15,7 @@ import com.digitalasset.canton.util.EitherTUtil import com.digitalasset.daml.lf.archive.DamlLf.Archive import com.digitalasset.daml.lf.archive.Decode import com.digitalasset.daml.lf.data.Ref -import com.digitalasset.daml.lf.language.Util.{dependenciesInTopologicalOrder, PkgIdWithNameAndVersion} +import com.digitalasset.daml.lf.language.Util.dependenciesInTopologicalOrder import com.digitalasset.daml.lf.language.{Ast, LanguageVersion} import com.digitalasset.daml.lf.validation.{TypecheckUpgrades, UpgradeError} import scalaz.std.either.* @@ -80,22 +80,21 @@ class PackageUpgradeValidator( ): EitherT[Future, DamlError, Unit] = { val (uploadedPackageId, uploadedPackageAst) = uploadedPackage val optUpgradingDar = Some(uploadedPackage) - val uploadedPackageIdWithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(uploadedPackage) logger.info( - s"Uploading DAR file for ${uploadedPackageIdWithMeta} in submission ID ${loggingContext.serializeFiltered("submissionId")}." + s"Uploading DAR file for $uploadedPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}." ) existingVersionedPackageId(uploadedPackageAst, packageMap) match { case Some(existingPackageId) => if (existingPackageId == uploadedPackageId) { logger.info( - s"Ignoring upload of package ${uploadedPackageIdWithMeta} as it has been previously uploaded" + s"Ignoring upload of package $uploadedPackageId as it has been previously uploaded" ) EitherT.rightT[Future, DamlError](()) } else { EitherT.leftT[Future, Unit]( Validation.UpgradeVersion .Error( - uploadedPackage = uploadedPackageIdWithMeta, + uploadedPackageId = uploadedPackageId, existingPackage = existingPackageId, packageVersion = uploadedPackageAst.metadata.version, ): DamlError @@ -126,7 +125,7 @@ class PackageUpgradeValidator( optMinimalDar, optUpgradingDar, ) - _ = logger.info(s"Typechecking upgrades for ${uploadedPackageIdWithMeta} succeeded.") + _ = logger.info(s"Typechecking upgrades for $uploadedPackageId succeeded.") } yield () } } @@ -214,38 +213,39 @@ class PackageUpgradeValidator( private def strictTypecheckUpgrades( phase: TypecheckUpgrades.UploadPhaseCheck, packageMap: PackageMap, - newDar1: (Ref.PackageId, Ast.Package), - oldDar2: (Ref.PackageId, Ast.Package), + optNewDar1: Option[(Ref.PackageId, Ast.Package)], + oldPkgId2: Ref.PackageId, + optOldPkg2: Option[Ast.Package], )(implicit loggingContext: LoggingContextWithTrace ): EitherT[Future, DamlError, Unit] = LoggingContextWithTrace .withEnrichedLoggingContext("upgradeTypecheckPhase" -> OfString(phase.toString)) { implicit loggingContext => - val (newPkgId1, newPkg1) = newDar1 - val newPkgId1WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(newDar1) - val (oldPkgId2, oldPkg2) = oldDar2 - val oldPkgId2WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(oldDar2) - logger.info(s"Package ${newPkgId1WithMeta} claims to upgrade package id ${oldPkgId2WithMeta}") - EitherT( - Future( - TypecheckUpgrades - .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, Some(oldPkg2)) - .toEither - ) - ).leftMap[DamlError] { - case err: UpgradeError => - Validation.Upgradeability.Error( - newPackage = newPkgId1WithMeta, - oldPackage = oldPkgId2WithMeta, - upgradeError = err, - phase = phase, - ) - case unhandledErr => - InternalError.Unhandled( - unhandledErr, - Some(s"Typechecking upgrades for ${oldPkgId2WithMeta} failed with unknown error."), - ) + optNewDar1 match { + case None => EitherT.rightT(()) + + case Some((newPkgId1, newPkg1)) => + logger.info(s"Package $newPkgId1 claims to upgrade package id $oldPkgId2") + EitherT( + Future( + TypecheckUpgrades + .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, optOldPkg2) + .toEither + ) + ).leftMap[DamlError] { + case err: UpgradeError => + Validation.Upgradeability.Error( + upgradingPackage = newPkgId1, + upgradedPackage = oldPkgId2, + upgradeError = err, + ) + case unhandledErr => + InternalError.Unhandled( + unhandledErr, + Some(s"Typechecking upgrades for $oldPkgId2 failed with unknown error."), + ) + } } } @@ -265,8 +265,9 @@ class PackageUpgradeValidator( strictTypecheckUpgrades( typecheckPhase, packageMap, - (newPkgId1, newPkg1), - (oldPkgId2, oldPkg2), + Some((newPkgId1, newPkg1)), + oldPkgId2, + Some(oldPkg2), ) } } diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/index/IndexServiceImpl.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/index/IndexServiceImpl.scala index c5a76bd02d4b..83bb1910e3f0 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/index/IndexServiceImpl.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/index/IndexServiceImpl.scala @@ -331,11 +331,6 @@ private[index] class IndexServiceImpl( ) } activeContractsSource - .concat( - Source.single( - GetActiveContractsResponse(offset = ApiOffset.toApiString(activeAt)) - ) - ) .buffered(metrics.index.activeContractsBufferSize, LedgerApiStreamsBufferSize) } } diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/store/dao/events/ACSReader.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/store/dao/events/ACSReader.scala index 75ba2d197fa8..0af222cb7794 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/store/dao/events/ACSReader.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/store/dao/events/ACSReader.scala @@ -94,9 +94,12 @@ class ACSReader( )(implicit loggingContext: LoggingContextWithTrace ): Source[GetActiveContractsResponse, NotUsed] = { - val (activeAtOffset, _) = activeAt + val (activeAtOffset, activeAtLong) = activeAt val span = Telemetry.Transactions.createSpan(tracer, activeAtOffset)(qualifiedNameOfCurrentFunc) + val event = + tracing.Event("contract", Map((SpanAttribute.Offset, activeAtLong.toString))) + Spans.addEventToSpan(event, span) logger.debug( s"getActiveContracts($activeAtOffset, $filteringConstraints, $eventProjectionProperties)" ) @@ -105,11 +108,6 @@ class ACSReader( activeAt, eventProjectionProperties, ) - .wireTap { getActiveContractsResponse => - val event = - tracing.Event("contract", Map((SpanAttribute.Offset, getActiveContractsResponse.offset))) - Spans.addEventToSpan(event, span) - } .watchTermination()(endSpanOnTermination(span)) } @@ -572,7 +570,6 @@ class ACSReader( ) .map(createdEvent => GetActiveContractsResponse( - offset = "", // empty for all data entries workflowId = rawActiveContract.workflowId.getOrElse(""), contractEntry = GetActiveContractsResponse.ContractEntry.ActiveContract( ActiveContract( @@ -598,7 +595,6 @@ class ACSReader( ) .map(createdEvent => rawAssignEntry.offset -> GetActiveContractsResponse( - offset = "", // empty for all data entries workflowId = rawAssignEntry.workflowId.getOrElse(""), contractEntry = GetActiveContractsResponse.ContractEntry.IncompleteAssigned( IncompleteAssigned( @@ -622,7 +618,6 @@ class ACSReader( .deserializeRaw(eventProjectionProperties)(rawCreate) .map(createdEvent => rawUnassignEntry.offset -> GetActiveContractsResponse( - offset = "", workflowId = rawUnassignEntry.workflowId.getOrElse(""), contractEntry = GetActiveContractsResponse.ContractEntry.IncompleteUnassigned( IncompleteUnassigned( diff --git a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala index 291eef782d9d..4623e0155628 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala @@ -614,9 +614,6 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec } yield { activeContracts should not be empty - forAll(activeContracts) { ac => - ac.offset shouldBe empty - } } } diff --git a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/state/StateServiceClient.scala b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/state/StateServiceClient.scala index 298b8db4cf89..41be9ffbc3a4 100644 --- a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/state/StateServiceClient.scala +++ b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/state/StateServiceClient.scala @@ -16,10 +16,10 @@ import com.daml.ledger.api.v2.state_service.{ } import com.daml.ledger.api.v2.transaction_filter.TransactionFilter import com.digitalasset.canton.ledger.client.LedgerClient -import com.digitalasset.canton.pekkostreams.ExtractMaterializedValue import com.digitalasset.canton.tracing.TraceContext +import org.apache.pekko.NotUsed import org.apache.pekko.stream.Materializer -import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source} +import org.apache.pekko.stream.scaladsl.{Sink, Source} import scala.concurrent.{ExecutionContext, Future} @@ -28,18 +28,13 @@ class StateServiceClient(service: StateServiceStub)(implicit esf: ExecutionSequencerFactory, ) { - /** Returns a stream of GetActiveContractsResponse messages. The materialized value will - * be resolved to the offset that can be used as a starting offset for streaming transactions - * via the transaction service. - * If the stream completes before the offset can be set, the materialized future will - * be failed with an exception. - */ + /** Returns a stream of GetActiveContractsResponse messages. */ def getActiveContractsSource( filter: TransactionFilter, validAtOffset: Long, verbose: Boolean = false, token: Option[String] = None, - )(implicit traceContext: TraceContext): Source[GetActiveContractsResponse, Future[String]] = + )(implicit traceContext: TraceContext): Source[GetActiveContractsResponse, NotUsed] = ClientAdapter .serverStreaming( GetActiveContractsRequest( @@ -49,9 +44,6 @@ class StateServiceClient(service: StateServiceStub)(implicit ), LedgerClient.stubWithTracing(service, token).getActiveContracts, ) - .viaMat(StateServiceClient.extractOffset)( - Keep.right - ) /** Returns the resulting active contract set */ def getActiveContracts( @@ -62,23 +54,15 @@ class StateServiceClient(service: StateServiceStub)(implicit )(implicit materializer: Materializer, traceContext: TraceContext, - ): Future[(Seq[ActiveContract], String)] = { - val (offsetF, contractsF) = - getActiveContractsSource(filter, validAtOffset, verbose, token) - .toMat(Sink.seq)(Keep.both) - .run() - val activeF = contractsF - .map( - _.map(_.contractEntry) - .collect { case ContractEntry.ActiveContract(value) => - value - } - ) + ): Future[(Seq[ActiveContract], Long)] = for { - active <- activeF - offset <- offsetF - } yield (active, offset) - } + contracts <- getActiveContractsSource(filter, validAtOffset, verbose, token).runWith(Sink.seq) + active = contracts + .map(_.contractEntry) + .collect { case ContractEntry.ActiveContract(value) => + value + } + } yield (active, validAtOffset) def getLedgerEnd( token: Option[String] = None @@ -96,11 +80,3 @@ class StateServiceClient(service: StateServiceStub)(implicit } } - -object StateServiceClient { - private val extractOffset = - new ExtractMaterializedValue[GetActiveContractsResponse, String](r => - if (r.offset.nonEmpty) Some(r.offset) else None - ) - -} diff --git a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala index 6c2d0aa0d31c..d714f3b98763 100644 --- a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala +++ b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala @@ -7,10 +7,8 @@ import com.daml.error.* import com.digitalasset.canton.ledger.error.groups.CommandExecutionErrors import com.digitalasset.daml.lf.archive.Error as LfArchiveError import com.digitalasset.daml.lf.data.Ref -import com.digitalasset.daml.lf.language.Ast -import com.digitalasset.daml.lf.language.Util import com.digitalasset.daml.lf.engine.Error -import com.digitalasset.daml.lf.validation.{UpgradeError, TypecheckUpgrades} +import com.digitalasset.daml.lf.validation.UpgradeError import com.digitalasset.daml.lf.{VersionRange, language, validation} import ParticipantErrorGroup.LedgerApiErrorGroup.PackageServiceErrorGroup @@ -268,18 +266,15 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - oldPackage: Util.PkgIdWithNameAndVersion, - newPackage: Util.PkgIdWithNameAndVersion, + upgradingPackage: Ref.PackageId, + upgradedPackage: Ref.PackageId, upgradeError: UpgradeError, - phase: TypecheckUpgrades.UploadPhaseCheck )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = - phase match { - case TypecheckUpgrades.MaximalDarCheck => s"The uploaded DAR contains a package $newPackage, but upgrade checks indicate that new package $newPackage cannot be an upgrade of existing package $oldPackage. Reason: ${upgradeError.prettyInternal}" - case TypecheckUpgrades.MinimalDarCheck => s"The uploaded DAR contains a package $oldPackage, but upgrade checks indicate that existing package $newPackage cannot be an upgrade of new package $oldPackage. Reason: ${upgradeError.prettyInternal}" - }) + cause = + s"The DAR contains a package which claims to upgrade another package, but basic checks indicate the package is not a valid upgrade. Upgrading package: $upgradingPackage; Upgraded package: $upgradedPackage; Reason: ${upgradeError.prettyInternal}" + ) } @Explanation( @@ -292,15 +287,15 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - uploadedPackage: Util.PkgIdWithNameAndVersion, + uploadedPackageId: Ref.PackageId, existingPackage: Ref.PackageId, packageVersion: Ref.PackageVersion, )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = s"Tried to upload package $uploadedPackage, but a different package $existingPackage with the same name and version has previously been uploaded.", + cause = "A DAR with the same version number has previously been uploaded.", extraContext = Map( - "uploadedPackageId" -> uploadedPackage, + "uploadedPackageId" -> uploadedPackageId, "existingPackage" -> existingPackage, "packageVersion" -> packageVersion.toString, ), diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreams.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreams.scala index 6477f6ab2aae..4a4571e6b78d 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreams.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreams.scala @@ -55,7 +55,7 @@ object AcsTxStreams extends NoTracing { ec: concurrent.ExecutionContext, lc: com.daml.logging.LoggingContextOf[Any], ): Graph[FanOutShape2[ - lav2.state_service.GetActiveContractsResponse, + Either[Long, lav2.state_service.GetActiveContractsResponse], ContractStreamStep.LAV1, BeginBookmark[domain.Offset], ], NotUsed] = @@ -71,11 +71,11 @@ object AcsTxStreams extends NoTracing { val txns = b add transactionsFollowingBoundary(transactionsSince, logger) val allSteps = b add Concat[ContractStreamStep.LAV1](3) // format: off - discard { dupOff <~ acs.out1 } - discard { acs.out0.map(ces => Acs(ces.toVector)) ~> allSteps } - discard { dupOff ~> liveStart ~> allSteps } - discard { txns.out0 ~> allSteps } - discard { dupOff ~> txns.in } + discard {dupOff <~ acs.out1} + discard {acs.out0.map(ces => Acs(ces.toVector)) ~> allSteps} + discard {dupOff ~> liveStart ~> allSteps} + discard {txns.out0 ~> allSteps} + discard {dupOff ~> txns.in} // format: on new FanOutShape2(acs.in, allSteps.out, txns.out1) } @@ -84,19 +84,21 @@ object AcsTxStreams extends NoTracing { * other with a single result, the last offset. */ private[this] def acsAndBoundary - : Graph[FanOutShape2[lav2.state_service.GetActiveContractsResponse, Seq[ + : Graph[FanOutShape2[Either[Long, lav2.state_service.GetActiveContractsResponse], Seq[ lav2.event.CreatedEvent, ], BeginBookmark[domain.Offset]], NotUsed] = GraphDSL.create() { implicit b => import GraphDSL.Implicits.* import lav2.state_service.GetActiveContractsResponse as GACR - val dup = b add Broadcast[GACR](2, eagerCancel = true) - val acs = b add (Flow fromFunction ((_: GACR).contractEntry.activeContract - .flatMap(_.createdEvent) - .toSeq)) - val off = b add Flow[GACR] - .collect { - case gacr if gacr.offset.nonEmpty => AbsoluteBookmark(domain.Offset(gacr.offset)) + val dup = b add Broadcast[Either[Long, GACR]](2, eagerCancel = true) + val acs = b add (Flow fromFunction ((_: Either[Long, GACR]).toSeq.flatMap( + _.contractEntry.activeContract + .flatMap(_.createdEvent) + .toSeq + ))) + val off = b add Flow[Either[Long, GACR]] + .collect { case Left(offset) => + AbsoluteBookmark(domain.Offset(offset)) } .via(last(ParticipantBegin: BeginBookmark[domain.Offset])) discard(dup ~> acs) @@ -134,10 +136,10 @@ object AcsTxStreams extends NoTracing { val logTxnOut = b add logTermination[ContractStreamStep.Txn.LAV1](logger, "first branch of tx stream split") // format: off - discard { txnSplit.in <~ txns <~ dupOff } - discard { dupOff ~> mergeOff ~> maxOff } - discard { txnSplit.out1.map(off => AbsoluteBookmark(off)) ~> lastTxOff ~> mergeOff } - discard { txnSplit.out0 ~> logTxnOut } + discard {txnSplit.in <~ txns <~ dupOff} + discard {dupOff ~> mergeOff ~> maxOff} + discard {txnSplit.out1.map(off => AbsoluteBookmark(off)) ~> lastTxOff ~> mergeOff} + discard {txnSplit.out0 ~> logTxnOut} // format: on new FanOutShape2(dupOff.in, logTxnOut.out, maxOff.out) } diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala index 0eca7c09f228..0dde0a3f36af 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala @@ -58,6 +58,7 @@ class ContractsService( getContractByContractId: LedgerClientJwt.GetContractByContractId, getActiveContracts: LedgerClientJwt.GetActiveContracts, getCreatesAndArchivesSince: LedgerClientJwt.GetCreatesAndArchivesSince, + getLedgerEnd: LedgerClientJwt.GetLedgerEnd, val loggerFactory: NamedLoggerFactory, )(implicit ec: ExecutionContext) extends NamedLogging @@ -396,18 +397,22 @@ class ContractsService( jwt: Jwt, txnFilter: TransactionFilter, )(implicit lc: LoggingContextOf[InstanceUUID]): Source[ContractStreamStep.LAV1, NotUsed] = - getActiveContracts(jwt, txnFilter, true)(lc) - .map { case GetActiveContractsResponse(offset, _, contractEntry) => - if (contractEntry.isActiveContract) { - val createdEvent = contractEntry.activeContract - .getOrElse( - throw new RuntimeException( - "unreachable, activeContract should not have been empty since contract is checked to be an active contract" - ) - ) - .createdEvent - Acs(createdEvent.toVector) - } else LiveBegin(AbsoluteBookmark(domain.Offset(offset))) + getLedgerEnd(jwt)(lc) + .flatMapConcat { offset => + getActiveContracts(jwt, txnFilter, offset, true)(lc) + .map { case GetActiveContractsResponse(_, contractEntry) => + if (contractEntry.isActiveContract) { + val createdEvent = contractEntry.activeContract + .getOrElse( + throw new RuntimeException( + "unreachable, activeContract should not have been empty since contract is checked to be an active contract" + ) + ) + .createdEvent + Acs(createdEvent.toVector) + } else LiveBegin(AbsoluteBookmark(domain.Offset(offset))) + } + .concat(Source.single(LiveBegin(AbsoluteBookmark(domain.Offset(offset))))) } /** An ACS ++ transaction stream of `templateIds`, starting at `startOffset` @@ -422,7 +427,12 @@ class ContractsService( lc: LoggingContextOf[InstanceUUID] ): Source[ContractStreamStep.LAV1, NotUsed] = { def source = - (getActiveContracts(jwt, txnFilter, true)(lc) + (getLedgerEnd(jwt)(lc) + .flatMapConcat(offset => + getActiveContracts(jwt, txnFilter, offset, true)(lc) + .map(Right(_)) + .concat(Source.single(Left(offset))) + ) via logTermination(logger, "ACS upstream")) val transactionsSince: String => Source[ diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/HttpService.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/HttpService.scala index 98c17f8a6277..bfa5cf16dc9d 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/HttpService.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/HttpService.scala @@ -116,6 +116,7 @@ class HttpService( ledgerClientJwt.getByContractId(ledgerClient), ledgerClientJwt.getActiveContracts(ledgerClient), ledgerClientJwt.getCreatesAndArchivesSince(ledgerClient), + ledgerClientJwt.getLedgerEnd(ledgerClient), loggerFactory, ) diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala index 0bf3260ce3f2..dd77dc49df4d 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala @@ -84,21 +84,16 @@ final case class LedgerClientJwt(loggerFactory: NamedLoggerFactory) extends Name def getActiveContracts(client: DamlLedgerClient)(implicit traceContext: TraceContext ): GetActiveContracts = - (jwt, filter, verbose) => + (jwt, filter, offset, verbose) => implicit lc => { log(GetActiveContractsLog) { - Source - .future(client.stateService.getLedgerEndOffset()) - .flatMapConcat { offsetO => - client.stateService - .getActiveContractsSource( - filter = filter, - validAtOffset = offsetO.getOrElse(0L), - verbose = verbose, - token = bearer(jwt), - ) - } - .mapMaterializedValue(_ => NotUsed) + client.stateService + .getActiveContractsSource( + filter = filter, + token = bearer(jwt), + verbose = verbose, + validAtOffset = offset, + ) } } @@ -272,6 +267,19 @@ final case class LedgerClientJwt(loggerFactory: NamedLoggerFactory) extends Name } } + def getLedgerEnd(client: DamlLedgerClient)(implicit + ec: EC, + traceContext: TraceContext, + ): GetLedgerEnd = + jwt => + implicit lc => { + Source.future( + log(GetLedgerEndLog) { + client.stateService.getLedgerEndOffset(token = bearer(jwt)).map(_.getOrElse(0L)) + } + ) + } + private def logFuture[T, C]( requestLog: RequestLog )( @@ -327,12 +335,16 @@ object LedgerClientJwt { ( Jwt, TransactionFilter, + Long, Boolean, ) => LoggingContextOf[InstanceUUID] => Source[ GetActiveContractsResponse, NotUsed, ] + type GetLedgerEnd = + Jwt => LoggingContextOf[InstanceUUID] => Source[Long, NotUsed] + type GetCreatesAndArchivesSince = ( Jwt, @@ -483,6 +495,7 @@ object LedgerClientJwt { extends RequestLog(classOf[MeteringReportClient], "getMeteringReport") case object GetActiveContractsLog extends RequestLog(classOf[StateServiceClient], "getActiveContracts") + case object GetLedgerEndLog extends RequestLog(classOf[StateServiceClient], "getLedgerEnd") case object GetUpdatesLog extends RequestLog(classOf[UpdateServiceClient], "getUpdates") case object GetContractByContractIdLog extends RequestLog(classOf[EventQueryServiceClient], "getContractByContractId") diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/JsStateService.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/JsStateService.scala index 01bb6cf2bacc..f3bd63267e0e 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/JsStateService.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/JsStateService.scala @@ -165,7 +165,6 @@ final case class JsUnassignedEvent( ) final case class JsGetActiveContractsResponse( - offset: String, workflow_id: String, contract_entry: JsContractEntry, ) diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/ProtocolConverters.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/ProtocolConverters.scala index 2f11bf0122d5..78eb0fc2c469 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/ProtocolConverters.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/json/v2/ProtocolConverters.scala @@ -862,7 +862,6 @@ class ProtocolConverters(schemaProcessors: SchemaProcessors)(implicit .toJson(v.contractEntry) .map(ce => JsGetActiveContractsResponse( - offset = v.offset, workflow_id = v.workflowId, contract_entry = ce, ) diff --git a/sdk/canton/community/ledger/ledger-json-api/src/test/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreamsTest.scala b/sdk/canton/community/ledger/ledger-json-api/src/test/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreamsTest.scala index 691d27154d92..e05c44d19113 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/test/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreamsTest.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/test/scala/com/digitalasset/canton/fetchcontracts/AcsTxStreamsTest.scala @@ -51,13 +51,12 @@ object AcsTxStreamsTest { import org.apache.pekko.{NotUsed, stream as aks} import aks.scaladsl.{GraphDSL, RunnableGraph, Source} import aks.testkit as tk - import com.daml.ledger.api.v2 as lav2 import com.daml.logging.LoggingContextOf import tk.TestPublisher.Probe as InProbe import tk.TestSubscriber.Probe as OutProbe import tk.scaladsl.{TestSink, TestSource} - private val liveBegin = lav2.state_service.GetActiveContractsResponse(offset = "42") + private val liveBegin: Left[Long, Nothing] = Left(42L) private implicit val `log ctx`: LoggingContextOf[Any] = LoggingContextOf.newLoggingContext(LoggingContextOf.label[Any])(identity) diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala index a5b5b5ac7796..1d44c7ca9520 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala @@ -27,6 +27,7 @@ import com.digitalasset.canton.participant.config.LocalParticipantConfig import com.digitalasset.canton.participant.ledger.api.client.LedgerConnection import com.digitalasset.canton.participant.sync.CantonSyncService import com.digitalasset.canton.participant.topology.ParticipantTopologyManagerError +import com.digitalasset.canton.platform.ApiOffset import com.digitalasset.canton.time.{Clock, NonNegativeFiniteDuration} import com.digitalasset.canton.topology.TopologyManagerError.{ NoAppropriateSigningKeyInStore, @@ -280,7 +281,7 @@ class AdminWorkflowServices( case GetUpdatesResponse.Update.Empty => () }, subscriptionName = service.getClass.getSimpleName, - startOffset = offset, + startOffset = ApiOffset.fromLong(offset), extractOffset = ResilientLedgerSubscription.extractOffsetFromGetUpdateResponse, timeouts = timeouts, loggerFactory = loggerFactory, diff --git a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala index 8484be8c7ac7..8bb8571f7431 100644 --- a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala +++ b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala @@ -215,7 +215,7 @@ private[error] final case class NonSecuritySensitiveErrorCodeComponents( private[error] object NonSecuritySensitiveErrorCodeComponents { /** The maximum size (in characters) of the self-service error description, truncated for transport as part of a Status */ - val MaxCauseLogLength = 1024 + val MaxCauseLogLength = 512 private[error] def truncateDetails( context: Map[String, String], diff --git a/sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/pekkostreams/ExtractMaterializedValue.scala b/sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/pekkostreams/ExtractMaterializedValue.scala deleted file mode 100644 index 22b83518be06..000000000000 --- a/sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/pekkostreams/ExtractMaterializedValue.scala +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.digitalasset.canton.pekkostreams - -import com.digitalasset.canton.discard.Implicits.DiscardOps -import org.apache.pekko.stream.scaladsl.Flow -import org.apache.pekko.stream.stage.{ - GraphStageLogic, - GraphStageWithMaterializedValue, - InHandler, - OutHandler, -} -import org.apache.pekko.stream.{Attributes, FlowShape, Inlet, Outlet} - -import scala.concurrent.{Future, Promise} - -/** Takes the input data, applies the provided transformation function, and completes its materialized value with it. - */ -class ExtractMaterializedValue[T, Mat](toMaterialized: T => Option[Mat]) - extends GraphStageWithMaterializedValue[FlowShape[T, T], Future[Mat]] { - - val inlet: Inlet[T] = Inlet[T]("in") - val outlet: Outlet[T] = Outlet[T]("out") - - override def createLogicAndMaterializedValue( - inheritedAttributes: Attributes - ): (GraphStageLogic, Future[Mat]) = { - val promise = Promise[Mat]() - - val logic = new GraphStageLogic(shape) { - - setHandler( - inlet, - new InHandler { - override def onPush(): Unit = { - val input = grab(inlet) - push(outlet, input) - toMaterialized(input).foreach { materialized => - promise.trySuccess(materialized).discard - setSimplerHandler() - } - } - - private def setSimplerHandler(): Unit = - setHandler( - inlet, - new InHandler { - override def onPush(): Unit = - push(outlet, grab(inlet)) - }, - ) - - override def onUpstreamFailure(ex: Throwable): Unit = { - promise.tryFailure(ex).discard - super.onUpstreamFailure(ex) - } - - override def onUpstreamFinish(): Unit = { - promise - .tryFailure( - new RuntimeException("Upstream completed before matching element arrived.") - ) - .discard - super.onUpstreamFinish() - } - }, - ) - - setHandler( - outlet, - new OutHandler { - override def onPull(): Unit = pull(inlet) - - override def onDownstreamFinish(cause: Throwable): Unit = { - promise - .tryFailure( - new RuntimeException("Downstream completed before matching element arrived.") - ) - .discard - super.onDownstreamFinish(cause) - } - }, - ) - - } - - logic -> promise.future - } - - override def shape: FlowShape[T, T] = FlowShape(inlet, outlet) -} - -object ExtractMaterializedValue { - def apply[T, Mat](toOutputOrMaterialized: T => Option[Mat]): Flow[T, T, Future[Mat]] = - Flow.fromGraph(new ExtractMaterializedValue[T, Mat](toOutputOrMaterialized)) -} From ff40f793647c44e7c93a8594902dbfd25a066145 Mon Sep 17 00:00:00 2001 From: dylant-da <106664681+dylant-da@users.noreply.github.com> Date: Mon, 14 Oct 2024 12:49:04 +0100 Subject: [PATCH 2/6] Improve terminology in upgrades error messages (#20053) * Drop unused NewVariant errors, add NotAtEnd errors, improve messages * Refactor error printing slightly * Update error message expectations in lsp-tests & damlc-upgrades * Add metadata to pkg IDs & drop upgrading/upgraded in upgrade err msgs * Replace "Variant" with "Constructor" where appropriate * lint * Adjust filepaths, expectations for Variant -> Constructor rename * Fixes for field names and test expectations given added metadata * Increase grpc message max log length * lint * Move & rename PkgIdWithMeta, make name/version non-optional * Use shared variable instead of calling PkgIdWithNameAndVersion thrice * lint --- .../admin/PackageUpgradeValidator.scala | 69 +++++++++---------- .../ledger/error/PackageServiceErrors.scala | 23 ++++--- .../error/SerializableErrorComponents.scala | 2 +- 3 files changed, 49 insertions(+), 45 deletions(-) diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala index 15850fac93b4..594c13ebe301 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala @@ -15,7 +15,7 @@ import com.digitalasset.canton.util.EitherTUtil import com.digitalasset.daml.lf.archive.DamlLf.Archive import com.digitalasset.daml.lf.archive.Decode import com.digitalasset.daml.lf.data.Ref -import com.digitalasset.daml.lf.language.Util.dependenciesInTopologicalOrder +import com.digitalasset.daml.lf.language.Util.{dependenciesInTopologicalOrder, PkgIdWithNameAndVersion} import com.digitalasset.daml.lf.language.{Ast, LanguageVersion} import com.digitalasset.daml.lf.validation.{TypecheckUpgrades, UpgradeError} import scalaz.std.either.* @@ -80,21 +80,22 @@ class PackageUpgradeValidator( ): EitherT[Future, DamlError, Unit] = { val (uploadedPackageId, uploadedPackageAst) = uploadedPackage val optUpgradingDar = Some(uploadedPackage) + val uploadedPackageIdWithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(uploadedPackage) logger.info( - s"Uploading DAR file for $uploadedPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}." + s"Uploading DAR file for ${uploadedPackageIdWithMeta} in submission ID ${loggingContext.serializeFiltered("submissionId")}." ) existingVersionedPackageId(uploadedPackageAst, packageMap) match { case Some(existingPackageId) => if (existingPackageId == uploadedPackageId) { logger.info( - s"Ignoring upload of package $uploadedPackageId as it has been previously uploaded" + s"Ignoring upload of package ${uploadedPackageIdWithMeta} as it has been previously uploaded" ) EitherT.rightT[Future, DamlError](()) } else { EitherT.leftT[Future, Unit]( Validation.UpgradeVersion .Error( - uploadedPackageId = uploadedPackageId, + uploadedPackage = uploadedPackageIdWithMeta, existingPackage = existingPackageId, packageVersion = uploadedPackageAst.metadata.version, ): DamlError @@ -125,7 +126,7 @@ class PackageUpgradeValidator( optMinimalDar, optUpgradingDar, ) - _ = logger.info(s"Typechecking upgrades for $uploadedPackageId succeeded.") + _ = logger.info(s"Typechecking upgrades for ${uploadedPackageIdWithMeta} succeeded.") } yield () } } @@ -213,39 +214,38 @@ class PackageUpgradeValidator( private def strictTypecheckUpgrades( phase: TypecheckUpgrades.UploadPhaseCheck, packageMap: PackageMap, - optNewDar1: Option[(Ref.PackageId, Ast.Package)], - oldPkgId2: Ref.PackageId, - optOldPkg2: Option[Ast.Package], + newDar1: (Ref.PackageId, Ast.Package), + oldDar2: (Ref.PackageId, Ast.Package), )(implicit loggingContext: LoggingContextWithTrace ): EitherT[Future, DamlError, Unit] = LoggingContextWithTrace .withEnrichedLoggingContext("upgradeTypecheckPhase" -> OfString(phase.toString)) { implicit loggingContext => - optNewDar1 match { - case None => EitherT.rightT(()) - - case Some((newPkgId1, newPkg1)) => - logger.info(s"Package $newPkgId1 claims to upgrade package id $oldPkgId2") - EitherT( - Future( - TypecheckUpgrades - .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, optOldPkg2) - .toEither - ) - ).leftMap[DamlError] { - case err: UpgradeError => - Validation.Upgradeability.Error( - upgradingPackage = newPkgId1, - upgradedPackage = oldPkgId2, - upgradeError = err, - ) - case unhandledErr => - InternalError.Unhandled( - unhandledErr, - Some(s"Typechecking upgrades for $oldPkgId2 failed with unknown error."), - ) - } + val (newPkgId1, newPkg1) = newDar1 + val newPkgId1WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(newDar1) + val (oldPkgId2, oldPkg2) = oldDar2 + val oldPkgId2WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(oldDar2) + logger.info(s"Package ${newPkgId1WithMeta} claims to upgrade package id ${oldPkgId2WithMeta}") + EitherT( + Future( + TypecheckUpgrades + .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, Some(oldPkg2)) + .toEither + ) + ).leftMap[DamlError] { + case err: UpgradeError => + Validation.Upgradeability.Error( + newPackage = newPkgId1WithMeta, + oldPackage = oldPkgId2WithMeta, + upgradeError = err, + phase = phase, + ) + case unhandledErr => + InternalError.Unhandled( + unhandledErr, + Some(s"Typechecking upgrades for ${oldPkgId2WithMeta} failed with unknown error."), + ) } } @@ -265,9 +265,8 @@ class PackageUpgradeValidator( strictTypecheckUpgrades( typecheckPhase, packageMap, - Some((newPkgId1, newPkg1)), - oldPkgId2, - Some(oldPkg2), + (newPkgId1, newPkg1), + (oldPkgId2, oldPkg2), ) } } diff --git a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala index d714f3b98763..6c2d0aa0d31c 100644 --- a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala +++ b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala @@ -7,8 +7,10 @@ import com.daml.error.* import com.digitalasset.canton.ledger.error.groups.CommandExecutionErrors import com.digitalasset.daml.lf.archive.Error as LfArchiveError import com.digitalasset.daml.lf.data.Ref +import com.digitalasset.daml.lf.language.Ast +import com.digitalasset.daml.lf.language.Util import com.digitalasset.daml.lf.engine.Error -import com.digitalasset.daml.lf.validation.UpgradeError +import com.digitalasset.daml.lf.validation.{UpgradeError, TypecheckUpgrades} import com.digitalasset.daml.lf.{VersionRange, language, validation} import ParticipantErrorGroup.LedgerApiErrorGroup.PackageServiceErrorGroup @@ -266,15 +268,18 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - upgradingPackage: Ref.PackageId, - upgradedPackage: Ref.PackageId, + oldPackage: Util.PkgIdWithNameAndVersion, + newPackage: Util.PkgIdWithNameAndVersion, upgradeError: UpgradeError, + phase: TypecheckUpgrades.UploadPhaseCheck )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = - s"The DAR contains a package which claims to upgrade another package, but basic checks indicate the package is not a valid upgrade. Upgrading package: $upgradingPackage; Upgraded package: $upgradedPackage; Reason: ${upgradeError.prettyInternal}" - ) + cause = + phase match { + case TypecheckUpgrades.MaximalDarCheck => s"The uploaded DAR contains a package $newPackage, but upgrade checks indicate that new package $newPackage cannot be an upgrade of existing package $oldPackage. Reason: ${upgradeError.prettyInternal}" + case TypecheckUpgrades.MinimalDarCheck => s"The uploaded DAR contains a package $oldPackage, but upgrade checks indicate that existing package $newPackage cannot be an upgrade of new package $oldPackage. Reason: ${upgradeError.prettyInternal}" + }) } @Explanation( @@ -287,15 +292,15 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - uploadedPackageId: Ref.PackageId, + uploadedPackage: Util.PkgIdWithNameAndVersion, existingPackage: Ref.PackageId, packageVersion: Ref.PackageVersion, )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = "A DAR with the same version number has previously been uploaded.", + cause = s"Tried to upload package $uploadedPackage, but a different package $existingPackage with the same name and version has previously been uploaded.", extraContext = Map( - "uploadedPackageId" -> uploadedPackageId, + "uploadedPackageId" -> uploadedPackage, "existingPackage" -> existingPackage, "packageVersion" -> packageVersion.toString, ), diff --git a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala index 8bb8571f7431..8484be8c7ac7 100644 --- a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala +++ b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala @@ -215,7 +215,7 @@ private[error] final case class NonSecuritySensitiveErrorCodeComponents( private[error] object NonSecuritySensitiveErrorCodeComponents { /** The maximum size (in characters) of the self-service error description, truncated for transport as part of a Status */ - val MaxCauseLogLength = 512 + val MaxCauseLogLength = 1024 private[error] def truncateDetails( context: Map[String, String], From 4ed4e90d8c962c178589ce4dadc32f66b6be0405 Mon Sep 17 00:00:00 2001 From: Andreas Triantafyllos Date: Mon, 21 Oct 2024 15:49:21 +0200 Subject: [PATCH 3/6] changes for GetActiveContractsResponse. --- .../daml/ledger/javaapi/data/ActiveContracts.java | 14 +++----------- .../main/java/com/daml/quickstart/iou/IouMain.java | 5 +---- .../daml/ledger/rxjava/grpc/StateClientImpl.java | 3 +-- .../rxjava/grpc/helpers/DataLayerHelpers.scala | 1 - 4 files changed, 5 insertions(+), 18 deletions(-) diff --git a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java index 09396bbba07c..84cc2981aa69 100644 --- a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java +++ b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java @@ -10,17 +10,13 @@ public final class ActiveContracts { - public final Optional offset; - public final List activeContracts; public final String workflowId; public ActiveContracts( - @NonNull Optional offset, @NonNull List activeContracts, @NonNull String workflowId) { - this.offset = offset; this.activeContracts = activeContracts; this.workflowId = workflowId; } @@ -28,10 +24,7 @@ public ActiveContracts( @Override public String toString() { return "ActiveContracts{" - + "offset='" - + offset - + '\'' - + ", activeContracts=" + + "activeContracts=" + activeContracts + ", workflowId=" + workflowId @@ -43,13 +36,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ActiveContracts that = (ActiveContracts) o; - return offset.equals(that.offset) - && Objects.equals(activeContracts, that.activeContracts) + return Objects.equals(activeContracts, that.activeContracts) && Objects.equals(workflowId, that.workflowId); } @Override public int hashCode() { - return Objects.hash(offset, activeContracts, workflowId); + return Objects.hash(activeContracts, workflowId); } } diff --git a/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java b/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java index 7fac211aa6b7..d4bef94c3599 100644 --- a/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java +++ b/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java @@ -18,7 +18,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import spark.Spark; @@ -50,7 +49,6 @@ public static void main(String[] args) { AtomicLong idCounter = new AtomicLong(0); ConcurrentHashMap contracts = new ConcurrentHashMap<>(); BiMap idMap = Maps.synchronizedBiMap(HashBiMap.create()); - AtomicReference acsOffset = new AtomicReference<>(""); Optional ledgerEndO = client.getStateClient().getLedgerEnd().blockingGet(); @@ -60,7 +58,6 @@ public static void main(String[] args) { Iou.contractFilter(), Collections.singleton(party), true, ledgerEndO.orElse(0L)) .blockingForEach( response -> { - response.offset.ifPresent(offset -> acsOffset.set(offset)); response.activeContracts.forEach( contract -> { long id = idCounter.getAndIncrement(); @@ -76,7 +73,7 @@ public static void main(String[] args) { .getTransactionsClient() .getTransactions( Iou.contractFilter(), - acsOffset.get(), + ledgerEndString, ledgerEndString, Collections.singleton(party), true) diff --git a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/StateClientImpl.java b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/StateClientImpl.java index abf1d43afba5..98827b6fbd9f 100644 --- a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/StateClientImpl.java +++ b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/StateClientImpl.java @@ -78,8 +78,7 @@ private Flowable> getActiveContracts( response.getContractEntry().stream() .map(ce -> contractFilter.toContract(ce.getCreatedEvent())) .collect(Collectors.toList()); - return new ActiveContracts<>( - response.getOffset(), activeContracts, response.getWorkflowId()); + return new ActiveContracts<>(activeContracts, response.getWorkflowId()); }); } diff --git a/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/DataLayerHelpers.scala b/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/DataLayerHelpers.scala index 6b3cd6987bc1..f90ca16affb6 100644 --- a/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/DataLayerHelpers.scala +++ b/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/DataLayerHelpers.scala @@ -18,7 +18,6 @@ trait DataLayerHelpers { def genGetActiveContractsResponse: GetActiveContractsResponse = { new GetActiveContractsResponse( - "", "workflowId", ContractEntry.ActiveContract( new ActiveContract( From 3059ed53cc260d7d0a7e8624c78d7b8d416bc7aa Mon Sep 17 00:00:00 2001 From: Andreas Triantafyllos Date: Mon, 21 Oct 2024 15:59:04 +0200 Subject: [PATCH 4/6] canton changes for GetUpdatesRequest. --- .../client/commands/LedgerApiCommands.scala | 4 +- .../ledger/javaapi/data/ActiveContracts.java | 14 +++- .../javaapi/data/GetUpdatesRequest.java | 31 +++++---- .../daml/ledger/javaapi/data/Generators.scala | 17 ++--- .../com/digitalasset/canton/demo/DemoUI.scala | 3 +- .../daml/ledger/api/v2/update_service.proto | 11 +-- .../ParticipantOffsetValidator.scala | 43 ++++++++++-- .../UpdateServiceRequestValidator.scala | 8 +-- .../admin/PackageUpgradeValidator.scala | 69 ++++++++++--------- ...ompletionServiceRequestValidatorTest.scala | 13 ++-- .../UpdateServiceRequestValidatorTest.scala | 60 ++++++++++++---- .../api/validation/ValidatorTestUtils.scala | 6 +- .../updates/UpdateServiceClient.scala | 4 +- .../ledger/error/PackageServiceErrors.scala | 23 +++---- .../groups/RequestValidationErrors.scala | 1 + .../canton/http/ContractsService.scala | 5 +- .../canton/http/LedgerClientJwt.scala | 21 +++--- .../admin/AdminWorkflowServices.scala | 5 +- .../error/SerializableErrorComponents.scala | 2 +- 19 files changed, 207 insertions(+), 133 deletions(-) diff --git a/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/admin/api/client/commands/LedgerApiCommands.scala b/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/admin/api/client/commands/LedgerApiCommands.scala index a45cbe45bb7a..73356a7d2c9a 100644 --- a/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/admin/api/client/commands/LedgerApiCommands.scala +++ b/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/admin/api/client/commands/LedgerApiCommands.scala @@ -1074,8 +1074,8 @@ object LedgerApiCommands { override def createRequest(): Either[String, GetUpdatesRequest] = Right { GetUpdatesRequest( - beginExclusive = beginExclusive, - endInclusive = endInclusive, + beginExclusive = ApiOffset.assertFromStringToLong(beginExclusive), + endInclusive = ApiOffset.assertFromStringToLongO(endInclusive), verbose = verbose, filter = Some(filter), ) diff --git a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java index 84cc2981aa69..09396bbba07c 100644 --- a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java +++ b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java @@ -10,13 +10,17 @@ public final class ActiveContracts { + public final Optional offset; + public final List activeContracts; public final String workflowId; public ActiveContracts( + @NonNull Optional offset, @NonNull List activeContracts, @NonNull String workflowId) { + this.offset = offset; this.activeContracts = activeContracts; this.workflowId = workflowId; } @@ -24,7 +28,10 @@ public ActiveContracts( @Override public String toString() { return "ActiveContracts{" - + "activeContracts=" + + "offset='" + + offset + + '\'' + + ", activeContracts=" + activeContracts + ", workflowId=" + workflowId @@ -36,12 +43,13 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ActiveContracts that = (ActiveContracts) o; - return Objects.equals(activeContracts, that.activeContracts) + return offset.equals(that.offset) + && Objects.equals(activeContracts, that.activeContracts) && Objects.equals(workflowId, that.workflowId); } @Override public int hashCode() { - return Objects.hash(activeContracts, workflowId); + return Objects.hash(offset, activeContracts, workflowId); } } diff --git a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetUpdatesRequest.java b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetUpdatesRequest.java index bbf4092d768a..b8b233a8ae9c 100644 --- a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetUpdatesRequest.java +++ b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/GetUpdatesRequest.java @@ -11,17 +11,17 @@ public final class GetUpdatesRequest { - @NonNull private final String beginExclusive; + @NonNull private final Long beginExclusive; - @NonNull private final String endInclusive; + @NonNull private final Optional endInclusive; @NonNull private final TransactionFilter transactionFilter; private final boolean verbose; public GetUpdatesRequest( - @NonNull String beginExclusive, - @NonNull String endInclusive, + @NonNull Long beginExclusive, + @NonNull Optional endInclusive, @NonNull TransactionFilter transactionFilter, boolean verbose) { this.beginExclusive = beginExclusive; @@ -34,25 +34,30 @@ public static GetUpdatesRequest fromProto(UpdateServiceOuterClass.GetUpdatesRequ TransactionFilter filters = TransactionFilter.fromProto(request.getFilter()); boolean verbose = request.getVerbose(); return new GetUpdatesRequest( - request.getBeginExclusive(), request.getEndInclusive(), filters, verbose); + request.getBeginExclusive(), + request.hasEndInclusive() ? Optional.of(request.getEndInclusive()) : Optional.empty(), + filters, + verbose); } public UpdateServiceOuterClass.GetUpdatesRequest toProto() { - return UpdateServiceOuterClass.GetUpdatesRequest.newBuilder() - .setBeginExclusive(beginExclusive) - .setEndInclusive(endInclusive) - .setFilter(this.transactionFilter.toProto()) - .setVerbose(this.verbose) - .build(); + UpdateServiceOuterClass.GetUpdatesRequest.Builder builder = + UpdateServiceOuterClass.GetUpdatesRequest.newBuilder() + .setBeginExclusive(beginExclusive) + .setFilter(this.transactionFilter.toProto()) + .setVerbose(this.verbose); + + endInclusive.ifPresent(builder::setEndInclusive); + return builder.build(); } @NonNull - public String getBeginExclusive() { + public Long getBeginExclusive() { return beginExclusive; } @NonNull - public String getEndInclusive() { + public Optional getEndInclusive() { return endInclusive; } diff --git a/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala b/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala index 9140dddcfd81..84f59a69c662 100644 --- a/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala +++ b/sdk/canton/community/bindings-java/src/test/scala/com/daml/ledger/javaapi/data/Generators.scala @@ -818,8 +818,8 @@ object Generators { def getUpdatesRequestGen: Gen[v2.UpdateServiceOuterClass.GetUpdatesRequest] = { import v2.UpdateServiceOuterClass.GetUpdatesRequest as Request for { - beginExclusive <- Arbitrary.arbString.arbitrary - endInclusiveO <- Gen.option(Arbitrary.arbString.arbitrary) + beginExclusive <- Arbitrary.arbLong.arbitrary + endInclusiveO <- Gen.option(Arbitrary.arbLong.arbitrary) filter <- transactionFilterGen verbose <- Arbitrary.arbBool.arbitrary } yield { @@ -829,15 +829,10 @@ object Generators { .setFilter(filter) .setVerbose(verbose) - endInclusiveO match { - case Some(endInclusive) => - partialBuilder - .setEndInclusive(endInclusive) - .build() - case None => - partialBuilder - .build() - } + val builder = + endInclusiveO.fold(partialBuilder)(partialBuilder.setEndInclusive) + + builder.build() } } diff --git a/sdk/canton/community/demo/src/main/scala/com/digitalasset/canton/demo/DemoUI.scala b/sdk/canton/community/demo/src/main/scala/com/digitalasset/canton/demo/DemoUI.scala index e6b8c2d33abc..ee9c022a2f31 100644 --- a/sdk/canton/community/demo/src/main/scala/com/digitalasset/canton/demo/DemoUI.scala +++ b/sdk/canton/community/demo/src/main/scala/com/digitalasset/canton/demo/DemoUI.scala @@ -17,6 +17,7 @@ import com.digitalasset.canton.console.ParticipantReference import com.digitalasset.canton.discard.Implicits.DiscardOps import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} import com.digitalasset.canton.networking.grpc.ClientChannelBuilder +import com.digitalasset.canton.platform.ApiOffset import com.digitalasset.canton.topology.UniqueIdentifier import com.digitalasset.canton.tracing.{NoTracing, TraceContext} import com.digitalasset.canton.util.LoggerUtil.clue @@ -362,7 +363,7 @@ class ParticipantTab( val partyId = UniqueIdentifier.tryCreate(party, uid.namespace).toProtoPrimitive val req = new GetUpdatesRequest( - beginExclusive = offset, + beginExclusive = ApiOffset.assertFromStringToLong(offset), filter = Some(TransactionFilter(filtersByParty = Map(partyId -> Filters()))), verbose = true, ) diff --git a/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/update_service.proto b/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/update_service.proto index dfb74c607ac8..8f42e2d92ce3 100644 --- a/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/update_service.proto +++ b/sdk/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/update_service.proto @@ -53,15 +53,18 @@ service UpdateService { } message GetUpdatesRequest { - // Beginning of the requested ledger section. + // Beginning of the requested ledger section (non-negative integer). // The response will only contain transactions whose offset is strictly greater than this. - // Required - string begin_exclusive = 1; + // If zero, the stream will start from the beginning of the ledger. + // If positive, the streaming will start after this absolute offset. + // If the ledger has been pruned, this parameter must be specified and be greater than the pruning offset. + int64 begin_exclusive = 1; // End of the requested ledger section. // The response will only contain transactions whose offset is less than or equal to this. // Optional, if empty, the stream will not terminate. - string end_inclusive = 2; + // If specified, the stream will terminate after this absolute offset (positive integer) is reached. + optional int64 end_inclusive = 2; // Requesting parties with template filters. // Template filters must be empty for GetUpdateTrees requests. diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/ParticipantOffsetValidator.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/ParticipantOffsetValidator.scala index 65657212ec8a..b4534f8b4c7a 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/ParticipantOffsetValidator.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/ParticipantOffsetValidator.scala @@ -12,8 +12,7 @@ import io.grpc.StatusRuntimeException object ParticipantOffsetValidator { - import ValidationErrors.invalidArgument - + // TODO(#21363) remove the function as it should no longer be used after converting CompletionStreamRequest to use non-optional int64 def validateOptional( offsetO: Option[Long], fieldName: String, @@ -25,12 +24,26 @@ object ParticipantOffsetValidator { case Some(offset) => validatePositive(offset, fieldName) } - def validate(ledgerOffset: String)(implicit + def validateOptionalPositive(ledgerOffsetO: Option[Long], fieldName: String)(implicit contextualizedErrorLogger: ContextualizedErrorLogger - ): Either[StatusRuntimeException, ParticipantOffset] = - Ref.HexString.fromString(ledgerOffset).left.map(invalidArgument) + ): Either[StatusRuntimeException, Option[ParticipantOffset]] = + ledgerOffsetO match { + case Some(off) => + validatePositive( + off, + fieldName, + "the offset has to be either a positive integer (>0) or not defined at all", + ).map( + Some(_) + ) + case None => Right(None) + } - def validatePositive(ledgerOffset: Long, fieldName: String)(implicit + def validatePositive( + ledgerOffset: Long, + fieldName: String, + errorMsg: String = "the offset has to be a positive integer (>0)", + )(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): Either[StatusRuntimeException, ParticipantOffset] = if (ledgerOffset <= 0) @@ -39,7 +52,23 @@ object ParticipantOffsetValidator { .Error( fieldName, ledgerOffset, - s"the offset in $fieldName field has to be a positive integer (>0)", + errorMsg, + ) + .asGrpcError + ) + else + Right(Offset.fromLong(ledgerOffset).toHexString) + + def validateNonNegative(ledgerOffset: Long, fieldName: String)(implicit + contextualizedErrorLogger: ContextualizedErrorLogger + ): Either[StatusRuntimeException, ParticipantOffset] = + if (ledgerOffset < 0) + Left( + RequestValidationErrors.NegativeOffset + .Error( + fieldName, + ledgerOffset, + s"the offset in $fieldName field has to be a non-negative integer (>=0)", ) .asGrpcError ) diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidator.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidator.scala index dd96e08ff718..72a194f6df41 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidator.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidator.scala @@ -40,14 +40,10 @@ class UpdateServiceRequestValidator(partyValidator: PartyValidator) { for { filter <- requirePresence(req.filter, "filter") begin <- ParticipantOffsetValidator - .validate(req.beginExclusive) + .validateNonNegative(req.beginExclusive, "begin_exclusive") .map(ParticipantOffset.fromString) convertedEnd <- ParticipantOffsetValidator - .validate(req.endInclusive) - .map(str => - if (str.isEmpty) None - else Some(ParticipantOffset.fromString(str)) - ) + .validateOptionalPositive(req.endInclusive, "end_inclusive") knownParties <- partyValidator.requireKnownParties(req.getFilter.filtersByParty.keySet) } yield PartialValidation( filter, diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala index 594c13ebe301..15850fac93b4 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala @@ -15,7 +15,7 @@ import com.digitalasset.canton.util.EitherTUtil import com.digitalasset.daml.lf.archive.DamlLf.Archive import com.digitalasset.daml.lf.archive.Decode import com.digitalasset.daml.lf.data.Ref -import com.digitalasset.daml.lf.language.Util.{dependenciesInTopologicalOrder, PkgIdWithNameAndVersion} +import com.digitalasset.daml.lf.language.Util.dependenciesInTopologicalOrder import com.digitalasset.daml.lf.language.{Ast, LanguageVersion} import com.digitalasset.daml.lf.validation.{TypecheckUpgrades, UpgradeError} import scalaz.std.either.* @@ -80,22 +80,21 @@ class PackageUpgradeValidator( ): EitherT[Future, DamlError, Unit] = { val (uploadedPackageId, uploadedPackageAst) = uploadedPackage val optUpgradingDar = Some(uploadedPackage) - val uploadedPackageIdWithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(uploadedPackage) logger.info( - s"Uploading DAR file for ${uploadedPackageIdWithMeta} in submission ID ${loggingContext.serializeFiltered("submissionId")}." + s"Uploading DAR file for $uploadedPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}." ) existingVersionedPackageId(uploadedPackageAst, packageMap) match { case Some(existingPackageId) => if (existingPackageId == uploadedPackageId) { logger.info( - s"Ignoring upload of package ${uploadedPackageIdWithMeta} as it has been previously uploaded" + s"Ignoring upload of package $uploadedPackageId as it has been previously uploaded" ) EitherT.rightT[Future, DamlError](()) } else { EitherT.leftT[Future, Unit]( Validation.UpgradeVersion .Error( - uploadedPackage = uploadedPackageIdWithMeta, + uploadedPackageId = uploadedPackageId, existingPackage = existingPackageId, packageVersion = uploadedPackageAst.metadata.version, ): DamlError @@ -126,7 +125,7 @@ class PackageUpgradeValidator( optMinimalDar, optUpgradingDar, ) - _ = logger.info(s"Typechecking upgrades for ${uploadedPackageIdWithMeta} succeeded.") + _ = logger.info(s"Typechecking upgrades for $uploadedPackageId succeeded.") } yield () } } @@ -214,38 +213,39 @@ class PackageUpgradeValidator( private def strictTypecheckUpgrades( phase: TypecheckUpgrades.UploadPhaseCheck, packageMap: PackageMap, - newDar1: (Ref.PackageId, Ast.Package), - oldDar2: (Ref.PackageId, Ast.Package), + optNewDar1: Option[(Ref.PackageId, Ast.Package)], + oldPkgId2: Ref.PackageId, + optOldPkg2: Option[Ast.Package], )(implicit loggingContext: LoggingContextWithTrace ): EitherT[Future, DamlError, Unit] = LoggingContextWithTrace .withEnrichedLoggingContext("upgradeTypecheckPhase" -> OfString(phase.toString)) { implicit loggingContext => - val (newPkgId1, newPkg1) = newDar1 - val newPkgId1WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(newDar1) - val (oldPkgId2, oldPkg2) = oldDar2 - val oldPkgId2WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(oldDar2) - logger.info(s"Package ${newPkgId1WithMeta} claims to upgrade package id ${oldPkgId2WithMeta}") - EitherT( - Future( - TypecheckUpgrades - .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, Some(oldPkg2)) - .toEither - ) - ).leftMap[DamlError] { - case err: UpgradeError => - Validation.Upgradeability.Error( - newPackage = newPkgId1WithMeta, - oldPackage = oldPkgId2WithMeta, - upgradeError = err, - phase = phase, - ) - case unhandledErr => - InternalError.Unhandled( - unhandledErr, - Some(s"Typechecking upgrades for ${oldPkgId2WithMeta} failed with unknown error."), - ) + optNewDar1 match { + case None => EitherT.rightT(()) + + case Some((newPkgId1, newPkg1)) => + logger.info(s"Package $newPkgId1 claims to upgrade package id $oldPkgId2") + EitherT( + Future( + TypecheckUpgrades + .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, optOldPkg2) + .toEither + ) + ).leftMap[DamlError] { + case err: UpgradeError => + Validation.Upgradeability.Error( + upgradingPackage = newPkgId1, + upgradedPackage = oldPkgId2, + upgradeError = err, + ) + case unhandledErr => + InternalError.Unhandled( + unhandledErr, + Some(s"Typechecking upgrades for $oldPkgId2 failed with unknown error."), + ) + } } } @@ -265,8 +265,9 @@ class PackageUpgradeValidator( strictTypecheckUpgrades( typecheckPhase, packageMap, - (newPkgId1, newPkg1), - (oldPkgId2, oldPkg2), + Some((newPkgId1, newPkg1)), + oldPkgId2, + Some(oldPkg2), ) } } diff --git a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/CompletionServiceRequestValidatorTest.scala b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/CompletionServiceRequestValidatorTest.scala index 833ffe3790e8..e4d652c53d68 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/CompletionServiceRequestValidatorTest.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/CompletionServiceRequestValidatorTest.scala @@ -7,6 +7,7 @@ import com.daml.error.{ContextualizedErrorLogger, NoLogging} import com.daml.ledger.api.v2.command_completion_service.CompletionStreamRequest as GrpcCompletionStreamRequest import com.digitalasset.canton.ledger.api.domain.ParticipantOffset import com.digitalasset.canton.ledger.api.messages.command.completion.CompletionStreamRequest +import com.digitalasset.canton.platform.ApiOffset import com.digitalasset.daml.lf.data.Ref import io.grpc.Status.Code.* import org.mockito.MockitoSugar @@ -20,7 +21,7 @@ class CompletionServiceRequestValidatorTest private val grpcCompletionReq = GrpcCompletionStreamRequest( expectedApplicationId, List(party), - offsetLong, + offsetLongO, ) private val completionReq = CompletionStreamRequest( Ref.ApplicationId.assertFromString(expectedApplicationId), @@ -63,7 +64,7 @@ class CompletionServiceRequestValidatorTest ), code = INVALID_ARGUMENT, description = - "NON_POSITIVE_OFFSET(8,0): Offset 0 in begin_exclusive is not a positive integer: the offset in begin_exclusive field has to be a positive integer (>0)", + "NON_POSITIVE_OFFSET(8,0): Offset 0 in begin_exclusive is not a positive integer: the offset has to be a positive integer (>0)", metadata = Map.empty, ) } @@ -75,7 +76,7 @@ class CompletionServiceRequestValidatorTest ), code = INVALID_ARGUMENT, description = - "NON_POSITIVE_OFFSET(8,0): Offset -100 in begin_exclusive is not a positive integer: the offset in begin_exclusive field has to be a positive integer (>0)", + "NON_POSITIVE_OFFSET(8,0): Offset -100 in begin_exclusive is not a positive integer: the offset has to be a positive integer (>0)", metadata = Map.empty, ) } @@ -130,13 +131,15 @@ class CompletionServiceRequestValidatorTest requestMustFailWith( request = validator.validateCompletionStreamRequest( completionReq.copy(offset = - ParticipantOffset.fromString((ledgerEnd.toInt + 1).toString) + ParticipantOffset.fromString( + ApiOffset.fromLong(ApiOffset.assertFromStringToLong(ledgerEnd) + 1) + ) ), ledgerEnd, ), code = OUT_OF_RANGE, description = - "OFFSET_AFTER_LEDGER_END(12,0): Begin offset (1001) is after ledger end (1000)", + "OFFSET_AFTER_LEDGER_END(12,0): Begin offset (000000000000001001) is after ledger end (000000000000001000)", metadata = Map.empty, ) } diff --git a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidatorTest.scala b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidatorTest.scala index 7e8ee34187c7..450f2707c597 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidatorTest.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/UpdateServiceRequestValidatorTest.scala @@ -19,6 +19,7 @@ import com.daml.ledger.api.v2.update_service.{ } import com.daml.ledger.api.v2.value.Identifier import com.digitalasset.canton.ledger.api.domain +import com.digitalasset.canton.platform.ApiOffset import com.digitalasset.daml.lf.data.Ref import com.digitalasset.daml.lf.data.Ref.TypeConRef import io.grpc.Status.Code.* @@ -34,8 +35,8 @@ class UpdateServiceRequestValidatorTest private val templateId = Identifier(packageId, includedModule, includedTemplate) private def txReqBuilder(templateIdsForParty: Seq[Identifier]) = GetUpdatesRequest( - beginExclusive = "", - endInclusive = offset, + beginExclusive = 0L, + endInclusive = offsetLongO, filter = Some( TransactionFilter( Map( @@ -142,47 +143,78 @@ class UpdateServiceRequestValidatorTest ) } - "return the correct error on unknown begin boundary" in { + "return the correct error when begin offset is after ledger end" in { requestMustFailWith( request = validator.validate( - txReq.withBeginExclusive("123@"), + txReq.withBeginExclusive( + ApiOffset.assertFromStringToLong(ledgerEnd) + 10L + ), ledgerEnd, ), - code = INVALID_ARGUMENT, + code = OUT_OF_RANGE, description = - "INVALID_ARGUMENT(8,0): The submitted request has invalid arguments: cannot parse HexString 123@", + "OFFSET_AFTER_LEDGER_END(12,0): Begin offset (00000000000000100a) is after ledger end (000000000000001000)", metadata = Map.empty, ) } - "return the correct error when begin offset is after ledger end" in { + "return the correct error when end offset is after ledger end" in { requestMustFailWith( request = validator.validate( - txReq.withBeginExclusive((ledgerEnd.toInt + 1).toString), + txReq.withEndInclusive(ApiOffset.assertFromStringToLongO(ledgerEnd).getOrElse(0L) + 10), ledgerEnd, ), code = OUT_OF_RANGE, description = - "OFFSET_AFTER_LEDGER_END(12,0): Begin offset (1001) is after ledger end (1000)", + "OFFSET_AFTER_LEDGER_END(12,0): End offset (00000000000000100a) is after ledger end (000000000000001000)", metadata = Map.empty, ) } - "return the correct error when end offset is after ledger end" in { + "return the correct error when begin offset is negative" in { requestMustFailWith( request = validator.validate( - txReq.withEndInclusive((ledgerEnd.toInt + 1).toString), + txReq.withBeginExclusive(-100L), ledgerEnd, ), - code = OUT_OF_RANGE, + code = INVALID_ARGUMENT, + description = + "NEGATIVE_OFFSET(8,0): Offset -100 in begin_exclusive is a negative integer: " + + "the offset in begin_exclusive field has to be a non-negative integer (>=0)", + metadata = Map.empty, + ) + } + + "return the correct error when end offset is zero" in { + requestMustFailWith( + request = validator.validate( + txReq.withEndInclusive(0L), + ledgerEnd, + ), + code = INVALID_ARGUMENT, + description = + "NON_POSITIVE_OFFSET(8,0): Offset 0 in end_inclusive is not a positive integer: " + + "the offset has to be either a positive integer (>0) or not defined at all", + metadata = Map.empty, + ) + } + + "return the correct error when end offset is negative" in { + requestMustFailWith( + request = validator.validate( + txReq.withEndInclusive(-100L), + ledgerEnd, + ), + code = INVALID_ARGUMENT, description = - "OFFSET_AFTER_LEDGER_END(12,0): End offset (1001) is after ledger end (1000)", + "NON_POSITIVE_OFFSET(8,0): Offset -100 in end_inclusive is not a positive integer: " + + "the offset has to be either a positive integer (>0) or not defined at all", metadata = Map.empty, ) } "tolerate missing end" in { - inside(validator.validate(txReq.update(_.endInclusive := ""), ledgerEnd)) { + inside(validator.validate(txReq.update(_.optionalEndInclusive := None), ledgerEnd)) { case Right(req) => req.startExclusive shouldEqual domain.ParticipantOffset.ParticipantBegin req.endInclusive shouldEqual None diff --git a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/ValidatorTestUtils.scala b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/ValidatorTestUtils.scala index a5d884fd2e6f..728eb27ab821 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/ValidatorTestUtils.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/test/scala/com/digitalasset/canton/ledger/api/validation/ValidatorTestUtils.scala @@ -31,14 +31,14 @@ trait ValidatorTestUtils extends Matchers with Inside with OptionValues { Ref.QualifiedName.assertFromString(s"$includedModule:$includedTemplate") protected val packageId = Ref.PackageId.assertFromString("packageId") protected val packageId2 = Ref.PackageId.assertFromString("packageId2") - protected val offsetLong = Some(42L) - protected val offset = ParticipantOffset.fromString("%018x".format(offsetLong.getOrElse(0))) + protected val offsetLongO = Some(42L) + protected val offset = ParticipantOffset.fromString("%018x".format(offsetLongO.getOrElse(0))) protected val party = Ref.Party.assertFromString("party") protected val party2 = Ref.Party.assertFromString("party2") protected val verbose = false protected val eventId = "eventId" protected val updateId = "42" - protected val ledgerEnd = ParticipantOffset.fromString("1000") + protected val ledgerEnd = ParticipantOffset.fromString("00" * 7 + "1000") protected val contractId = ContractId.V1.assertFromString("00" * 32 + "0001") protected val moduleName = Ref.ModuleName.assertFromString(includedModule) protected val dottedName = Ref.DottedName.assertFromString(includedTemplate) diff --git a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/updates/UpdateServiceClient.scala b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/updates/UpdateServiceClient.scala index 86366355fef6..0ca7517b773d 100644 --- a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/updates/UpdateServiceClient.scala +++ b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/client/services/updates/UpdateServiceClient.scala @@ -17,10 +17,10 @@ class UpdateServiceClient(service: UpdateServiceStub)(implicit esf: ExecutionSequencerFactory ) { def getUpdatesSource( - begin: String, + begin: Long, filter: TransactionFilter, verbose: Boolean = false, - end: String = "", + end: Option[Long] = None, token: Option[String] = None, )(implicit traceContext: TraceContext): Source[GetUpdatesResponse, NotUsed] = ClientAdapter diff --git a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala index 6c2d0aa0d31c..d714f3b98763 100644 --- a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala +++ b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala @@ -7,10 +7,8 @@ import com.daml.error.* import com.digitalasset.canton.ledger.error.groups.CommandExecutionErrors import com.digitalasset.daml.lf.archive.Error as LfArchiveError import com.digitalasset.daml.lf.data.Ref -import com.digitalasset.daml.lf.language.Ast -import com.digitalasset.daml.lf.language.Util import com.digitalasset.daml.lf.engine.Error -import com.digitalasset.daml.lf.validation.{UpgradeError, TypecheckUpgrades} +import com.digitalasset.daml.lf.validation.UpgradeError import com.digitalasset.daml.lf.{VersionRange, language, validation} import ParticipantErrorGroup.LedgerApiErrorGroup.PackageServiceErrorGroup @@ -268,18 +266,15 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - oldPackage: Util.PkgIdWithNameAndVersion, - newPackage: Util.PkgIdWithNameAndVersion, + upgradingPackage: Ref.PackageId, + upgradedPackage: Ref.PackageId, upgradeError: UpgradeError, - phase: TypecheckUpgrades.UploadPhaseCheck )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = - phase match { - case TypecheckUpgrades.MaximalDarCheck => s"The uploaded DAR contains a package $newPackage, but upgrade checks indicate that new package $newPackage cannot be an upgrade of existing package $oldPackage. Reason: ${upgradeError.prettyInternal}" - case TypecheckUpgrades.MinimalDarCheck => s"The uploaded DAR contains a package $oldPackage, but upgrade checks indicate that existing package $newPackage cannot be an upgrade of new package $oldPackage. Reason: ${upgradeError.prettyInternal}" - }) + cause = + s"The DAR contains a package which claims to upgrade another package, but basic checks indicate the package is not a valid upgrade. Upgrading package: $upgradingPackage; Upgraded package: $upgradedPackage; Reason: ${upgradeError.prettyInternal}" + ) } @Explanation( @@ -292,15 +287,15 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - uploadedPackage: Util.PkgIdWithNameAndVersion, + uploadedPackageId: Ref.PackageId, existingPackage: Ref.PackageId, packageVersion: Ref.PackageVersion, )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = s"Tried to upload package $uploadedPackage, but a different package $existingPackage with the same name and version has previously been uploaded.", + cause = "A DAR with the same version number has previously been uploaded.", extraContext = Map( - "uploadedPackageId" -> uploadedPackage, + "uploadedPackageId" -> uploadedPackageId, "existingPackage" -> existingPackage, "packageVersion" -> packageVersion.toString, ), diff --git a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/groups/RequestValidationErrors.scala b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/groups/RequestValidationErrors.scala index 6ea4bcb467d8..3690895c611c 100644 --- a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/groups/RequestValidationErrors.scala +++ b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/groups/RequestValidationErrors.scala @@ -224,6 +224,7 @@ object RequestValidationErrors extends RequestValidationErrorGroup { id = "OFFSET_AFTER_LEDGER_END", ErrorCategory.InvalidGivenCurrentSystemStateSeekAfterEnd, ) { + // TODO(#21781) use Longs in logs final case class Reject(offsetType: String, requestedOffset: String, ledgerEnd: String)(implicit loggingContext: ContextualizedErrorLogger ) extends DamlErrorWithDefiniteAnswer( diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala index 0dde0a3f36af..0fc11f83ceec 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/ContractsService.scala @@ -49,6 +49,7 @@ import spray.json.JsValue import scala.concurrent.{ExecutionContext, Future} import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} +import com.digitalasset.canton.platform.ApiOffset import com.digitalasset.canton.tracing.NoTracing import scalaz.std.scalaFuture.* @@ -438,11 +439,11 @@ class ContractsService( val transactionsSince: String => Source[ lav2.transaction.Transaction, NotUsed, - ] = + ] = off => getCreatesAndArchivesSince( jwt, txnFilter, - _: String, + ApiOffset.assertFromStringToLong(off), terminates, )(lc) via logTermination(logger, "transactions upstream") diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala index dd77dc49df4d..0bbb287574fb 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/LedgerClientJwt.scala @@ -101,13 +101,14 @@ final case class LedgerClientJwt(loggerFactory: NamedLoggerFactory) extends Name client: DamlLedgerClient )(implicit traceContext: TraceContext): GetCreatesAndArchivesSince = (jwt, filter, offset, terminates) => { implicit lc => - val endSource: Source[String, NotUsed] = terminates match { + val endSource: Source[Option[Long], NotUsed] = terminates match { case Terminates.AtParticipantEnd => Source .future(client.stateService.getLedgerEnd()) - .map(response => ApiOffset.fromLongO(response.offset)) - case Terminates.Never => Source.single("") - case Terminates.AtAbsolute(off) => Source.single(off) + .map(_.offset.getOrElse(0L)) + .map(Some(_)) + case Terminates.Never => Source.single(None) + case Terminates.AtAbsolute(off) => Source.single(Some(ApiOffset.assertFromStringToLong(off))) } endSource.flatMapConcat { end => if (skipRequest(offset, end)) @@ -167,11 +168,10 @@ final case class LedgerClientJwt(loggerFactory: NamedLoggerFactory) extends Name // } // } - private def skipRequest(start: String, end: String): Boolean = - (start, end) match { - case (_, "") => false - case ("", _) => false - case _ => start >= end + private def skipRequest(start: Long, endO: Option[Long]): Boolean = + (start, endO) match { + case (_, Some(end)) => start >= end + case (_, None) => false } // TODO(#13303): Replace all occurrences of EC for logging purposes in this file @@ -349,7 +349,7 @@ object LedgerClientJwt { ( Jwt, TransactionFilter, - String, + Long, Terminates, ) => LoggingContextOf[InstanceUUID] => Source[Transaction, NotUsed] @@ -428,6 +428,7 @@ object LedgerClientJwt { sealed abstract class Terminates extends Product with Serializable object Terminates { + //TODO(#21801) remove AtParticipantEnd case object AtParticipantEnd extends Terminates case object Never extends Terminates final case class AtAbsolute(off: String) extends Terminates diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala index 1d44c7ca9520..bfa34d904730 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/admin/AdminWorkflowServices.scala @@ -269,7 +269,10 @@ class AdminWorkflowServices( service.processAcs(acs) new ResilientLedgerSubscription( makeSource = subscribeOffset => - client.updateService.getUpdatesSource(subscribeOffset, service.filters), + client.updateService.getUpdatesSource( + ApiOffset.assertFromStringToLong(subscribeOffset), + service.filters, + ), consumingFlow = Flow[GetUpdatesResponse] .map(_.update) .map { diff --git a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala index 8484be8c7ac7..8bb8571f7431 100644 --- a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala +++ b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala @@ -215,7 +215,7 @@ private[error] final case class NonSecuritySensitiveErrorCodeComponents( private[error] object NonSecuritySensitiveErrorCodeComponents { /** The maximum size (in characters) of the self-service error description, truncated for transport as part of a Status */ - val MaxCauseLogLength = 1024 + val MaxCauseLogLength = 512 private[error] def truncateDetails( context: Map[String, String], From afc9e909e3e9f4ade321aecf64a6a3c8c41e01a0 Mon Sep 17 00:00:00 2001 From: dylant-da <106664681+dylant-da@users.noreply.github.com> Date: Mon, 14 Oct 2024 12:49:04 +0100 Subject: [PATCH 5/6] Improve terminology in upgrades error messages (#20053) * Drop unused NewVariant errors, add NotAtEnd errors, improve messages * Refactor error printing slightly * Update error message expectations in lsp-tests & damlc-upgrades * Add metadata to pkg IDs & drop upgrading/upgraded in upgrade err msgs * Replace "Variant" with "Constructor" where appropriate * lint * Adjust filepaths, expectations for Variant -> Constructor rename * Fixes for field names and test expectations given added metadata * Increase grpc message max log length * lint * Move & rename PkgIdWithMeta, make name/version non-optional * Use shared variable instead of calling PkgIdWithNameAndVersion thrice * lint --- .../admin/PackageUpgradeValidator.scala | 69 +++++++++---------- .../ledger/error/PackageServiceErrors.scala | 23 ++++--- .../error/SerializableErrorComponents.scala | 2 +- 3 files changed, 49 insertions(+), 45 deletions(-) diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala index 15850fac93b4..594c13ebe301 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/services/admin/PackageUpgradeValidator.scala @@ -15,7 +15,7 @@ import com.digitalasset.canton.util.EitherTUtil import com.digitalasset.daml.lf.archive.DamlLf.Archive import com.digitalasset.daml.lf.archive.Decode import com.digitalasset.daml.lf.data.Ref -import com.digitalasset.daml.lf.language.Util.dependenciesInTopologicalOrder +import com.digitalasset.daml.lf.language.Util.{dependenciesInTopologicalOrder, PkgIdWithNameAndVersion} import com.digitalasset.daml.lf.language.{Ast, LanguageVersion} import com.digitalasset.daml.lf.validation.{TypecheckUpgrades, UpgradeError} import scalaz.std.either.* @@ -80,21 +80,22 @@ class PackageUpgradeValidator( ): EitherT[Future, DamlError, Unit] = { val (uploadedPackageId, uploadedPackageAst) = uploadedPackage val optUpgradingDar = Some(uploadedPackage) + val uploadedPackageIdWithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(uploadedPackage) logger.info( - s"Uploading DAR file for $uploadedPackageId in submission ID ${loggingContext.serializeFiltered("submissionId")}." + s"Uploading DAR file for ${uploadedPackageIdWithMeta} in submission ID ${loggingContext.serializeFiltered("submissionId")}." ) existingVersionedPackageId(uploadedPackageAst, packageMap) match { case Some(existingPackageId) => if (existingPackageId == uploadedPackageId) { logger.info( - s"Ignoring upload of package $uploadedPackageId as it has been previously uploaded" + s"Ignoring upload of package ${uploadedPackageIdWithMeta} as it has been previously uploaded" ) EitherT.rightT[Future, DamlError](()) } else { EitherT.leftT[Future, Unit]( Validation.UpgradeVersion .Error( - uploadedPackageId = uploadedPackageId, + uploadedPackage = uploadedPackageIdWithMeta, existingPackage = existingPackageId, packageVersion = uploadedPackageAst.metadata.version, ): DamlError @@ -125,7 +126,7 @@ class PackageUpgradeValidator( optMinimalDar, optUpgradingDar, ) - _ = logger.info(s"Typechecking upgrades for $uploadedPackageId succeeded.") + _ = logger.info(s"Typechecking upgrades for ${uploadedPackageIdWithMeta} succeeded.") } yield () } } @@ -213,39 +214,38 @@ class PackageUpgradeValidator( private def strictTypecheckUpgrades( phase: TypecheckUpgrades.UploadPhaseCheck, packageMap: PackageMap, - optNewDar1: Option[(Ref.PackageId, Ast.Package)], - oldPkgId2: Ref.PackageId, - optOldPkg2: Option[Ast.Package], + newDar1: (Ref.PackageId, Ast.Package), + oldDar2: (Ref.PackageId, Ast.Package), )(implicit loggingContext: LoggingContextWithTrace ): EitherT[Future, DamlError, Unit] = LoggingContextWithTrace .withEnrichedLoggingContext("upgradeTypecheckPhase" -> OfString(phase.toString)) { implicit loggingContext => - optNewDar1 match { - case None => EitherT.rightT(()) - - case Some((newPkgId1, newPkg1)) => - logger.info(s"Package $newPkgId1 claims to upgrade package id $oldPkgId2") - EitherT( - Future( - TypecheckUpgrades - .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, optOldPkg2) - .toEither - ) - ).leftMap[DamlError] { - case err: UpgradeError => - Validation.Upgradeability.Error( - upgradingPackage = newPkgId1, - upgradedPackage = oldPkgId2, - upgradeError = err, - ) - case unhandledErr => - InternalError.Unhandled( - unhandledErr, - Some(s"Typechecking upgrades for $oldPkgId2 failed with unknown error."), - ) - } + val (newPkgId1, newPkg1) = newDar1 + val newPkgId1WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(newDar1) + val (oldPkgId2, oldPkg2) = oldDar2 + val oldPkgId2WithMeta: PkgIdWithNameAndVersion = PkgIdWithNameAndVersion(oldDar2) + logger.info(s"Package ${newPkgId1WithMeta} claims to upgrade package id ${oldPkgId2WithMeta}") + EitherT( + Future( + TypecheckUpgrades + .typecheckUpgrades(packageMap, (newPkgId1, newPkg1), oldPkgId2, Some(oldPkg2)) + .toEither + ) + ).leftMap[DamlError] { + case err: UpgradeError => + Validation.Upgradeability.Error( + newPackage = newPkgId1WithMeta, + oldPackage = oldPkgId2WithMeta, + upgradeError = err, + phase = phase, + ) + case unhandledErr => + InternalError.Unhandled( + unhandledErr, + Some(s"Typechecking upgrades for ${oldPkgId2WithMeta} failed with unknown error."), + ) } } @@ -265,9 +265,8 @@ class PackageUpgradeValidator( strictTypecheckUpgrades( typecheckPhase, packageMap, - Some((newPkgId1, newPkg1)), - oldPkgId2, - Some(oldPkg2), + (newPkgId1, newPkg1), + (oldPkgId2, oldPkg2), ) } } diff --git a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala index d714f3b98763..6c2d0aa0d31c 100644 --- a/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala +++ b/sdk/canton/community/ledger/ledger-common/src/main/scala/com/digitalasset/canton/ledger/error/PackageServiceErrors.scala @@ -7,8 +7,10 @@ import com.daml.error.* import com.digitalasset.canton.ledger.error.groups.CommandExecutionErrors import com.digitalasset.daml.lf.archive.Error as LfArchiveError import com.digitalasset.daml.lf.data.Ref +import com.digitalasset.daml.lf.language.Ast +import com.digitalasset.daml.lf.language.Util import com.digitalasset.daml.lf.engine.Error -import com.digitalasset.daml.lf.validation.UpgradeError +import com.digitalasset.daml.lf.validation.{UpgradeError, TypecheckUpgrades} import com.digitalasset.daml.lf.{VersionRange, language, validation} import ParticipantErrorGroup.LedgerApiErrorGroup.PackageServiceErrorGroup @@ -266,15 +268,18 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - upgradingPackage: Ref.PackageId, - upgradedPackage: Ref.PackageId, + oldPackage: Util.PkgIdWithNameAndVersion, + newPackage: Util.PkgIdWithNameAndVersion, upgradeError: UpgradeError, + phase: TypecheckUpgrades.UploadPhaseCheck )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = - s"The DAR contains a package which claims to upgrade another package, but basic checks indicate the package is not a valid upgrade. Upgrading package: $upgradingPackage; Upgraded package: $upgradedPackage; Reason: ${upgradeError.prettyInternal}" - ) + cause = + phase match { + case TypecheckUpgrades.MaximalDarCheck => s"The uploaded DAR contains a package $newPackage, but upgrade checks indicate that new package $newPackage cannot be an upgrade of existing package $oldPackage. Reason: ${upgradeError.prettyInternal}" + case TypecheckUpgrades.MinimalDarCheck => s"The uploaded DAR contains a package $oldPackage, but upgrade checks indicate that existing package $newPackage cannot be an upgrade of new package $oldPackage. Reason: ${upgradeError.prettyInternal}" + }) } @Explanation( @@ -287,15 +292,15 @@ object PackageServiceErrors extends PackageServiceErrorGroup { ErrorCategory.InvalidIndependentOfSystemState, ) { final case class Error( - uploadedPackageId: Ref.PackageId, + uploadedPackage: Util.PkgIdWithNameAndVersion, existingPackage: Ref.PackageId, packageVersion: Ref.PackageVersion, )(implicit val loggingContext: ContextualizedErrorLogger ) extends DamlError( - cause = "A DAR with the same version number has previously been uploaded.", + cause = s"Tried to upload package $uploadedPackage, but a different package $existingPackage with the same name and version has previously been uploaded.", extraContext = Map( - "uploadedPackageId" -> uploadedPackageId, + "uploadedPackageId" -> uploadedPackage, "existingPackage" -> existingPackage, "packageVersion" -> packageVersion.toString, ), diff --git a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala index 8bb8571f7431..8484be8c7ac7 100644 --- a/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala +++ b/sdk/canton/daml-common-staging/daml-errors/src/main/scala/com/daml/error/SerializableErrorComponents.scala @@ -215,7 +215,7 @@ private[error] final case class NonSecuritySensitiveErrorCodeComponents( private[error] object NonSecuritySensitiveErrorCodeComponents { /** The maximum size (in characters) of the self-service error description, truncated for transport as part of a Status */ - val MaxCauseLogLength = 512 + val MaxCauseLogLength = 1024 private[error] def truncateDetails( context: Map[String, String], From d015b22215ad65bd22ad45f3b0297754f1b72295 Mon Sep 17 00:00:00 2001 From: Andreas Triantafyllos Date: Mon, 21 Oct 2024 17:37:15 +0200 Subject: [PATCH 6/6] [CHERRY-PICK ME] for GetUpdatesRequest. --- .../ledger/javaapi/data/ActiveContracts.java | 14 ++----- .../java/com/daml/quickstart/iou/IouMain.java | 11 ++--- .../daml/ledger/rxjava/EventQueryClient.java | 2 +- .../com/daml/ledger/rxjava/PackageClient.java | 2 +- .../com/daml/ledger/rxjava/StateClient.java | 2 +- .../com/daml/ledger/rxjava/UpdateClient.java | 21 +++++++--- .../ledger/rxjava/UserManagementClient.java | 2 +- .../ledger/rxjava/grpc/UpdateClientImpl.java | 32 ++++++++------ .../rxjava/grpc/UpdateClientImplTest.scala | 42 +++++++++---------- .../grpc/helpers/UpdateServiceImpl.scala | 35 ++++++++-------- .../build-and-lint-test/src/__tests__/test.ts | 6 +-- 11 files changed, 87 insertions(+), 82 deletions(-) diff --git a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java index 09396bbba07c..84cc2981aa69 100644 --- a/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java +++ b/sdk/canton/community/bindings-java/src/main/java/com/daml/ledger/javaapi/data/ActiveContracts.java @@ -10,17 +10,13 @@ public final class ActiveContracts { - public final Optional offset; - public final List activeContracts; public final String workflowId; public ActiveContracts( - @NonNull Optional offset, @NonNull List activeContracts, @NonNull String workflowId) { - this.offset = offset; this.activeContracts = activeContracts; this.workflowId = workflowId; } @@ -28,10 +24,7 @@ public ActiveContracts( @Override public String toString() { return "ActiveContracts{" - + "offset='" - + offset - + '\'' - + ", activeContracts=" + + "activeContracts=" + activeContracts + ", workflowId=" + workflowId @@ -43,13 +36,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ActiveContracts that = (ActiveContracts) o; - return offset.equals(that.offset) - && Objects.equals(activeContracts, that.activeContracts) + return Objects.equals(activeContracts, that.activeContracts) && Objects.equals(workflowId, that.workflowId); } @Override public int hashCode() { - return Objects.hash(offset, activeContracts, workflowId); + return Objects.hash(activeContracts, workflowId); } } diff --git a/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java b/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java index d4bef94c3599..80e7b0d77c3b 100644 --- a/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java +++ b/sdk/docs/source/app-dev/bindings-java/quickstart/template-root/src/main/java/com/daml/quickstart/iou/IouMain.java @@ -50,12 +50,11 @@ public static void main(String[] args) { ConcurrentHashMap contracts = new ConcurrentHashMap<>(); BiMap idMap = Maps.synchronizedBiMap(HashBiMap.create()); - Optional ledgerEndO = client.getStateClient().getLedgerEnd().blockingGet(); + Long ledgerEnd = client.getStateClient().getLedgerEnd().blockingGet().orElse(0L); client .getStateClient() - .getActiveContracts( - Iou.contractFilter(), Collections.singleton(party), true, ledgerEndO.orElse(0L)) + .getActiveContracts(Iou.contractFilter(), Collections.singleton(party), true, ledgerEnd) .blockingForEach( response -> { response.activeContracts.forEach( @@ -66,15 +65,13 @@ public static void main(String[] args) { }); }); - String ledgerEndString = ledgerEndO.map(num -> String.format("%018x", num)).orElse(""); - Disposable ignore = client .getTransactionsClient() .getTransactions( Iou.contractFilter(), - ledgerEndString, - ledgerEndString, + ledgerEnd, + Optional.of(ledgerEnd), Collections.singleton(party), true) .forEach( diff --git a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/EventQueryClient.java b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/EventQueryClient.java index db2bea25925a..88a1d42aa761 100644 --- a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/EventQueryClient.java +++ b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/EventQueryClient.java @@ -7,7 +7,7 @@ import io.reactivex.Single; import java.util.Set; -/** An RxJava version of {@link com.daml.ledger.api.v1.PackageServiceGrpc} */ +/** An RxJava version of {@link com.daml.ledger.api.v2.PackageServiceGrpc} */ public interface EventQueryClient { Single getEventsByContractId( String contractId, Set requestingParties); diff --git a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/PackageClient.java b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/PackageClient.java index bd35efd0a4cc..845f15e5114d 100644 --- a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/PackageClient.java +++ b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/PackageClient.java @@ -8,7 +8,7 @@ import io.reactivex.Flowable; import io.reactivex.Single; -/** An RxJava version of {@link com.daml.ledger.api.v1.PackageServiceGrpc} */ +/** An RxJava version of {@link com.daml.ledger.api.v2.PackageServiceGrpc} */ public interface PackageClient { Flowable listPackages(); diff --git a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/StateClient.java b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/StateClient.java index 0522cc2a4e4c..02772f5bc349 100644 --- a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/StateClient.java +++ b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/StateClient.java @@ -9,7 +9,7 @@ import java.util.Optional; import java.util.Set; -/** An RxJava version of {@link com.daml.ledger.api.v1.ActiveContractsServiceGrpc} */ +/** An RxJava version of {@link com.daml.ledger.api.v2.StateServiceGrpc} */ public interface StateClient { Flowable getActiveContracts( diff --git a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UpdateClient.java b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UpdateClient.java index a645c9028860..bae790530ea6 100644 --- a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UpdateClient.java +++ b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UpdateClient.java @@ -6,29 +6,38 @@ import com.daml.ledger.javaapi.data.*; import io.reactivex.Flowable; import io.reactivex.Single; +import java.util.Optional; import java.util.Set; /** An RxJava version of {@link com.daml.ledger.api.v2.UpdateServiceGrpc} */ public interface UpdateClient { Flowable getTransactions( - String begin, String end, TransactionFilter filter, boolean verbose); + Long begin, Optional end, TransactionFilter filter, boolean verbose); Flowable getTransactions( ContractFilter contractFilter, - String begin, - String end, + Long begin, + Optional end, Set parties, boolean verbose); Flowable getTransactions( - String begin, String end, TransactionFilter filter, boolean verbose, String accessToken); + Long begin, + Optional end, + TransactionFilter filter, + boolean verbose, + String accessToken); Flowable getTransactionsTrees( - String begin, String end, TransactionFilter filter, boolean verbose); + Long begin, Optional end, TransactionFilter filter, boolean verbose); Flowable getTransactionsTrees( - String begin, String end, TransactionFilter filter, boolean verbose, String accessToken); + Long begin, + Optional end, + TransactionFilter filter, + boolean verbose, + String accessToken); Single getTransactionTreeByEventId( String eventId, Set requestingParties); diff --git a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UserManagementClient.java b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UserManagementClient.java index 95222d394915..24f00aa2e998 100644 --- a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UserManagementClient.java +++ b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/UserManagementClient.java @@ -7,7 +7,7 @@ import io.reactivex.Single; import org.checkerframework.checker.nullness.qual.NonNull; -/** An RxJava version of {@link com.daml.ledger.api.v1.admin.UserManagementServiceGrpc} */ +/** An RxJava version of {@link com.daml.ledger.api.v2.admin.UserManagementServiceGrpc} */ public interface UserManagementClient { Single createUser(@NonNull CreateUserRequest request); diff --git a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/UpdateClientImpl.java b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/UpdateClientImpl.java index f16cdfa42d54..736d14c58636 100644 --- a/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/UpdateClientImpl.java +++ b/sdk/language-support/java/bindings-rxjava/src/main/java/com/daml/ledger/rxjava/grpc/UpdateClientImpl.java @@ -47,8 +47,8 @@ private Flowable extractTransactions( } private Flowable getTransactions( - String begin, - String end, + Long begin, + Optional end, TransactionFilter filter, boolean verbose, Optional accessToken) { @@ -59,20 +59,24 @@ private Flowable getTransactions( @Override public Flowable getTransactions( - String begin, String end, TransactionFilter filter, boolean verbose) { + Long begin, Optional end, TransactionFilter filter, boolean verbose) { return getTransactions(begin, end, filter, verbose, Optional.empty()); } @Override public Flowable getTransactions( - String begin, String end, TransactionFilter filter, boolean verbose, String accessToken) { + Long begin, + Optional end, + TransactionFilter filter, + boolean verbose, + String accessToken) { return getTransactions(begin, end, filter, verbose, Optional.of(accessToken)); } private Flowable getTransactions( ContractFilter contractFilter, - String begin, - String end, + Long begin, + Optional end, Set parties, boolean verbose, Optional accessToken) { @@ -82,8 +86,8 @@ private Flowable getTransactions( public Flowable getTransactions( ContractFilter contractFilter, - String begin, - String end, + Long begin, + Optional end, Set parties, boolean verbose) { return getTransactions(contractFilter, begin, end, parties, verbose, Optional.empty()); @@ -101,8 +105,8 @@ private Flowable extractTransactionTrees( } private Flowable getTransactionsTrees( - String begin, - String end, + Long begin, + Optional end, TransactionFilter filter, boolean verbose, Optional accessToken) { @@ -113,13 +117,17 @@ private Flowable getTransactionsTrees( @Override public Flowable getTransactionsTrees( - String begin, String end, TransactionFilter filter, boolean verbose) { + Long begin, Optional end, TransactionFilter filter, boolean verbose) { return getTransactionsTrees(begin, end, filter, verbose, Optional.empty()); } @Override public Flowable getTransactionsTrees( - String begin, String end, TransactionFilter filter, boolean verbose, String accessToken) { + Long begin, + Optional end, + TransactionFilter filter, + boolean verbose, + String accessToken) { return getTransactionsTrees(begin, end, filter, verbose, Optional.of(accessToken)); } diff --git a/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/UpdateClientImplTest.scala b/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/UpdateClientImplTest.scala index a79d76970501..6835a066454c 100644 --- a/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/UpdateClientImplTest.scala +++ b/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/UpdateClientImplTest.scala @@ -5,6 +5,7 @@ package com.daml.ledger.rxjava.grpc import java.util.concurrent.TimeUnit import java.time.Instant +import java.util.Optional import com.daml.ledger.javaapi.data._ import com.daml.ledger.api.v2.TraceContextOuterClass.TraceContext @@ -15,7 +16,6 @@ import com.daml.ledger.rxjava.grpc.helpers.{DataLayerHelpers, LedgerServices, Te import com.daml.ledger.api.v2.transaction_filter.TemplateFilter import com.daml.ledger.api.v2.value.Identifier import com.daml.ledger.javaapi.data.TransactionFilter -import com.digitalasset.canton.platform.ApiOffset import io.reactivex.Observable import org.scalacheck.Shrink import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks @@ -61,7 +61,7 @@ final class UpdateClientImplTest implicit def tupleNoShrink[A, B]: Shrink[(A, B)] = Shrink.shrinkAny - private val ledgerBegin = "" + private val ledgerBegin: java.lang.Long = 0L private val emptyFilter = new TransactionFilter(Map.empty[String, data.Filter].asJava, None.toJava) @@ -69,13 +69,13 @@ final class UpdateClientImplTest it should "return transactions from the ledger" in forAll(ledgerContentGen) { case (ledgerContent, expectedTransactions) => - val ledgerEnd = ApiOffset.fromLong(expectedTransactions.lastOption.fold(0L)(_.getOffset)) + val ledgerEnd = expectedTransactions.lastOption.fold(0L)(_.getOffset) ledgerServices.withUpdateClient(Observable.fromIterable(ledgerContent.asJava)) { (transactionClient, _) => { val result: List[TransactionWithoutRecordTime] = transactionClient - .getTransactions(ledgerBegin, ledgerEnd, emptyFilter, false) + .getTransactions(ledgerBegin, Optional.of(long2Long(ledgerEnd)), emptyFilter, false) .timeout(TestConfiguration.timeoutInSeconds, TimeUnit.SECONDS) .blockingIterable() .asScala @@ -92,8 +92,8 @@ final class UpdateClientImplTest it should "pass start offset, end offset, transaction filter and verbose flag with the request" in { ledgerServices.withUpdateClient(Observable.empty()) { (transactionClient, transactionService) => - val begin = "1" - val end = "2" + val begin = 1L + val end = 2L val transactionFilter = new TransactionFilter( Map[String, data.Filter]( @@ -110,13 +110,13 @@ final class UpdateClientImplTest ) transactionClient - .getTransactions(begin, end, transactionFilter, true) + .getTransactions(begin, Optional.of(end), transactionFilter, true) .toList() .blockingGet() val request = transactionService.lastUpdatesRequest.get() - request.beginExclusive shouldBe "1" - request.endInclusive shouldBe "2" + request.beginExclusive shouldBe 1L + request.endInclusive shouldBe Some(2L) val filterByParty = request.filter.get.filtersByParty filterByParty.keySet shouldBe Set("Alice") filterByParty("Alice").cumulative @@ -139,11 +139,11 @@ final class UpdateClientImplTest it should "return transaction trees from the ledger" ignore forAll(ledgerContentTreeGen) { case (ledgerContent, expectedTransactionsTrees) => - val ledgerEnd = ApiOffset.fromLong(expectedTransactionsTrees.last.getOffset) + val ledgerEnd = expectedTransactionsTrees.last.getOffset ledgerServices.withUpdateClient(Observable.fromIterable(ledgerContent.asJava)) { (transactionClient, _) => transactionClient - .getTransactionsTrees(ledgerBegin, ledgerEnd, emptyFilter, false) + .getTransactionsTrees(ledgerBegin, Optional.of(ledgerEnd), emptyFilter, false) .blockingIterable() .asScala .toList shouldBe expectedTransactionsTrees @@ -154,8 +154,8 @@ final class UpdateClientImplTest it should "pass start offset, end offset, transaction filter and verbose flag with the request" in { ledgerServices.withUpdateClient(Observable.empty()) { (transactionClient, transactionService) => - val begin = "1" - val end = "2" + val begin = 1L + val end: Optional[java.lang.Long] = Optional.of(2L) val transactionFilter = new TransactionFilter( Map[String, data.Filter]( @@ -177,8 +177,8 @@ final class UpdateClientImplTest .blockingGet() val request = transactionService.lastUpdatesTreesRequest.get() - request.beginExclusive shouldBe "1" - request.endInclusive shouldBe "2" + request.beginExclusive shouldBe 1L + request.endInclusive shouldBe Some(2L) val filterByParty = request.filter.get.filtersByParty filterByParty.keySet shouldBe Set("Alice") filterByParty("Alice").cumulative @@ -271,7 +271,7 @@ final class UpdateClientImplTest withClue("getTransactions specifying end") { expectUnauthenticated { toAuthenticatedServer( - _.getTransactions(ledgerBegin, "1", filterFor(someParty), false) + _.getTransactions(ledgerBegin, Optional.of(1L), filterFor(someParty), false) .timeout(TestConfiguration.timeoutInSeconds, TimeUnit.SECONDS) .blockingIterable() .asScala @@ -282,7 +282,7 @@ final class UpdateClientImplTest withClue("getTransactionsTree specifying end") { expectUnauthenticated { toAuthenticatedServer( - _.getTransactionsTrees(ledgerBegin, "1", filterFor(someParty), false) + _.getTransactionsTrees(ledgerBegin, Optional.of(1L), filterFor(someParty), false) .timeout(TestConfiguration.timeoutInSeconds, TimeUnit.SECONDS) .blockingIterable() .asScala @@ -322,7 +322,7 @@ final class UpdateClientImplTest toAuthenticatedServer( _.getTransactions( ledgerBegin, - "1", + Optional.of(1L: java.lang.Long), filterFor(someParty), false, someOtherPartyReadWriteToken, @@ -339,7 +339,7 @@ final class UpdateClientImplTest toAuthenticatedServer( _.getTransactionsTrees( ledgerBegin, - "1", + Optional.of(1L), filterFor(someParty), false, someOtherPartyReadWriteToken, @@ -390,7 +390,7 @@ final class UpdateClientImplTest toAuthenticatedServer( _.getTransactions( ledgerBegin, - "1", + Optional.of(1L: java.lang.Long), filterFor(someParty), false, somePartyReadWriteToken, @@ -405,7 +405,7 @@ final class UpdateClientImplTest toAuthenticatedServer( _.getTransactionsTrees( ledgerBegin, - "1", + Optional.of(1L), filterFor(someParty), false, somePartyReadWriteToken, diff --git a/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/UpdateServiceImpl.scala b/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/UpdateServiceImpl.scala index be6534077559..b901081dc0a8 100644 --- a/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/UpdateServiceImpl.scala +++ b/sdk/language-support/java/bindings-rxjava/src/test/scala/com/daml/ledger/rxjava/grpc/helpers/UpdateServiceImpl.scala @@ -39,23 +39,24 @@ final class UpdateServiceImpl(ledgerContent: Observable[LedgerItem]) ): Unit = { lastUpdatesRequest.set(request) - if (request.beginExclusive > request.endInclusive) { - val metadata = new Metadata() - metadata.put( - Metadata.Key.of("cause", Metadata.ASCII_STRING_MARSHALLER), - s"BEGIN should be strictly smaller than END. Found BEGIN '${request.beginExclusive}' and END '${request.endInclusive}'", - ) - responseObserver.onError(Status.INVALID_ARGUMENT.asRuntimeException(metadata)) - } else { - ledgerContent.subscribe(new Observer[LedgerItem] { - override def onSubscribe(d: Disposable): Unit = () - override def onNext(t: LedgerItem): Unit = - responseObserver.onNext( - GetUpdatesResponse(GetUpdatesResponse.Update.Transaction(t.toTransaction)) - ) - override def onError(t: Throwable): Unit = responseObserver.onError(t) - override def onComplete(): Unit = responseObserver.onCompleted() - }) + request.endInclusive match { + case Some(endInclusive) if request.beginExclusive > endInclusive => + val metadata = new Metadata() + metadata.put( + Metadata.Key.of("cause", Metadata.ASCII_STRING_MARSHALLER), + s"BEGIN should be strictly smaller than END. Found BEGIN '${request.beginExclusive}' and END '${request.endInclusive}'", + ) + responseObserver.onError(Status.INVALID_ARGUMENT.asRuntimeException(metadata)) + case _ => + ledgerContent.subscribe(new Observer[LedgerItem] { + override def onSubscribe(d: Disposable): Unit = () + override def onNext(t: LedgerItem): Unit = + responseObserver.onNext( + GetUpdatesResponse(GetUpdatesResponse.Update.Transaction(t.toTransaction)) + ) + override def onError(t: Throwable): Unit = responseObserver.onError(t) + override def onComplete(): Unit = responseObserver.onCompleted() + }) } } diff --git a/sdk/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts b/sdk/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts index 1a2c5fec4f46..5a2e320a6861 100644 --- a/sdk/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts +++ b/sdk/language-support/ts/codegen/tests/ts/build-and-lint-test/src/__tests__/test.ts @@ -672,9 +672,7 @@ test("exercise using explicit disclosure", async () => { { encoding: "utf8" }, ); - const ledgerEnd = Number(JSON.parse(ledgerEndResponse).offset) - .toString(16) - .padStart(18, "0"); + const ledgerEnd = Number(JSON.parse(ledgerEndResponse).offset); // TODO(https://digitalasset.atlassian.net/browse/LT-5) // The JSON API does not expose created_event_blob so we read it directly through the gRPC API. @@ -704,7 +702,7 @@ test("exercise using explicit disclosure", async () => { }, }, }, - beginExclusive: "", + beginExclusive: 0, endInclusive: ledgerEnd, }), "localhost:5011",