Skip to content

Commit

Permalink
Update 2023-08-15.23 (#107)
Browse files Browse the repository at this point in the history
Reference commit: 6fb7b89ad6

Co-authored-by: Canton <[email protected]>
  • Loading branch information
canton-machine and Canton authored Aug 16, 2023
1 parent e296d31 commit 7005ee0
Show file tree
Hide file tree
Showing 118 changed files with 2,232 additions and 897 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import com.digitalasset.canton.config.NonNegativeDuration
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.console.ConsoleEnvironment.Implicits.*
import com.digitalasset.canton.logging.{LastErrorsAppender, NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.participant.admin.{RepairService, SyncStateInspection}
import com.digitalasset.canton.participant.admin.RepairService
import com.digitalasset.canton.participant.admin.inspection.SyncStateInspection
import com.digitalasset.canton.participant.config.{AuthServiceConfig, BaseParticipantConfig}
import com.digitalasset.canton.participant.ledger.api.JwtTokenUtilities
import com.digitalasset.canton.protocol.{LfContractId, SerializableContract}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ import com.digitalasset.canton.health.admin.data.ParticipantStatus
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger}
import com.digitalasset.canton.participant.ParticipantNode
import com.digitalasset.canton.participant.admin.grpc.TransferSearchResult
import com.digitalasset.canton.participant.admin.inspection.SyncStateInspection
import com.digitalasset.canton.participant.admin.v0.PruningServiceGrpc
import com.digitalasset.canton.participant.admin.v0.PruningServiceGrpc.PruningServiceStub
import com.digitalasset.canton.participant.admin.{ResourceLimits, SyncStateInspection, v0}
import com.digitalasset.canton.participant.admin.{ResourceLimits, v0}
import com.digitalasset.canton.participant.domain.DomainConnectionConfig
import com.digitalasset.canton.participant.sync.TimestampedEvent
import com.digitalasset.canton.protocol.messages.{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sdk-version: 2.8.0-snapshot.20230807.12017.0.v7ba1e675
sdk-version: 2.8.0-snapshot.20230811.12033.0.v2aad9b4e
sandbox-options:
- --wall-clock-time
name: contact
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
sdk-version: 2.8.0-snapshot.20230807.12017.0.v7ba1e675
sdk-version: 2.8.0-snapshot.20230811.12033.0.v2aad9b4e
sandbox-options:
- --wall-clock-time
name: message
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";

package com.digitalasset.canton.protocol.v0;

import "google/rpc/status.proto";
import "scalapb/scalapb.proto";

/* DA's wrapper around `google.rpc.Status` for the proper versioning and code-gen
*/
message VersionedStatus {
option (scalapb.message).companion_extends = "com.digitalasset.canton.version.StorageProtoVersion";

google.rpc.Status status = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import "com/digitalasset/canton/protocol/v0/sequencing.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "google/rpc/status.proto";
import "scalapb/scalapb.proto";

message StaticDomainParameters {
Expand Down Expand Up @@ -72,7 +73,7 @@ message SequencedEvent {
google.protobuf.StringValue message_id = 4; // Optional: Present for DeliverError, present for the sender of a Deliver.
// Changed in v1 from v0.CompressedBatch to v1.CompressedBatch
CompressedBatch batch = 5; // Optional: Present for Deliver, absent for DeliverError
v0.DeliverErrorReason deliver_error_reason = 6; // Optional: Present for DeliverError, absent for other events
google.rpc.Status deliver_error_reason = 6; // Optional: Present for DeliverError, absent for other events
}

message SignedContent {
Expand Down
12 changes: 12 additions & 0 deletions community/base/src/main/protobuf/google/rpc/package.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";
package google.rpc;

import "scalapb/scalapb.proto";

option (scalapb.options) = {
scope: PACKAGE
flat_package: false
};
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.digitalasset.canton.data
import cats.syntax.either.*
import com.digitalasset.canton.*
import com.digitalasset.canton.crypto.*
import com.digitalasset.canton.data.CommonMetadata.singleMediatorError
import com.digitalasset.canton.logging.pretty.Pretty
import com.digitalasset.canton.protocol.{v0, *}
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
Expand Down Expand Up @@ -49,8 +50,7 @@ final case class CommonMetadata private (

@transient override protected lazy val companionObj: CommonMetadata.type = CommonMetadata

private[CommonMetadata] def toProtoV0: v0.CommonMetadata = {
require(isEquivalentTo(ProtocolVersion.v3))
private def toProtoV0: v0.CommonMetadata = {
mediator match {
case MediatorRef.Single(mediatorId) =>
v0.CommonMetadata(
Expand All @@ -61,15 +61,12 @@ final case class CommonMetadata private (
mediatorId = mediatorId.toProtoPrimitive,
)
case _ =>
throw new IllegalStateException(
s"Only single mediator exist in for the representative protocol version $representativeProtocolVersion"
)
throw new IllegalStateException(singleMediatorError(representativeProtocolVersion))
}
}

private[CommonMetadata] def toProtoV1: v1.CommonMetadata = {
private def toProtoV1: v1.CommonMetadata = {
// TODO(#12373) Adapt when releasing BFT
require(isEquivalentTo(ProtocolVersion.dev))
v1.CommonMetadata(
confirmationPolicy = confirmationPolicy.toProtoPrimitive,
domainId = domainId.toProtoPrimitive,
Expand Down Expand Up @@ -99,20 +96,53 @@ object CommonMetadata
),
)

def apply(
hashOps: HashOps
private def singleMediatorError(
rpv: RepresentativeProtocolVersion[CommonMetadata.type]
): String = s"Only single mediator exist in for the representative protocol version $rpv"

private[data] def shouldHaveSingleMediator(
rpv: RepresentativeProtocolVersion[CommonMetadata.type]
): Boolean = rpv == protocolVersionRepresentativeFor(ProtocolVersion.v3)

def create(
hashOps: HashOps,
protocolVersion: ProtocolVersion,
)(
confirmationPolicy: ConfirmationPolicy,
domain: DomainId,
mediator: MediatorRef,
salt: Salt,
uuid: UUID,
protocolVersion: ProtocolVersion,
): CommonMetadata = CommonMetadata(confirmationPolicy, domain, mediator, salt, uuid)(
): Either[String, CommonMetadata] = create(
hashOps,
protocolVersionRepresentativeFor(protocolVersion),
None,
)
)(confirmationPolicy, domain, mediator, salt, uuid)

def create(
hashOps: HashOps,
protocolVersion: RepresentativeProtocolVersion[CommonMetadata.type],
)(
confirmationPolicy: ConfirmationPolicy,
domain: DomainId,
mediator: MediatorRef,
salt: Salt,
uuid: UUID,
): Either[String, CommonMetadata] = {

mediator match {
case MediatorRef.Group(_) if shouldHaveSingleMediator(protocolVersion) =>
Left(singleMediatorError(protocolVersion))

case _ =>
Right(
CommonMetadata(confirmationPolicy, domain, mediator, salt, uuid)(
hashOps,
protocolVersion,
None,
)
)
}
}

private def fromProtoV0(hashOps: HashOps, metaDataP: v0.CommonMetadata)(
bytes: ByteString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ trait BaseCantonError extends BaseError {
/** The error code, usually passed in as implicit where the error class is defined */
def code: ErrorCode

def rpcStatusWithoutLoggingContext(): com.google.rpc.status.Status = rpcStatus()(NoLogging)

def rpcStatus(
overrideCode: Option[Status.Code] = None
)(implicit loggingContext: ErrorLoggingContext): com.google.rpc.status.Status = {
)(implicit loggingContext: ContextualizedErrorLogger): com.google.rpc.status.Status = {
import scala.jdk.CollectionConverters.*
val status0: com.google.rpc.Status = code.asGrpcStatus(this)

val details: Seq[com.google.protobuf.Any] = status0.getDetailsList.asScala.toSeq
val detailsScalapb = details.map(com.google.protobuf.any.Any.fromJavaProto)

Expand Down Expand Up @@ -169,17 +172,24 @@ object BaseCantonError {
)(implicit override val code: ErrorCode)
extends BaseCantonError {}

def isStatusErrorCode(errorCode: ErrorCode, status: com.google.rpc.status.Status): Boolean = {
def isStatusErrorCode(errorCode: ErrorCode, status: com.google.rpc.status.Status): Boolean =
extractStatusErrorCodeMessage(errorCode, status).isDefined

def extractStatusErrorCodeMessage(
errorCode: ErrorCode,
status: com.google.rpc.status.Status,
): Option[String] = {
val code = errorCode.category.grpcCode.getOrElse(
throw new IllegalArgumentException(s"Error code $errorCode does not have a gRPC code")
)
if (status.code == code.value()) {
status.details.exists { any =>
if (any.is(ErrorInfo.messageCompanion)) {
Try(any.unpack(ErrorInfo.messageCompanion)).toOption.exists(_.reason == errorCode.id)
} else false
status.details.collectFirst {
case any
if (any.is(ErrorInfo) && Try(any.unpack(ErrorInfo))
.fold(_ => false, _.reason == errorCode.id)) =>
status.message
}
} else false
} else None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,7 @@ object ConfirmationPolicy {
override def requiredTrustLevel: TrustLevel = TrustLevel.Ordinary
}

case object Full extends ConfirmationPolicy {
override val name = "Full"
protected override val index: Int = 2

override def informeesAndThreshold(node: LfActionNode, topologySnapshot: TopologySnapshot)(
implicit ec: ExecutionContext
): Future[(Set[Informee], NonNegativeInt)] = {
val informees = node.informeesOfNode
require(
informees.nonEmpty,
"There must be at least one informee as every node must have at least one signatory.",
)
Future.successful(toInformeesAndThreshold(informees, Set.empty, TrustLevel.Ordinary))
}

override def requiredTrustLevel: TrustLevel = TrustLevel.Ordinary
}

val values: Seq[ConfirmationPolicy] = Seq[ConfirmationPolicy](Vip, Signatory, Full)
val values: Seq[ConfirmationPolicy] = Seq[ConfirmationPolicy](Vip, Signatory)

require(
values.zipWithIndex.forall { case (policy, index) => policy.index == index },
Expand All @@ -190,7 +172,6 @@ object ConfirmationPolicy {
/** Chooses appropriate confirmation policies for a transaction.
* It chooses [[Vip]] if every node has at least one VIP who knows the state
* It chooses [[Signatory]] if every node has a Participant that can confirm.
* It never chooses [[Full]].
*/
def choose(transaction: LfVersionedTransaction, topologySnapshot: TopologySnapshot)(implicit
ec: ExecutionContext
Expand Down Expand Up @@ -235,7 +216,7 @@ object ConfirmationPolicy {
DeterministicEncoding.decodeString(encodedName).flatMap {
case (Vip.name, _) => Right(Vip)
case (Signatory.name, _) => Right(Signatory)
case (badName, badBytes) =>
case (badName, _) =>
Left(DefaultDeserializationError(s"Invalid confirmation policy $badName"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,7 @@ import com.digitalasset.canton.crypto.HashOps
import com.digitalasset.canton.data.{FullInformeeTree, Informee, ViewPosition, ViewType}
import com.digitalasset.canton.logging.pretty.Pretty
import com.digitalasset.canton.protocol.messages.ProtocolMessage.ProtocolMessageContentCast
import com.digitalasset.canton.protocol.{
ConfirmationPolicy,
RequestId,
RootHash,
ViewHash,
v0,
v1,
v2,
v3,
}
import com.digitalasset.canton.protocol.{RequestId, RootHash, ViewHash, v0, v1, v2, v3}
import com.digitalasset.canton.serialization.ProtoConverter
import com.digitalasset.canton.serialization.ProtoConverter.ParsingResult
import com.digitalasset.canton.topology.{DomainId, MediatorRef}
Expand Down Expand Up @@ -127,7 +118,8 @@ case class InformeeMessage(fullInformeeTree: FullInformeeTree)(
override def toProtoSomeEnvelopeContentV3: v3.EnvelopeContent.SomeEnvelopeContent =
v3.EnvelopeContent.SomeEnvelopeContent.InformeeMessage(toProtoV1)

override def confirmationPolicy: ConfirmationPolicy = fullInformeeTree.confirmationPolicy
override def minimumThreshold(informees: Set[Informee]): NonNegativeInt =
fullInformeeTree.confirmationPolicy.minimumThreshold(informees)

override def rootHash: Option[RootHash] = Some(fullInformeeTree.transactionId.toRootHash)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ abstract class MalformedErrorCode(id: String, override val v0CodeP: v0.LocalReje
implicit override val code: MalformedErrorCode = this
}

trait LocalReject extends LocalVerdict with TransactionError {
sealed trait LocalReject extends LocalVerdict with TransactionError {

/** The first part of the cause. Typically the same for all instances of the particular type.
*/
Expand Down Expand Up @@ -192,7 +192,7 @@ trait LocalReject extends LocalVerdict with TransactionError {

/** Base class for LocalReject errors, if the rejection does not (necessarily) occur due to malicious behavior.
*/
abstract class LocalRejectImpl(
sealed abstract class LocalRejectImpl(
override val _causePrefix: String,
override val _details: String = "",
override val throwableO: Option[Throwable] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.digitalasset.canton.LfPartyId
import com.digitalasset.canton.config.RequireTypes.NonNegativeInt
import com.digitalasset.canton.data.{Informee, ViewPosition, ViewType}
import com.digitalasset.canton.protocol.messages.ProtocolMessage.ProtocolMessageContentCast
import com.digitalasset.canton.protocol.{ConfirmationPolicy, RequestId, RootHash, ViewHash}
import com.digitalasset.canton.protocol.{RequestId, RootHash, ViewHash}
import com.digitalasset.canton.topology.MediatorRef

import java.util.UUID
Expand Down Expand Up @@ -35,7 +35,7 @@ trait MediatorRequest extends ProtocolMessage with UnsignedProtocolMessage {
recipientParties: Set[LfPartyId],
): MediatorResult with SignedProtocolMessageContent

def confirmationPolicy: ConfirmationPolicy
def minimumThreshold(informees: Set[Informee]): NonNegativeInt

/** Returns the hash that all [[com.digitalasset.canton.protocol.messages.RootHashMessage]]s of the request batch should contain.
* [[scala.None$]] indicates that no [[com.digitalasset.canton.protocol.messages.RootHashMessage]] should be in the batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ final case class TransferInMediatorMessage(tree: TransferInViewTree)
Map(tree.viewPosition -> ((confirmingParties, threshold)))
}

override def confirmationPolicy: ConfirmationPolicy = ConfirmationPolicy.Full
override def minimumThreshold(informees: Set[Informee]): NonNegativeInt = NonNegativeInt.one

override def createMediatorResult(
requestId: RequestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ final case class TransferOutMediatorMessage(
Map(tree.viewPosition -> ((confirmingParties, threshold)))
}

override def confirmationPolicy: ConfirmationPolicy = ConfirmationPolicy.Full
override def minimumThreshold(informees: Set[Informee]): NonNegativeInt = NonNegativeInt.one

override def createMediatorResult(
requestId: RequestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import com.digitalasset.canton.logging.TracedLogger
import com.digitalasset.canton.sequencing.protocol.{
Deliver,
DeliverError,
DeliverErrorReason,
Envelope,
SequencerErrors,
}
import com.digitalasset.canton.tracing.TraceContext

Expand Down Expand Up @@ -60,9 +60,9 @@ object SendResult {
result match {
case SendResult.Success(_) =>
FutureUnlessShutdown.pure(())
// TODO(i13155): Use a dedicated signalling mechanism for this case
case SendResult.Error(DeliverError(_, _, _, _, DeliverErrorReason.BatchRefused(message)))
if message.contains("was previously delivered at") =>
case SendResult.Error(
DeliverError(_, _, _, _, SequencerErrors.AggregateSubmissionAlreadySent(_))
) =>
// Stop retrying
FutureUnlessShutdown.unit
case SendResult.Error(error) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,6 @@ class SequencerClientImpl(
SyncCloseable("sequencer-client-subscription", sequencersTransportState.close()),
SyncCloseable("handler-becomes-idle", waitForHandlerToComplete()),
SyncCloseable("sequencer-client-recorder", recorderO.foreach(_.close())),
SyncCloseable("sequenced-event-store", sequencedEventStore.close()),
SyncCloseable("deferred-subscription-health", deferredSubscriptionHealth.close()),
)
}
Expand Down
Loading

0 comments on commit 7005ee0

Please sign in to comment.