From cbf1a9296d6fac01d9e64dd98dc5b44538060963 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:08:25 -0300 Subject: [PATCH 1/8] feat: store generated changeset into ingestion log --- diode-proto/diode/v1/reconciler.proto | 7 + .../diode/v1/reconcilerpb/reconciler.pb.go | 346 +++++++++++------- .../v1/reconcilerpb/reconciler.pb.validate.go | 132 +++++++ .../reconciler/ingestion_processor.go | 29 +- 4 files changed, 382 insertions(+), 132 deletions(-) diff --git a/diode-proto/diode/v1/reconciler.proto b/diode-proto/diode/v1/reconciler.proto index bc9f327a..4af3f85b 100644 --- a/diode-proto/diode/v1/reconciler.proto +++ b/diode-proto/diode/v1/reconciler.proto @@ -70,6 +70,12 @@ message IngestionMetrics { int32 no_changes = 5; } +// A change set +message ChangeSet { + string id = 1; // A change set ID + string data = 2; // An arbitrary JSON object representing the change set +} + // An ingestion log message IngestionLog { string id = 1; @@ -83,6 +89,7 @@ message IngestionLog { string sdk_version = 9; diode.v1.Entity entity = 10; IngestionError error = 11; + ChangeSet change_set = 12; } // The request to retrieve ingestion logs diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go index 520b3557..2bf954c5 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go @@ -389,6 +389,62 @@ func (x *IngestionMetrics) GetNoChanges() int32 { return 0 } +// A change set +type ChangeSet struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // A change set ID + Data string `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // An arbitrary JSON object representing the change set +} + +func (x *ChangeSet) Reset() { + *x = ChangeSet{} + if protoimpl.UnsafeEnabled { + mi := &file_diode_v1_reconciler_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChangeSet) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChangeSet) ProtoMessage() {} + +func (x *ChangeSet) ProtoReflect() protoreflect.Message { + mi := &file_diode_v1_reconciler_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChangeSet.ProtoReflect.Descriptor instead. +func (*ChangeSet) Descriptor() ([]byte, []int) { + return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{5} +} + +func (x *ChangeSet) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ChangeSet) GetData() string { + if x != nil { + return x.Data + } + return "" +} + // An ingestion log type IngestionLog struct { state protoimpl.MessageState @@ -406,12 +462,13 @@ type IngestionLog struct { SdkVersion string `protobuf:"bytes,9,opt,name=sdk_version,json=sdkVersion,proto3" json:"sdk_version,omitempty"` Entity *diodepb.Entity `protobuf:"bytes,10,opt,name=entity,proto3" json:"entity,omitempty"` Error *IngestionError `protobuf:"bytes,11,opt,name=error,proto3" json:"error,omitempty"` + ChangeSet *ChangeSet `protobuf:"bytes,12,opt,name=change_set,json=changeSet,proto3" json:"change_set,omitempty"` } func (x *IngestionLog) Reset() { *x = IngestionLog{} if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[5] + mi := &file_diode_v1_reconciler_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -424,7 +481,7 @@ func (x *IngestionLog) String() string { func (*IngestionLog) ProtoMessage() {} func (x *IngestionLog) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[5] + mi := &file_diode_v1_reconciler_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -437,7 +494,7 @@ func (x *IngestionLog) ProtoReflect() protoreflect.Message { // Deprecated: Use IngestionLog.ProtoReflect.Descriptor instead. func (*IngestionLog) Descriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{5} + return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{6} } func (x *IngestionLog) GetId() string { @@ -517,6 +574,13 @@ func (x *IngestionLog) GetError() *IngestionError { return nil } +func (x *IngestionLog) GetChangeSet() *ChangeSet { + if x != nil { + return x.ChangeSet + } + return nil +} + // The request to retrieve ingestion logs type RetrieveIngestionLogsRequest struct { state protoimpl.MessageState @@ -536,7 +600,7 @@ type RetrieveIngestionLogsRequest struct { func (x *RetrieveIngestionLogsRequest) Reset() { *x = RetrieveIngestionLogsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[6] + mi := &file_diode_v1_reconciler_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -549,7 +613,7 @@ func (x *RetrieveIngestionLogsRequest) String() string { func (*RetrieveIngestionLogsRequest) ProtoMessage() {} func (x *RetrieveIngestionLogsRequest) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[6] + mi := &file_diode_v1_reconciler_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -562,7 +626,7 @@ func (x *RetrieveIngestionLogsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RetrieveIngestionLogsRequest.ProtoReflect.Descriptor instead. func (*RetrieveIngestionLogsRequest) Descriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{6} + return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{7} } func (x *RetrieveIngestionLogsRequest) GetPageSize() int32 { @@ -635,7 +699,7 @@ type RetrieveIngestionLogsResponse struct { func (x *RetrieveIngestionLogsResponse) Reset() { *x = RetrieveIngestionLogsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[7] + mi := &file_diode_v1_reconciler_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -648,7 +712,7 @@ func (x *RetrieveIngestionLogsResponse) String() string { func (*RetrieveIngestionLogsResponse) ProtoMessage() {} func (x *RetrieveIngestionLogsResponse) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[7] + mi := &file_diode_v1_reconciler_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -661,7 +725,7 @@ func (x *RetrieveIngestionLogsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RetrieveIngestionLogsResponse.ProtoReflect.Descriptor instead. func (*RetrieveIngestionLogsResponse) Descriptor() ([]byte, []int) { - return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{7} + return file_diode_v1_reconciler_proto_rawDescGZIP(), []int{8} } func (x *RetrieveIngestionLogsResponse) GetLogs() []*IngestionLog { @@ -698,7 +762,7 @@ type IngestionError_Details struct { func (x *IngestionError_Details) Reset() { *x = IngestionError_Details{} if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[8] + mi := &file_diode_v1_reconciler_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -711,7 +775,7 @@ func (x *IngestionError_Details) String() string { func (*IngestionError_Details) ProtoMessage() {} func (x *IngestionError_Details) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[8] + mi := &file_diode_v1_reconciler_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -760,7 +824,7 @@ type IngestionError_Details_Error struct { func (x *IngestionError_Details_Error) Reset() { *x = IngestionError_Details_Error{} if protoimpl.UnsafeEnabled { - mi := &file_diode_v1_reconciler_proto_msgTypes[9] + mi := &file_diode_v1_reconciler_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -773,7 +837,7 @@ func (x *IngestionError_Details_Error) String() string { func (*IngestionError_Details_Error) ProtoMessage() {} func (x *IngestionError_Details_Error) ProtoReflect() protoreflect.Message { - mi := &file_diode_v1_reconciler_proto_msgTypes[9] + mi := &file_diode_v1_reconciler_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -865,97 +929,103 @@ var file_diode_v1_reconciler_proto_rawDesc = []byte{ 0x61, 0x69, 0x6c, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x6f, 0x5f, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6e, 0x6f, 0x43, 0x68, 0x61, 0x6e, 0x67, - 0x65, 0x73, 0x22, 0x98, 0x03, 0x0a, 0x0c, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, - 0x4c, 0x6f, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x25, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, - 0x0f, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, - 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, - 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x69, 0x6e, - 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x73, 0x12, 0x2a, 0x0a, 0x11, 0x70, 0x72, 0x6f, - 0x64, 0x75, 0x63, 0x65, 0x72, 0x5f, 0x61, 0x70, 0x70, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x06, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x41, 0x70, - 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x30, 0x0a, 0x14, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, - 0x72, 0x5f, 0x61, 0x70, 0x70, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x07, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x12, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x41, 0x70, 0x70, - 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x64, 0x6b, 0x5f, 0x6e, - 0x61, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x64, 0x6b, 0x4e, 0x61, - 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x64, 0x6b, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x64, 0x6b, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x06, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x0a, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x45, - 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x06, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x2e, 0x0a, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x64, - 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, - 0x6e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xda, 0x02, - 0x0a, 0x1c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, - 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, - 0x0a, 0x09, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x05, 0x48, 0x00, 0x52, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x88, 0x01, 0x01, - 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, - 0x0f, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, - 0x48, 0x01, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x88, 0x01, 0x01, 0x12, 0x1b, 0x0a, 0x09, - 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x64, 0x61, 0x74, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x2c, 0x0a, 0x12, 0x69, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x73, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x54, - 0x73, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x28, 0x0a, 0x10, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, - 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x0e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x73, 0x45, 0x6e, 0x64, - 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x07, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x67, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, - 0x21, 0x0a, 0x0c, 0x6f, 0x6e, 0x6c, 0x79, 0x5f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, - 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x6f, 0x6e, 0x6c, 0x79, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, - 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0xa9, 0x01, 0x0a, 0x1d, 0x52, - 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, - 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x04, - 0x6c, 0x6f, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x64, 0x69, 0x6f, - 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, - 0x6f, 0x67, 0x52, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x12, 0x34, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, - 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x64, 0x69, 0x6f, 0x64, - 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x26, - 0x0a, 0x0f, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x6f, 0x6b, 0x65, - 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6e, 0x65, 0x78, 0x74, 0x50, 0x61, 0x67, - 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x2a, 0x4d, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, - 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, - 0x12, 0x07, 0x0a, 0x03, 0x4e, 0x45, 0x57, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x45, 0x43, - 0x4f, 0x4e, 0x43, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, - 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x4f, 0x5f, 0x43, 0x48, 0x41, 0x4e, - 0x47, 0x45, 0x53, 0x10, 0x04, 0x32, 0xfe, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, - 0x69, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7f, 0x0a, 0x1c, 0x52, - 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, - 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2d, 0x2e, 0x64, 0x69, - 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, - 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x64, 0x69, 0x6f, + 0x65, 0x73, 0x22, 0x2f, 0x0a, 0x09, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x22, 0xcc, 0x03, 0x0a, 0x0c, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, + 0x6e, 0x4c, 0x6f, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x25, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, + 0x32, 0x0f, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x67, 0x65, 0x73, + 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x69, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x73, 0x12, 0x2a, 0x0a, 0x11, 0x70, 0x72, + 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x5f, 0x61, 0x70, 0x70, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x41, + 0x70, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x30, 0x0a, 0x14, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, + 0x65, 0x72, 0x5f, 0x61, 0x70, 0x70, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x07, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x12, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x72, 0x41, 0x70, + 0x70, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x64, 0x6b, 0x5f, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x64, 0x6b, 0x4e, + 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x64, 0x6b, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x64, 0x6b, 0x56, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x06, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x0a, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x06, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x2e, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, + 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, + 0x6f, 0x6e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x32, + 0x0a, 0x0a, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x5f, 0x73, 0x65, 0x74, 0x18, 0x0c, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, + 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x52, 0x09, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, + 0x65, 0x74, 0x22, 0xda, 0x02, 0x0a, 0x1c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x09, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, + 0x7a, 0x65, 0x88, 0x01, 0x01, 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x48, 0x01, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x88, 0x01, + 0x01, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1d, + 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x2c, 0x0a, + 0x12, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x73, 0x5f, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x69, 0x6e, 0x67, 0x65, 0x73, + 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x73, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x28, 0x0a, 0x10, 0x69, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, + 0x54, 0x73, 0x45, 0x6e, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x6f, + 0x6b, 0x65, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x67, 0x65, 0x54, + 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x6f, 0x6e, 0x6c, 0x79, 0x5f, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x6f, 0x6e, 0x6c, 0x79, + 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x70, 0x61, 0x67, 0x65, + 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, + 0xa9, 0x01, 0x0a, 0x1d, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, + 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x2a, 0x0a, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x16, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, + 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x52, 0x04, 0x6c, 0x6f, 0x67, 0x73, 0x12, 0x34, 0x0a, + 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, + 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, + 0x69, 0x63, 0x73, 0x12, 0x26, 0x0a, 0x0f, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x70, 0x61, 0x67, 0x65, + 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6e, 0x65, + 0x78, 0x74, 0x50, 0x61, 0x67, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x2a, 0x4d, 0x0a, 0x05, 0x53, + 0x74, 0x61, 0x74, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, + 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x4e, 0x45, 0x57, 0x10, 0x01, 0x12, 0x0e, + 0x0a, 0x0a, 0x52, 0x45, 0x43, 0x4f, 0x4e, 0x43, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x12, 0x0a, + 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x4f, + 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x53, 0x10, 0x04, 0x32, 0xfe, 0x01, 0x0a, 0x11, 0x52, + 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x7f, 0x0a, 0x1c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, + 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, + 0x12, 0x2d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, + 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, + 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x2e, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, + 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, + 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x68, 0x0a, 0x15, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x26, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, - 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x68, 0x0a, 0x15, - 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, - 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x26, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, - 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, - 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, - 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0xa4, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x64, - 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0f, 0x52, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, - 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x62, 0x6f, 0x78, 0x6c, 0x61, 0x62, - 0x73, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2d, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x76, - 0x31, 0x2f, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x70, 0x62, 0xa2, 0x02, - 0x03, 0x44, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x56, 0x31, 0xca, - 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x44, 0x69, 0x6f, - 0x64, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0xea, 0x02, 0x09, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, + 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, + 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0xa4, 0x01, 0x0a, 0x0c, + 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0f, 0x52, 0x65, + 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, + 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x65, 0x74, 0x62, + 0x6f, 0x78, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x64, 0x69, 0x6f, 0x64, 0x65, 0x2f, 0x64, 0x69, 0x6f, + 0x64, 0x65, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x64, 0x69, + 0x6f, 0x64, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x65, 0x63, 0x6f, 0x6e, 0x63, 0x69, 0x6c, 0x65, + 0x72, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x44, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, + 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0xe2, + 0x02, 0x14, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x44, 0x69, 0x6f, 0x64, 0x65, 0x3a, 0x3a, + 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -971,7 +1041,7 @@ func file_diode_v1_reconciler_proto_rawDescGZIP() []byte { } var file_diode_v1_reconciler_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_diode_v1_reconciler_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_diode_v1_reconciler_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_diode_v1_reconciler_proto_goTypes = []any{ (State)(0), // 0: diode.v1.State (*IngestionDataSource)(nil), // 1: diode.v1.IngestionDataSource @@ -979,32 +1049,34 @@ var file_diode_v1_reconciler_proto_goTypes = []any{ (*RetrieveIngestionDataSourcesResponse)(nil), // 3: diode.v1.RetrieveIngestionDataSourcesResponse (*IngestionError)(nil), // 4: diode.v1.IngestionError (*IngestionMetrics)(nil), // 5: diode.v1.IngestionMetrics - (*IngestionLog)(nil), // 6: diode.v1.IngestionLog - (*RetrieveIngestionLogsRequest)(nil), // 7: diode.v1.RetrieveIngestionLogsRequest - (*RetrieveIngestionLogsResponse)(nil), // 8: diode.v1.RetrieveIngestionLogsResponse - (*IngestionError_Details)(nil), // 9: diode.v1.IngestionError.Details - (*IngestionError_Details_Error)(nil), // 10: diode.v1.IngestionError.Details.Error - (*diodepb.Entity)(nil), // 11: diode.v1.Entity + (*ChangeSet)(nil), // 6: diode.v1.ChangeSet + (*IngestionLog)(nil), // 7: diode.v1.IngestionLog + (*RetrieveIngestionLogsRequest)(nil), // 8: diode.v1.RetrieveIngestionLogsRequest + (*RetrieveIngestionLogsResponse)(nil), // 9: diode.v1.RetrieveIngestionLogsResponse + (*IngestionError_Details)(nil), // 10: diode.v1.IngestionError.Details + (*IngestionError_Details_Error)(nil), // 11: diode.v1.IngestionError.Details.Error + (*diodepb.Entity)(nil), // 12: diode.v1.Entity } var file_diode_v1_reconciler_proto_depIdxs = []int32{ 1, // 0: diode.v1.RetrieveIngestionDataSourcesResponse.ingestion_data_sources:type_name -> diode.v1.IngestionDataSource - 9, // 1: diode.v1.IngestionError.details:type_name -> diode.v1.IngestionError.Details + 10, // 1: diode.v1.IngestionError.details:type_name -> diode.v1.IngestionError.Details 0, // 2: diode.v1.IngestionLog.state:type_name -> diode.v1.State - 11, // 3: diode.v1.IngestionLog.entity:type_name -> diode.v1.Entity + 12, // 3: diode.v1.IngestionLog.entity:type_name -> diode.v1.Entity 4, // 4: diode.v1.IngestionLog.error:type_name -> diode.v1.IngestionError - 0, // 5: diode.v1.RetrieveIngestionLogsRequest.state:type_name -> diode.v1.State - 6, // 6: diode.v1.RetrieveIngestionLogsResponse.logs:type_name -> diode.v1.IngestionLog - 5, // 7: diode.v1.RetrieveIngestionLogsResponse.metrics:type_name -> diode.v1.IngestionMetrics - 10, // 8: diode.v1.IngestionError.Details.errors:type_name -> diode.v1.IngestionError.Details.Error - 2, // 9: diode.v1.ReconcilerService.RetrieveIngestionDataSources:input_type -> diode.v1.RetrieveIngestionDataSourcesRequest - 7, // 10: diode.v1.ReconcilerService.RetrieveIngestionLogs:input_type -> diode.v1.RetrieveIngestionLogsRequest - 3, // 11: diode.v1.ReconcilerService.RetrieveIngestionDataSources:output_type -> diode.v1.RetrieveIngestionDataSourcesResponse - 8, // 12: diode.v1.ReconcilerService.RetrieveIngestionLogs:output_type -> diode.v1.RetrieveIngestionLogsResponse - 11, // [11:13] is the sub-list for method output_type - 9, // [9:11] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 6, // 5: diode.v1.IngestionLog.change_set:type_name -> diode.v1.ChangeSet + 0, // 6: diode.v1.RetrieveIngestionLogsRequest.state:type_name -> diode.v1.State + 7, // 7: diode.v1.RetrieveIngestionLogsResponse.logs:type_name -> diode.v1.IngestionLog + 5, // 8: diode.v1.RetrieveIngestionLogsResponse.metrics:type_name -> diode.v1.IngestionMetrics + 11, // 9: diode.v1.IngestionError.Details.errors:type_name -> diode.v1.IngestionError.Details.Error + 2, // 10: diode.v1.ReconcilerService.RetrieveIngestionDataSources:input_type -> diode.v1.RetrieveIngestionDataSourcesRequest + 8, // 11: diode.v1.ReconcilerService.RetrieveIngestionLogs:input_type -> diode.v1.RetrieveIngestionLogsRequest + 3, // 12: diode.v1.ReconcilerService.RetrieveIngestionDataSources:output_type -> diode.v1.RetrieveIngestionDataSourcesResponse + 9, // 13: diode.v1.ReconcilerService.RetrieveIngestionLogs:output_type -> diode.v1.RetrieveIngestionLogsResponse + 12, // [12:14] is the sub-list for method output_type + 10, // [10:12] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_diode_v1_reconciler_proto_init() } @@ -1074,7 +1146,7 @@ func file_diode_v1_reconciler_proto_init() { } } file_diode_v1_reconciler_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*IngestionLog); i { + switch v := v.(*ChangeSet); i { case 0: return &v.state case 1: @@ -1086,7 +1158,7 @@ func file_diode_v1_reconciler_proto_init() { } } file_diode_v1_reconciler_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*RetrieveIngestionLogsRequest); i { + switch v := v.(*IngestionLog); i { case 0: return &v.state case 1: @@ -1098,7 +1170,7 @@ func file_diode_v1_reconciler_proto_init() { } } file_diode_v1_reconciler_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*RetrieveIngestionLogsResponse); i { + switch v := v.(*RetrieveIngestionLogsRequest); i { case 0: return &v.state case 1: @@ -1110,7 +1182,7 @@ func file_diode_v1_reconciler_proto_init() { } } file_diode_v1_reconciler_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*IngestionError_Details); i { + switch v := v.(*RetrieveIngestionLogsResponse); i { case 0: return &v.state case 1: @@ -1122,6 +1194,18 @@ func file_diode_v1_reconciler_proto_init() { } } file_diode_v1_reconciler_proto_msgTypes[9].Exporter = func(v any, i int) any { + switch v := v.(*IngestionError_Details); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_diode_v1_reconciler_proto_msgTypes[10].Exporter = func(v any, i int) any { switch v := v.(*IngestionError_Details_Error); i { case 0: return &v.state @@ -1134,14 +1218,14 @@ func file_diode_v1_reconciler_proto_init() { } } } - file_diode_v1_reconciler_proto_msgTypes[6].OneofWrappers = []any{} + file_diode_v1_reconciler_proto_msgTypes[7].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_diode_v1_reconciler_proto_rawDesc, NumEnums: 1, - NumMessages: 10, + NumMessages: 11, NumExtensions: 0, NumServices: 1, }, diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go index 8f2e960c..5b60f64c 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.validate.go @@ -682,6 +682,109 @@ var _ interface { ErrorName() string } = IngestionMetricsValidationError{} +// Validate checks the field values on ChangeSet with the rules defined in the +// proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. +func (m *ChangeSet) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on ChangeSet with the rules defined in +// the proto definition for this message. If any rules are violated, the +// result is a list of violation errors wrapped in ChangeSetMultiError, or nil +// if none found. +func (m *ChangeSet) ValidateAll() error { + return m.validate(true) +} + +func (m *ChangeSet) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + // no validation rules for Id + + // no validation rules for Data + + if len(errors) > 0 { + return ChangeSetMultiError(errors) + } + + return nil +} + +// ChangeSetMultiError is an error wrapping multiple validation errors returned +// by ChangeSet.ValidateAll() if the designated constraints aren't met. +type ChangeSetMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m ChangeSetMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m ChangeSetMultiError) AllErrors() []error { return m } + +// ChangeSetValidationError is the validation error returned by +// ChangeSet.Validate if the designated constraints aren't met. +type ChangeSetValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e ChangeSetValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e ChangeSetValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e ChangeSetValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e ChangeSetValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e ChangeSetValidationError) ErrorName() string { return "ChangeSetValidationError" } + +// Error satisfies the builtin error interface +func (e ChangeSetValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sChangeSet.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = ChangeSetValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = ChangeSetValidationError{} + // Validate checks the field values on IngestionLog with the rules defined in // the proto definition for this message. If any rules are violated, the first // error encountered is returned, or nil if there are no violations. @@ -780,6 +883,35 @@ func (m *IngestionLog) validate(all bool) error { } } + if all { + switch v := interface{}(m.GetChangeSet()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, IngestionLogValidationError{ + field: "ChangeSet", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, IngestionLogValidationError{ + field: "ChangeSet", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetChangeSet()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return IngestionLogValidationError{ + field: "ChangeSet", + reason: "embedded message failed validation", + cause: err, + } + } + } + if len(errors) > 0 { return IngestionLogMultiError(errors) } diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 82574cc7..cc99c967 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -2,6 +2,8 @@ package reconciler import ( "context" + "encoding/base64" + "encoding/json" "errors" "fmt" "log/slog" @@ -232,6 +234,16 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis. ingestionLog.State = reconcilerpb.State_FAILED ingestionLog.Error = extractIngestionError(err) + if changeSet != nil { + ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID} + csBase64, err := changeSetToBase64(changeSet) + if err != nil { + errs = append(errs, err) + } else { + ingestionLog.ChangeSet.Data = csBase64 + } + } + if _, err = p.writeIngestionLog(ctx, key, ingestionLog); err != nil { errs = append(errs, err) } @@ -240,7 +252,13 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis. if changeSet != nil { ingestionLog.State = reconcilerpb.State_RECONCILED - //TODO: add change set ID to ingestion log + ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID} + csBase64, err := changeSetToBase64(changeSet) + if err != nil { + errs = append(errs, err) + } else { + ingestionLog.ChangeSet.Data = csBase64 + } } else { ingestionLog.State = reconcilerpb.State_NO_CHANGES } @@ -356,6 +374,15 @@ func normalizeIngestionLog(l []byte) []byte { return re.ReplaceAll(l, []byte(`"ingestionTs":$1`)) } +func changeSetToBase64(cs *changeset.ChangeSet) (string, error) { + csJSON, err := json.Marshal(cs) + if err != nil { + return "", fmt.Errorf("failed to marshal JSON: %v", err) + } + + return base64.StdEncoding.EncodeToString(csJSON), nil +} + func extractObjectType(in *diodepb.Entity) (string, error) { switch in.GetEntity().(type) { case *diodepb.Entity_Device: From 6aeb808abb3600690e3e944490be7d6ae6049b86 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:10:07 -0300 Subject: [PATCH 2/8] change comment --- diode-proto/diode/v1/reconciler.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diode-proto/diode/v1/reconciler.proto b/diode-proto/diode/v1/reconciler.proto index 4af3f85b..908ec358 100644 --- a/diode-proto/diode/v1/reconciler.proto +++ b/diode-proto/diode/v1/reconciler.proto @@ -73,7 +73,7 @@ message IngestionMetrics { // A change set message ChangeSet { string id = 1; // A change set ID - string data = 2; // An arbitrary JSON object representing the change set + string data = 2; // Base64 string representing the change set } // An ingestion log From 0875b9bcce2df016b1a50e3f28fef1783d4c82b8 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Thu, 19 Sep 2024 14:19:17 -0300 Subject: [PATCH 3/8] add unit tests --- .../reconciler/ingestion_processor.go | 2 +- .../ingestion_processor_internal_test.go | 57 +++++++++++++++---- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index cc99c967..16d317ac 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -346,7 +346,7 @@ func (p *IngestionProcessor) reconcileEntity(ctx context.Context, ingestEntity c resp, err := p.nbClient.ApplyChangeSet(ctx, req) if err != nil { - return nil, err + return cs, err } p.logger.Debug("apply change set response", "response", resp) diff --git a/diode-server/reconciler/ingestion_processor_internal_test.go b/diode-server/reconciler/ingestion_processor_internal_test.go index 35dc5266..b8a4a965 100644 --- a/diode-server/reconciler/ingestion_processor_internal_test.go +++ b/diode-server/reconciler/ingestion_processor_internal_test.go @@ -212,12 +212,14 @@ func TestReconcileEntity(t *testing.T) { func TestHandleStreamMessage(t *testing.T) { tests := []struct { - name string - validMsg bool - entities []*diodepb.Entity - mockChangeSet *changeset.ChangeSet - reconcilerError bool - expectedError bool + name string + validMsg bool + entities []*diodepb.Entity + mockChangeSet *changeset.ChangeSet + changeSetResponse *netboxdiodeplugin.ChangeSetResponse + changeSetError error + reconcilerError bool + expectedError bool }{ { name: "successful processing", @@ -231,8 +233,9 @@ func TestHandleStreamMessage(t *testing.T) { }, }, }, - reconcilerError: false, - expectedError: false, + changeSetResponse: &netboxdiodeplugin.ChangeSetResponse{}, + reconcilerError: false, + expectedError: false, }, { name: "unmarshal error", @@ -257,8 +260,9 @@ func TestHandleStreamMessage(t *testing.T) { }, }, }, - reconcilerError: true, - expectedError: false, + changeSetResponse: &netboxdiodeplugin.ChangeSetResponse{}, + reconcilerError: true, + expectedError: false, }, { name: "no entities", @@ -268,11 +272,35 @@ func TestHandleStreamMessage(t *testing.T) { Entity: nil, }, }, + changeSetResponse: &netboxdiodeplugin.ChangeSetResponse{}, + reconcilerError: false, + expectedError: false, + }, + { + name: "change set available", + validMsg: true, + entities: []*diodepb.Entity{ + { + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-name", + }, + }, + }, + }, + mockChangeSet: &changeset.ChangeSet{ + ChangeSetID: "cs123", + ChangeSet: []changeset.Change{}, + }, + changeSetResponse: &netboxdiodeplugin.ChangeSetResponse{ + ChangeSetID: "cs123", + Result: "changed", + }, reconcilerError: false, expectedError: false, }, { - name: "change set available", + name: "change set apply error", validMsg: true, entities: []*diodepb.Entity{ { @@ -287,6 +315,11 @@ func TestHandleStreamMessage(t *testing.T) { ChangeSetID: "cs123", ChangeSet: []changeset.Change{}, }, + changeSetResponse: &netboxdiodeplugin.ChangeSetResponse{ + ChangeSetID: "cs123", + Result: "changed", + }, + changeSetError: errors.New("apply error"), reconcilerError: false, expectedError: false, }, @@ -341,7 +374,7 @@ func TestHandleStreamMessage(t *testing.T) { Site: nil, }}, nil) } - mockNbClient.On("ApplyChangeSet", ctx, mock.Anything).Return(&netboxdiodeplugin.ChangeSetResponse{}, nil) + mockNbClient.On("ApplyChangeSet", ctx, mock.Anything).Return(tt.changeSetResponse, tt.changeSetError) if tt.entities[0].Entity != nil { mockRedisClient.On("Do", ctx, "JSON.SET", mock.Anything, "$", mock.Anything).Return(redis.NewCmd(ctx)) } From 6e831af8d078695794c6e20f01a0215b4bc356b5 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Fri, 20 Sep 2024 09:04:27 -0300 Subject: [PATCH 4/8] use brotli --- diode-server/go.mod | 1 + diode-server/go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/diode-server/go.mod b/diode-server/go.mod index c272b13b..dc9d99e6 100644 --- a/diode-server/go.mod +++ b/diode-server/go.mod @@ -23,6 +23,7 @@ require ( require ( github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/andybalholm/brotli v1.1.0 github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect diff --git a/diode-server/go.sum b/diode-server/go.sum index deaa110a..bd0b02f0 100644 --- a/diode-server/go.sum +++ b/diode-server/go.sum @@ -2,6 +2,8 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= From ed2824e279a53581a3da6fc348447678f06f3409 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Fri, 20 Sep 2024 09:29:36 -0300 Subject: [PATCH 5/8] Add brotli support --- .../reconciler/ingestion_processor.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 16d317ac..48ca3961 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -1,6 +1,7 @@ package reconciler import ( + "bytes" "context" "encoding/base64" "encoding/json" @@ -11,6 +12,7 @@ import ( "regexp" "strconv" + "github.com/andybalholm/brotli" "github.com/kelseyhightower/envconfig" "github.com/redis/go-redis/v9" "github.com/segmentio/ksuid" @@ -236,11 +238,11 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis. if changeSet != nil { ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID} - csBase64, err := changeSetToBase64(changeSet) + csCompressed, err := compressChangeSet(changeSet) if err != nil { errs = append(errs, err) } else { - ingestionLog.ChangeSet.Data = csBase64 + ingestionLog.ChangeSet.Data = csCompressed } } @@ -253,11 +255,11 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis. if changeSet != nil { ingestionLog.State = reconcilerpb.State_RECONCILED ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID} - csBase64, err := changeSetToBase64(changeSet) + csCompressed, err := compressChangeSet(changeSet) if err != nil { errs = append(errs, err) } else { - ingestionLog.ChangeSet.Data = csBase64 + ingestionLog.ChangeSet.Data = csCompressed } } else { ingestionLog.State = reconcilerpb.State_NO_CHANGES @@ -374,13 +376,18 @@ func normalizeIngestionLog(l []byte) []byte { return re.ReplaceAll(l, []byte(`"ingestionTs":$1`)) } -func changeSetToBase64(cs *changeset.ChangeSet) (string, error) { +func compressChangeSet(cs *changeset.ChangeSet) (string, error) { csJSON, err := json.Marshal(cs) if err != nil { return "", fmt.Errorf("failed to marshal JSON: %v", err) } - return base64.StdEncoding.EncodeToString(csJSON), nil + var brotliBuf bytes.Buffer + brotliWriter := brotli.NewWriter(&brotliBuf) + brotliWriter.Write(csJSON) + brotliWriter.Close() + + return base64.StdEncoding.EncodeToString(brotliBuf.Bytes()), nil } func extractObjectType(in *diodepb.Entity) (string, error) { From 62712f09e6f64c4c3a17a69303ab0d145254a5c9 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Fri, 20 Sep 2024 09:37:20 -0300 Subject: [PATCH 6/8] fix lint --- diode-server/reconciler/ingestion_processor.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 48ca3961..1b012b0f 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -379,13 +379,17 @@ func normalizeIngestionLog(l []byte) []byte { func compressChangeSet(cs *changeset.ChangeSet) (string, error) { csJSON, err := json.Marshal(cs) if err != nil { - return "", fmt.Errorf("failed to marshal JSON: %v", err) + return "", fmt.Errorf("failed to marshal changeset JSON: %v", err) } var brotliBuf bytes.Buffer brotliWriter := brotli.NewWriter(&brotliBuf) - brotliWriter.Write(csJSON) - brotliWriter.Close() + if _, err = brotliWriter.Write(csJSON); err != nil { + return "", fmt.Errorf("failed to compress changeset: %v", err) + } + if err = brotliWriter.Close(); err != nil { + return "", fmt.Errorf("failed to compress changeset: %v", err) + } return base64.StdEncoding.EncodeToString(brotliBuf.Bytes()), nil } From 71c0e8448c2accccd479fa54afd57e616014a534 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Fri, 20 Sep 2024 11:10:33 -0300 Subject: [PATCH 7/8] Replace with bytes --- diode-proto/diode/v1/reconciler.proto | 2 +- .../gen/diode/v1/reconcilerpb/reconciler.pb.go | 8 ++++---- diode-server/reconciler/ingestion_processor.go | 11 +++++------ 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/diode-proto/diode/v1/reconciler.proto b/diode-proto/diode/v1/reconciler.proto index 908ec358..71ea6d67 100644 --- a/diode-proto/diode/v1/reconciler.proto +++ b/diode-proto/diode/v1/reconciler.proto @@ -73,7 +73,7 @@ message IngestionMetrics { // A change set message ChangeSet { string id = 1; // A change set ID - string data = 2; // Base64 string representing the change set + bytes data = 2; // Binary data representing the change set } // An ingestion log diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go index 2bf954c5..3e51691f 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go @@ -396,7 +396,7 @@ type ChangeSet struct { unknownFields protoimpl.UnknownFields Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // A change set ID - Data string `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // An arbitrary JSON object representing the change set + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // Binary data representing the change set } func (x *ChangeSet) Reset() { @@ -438,11 +438,11 @@ func (x *ChangeSet) GetId() string { return "" } -func (x *ChangeSet) GetData() string { +func (x *ChangeSet) GetData() []byte { if x != nil { return x.Data } - return "" + return nil } // An ingestion log @@ -931,7 +931,7 @@ var file_diode_v1_reconciler_proto_rawDesc = []byte{ 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6e, 0x6f, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x22, 0x2f, 0x0a, 0x09, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, - 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, + 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0xcc, 0x03, 0x0a, 0x0c, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x6f, 0x67, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 1b012b0f..e30e25fd 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -3,7 +3,6 @@ package reconciler import ( "bytes" "context" - "encoding/base64" "encoding/json" "errors" "fmt" @@ -376,22 +375,22 @@ func normalizeIngestionLog(l []byte) []byte { return re.ReplaceAll(l, []byte(`"ingestionTs":$1`)) } -func compressChangeSet(cs *changeset.ChangeSet) (string, error) { +func compressChangeSet(cs *changeset.ChangeSet) ([]byte, error) { csJSON, err := json.Marshal(cs) if err != nil { - return "", fmt.Errorf("failed to marshal changeset JSON: %v", err) + return nil, fmt.Errorf("failed to marshal changeset JSON: %v", err) } var brotliBuf bytes.Buffer brotliWriter := brotli.NewWriter(&brotliBuf) if _, err = brotliWriter.Write(csJSON); err != nil { - return "", fmt.Errorf("failed to compress changeset: %v", err) + return nil, fmt.Errorf("failed to compress changeset: %v", err) } if err = brotliWriter.Close(); err != nil { - return "", fmt.Errorf("failed to compress changeset: %v", err) + return nil, fmt.Errorf("failed to compress changeset: %v", err) } - return base64.StdEncoding.EncodeToString(brotliBuf.Bytes()), nil + return brotliBuf.Bytes(), nil } func extractObjectType(in *diodepb.Entity) (string, error) { From a81d156b0452b9788e2a5cffe117cd62c206d924 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Fri, 20 Sep 2024 14:30:56 -0300 Subject: [PATCH 8/8] Add unit tests for compress method --- .../ingestion_processor_internal_test.go | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/diode-server/reconciler/ingestion_processor_internal_test.go b/diode-server/reconciler/ingestion_processor_internal_test.go index ec0e26c2..82331c19 100644 --- a/diode-server/reconciler/ingestion_processor_internal_test.go +++ b/diode-server/reconciler/ingestion_processor_internal_test.go @@ -1,13 +1,17 @@ package reconciler import ( + "bytes" "context" + "encoding/json" "errors" + "io" "log/slog" "os" "testing" "time" + "github.com/andybalholm/brotli" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -459,3 +463,66 @@ func TestConsumeIngestionStream(t *testing.T) { }) } } + +func TestCompressChangeSet(t *testing.T) { + cs := changeset.ChangeSet{ + ChangeSetID: "5663a77e-9bad-4981-afe9-77d8a9f2b8b5", + ChangeSet: []changeset.Change{ + { + ChangeID: "5663a77e-9bad-4981-afe9-77d8a9f2b8b6", + ChangeType: changeset.ChangeTypeCreate, + ObjectType: "extras.tag", + ObjectID: nil, + ObjectVersion: nil, + Data: &netbox.Tag{ + Name: "tag 2", + Slug: "tag-2", + }, + }, + { + ChangeID: "5663a77e-9bad-4981-afe9-77d8a9f2b8b5", + ChangeType: changeset.ChangeTypeUpdate, + ObjectType: "dcim.site", + ObjectVersion: nil, + Data: &netbox.DcimSite{ + ID: 1, + Name: "Site A", + Slug: "site-a", + Status: (*netbox.DcimSiteStatus)(strPtr(string(netbox.DcimSiteStatusActive))), + Tags: []*netbox.Tag{ + { + ID: 1, + Name: "tag 1", + Slug: "tag-1", + }, + { + ID: 3, + Name: "tag 3", + Slug: "tag-3", + }, + { + Name: "tag 2", + Slug: "tag-2", + }, + }, + }, + }, + }, + } + compressed, err := compressChangeSet(&cs) + require.NoError(t, err) + + // Decompress the compressed data + r := brotli.NewReader(bytes.NewReader(compressed)) + var decodedOutput bytes.Buffer + n, err := io.Copy(&decodedOutput, r) + require.NoError(t, err) + + csJSON, err := json.Marshal(cs) + require.NoError(t, err) + + // Assert the decompressed data is the same as the original data + require.Equal(t, int64(len(csJSON)), n) + require.Equal(t, csJSON, decodedOutput.Bytes()) + require.Contains(t, decodedOutput.String(), "5663a77e-9bad-4981-afe9-77d8a9f2b8b5") +}