Skip to content

Commit

Permalink
[main] Update 2024-07-02.23 (#191)
Browse files Browse the repository at this point in the history
Reference commit: 19985bc98f

Co-authored-by: Canton <[email protected]>
  • Loading branch information
canton-machine and Canton authored Jul 3, 2024
1 parent 8068166 commit 85b4647
Show file tree
Hide file tree
Showing 543 changed files with 4,126 additions and 2,726 deletions.
1 change: 1 addition & 0 deletions community-build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ lazy val `sequencer-driver-lib` = CommunityProjects.`sequencer-driver-lib`
lazy val `community-reference-driver` = CommunityProjects.`community-reference-driver`
lazy val `wartremover-extension` = CommunityProjects.`wartremover-extension`
lazy val `google-common-protos-scala` = DamlProjects.`google-common-protos-scala`
lazy val `ledger-api-value` = DamlProjects.`ledger-api-value`
lazy val `ledger-api` = DamlProjects.`ledger-api`
lazy val `bindings-java` = DamlProjects.`bindings-java`
lazy val `ledger-common` = CommunityProjects.`ledger-common`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import com.daml.ledger.api.v2.state_service.{
IncompleteUnassigned,
}
import com.daml.ledger.api.v2.value.{Record, RecordField, Value}
import com.daml.lf.data.Time
import com.digitalasset.canton.admin.api.client.data.TemplateId
import com.digitalasset.canton.crypto.Salt
import com.digitalasset.canton.protocol.LfContractId
import com.digitalasset.canton.topology.DomainId
import com.digitalasset.canton.{LfPackageName, LfPackageVersion}
import com.digitalasset.daml.lf.data.Time
import com.google.protobuf.timestamp.Timestamp

/** Wrapper class to make scalapb LedgerApi classes more convenient to access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ object ParticipantAdminCommands {
response: CancellableContext
): Either[String, CancellableContext] = Right(response)

override def timeoutType: GrpcAdminCommand.TimeoutType =
GrpcAdminCommand.DefaultUnboundedTimeout
}

final case class ImportAcs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import com.digitalasset.canton.topology.transaction.{
import com.digitalasset.canton.version.ProtocolVersionValidation
import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import io.grpc.ManagedChannel
import io.grpc.Context.CancellableContext
import io.grpc.stub.StreamObserver
import io.grpc.{Context, ManagedChannel}

import java.time.Instant
import scala.concurrent.Future
Expand Down Expand Up @@ -571,12 +573,13 @@ object TopologyAdminCommands {
}

final case class GenesisState(
observer: StreamObserver[GenesisStateResponse],
filterDomainStore: Option[String],
timestamp: Option[CantonTimestamp],
) extends BaseCommand[
v30.GenesisStateRequest,
v30.GenesisStateResponse,
ByteString,
CancellableContext,
CancellableContext,
] {
override def createRequest(): Either[String, v30.GenesisStateRequest] = {
val domainStore = filterDomainStore.traverse(DomainId.fromString)
Expand All @@ -593,13 +596,17 @@ object TopologyAdminCommands {
override def submitRequest(
service: TopologyManagerReadServiceStub,
request: v30.GenesisStateRequest,
): Future[v30.GenesisStateResponse] = service.genesisState(request)
): Future[CancellableContext] = {
val context = Context.current().withCancellation()
context.run(() => service.genesisState(request, observer))
Future.successful(context)
}

override def handleResponse(
response: v30.GenesisStateResponse
): Either[String, ByteString] =
Right(response.genesisStateForSequencer)
// command will potentially take a long time
response: CancellableContext
): Either[String, CancellableContext] =
Right(response)

override def timeoutType: TimeoutType = DefaultUnboundedTimeout
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.daml.ledger.api.v2.value.{
RecordField,
Value,
}
import com.daml.lf.value.Value.ContractId
import com.daml.nonempty.NonEmpty
import com.daml.nonempty.NonEmptyReturningOps.*
import com.digitalasset.canton.admin.api.client.commands.LedgerApiTypeWrappers.ContractData
Expand Down Expand Up @@ -65,6 +64,7 @@ import com.digitalasset.canton.tracing.{NoTracing, TraceContext}
import com.digitalasset.canton.util.{BinaryFileUtil, EitherUtil}
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{DomainAlias, SequencerAlias}
import com.digitalasset.daml.lf.value.Value.ContractId
import com.google.protobuf.ByteString
import com.typesafe.scalalogging.LazyLogging
import io.circe.Encoder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.canton.console.commands

import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Future, Promise}
import scala.language.reflectiveCalls

private[commands] class ByteStringStreamObserver[
T <: ByteStringStreamObserver.ByteStringChunk
] extends StreamObserver[T] {
private val byteBuffer = new AtomicReference(Vector.empty[Byte])
private val requestComplete: Promise[ByteString] = Promise[ByteString]()

def result: Future[ByteString] =
requestComplete.future

override def onNext(value: T): Unit =
byteBuffer.getAndUpdate(_ ++ value.chunk.toByteArray).discard

override def onError(t: Throwable): Unit = {
requestComplete.tryFailure(t).discard
}

override def onCompleted(): Unit = {
val finalByteString = ByteString.copyFrom(byteBuffer.get().toArray)
requestComplete.trySuccess(finalByteString).discard
}
}

private[commands] object ByteStringStreamObserver {
type ByteStringChunk = { val chunk: ByteString }
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ import com.digitalasset.canton.util.ResourceUtil
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver

import java.io.FileOutputStream
import scala.concurrent.Promise
import scala.language.reflectiveCalls
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

private[commands] class GrpcByteChunksToFileObserver[
T <: GrpcByteChunksToFileObserver.ByteStringChunk
](
private[commands] class FileStreamObserver[T](
inputFile: File,
requestComplete: Promise[String],
converter: T => ByteString,
) extends StreamObserver[T] {
private val os: FileOutputStream = inputFile.newFileOutputStream(append = false)
private val os = inputFile.newFileOutputStream(append = false)
private val requestComplete: Promise[Unit] = Promise[Unit]()

def result: Future[Unit] = requestComplete.future
override def onNext(value: T): Unit = {
Try(os.write(value.chunk.toByteArray)) match {
Try(os.write(converter(value).toByteArray)) match {
case Failure(exception) =>
ResourceUtil.closeAndAddSuppressed(Some(exception), os)
throw exception
Expand All @@ -37,11 +35,7 @@ private[commands] class GrpcByteChunksToFileObserver[
}

override def onCompleted(): Unit = {
requestComplete.trySuccess(inputFile.pathAsString).discard
requestComplete.trySuccess(()).discard
ResourceUtil.closeAndAddSuppressed(None, os)
}
}

private[commands] object GrpcByteChunksToFileObserver {
type ByteStringChunk = { val chunk: ByteString }
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.digitalasset.canton.util.ResourceUtil
import io.grpc.StatusRuntimeException

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{Await, Promise, TimeoutException}
import scala.concurrent.{Await, TimeoutException}

class HealthAdministration[S <: data.NodeStatus.Status](
runner: AdminCommandRunner,
Expand Down Expand Up @@ -70,9 +70,8 @@ class HealthAdministration[S <: data.NodeStatus.Status](
timeout: NonNegativeDuration = timeouts.unbounded,
chunkSize: Option[Int] = None,
): String = consoleEnvironment.run {
val requestComplete = Promise[String]()
val responseObserver =
new GrpcByteChunksToFileObserver[v30.HealthDumpResponse](outputFile, requestComplete)
new FileStreamObserver[v30.HealthDumpResponse](outputFile, _.chunk)

def call = consoleEnvironment.run {
adminCommand(new StatusAdminCommands.GetHealthDump(responseObserver, chunkSize))
Expand All @@ -81,8 +80,8 @@ class HealthAdministration[S <: data.NodeStatus.Status](
try {
ResourceUtil.withResource(call) { _ =>
CommandSuccessful(
Await.result(requestComplete.future, timeout.duration)
)
Await.result(responseObserver.result, timeout.duration)
).map(_ => outputFile.pathAsString)
}
} catch {
case sre: StatusRuntimeException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import com.daml.ledger.javaapi.data.{
TransactionTree,
}
import com.daml.ledger.javaapi as javab
import com.daml.lf.data.Ref
import com.daml.metrics.api.MetricsContext
import com.daml.scalautil.Statement.discard
import com.digitalasset.canton.admin.api.client.commands.LedgerApiCommands
Expand Down Expand Up @@ -87,6 +86,7 @@ import com.digitalasset.canton.topology.{DomainId, ParticipantId, PartyId}
import com.digitalasset.canton.tracing.NoTracing
import com.digitalasset.canton.util.ResourceUtil
import com.digitalasset.canton.{LfPackageId, LfPartyId, config}
import com.digitalasset.daml.lf.data.Ref
import com.google.protobuf.field_mask.FieldMask
import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1515,12 +1515,12 @@ trait ParticipantAdministration extends FeatureFlagFilter {
if (!config.manualConnect) {
reconnect(config.domain.unwrap, retry = false).discard
// now update the domain settings to auto-connect
modify(config.domain.unwrap, _.copy(manualConnect = false))
modify(config.domain.unwrap, _.copy(manualConnect = false), validation)
}
// architecture-handbook-entry-end: OnboardParticipantConnect
} else if (!config.manualConnect) {
reconnect(config.domain, retry = false).discard
modify(config.domain.unwrap, _.copy(manualConnect = false))
modify(config.domain.unwrap, _.copy(manualConnect = false), validation)
}
synchronize.foreach { timeout =>
ConsoleMacros.utils.synchronize_topology(Some(timeout))(consoleEnvironment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ package com.digitalasset.canton.console.commands
import better.files.File
import cats.syntax.either.*
import cats.syntax.foldable.*
import com.digitalasset.canton.admin.api.client.commands.{
GrpcAdminCommand,
ParticipantAdminCommands,
}
import com.digitalasset.canton.admin.participant.v30.{ExportAcsRequest, ExportAcsResponse}
import com.digitalasset.canton.admin.api.client.commands.ParticipantAdminCommands
import com.digitalasset.canton.admin.participant.v30.ExportAcsResponse
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.config.{ConsoleCommandTimeout, NonNegativeDuration}
import com.digitalasset.canton.console.CommandErrors.GenericCommandError
import com.digitalasset.canton.console.{
AdminCommandRunner,
Expand All @@ -25,7 +23,6 @@ import com.digitalasset.canton.console.{
Helpful,
}
import com.digitalasset.canton.data.RepairContract
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.networking.grpc.GrpcError
import com.digitalasset.canton.participant.ParticipantNode
Expand All @@ -38,12 +35,11 @@ import com.digitalasset.canton.util.ResourceUtil
import com.digitalasset.canton.version.ProtocolVersion
import com.digitalasset.canton.{DomainAlias, SequencerCounter}
import com.google.protobuf.ByteString
import io.grpc.Context.CancellableContext
import io.grpc.StatusRuntimeException

import java.time.Instant
import java.util.UUID
import scala.concurrent.{Await, Promise, TimeoutException}
import scala.concurrent.{Await, TimeoutException}

class ParticipantRepairAdministration(
val consoleEnvironment: ConsoleEnvironment,
Expand All @@ -52,6 +48,7 @@ class ParticipantRepairAdministration(
) extends FeatureFlagFilter
with NoTracing
with Helpful {
private def timeouts: ConsoleCommandTimeout = consoleEnvironment.commandTimeouts

@Help.Summary("Purge contracts with specified Contract IDs from local participant.")
@Help.Description(
Expand Down Expand Up @@ -145,59 +142,32 @@ class ParticipantRepairAdministration(
timestamp: Option[Instant] = None,
contractDomainRenames: Map[DomainId, (DomainId, ProtocolVersion)] = Map.empty,
force: Boolean = false,
timeout: NonNegativeDuration = timeouts.unbounded,
): Unit = {
check(FeatureFlag.Repair) {
val collector = AcsSnapshotFileCollector[ExportAcsRequest, ExportAcsResponse](outputFile)
val command = ParticipantAdminCommands.ParticipantRepairManagement
.ExportAcs(
parties,
partiesOffboarding = partiesOffboarding,
filterDomainId,
timestamp,
collector.observer,
contractDomainRenames,
force = force,
)
collector.materializeFile(command)
}
}

private case class AcsSnapshotFileCollector[
Req,
Resp <: GrpcByteChunksToFileObserver.ByteStringChunk,
](outputFile: String) {
private val target = File(outputFile)
private val requestComplete = Promise[String]()
val observer = new GrpcByteChunksToFileObserver[Resp](
target,
requestComplete,
)
private val timeout = consoleEnvironment.commandTimeouts.ledgerCommand

def materializeFile(
command: GrpcAdminCommand[
Req,
CancellableContext,
CancellableContext,
]
): Unit = {
consoleEnvironment.run {
val file = File(outputFile)
val responseObserver = new FileStreamObserver[ExportAcsResponse](file, _.chunk)

def call = consoleEnvironment.run {
runner.adminCommand(
command
ParticipantAdminCommands.ParticipantRepairManagement
.ExportAcs(
parties,
partiesOffboarding = partiesOffboarding,
filterDomainId,
timestamp,
responseObserver,
contractDomainRenames,
force = force,
)
)
}

try {
ResourceUtil.withResource(call) { _ =>
CommandSuccessful(
Await
.result(
requestComplete.future,
timeout.duration,
)
.discard
Await.result(responseObserver.result, timeout.duration)
)
}
} catch {
Expand All @@ -206,7 +176,7 @@ class ParticipantRepairAdministration(
GrpcError("Generating acs snapshot file", "download_acs_snapshot", sre).toString
)
case _: TimeoutException =>
target.delete(swallowIOExceptions = true)
file.delete(swallowIOExceptions = true)
CommandErrors.ConsoleTimeout.Error(timeout.asJavaApproximation)
}
}
Expand Down
Loading

0 comments on commit 85b4647

Please sign in to comment.