diff --git a/api/types/constants.go b/api/types/constants.go index 87c0335586bf..09416999a3a3 100644 --- a/api/types/constants.go +++ b/api/types/constants.go @@ -1054,10 +1054,10 @@ const ( // group they should attempt to be connected to. ProxyGroupGenerationLabel = TeleportInternalLabelPrefix + "proxygroup-gen" - // ProxyPeerQUICLabel is the internal-user label for proxy heartbeats that's - // used to signal that the proxy supports receiving proxy peering - // connections over QUIC. - ProxyPeerQUICLabel = TeleportInternalLabelPrefix + "proxy-peer-quic" + // UnstableProxyPeerQUICLabel is the internal-use label for proxy heartbeats + // that's used to signal that the proxy supports receiving proxy peering + // connections over QUIC. The value should be "yes". + UnstableProxyPeerQUICLabel = TeleportInternalLabelPrefix + "proxy-peer-quic" // OktaAppNameLabel is the individual app name label. OktaAppNameLabel = TeleportInternalLabelPrefix + "okta-app-name" diff --git a/gen/proto/go/teleport/quicpeering/v1alpha/dial.pb.go b/gen/proto/go/teleport/quicpeering/v1alpha/dial.pb.go new file mode 100644 index 000000000000..9ca59303533b --- /dev/null +++ b/gen/proto/go/teleport/quicpeering/v1alpha/dial.pb.go @@ -0,0 +1,348 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.1 +// protoc (unknown) +// source: teleport/quicpeering/v1alpha/dial.proto + +package quicpeeringv1alpha + +import ( + status "google.golang.org/genproto/googleapis/rpc/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Sent from a proxy to a peer proxy in a fresh QUIC stream to dial a Teleport +// resource through a QUIC proxy peering connection. The message is sent in +// protobuf binary format, prefixed by its length encoded as a little endian +// 32-bit unsigned integer. +type DialRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The target agent for the connection attempt; should be ".". + TargetHostId string `protobuf:"bytes,1,opt,name=target_host_id,json=targetHostId,proto3" json:"target_host_id,omitempty"` + // The type of the connection as defined by api/types.TunnelType ("node", + // "app", "kube"...). + ConnectionType string `protobuf:"bytes,2,opt,name=connection_type,json=connectionType,proto3" json:"connection_type,omitempty"` + // The source of the connection, the network address of the user for whom the + // connection is being tunneled, as seen from the proxy sending the request. + Source *Addr `protobuf:"bytes,3,opt,name=source,proto3" json:"source,omitempty"` + // The destination of the connection, used as a weak hint and as something to + // put in the "local address" of the connection object handled by the agent. + Destination *Addr `protobuf:"bytes,4,opt,name=destination,proto3" json:"destination,omitempty"` + // The time of the client, must be provided and within 5 minutes of the local + // server time for 0-RTT requests. + Timestamp *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // A random id for each dial request, must be provided and unique among dial + // requests recently received by the server. + Nonce uint64 `protobuf:"fixed64,6,opt,name=nonce,proto3" json:"nonce,omitempty"` +} + +func (x *DialRequest) Reset() { + *x = DialRequest{} + mi := &file_teleport_quicpeering_v1alpha_dial_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DialRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DialRequest) ProtoMessage() {} + +func (x *DialRequest) ProtoReflect() protoreflect.Message { + mi := &file_teleport_quicpeering_v1alpha_dial_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DialRequest.ProtoReflect.Descriptor instead. +func (*DialRequest) Descriptor() ([]byte, []int) { + return file_teleport_quicpeering_v1alpha_dial_proto_rawDescGZIP(), []int{0} +} + +func (x *DialRequest) GetTargetHostId() string { + if x != nil { + return x.TargetHostId + } + return "" +} + +func (x *DialRequest) GetConnectionType() string { + if x != nil { + return x.ConnectionType + } + return "" +} + +func (x *DialRequest) GetSource() *Addr { + if x != nil { + return x.Source + } + return nil +} + +func (x *DialRequest) GetDestination() *Addr { + if x != nil { + return x.Destination + } + return nil +} + +func (x *DialRequest) GetTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.Timestamp + } + return nil +} + +func (x *DialRequest) GetNonce() uint64 { + if x != nil { + return x.Nonce + } + return 0 +} + +// A stringy Go net.Addr. Can be converted to and from a lib/utils.NetAddr. +type Addr struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Likely always "tcp". + Network string `protobuf:"bytes,1,opt,name=network,proto3" json:"network,omitempty"` + // Depending on the network, likely ":". + Addr string `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"` +} + +func (x *Addr) Reset() { + *x = Addr{} + mi := &file_teleport_quicpeering_v1alpha_dial_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Addr) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Addr) ProtoMessage() {} + +func (x *Addr) ProtoReflect() protoreflect.Message { + mi := &file_teleport_quicpeering_v1alpha_dial_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Addr.ProtoReflect.Descriptor instead. +func (*Addr) Descriptor() ([]byte, []int) { + return file_teleport_quicpeering_v1alpha_dial_proto_rawDescGZIP(), []int{1} +} + +func (x *Addr) GetNetwork() string { + if x != nil { + return x.Network + } + return "" +} + +func (x *Addr) GetAddr() string { + if x != nil { + return x.Addr + } + return "" +} + +// Sent from the server to the client as a response to a DialRequest. The +// message is likewise sent in protobuf binary format, prefixed by its length +// encoded as a little endian uint32. +type DialResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The success or failure of the dial. If the dial is successful, the stream + // will continue with the data of the connection. + Status *status.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` +} + +func (x *DialResponse) Reset() { + *x = DialResponse{} + mi := &file_teleport_quicpeering_v1alpha_dial_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DialResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DialResponse) ProtoMessage() {} + +func (x *DialResponse) ProtoReflect() protoreflect.Message { + mi := &file_teleport_quicpeering_v1alpha_dial_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DialResponse.ProtoReflect.Descriptor instead. +func (*DialResponse) Descriptor() ([]byte, []int) { + return file_teleport_quicpeering_v1alpha_dial_proto_rawDescGZIP(), []int{2} +} + +func (x *DialResponse) GetStatus() *status.Status { + if x != nil { + return x.Status + } + return nil +} + +var File_teleport_quicpeering_v1alpha_dial_proto protoreflect.FileDescriptor + +var file_teleport_quicpeering_v1alpha_dial_proto_rawDesc = []byte{ + 0x0a, 0x27, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x71, 0x75, 0x69, 0x63, 0x70, + 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2f, 0x64, + 0x69, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1c, 0x74, 0x65, 0x6c, 0x65, 0x70, + 0x6f, 0x72, 0x74, 0x2e, 0x71, 0x75, 0x69, 0x63, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, + 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x17, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0xae, 0x02, 0x0a, 0x0b, 0x44, 0x69, 0x61, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x24, 0x0a, 0x0e, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x68, 0x6f, 0x73, 0x74, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x74, 0x61, 0x72, 0x67, 0x65, + 0x74, 0x48, 0x6f, 0x73, 0x74, 0x49, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x3a, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x22, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x71, 0x75, 0x69, 0x63, + 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, + 0x41, 0x64, 0x64, 0x72, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x44, 0x0a, 0x0b, + 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x22, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x71, 0x75, 0x69, + 0x63, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, + 0x2e, 0x41, 0x64, 0x64, 0x72, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x14, 0x0a, 0x05, + 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x06, 0x52, 0x05, 0x6e, 0x6f, 0x6e, + 0x63, 0x65, 0x22, 0x34, 0x0a, 0x04, 0x41, 0x64, 0x64, 0x72, 0x12, 0x18, 0x0a, 0x07, 0x6e, 0x65, + 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6e, 0x65, 0x74, + 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x22, 0x3a, 0x0a, 0x0c, 0x44, 0x69, 0x61, 0x6c, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x42, 0x60, 0x5a, 0x5e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x76, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, + 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, + 0x71, 0x75, 0x69, 0x63, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x3b, 0x71, 0x75, 0x69, 0x63, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x76, + 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_teleport_quicpeering_v1alpha_dial_proto_rawDescOnce sync.Once + file_teleport_quicpeering_v1alpha_dial_proto_rawDescData = file_teleport_quicpeering_v1alpha_dial_proto_rawDesc +) + +func file_teleport_quicpeering_v1alpha_dial_proto_rawDescGZIP() []byte { + file_teleport_quicpeering_v1alpha_dial_proto_rawDescOnce.Do(func() { + file_teleport_quicpeering_v1alpha_dial_proto_rawDescData = protoimpl.X.CompressGZIP(file_teleport_quicpeering_v1alpha_dial_proto_rawDescData) + }) + return file_teleport_quicpeering_v1alpha_dial_proto_rawDescData +} + +var file_teleport_quicpeering_v1alpha_dial_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_teleport_quicpeering_v1alpha_dial_proto_goTypes = []any{ + (*DialRequest)(nil), // 0: teleport.quicpeering.v1alpha.DialRequest + (*Addr)(nil), // 1: teleport.quicpeering.v1alpha.Addr + (*DialResponse)(nil), // 2: teleport.quicpeering.v1alpha.DialResponse + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp + (*status.Status)(nil), // 4: google.rpc.Status +} +var file_teleport_quicpeering_v1alpha_dial_proto_depIdxs = []int32{ + 1, // 0: teleport.quicpeering.v1alpha.DialRequest.source:type_name -> teleport.quicpeering.v1alpha.Addr + 1, // 1: teleport.quicpeering.v1alpha.DialRequest.destination:type_name -> teleport.quicpeering.v1alpha.Addr + 3, // 2: teleport.quicpeering.v1alpha.DialRequest.timestamp:type_name -> google.protobuf.Timestamp + 4, // 3: teleport.quicpeering.v1alpha.DialResponse.status:type_name -> google.rpc.Status + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_teleport_quicpeering_v1alpha_dial_proto_init() } +func file_teleport_quicpeering_v1alpha_dial_proto_init() { + if File_teleport_quicpeering_v1alpha_dial_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_teleport_quicpeering_v1alpha_dial_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_teleport_quicpeering_v1alpha_dial_proto_goTypes, + DependencyIndexes: file_teleport_quicpeering_v1alpha_dial_proto_depIdxs, + MessageInfos: file_teleport_quicpeering_v1alpha_dial_proto_msgTypes, + }.Build() + File_teleport_quicpeering_v1alpha_dial_proto = out.File + file_teleport_quicpeering_v1alpha_dial_proto_rawDesc = nil + file_teleport_quicpeering_v1alpha_dial_proto_goTypes = nil + file_teleport_quicpeering_v1alpha_dial_proto_depIdxs = nil +} diff --git a/go.mod b/go.mod index b75dec86e94a..1e83490b5f2f 100644 --- a/go.mod +++ b/go.mod @@ -163,7 +163,7 @@ require ( github.com/prometheus/client_golang v1.20.4 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.55.0 - github.com/quic-go/quic-go v0.47.0 + github.com/quic-go/quic-go v0.48.0 github.com/redis/go-redis/v9 v9.5.1 // replaced github.com/russellhaering/gosaml2 v0.9.1 github.com/russellhaering/goxmldsig v1.4.0 diff --git a/go.sum b/go.sum index f66f4a8af0e7..69298be05a41 100644 --- a/go.sum +++ b/go.sum @@ -1993,8 +1993,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/protocolbuffers/txtpbfmt v0.0.0-20231025115547-084445ff1adf h1:014O62zIzQwvoD7Ekj3ePDF5bv9Xxy0w6AZk0qYbjUk= github.com/protocolbuffers/txtpbfmt v0.0.0-20231025115547-084445ff1adf/go.mod h1:jgxiZysxFPM+iWKwQwPR+y+Jvo54ARd4EisXxKYpB5c= -github.com/quic-go/quic-go v0.47.0 h1:yXs3v7r2bm1wmPTYNLKAAJTHMYkPEsfYJmTazXrCZ7Y= -github.com/quic-go/quic-go v0.47.0/go.mod h1:3bCapYsJvXGZcipOHuu7plYtaV6tnF+z7wIFsU0WK9E= +github.com/quic-go/quic-go v0.48.0 h1:2TCyvBrMu1Z25rvIAlnp2dPT4lgh/uTqLqiXVpp5AeU= +github.com/quic-go/quic-go v0.48.0/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= diff --git a/lib/config/configuration.go b/lib/config/configuration.go index 0f23047f64f7..1dbba94a79b3 100644 --- a/lib/config/configuration.go +++ b/lib/config/configuration.go @@ -2722,6 +2722,10 @@ func Configure(clf *CommandLineFlags, cfg *servicecfg.Config, legacyAppFlags boo cfg.DebugService.Enabled = false } + if os.Getenv("TELEPORT_UNSTABLE_QUIC_PROXY_PEERING") == "yes" { + cfg.Proxy.QUICProxyPeering = true + } + return nil } diff --git a/lib/proxy/peer/client.go b/lib/proxy/peer/client.go index c00b67006875..4912a997ba9f 100644 --- a/lib/proxy/peer/client.go +++ b/lib/proxy/peer/client.go @@ -483,8 +483,8 @@ func (c *Client) updateConnections(proxies []types.Server) error { } // establish new connections - _, supportsQuic := proxy.GetLabel(types.ProxyPeerQUICLabel) - conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic) + supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel) + conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes") if err != nil { c.metrics.reportTunnelError(errorProxyPeerTunnelDial) c.config.Log.DebugContext(c.ctx, "error dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr()) @@ -684,8 +684,8 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) { continue } - _, supportsQuic := proxy.GetLabel(types.ProxyPeerQUICLabel) - conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic) + supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel) + conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes") if err != nil { c.metrics.reportTunnelError(errorProxyPeerTunnelDirectDial) c.config.Log.DebugContext(c.ctx, "error direct dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr()) @@ -715,7 +715,11 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) { // connect dials a new connection to proxyAddr. func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (clientConn, error) { if supportsQUIC && c.config.QUICTransport != nil { - panic("QUIC proxy peering is not implemented") + conn, err := c.connectQUIC(peerID, peerAddr) + if err != nil { + return nil, trace.Wrap(err) + } + return conn, nil } tlsConfig := utils.TLSConfig(c.config.TLSCipherSuites) tlsConfig.ServerName = apiutils.EncodeClusterName(c.config.ClusterName) diff --git a/lib/proxy/peer/credentials.go b/lib/proxy/peer/credentials.go index 5ed14d22ef6a..b4dcce84b55b 100644 --- a/lib/proxy/peer/credentials.go +++ b/lib/proxy/peer/credentials.go @@ -121,5 +121,5 @@ func validatePeer(peerID string, identity *tlsca.Identity) error { return nil } - return trace.AccessDenied("connected to unexpected proxy") + return trace.Wrap(wrongProxyError{}) } diff --git a/lib/proxy/peer/quic.go b/lib/proxy/peer/quic.go new file mode 100644 index 000000000000..61eaac632a81 --- /dev/null +++ b/lib/proxy/peer/quic.go @@ -0,0 +1,151 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package peer + +import ( + "time" + + "github.com/quic-go/quic-go/quicvarint" +) + +const ( + // quicMaxIdleTimeout is the arbitrary timeout after which a QUIC connection + // that hasn't received data is presumed to be lost to the aether. + quicMaxIdleTimeout = 30 * time.Second + // quicKeepAlivePeriod is the interval of QUIC keepalive packets sent if the + // connection is otherwise idle. + quicKeepAlivePeriod = 5 * time.Second + + quicMaxReceiveWindow = quicvarint.Max + quicMaxIncomingStreams = 1 << 60 // maximum allowed value as per the quic-go docs + + // quicNextProto is the ALPN indicator for the current version of the QUIC + // proxy peering protocol. + quicNextProto = "teleport-peer-v1alpha" + + // quicMaxMessageSize is the maximum accepted size (in protobuf binary + // format) for the request and response messages exchanged as part of the + // dialing. + quicMaxMessageSize = 128 * 1024 + + // quicTimestampGraceWindow is the maximum time difference between local + // time and reported time in a 0-RTT request. Clients should not keep trying + // to use a request after this much time has passed. + quicTimestampGraceWindow = time.Minute + // quicNoncePersistence is the shortest time for which a nonce will be kept + // in memory to prevent 0-RTT replay attacks. Should be significantly longer + // than [quicTimestampGraceWindow]. In the current implementation, nonces + // can be kept for at most twice this value. + quicNoncePersistence = 5 * time.Minute + + quicDialTimeout = 30 * time.Second + quicRequestTimeout = 10 * time.Second + quicErrorResponseTimeout = 10 * time.Second +) + +/* + +# QUIC proxy peering + +QUIC proxy peering uses QUIC connections to the same address and port as the +regular proxy peering listener (port 3021, by default) and use the same routing +logic as the existing proxy peering. + +Until the feature is stabilized, a proxy can advertise support for receiving +QUIC proxy peering connections through a label in its heartbeat. In the current +implementation a proxy will only use outbound QUIC connections if it's also +accepting QUIC connections, and all connections will use the same socket; this +has the effect of taking up half the conntrack entries than TCP proxy peering +(as each outbound TCP connection to a given destination needs use a different +ephemeral port). + +## Protocol + +The server will accept connections from any Proxy of the cluster; in the +connection, the client can open a bidirectional stream for each dial attempt +through the server. The connections use ALPN with a protocol name that contains +the "version" of the protocol (currently v1alpha, matching the protobuf package +version). + +The client opens each stream by sending the protobuf binary encoding of a +DialRequest message (see proto/teleport/quicpeering/v1alpha/dial.proto), +length-prefixed by a little endian 32 bit integer. The server will check that +the request is valid (see the "0-RTT considerations" section), attempt to dial +the agent through a reverse tunnel, then report back an error in a DialResponse +message that contains a google.rpc.Status (by taking the error and passing it +through trace/trail, which is conveniently how we transfer errors on the wire +with gRPC); the response is, like the request, encoded in binary protobuf format +and length-prefixed. + +If the status is ok (signifying no error) then the stream will stay open, +carrying the data for the connection between the user and the agent, otherwise +the stream will be closed. For sanity's sake, the size of both messages is +limited and any oversized message is treated as a protocol violation. + +## Multiplexing (or lack thereof) + +While the current server implementation poses no limits to the amount of streams +in a single connection, real-world tests have shown that the best performance in +terms of throughput and isolation between different user connections is achieved +with individual QUIC connections between proxies. This would be very impractical +with TCP, as that would result in significantly heavier load on the network +infrastructure from having to keep track of all the individual TCP connections, +but QUIC can use its own internal connection IDs as it sends and receives UDP +packets over the same (source address, source port, destination address, +destination port) 4-ple. + +As such, the current client implementation opens a new QUIC connection for each +user connection, with a single stream in it. This could be changed without +breaking compatibility in the future. + +## 0-RTT considerations + +QUIC can make use of TLS session resumption, to make TLS handshakes +computationally cheaper. When resuming a session, the client can send data in +the very first packets, without waiting for any data from the server. This +"0-RTT" data is authentic and protected against eavesdropping, and any response +from the server would likewise be protected against eavesdropping, but an +attacker with the ability to sniff and inject data can blindly replay the +initial resumed handshake including the 0-RTT data. + +Since proxy peering is at its most useful when dealing with connections across +regions, it's very advantageous to take advantage of the latency reduction +offered by 0-RTT; to prevent any problems caused by replay attacks, the client +must include a timestamp and a nonce in the request: the server will require a +full handshake if the timestamp is too far off the local time (to prevent +complete cluster outages in case of clock drift), and then will check the nonce +against a list of nonces received recently. If the nonce was already used, the +dial request is rejected (potentially rejecting the legitimate dial attempt if +it happens to be processed after a replay, which will increase the chance that +someone will notice that something is wrong), otherwise the nonce is added to +the list and the dial request is accepted. This makes it so that each dial +request sent as 0-RTT can result in at most one connection opened through an +agent tunnel. + +The client should make sure to not send data belonging to the connection as part +of the early data, as an additional layer against replay attacks; this will +cause no further delays if the client intends to wait for the server to reply to +the dial request. A client that wants to make use of multiplexing should take +care to not accidentally send more than one dial request as 0-RTT in a single +connection, to keep the effort needed to handle potential replays at a minimum. + +The protocol doesn't currently take advantage of early server-side data for +non-resumed connections, so considerations around the security of "0.5-RTT" data +are not relevant; data sent by the server as a response to the client is either +using 0-RTT keys or is sent after the handshake is completed. + +*/ diff --git a/lib/proxy/peer/quicclient.go b/lib/proxy/peer/quicclient.go new file mode 100644 index 000000000000..f6ea5d74e223 --- /dev/null +++ b/lib/proxy/peer/quicclient.go @@ -0,0 +1,457 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package peer + +import ( + "context" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "encoding/binary" + "errors" + "io" + "log/slog" + "net" + "slices" + "sync" + "time" + + "github.com/gravitational/trace" + "github.com/gravitational/trace/trail" + "github.com/quic-go/quic-go" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/gravitational/teleport/api/types" + apiutils "github.com/gravitational/teleport/api/utils" + quicpeeringv1a "github.com/gravitational/teleport/gen/proto/go/teleport/quicpeering/v1alpha" + "github.com/gravitational/teleport/lib/tlsca" + "github.com/gravitational/teleport/lib/utils" +) + +func (c *Client) connectQUIC(peerID string, peerAddr string) (*quicClientConn, error) { + log := c.config.Log.With( + "peer_id", peerID, + "peer_addr", peerAddr, + ) + log.InfoContext(c.ctx, "setting up a QUIC client conn") + + udpAddr, err := net.ResolveUDPAddr("udp", peerAddr) + if err != nil { + return nil, trace.Wrap(err) + } + + getCertificate := c.config.GetTLSCertificate + // crypto/tls doesn't allow us to configure TLS 1.3 ciphersuites, and the + // only other effect of [utils.TLSConfig] is to require at least TLS 1.2, + // but QUIC requires at least TLS 1.3 anyway + tlsConfig := &tls.Config{ + GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert, err := getCertificate() + if err != nil { + return nil, trace.Wrap(err) + } + return cert, nil + }, + VerifyPeerCertificate: verifyPeerCertificateIsSpecificProxy(peerID + "." + c.config.ClusterName), + NextProtos: []string{quicNextProto}, + ServerName: apiutils.EncodeClusterName(c.config.ClusterName), + ClientSessionCache: tls.NewLRUClientSessionCache(0), + MinVersion: tls.VersionTLS13, + } + + quicConfig := &quic.Config{ + MaxStreamReceiveWindow: quicMaxReceiveWindow, + MaxConnectionReceiveWindow: quicMaxReceiveWindow, + + MaxIncomingStreams: -1, + MaxIncomingUniStreams: -1, + + MaxIdleTimeout: quicMaxIdleTimeout, + KeepAlivePeriod: quicKeepAlivePeriod, + + TokenStore: quic.NewLRUTokenStore(10, 10), + } + + runCtx, runCancel := context.WithCancel(context.Background()) + return &quicClientConn{ + id: peerID, + addr: udpAddr, + + log: log, + + transport: c.config.QUICTransport, + + tlsConfig: tlsConfig, + getRootCAs: c.config.GetTLSRoots, + quicConfig: quicConfig, + + runCtx: runCtx, + runCancel: runCancel, + }, nil +} + +type quicClientConn struct { + id string + addr *net.UDPAddr + + log *slog.Logger + + transport *quic.Transport + + tlsConfig *tls.Config + getRootCAs utils.GetRootsFunc + quicConfig *quic.Config + + // runCtx is canceled to signal that all connections should be closed + // (ungracefully). + runCtx context.Context + // runCancel cancels runCtx. + runCancel context.CancelFunc + + mu sync.Mutex + // closed is set at the beginning of a shutdown to signify that this client + // conn should not be opening any new connections. + closed bool + // wg counts the active connections. Must only be waited after setting the + // closed flag. Must only be potentially increased from 0 while the closed + // flag is not set. + wg sync.WaitGroup +} + +var _ clientConn = (*quicClientConn)(nil) + +// peerID implements [clientConn]. +func (c *quicClientConn) peerID() string { + return c.id +} + +// peerAddr implements [clientConn]. +func (c *quicClientConn) peerAddr() string { + return c.addr.String() +} + +// Close implements [clientConn]. +func (c *quicClientConn) close() error { + c.mu.Lock() + c.closed = true + c.mu.Unlock() + c.runCancel() + c.wg.Wait() + return nil +} + +// shutdown implements [clientConn]. +func (c *quicClientConn) shutdown(ctx context.Context) { + c.mu.Lock() + c.closed = true + c.mu.Unlock() + defer context.AfterFunc(ctx, c.runCancel)() + c.wg.Wait() +} + +// dial implements [clientConn]. +func (c *quicClientConn) dial(nodeID string, src net.Addr, dst net.Addr, tunnelType types.TunnelType) (_ net.Conn, err error) { + c.mu.Lock() + if c.closed { + c.mu.Unlock() + return nil, trace.ConnectionProblem(net.ErrClosed, "client is shut down") + } + c.wg.Add(1) + defer c.wg.Done() + c.mu.Unlock() + + var nonce uint64 + if err := binary.Read(rand.Reader, binary.NativeEndian, &nonce); err != nil { + return nil, trace.Wrap(err) + } + + log := c.log.With("conn_nonce", nonce) + + req := &quicpeeringv1a.DialRequest{ + TargetHostId: nodeID, + ConnectionType: string(tunnelType), + Source: &quicpeeringv1a.Addr{ + Network: src.Network(), + Addr: src.String(), + }, + Destination: &quicpeeringv1a.Addr{ + Network: dst.Network(), + Addr: dst.String(), + }, + Timestamp: timestamppb.Now(), + Nonce: nonce, + } + sizedReqBuf := make([]byte, 4, 4+proto.MarshalOptions{}.Size(req)) + sizedReqBuf, err = proto.MarshalOptions{UseCachedSize: true}.MarshalAppend(sizedReqBuf, req) + if err != nil { + return nil, trace.Wrap(err) + } + if len(sizedReqBuf)-4 > quicMaxMessageSize { + log.WarnContext(context.Background(), "refusing to send oversized dial request (this is a bug)") + return nil, trace.LimitExceeded("oversized dial request") + } + binary.LittleEndian.PutUint32(sizedReqBuf, uint32(len(sizedReqBuf)-4)) + + rootCAs, err := c.getRootCAs() + if err != nil { + return nil, trace.Wrap(err) + } + utils.RefreshTLSConfigTickets(c.tlsConfig) + tlsConfig := c.tlsConfig.Clone() + tlsConfig.RootCAs = rootCAs + + deadline := time.Now().Add(quicDialTimeout) + dialCtx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() + + log.DebugContext(dialCtx, "dialing peer proxy") + earlyConn, err := c.transport.DialEarly(dialCtx, c.addr, tlsConfig, c.quicConfig) + if err != nil { + if errors.Is(err, wrongProxyError{}) { + const duplicatePeerMsg = duplicatePeerMsg // to appease sloglint + log.ErrorContext(dialCtx, duplicatePeerMsg) + } + return nil, trace.Wrap(err) + } + + var conn quic.Connection = earlyConn + defer func() { + if err == nil { + return + } + conn.CloseWithError(0, "") + }() + + log.DebugContext(conn.Context(), + "opened connection", + "gso", conn.ConnectionState().GSO, + ) + + respBuf, stream, err := quicSendUnary(deadline, sizedReqBuf, conn) + if err != nil { + if !errors.Is(err, quic.Err0RTTRejected) { + return nil, trace.Wrap(err) + } + + log.InfoContext(dialCtx, "0-RTT attempt rejected, retrying with a full handshake") + nextConn, err := earlyConn.NextConnection(dialCtx) + if err != nil { + if errors.Is(err, wrongProxyError{}) { + // if we are hitting a QUIC-aware load balancer(?) it's possible + // to reach an unexpected peer proxy after a failed 0-RTT + // (failed because we got sent to the "wrong" peer) + const duplicatePeerMsg = duplicatePeerMsg // to appease sloglint + log.ErrorContext(dialCtx, duplicatePeerMsg) + } + return nil, trace.Wrap(err) + } + conn, earlyConn = nextConn, nil + // NextConnection can return a valid, closed connection and no error; if + // the connection is not closed then we completed a full handshake + if conn.Context().Err() != nil { + return nil, trace.Wrap(context.Cause(conn.Context())) + } + log.DebugContext(conn.Context(), "full handshake completed after 0-RTT rejection") + respBuf, stream, err = quicSendUnary(deadline, sizedReqBuf, conn) + if err != nil { + log.DebugContext(conn.Context(), + "failed to exchange dial request after 0-RTT rejection and handshake", + "error", err, + ) + return nil, trace.Wrap(err) + } + } + + log.DebugContext(conn.Context(), + "exchanged dial request and response", + "used_0rtt", conn.ConnectionState().Used0RTT, + ) + + resp := new(quicpeeringv1a.DialResponse) + if err := proto.Unmarshal(respBuf, resp); err != nil { + return nil, trace.Wrap(err) + } + if err := trail.FromGRPC(status.FromProto(resp.GetStatus()).Err()); err != nil { + return nil, trace.Wrap(err) + } + + if earlyConn != nil { + // avoid sending connection data as part of the early data; I'm not sure + // if the client is guaranteed to be protected against replays + // immediately after successfully receiving server data, but if it is + // then it means that the handshake is already complete and this will + // not block anyway + select { + case <-earlyConn.HandshakeComplete(): + case <-earlyConn.Context().Done(): + log.DebugContext(conn.Context(), + "failed to complete handshake after exchanging 0-RTT dial request", + "error", err, + ) + return nil, trace.Wrap(context.Cause(earlyConn.Context())) + } + } + + sc := &streamConn{ + st: stream, + conn: conn, + + src: src, + dst: dst, + } + + detach := context.AfterFunc(c.runCtx, func() { _ = sc.Close() }) + // conn.Context() is canceled when the connection is closed; wg is currently + // at least at 1 because we add one count for the duration of this function, + // so we're always allowed to add another one here + c.wg.Add(1) + context.AfterFunc(conn.Context(), func() { + log.DebugContext(conn.Context(), "connection closed") + c.wg.Done() + // remove the connection from the runCtx cancellation tree + detach() + }) + + return sc, nil +} + +// quicSendUnary opens a stream, sends the given request buffer and then reads a +// response buffer. Request and response are length-prefixed by a 32 bit little +// endian integer, but the buffer size is also limited by [quicMaxMessageSize]. +// The given request buffer should already be length-prefixed. +func quicSendUnary(deadline time.Time, sizedReqBuf []byte, conn quic.Connection) (_ []byte, _ quic.Stream, err error) { + stream, err := conn.OpenStream() + if err != nil { + return nil, nil, trace.Wrap(err) + } + defer func() { + if err == nil { + return + } + stream.CancelRead(0) + stream.CancelWrite(0) + }() + + stream.SetDeadline(deadline) + if _, err := stream.Write(sizedReqBuf); err != nil { + return nil, nil, trace.Wrap(err) + } + var respSize uint32 + if err := binary.Read(stream, binary.LittleEndian, &respSize); err != nil { + return nil, nil, trace.Wrap(err) + } + if respSize > quicMaxMessageSize { + return nil, nil, trace.LimitExceeded("oversized response message") + } + respBuf := make([]byte, respSize) + if _, err := io.ReadFull(stream, respBuf); err != nil { + return nil, nil, trace.Wrap(err) + } + stream.SetDeadline(time.Time{}) + return respBuf, stream, nil +} + +// streamConn is a [net.Conn] using a single [quic.Stream] in a dedicated +// [quic.Connection]. +type streamConn struct { + st quic.Stream + conn quic.Connection + + src net.Addr + dst net.Addr +} + +var _ net.Conn = (*streamConn)(nil) + +// Read implements [net.Conn]. +func (c *streamConn) Read(b []byte) (n int, err error) { + return c.st.Read(b) +} + +// Write implements [net.Conn]. +func (c *streamConn) Write(b []byte) (n int, err error) { + return c.st.Write(b) +} + +// Close implements [net.Conn]. +func (c *streamConn) Close() error { + return trace.Wrap(c.conn.CloseWithError(0, "")) +} + +// SetDeadline implements [net.Conn]. +func (c *streamConn) SetDeadline(t time.Time) error { + return c.st.SetDeadline(t) +} + +// SetReadDeadline implements [net.Conn]. +func (c *streamConn) SetReadDeadline(t time.Time) error { + return c.st.SetReadDeadline(t) +} + +// SetWriteDeadline implements [net.Conn]. +func (c *streamConn) SetWriteDeadline(t time.Time) error { + return c.st.SetWriteDeadline(t) +} + +// LocalAddr implements [net.Conn]. +func (c *streamConn) LocalAddr() net.Addr { + return c.src +} + +// RemoteAddr implements [net.Conn]. +func (c *streamConn) RemoteAddr() net.Addr { + return c.dst +} + +type verifyPeerCertificateFunc = func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error + +func verifyPeerCertificateIsSpecificProxy(peerID string) verifyPeerCertificateFunc { + return func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error { + if len(verifiedChains) < 1 { + return trace.AccessDenied("missing server certificate (this is a bug)") + } + + clientCert := verifiedChains[0][0] + clientIdentity, err := tlsca.FromSubject(clientCert.Subject, clientCert.NotAfter) + if err != nil { + return trace.Wrap(err) + } + + if !slices.Contains(clientIdentity.Groups, string(types.RoleProxy)) { + return trace.AccessDenied("expected Proxy server credentials") + } + + if clientIdentity.Username != peerID { + return trace.Wrap(wrongProxyError{}) + } + return nil + } +} + +type wrongProxyError struct{} + +func (wrongProxyError) Error() string { + return "connected to unexpected proxy" +} + +func (e wrongProxyError) Unwrap() error { + return &trace.AccessDeniedError{ + Message: e.Error(), + } +} diff --git a/lib/proxy/peer/quicserver.go b/lib/proxy/peer/quicserver.go index 0877d47e5ed9..1a472e8c10a4 100644 --- a/lib/proxy/peer/quicserver.go +++ b/lib/proxy/peer/quicserver.go @@ -22,12 +22,25 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/binary" + "io" "log/slog" + "strings" + "sync" + "time" + "github.com/google/uuid" "github.com/gravitational/trace" + "github.com/gravitational/trace/trail" "github.com/quic-go/quic-go" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/types" + quicpeeringv1a "github.com/gravitational/teleport/gen/proto/go/teleport/quicpeering/v1alpha" + "github.com/gravitational/teleport/lib/utils" ) // QUICServerConfig holds the parameters for [NewQUICServer]. @@ -37,13 +50,6 @@ type QUICServerConfig struct { // of the peer proxies. Required. ClusterDialer ClusterDialer - // CipherSuites is the set of TLS ciphersuites to be used by the server. - // - // Note: it won't actually have an effect, since QUIC always uses (the DTLS - // equivalent of) TLS 1.3, and TLS 1.3 ciphersuites can't be configured in - // crypto/tls, but for consistency's sake this should be passed along from - // the agent configuration. - CipherSuites []uint16 // GetCertificate should return the server certificate at time of use. It // should be a certificate with the Proxy host role. Required. GetCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error) @@ -78,31 +84,366 @@ func (c *QUICServerConfig) checkAndSetDefaults() error { } // QUICServer is a proxy peering server that uses the QUIC protocol. -type QUICServer struct{} +type QUICServer struct { + log *slog.Logger + clusterDialer ClusterDialer + tlsConfig *tls.Config + quicConfig *quic.Config + + replayStore replayStore + + // runCtx is a context that gets canceled when all connections should be + // ungracefully terminated. + runCtx context.Context + // runCancel cancels runCtx. + runCancel context.CancelFunc + // serveCtx is a context that gets canceled when all listeners should stop + // accepting new connections. + serveCtx context.Context + // serveCancel cancels serveCtx. + serveCancel context.CancelFunc + + // mu protects everything further in the struct. + mu sync.Mutex + // closed is set at the beginning of shutdown. When set, nothing is allowed + // to increase the waitgroup count past 0. + closed bool + // wg counts any active listener and connection. Should only be increased + // from 0 while holding mu, if the closed flag is not set. Should only be + // waited after setting the closed flag. + wg sync.WaitGroup +} // NewQUICServer returns a [QUICServer] with the given config. func NewQUICServer(cfg QUICServerConfig) (*QUICServer, error) { if err := cfg.checkAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - panic("QUIC proxy peering is not implemented") + + // crypto/tls doesn't allow us to configure TLS 1.3 ciphersuites, and the + // only other effect of [utils.TLSConfig] is to require at least TLS 1.2, + // but QUIC requires at least TLS 1.3 anyway + tlsConfig := &tls.Config{ + GetCertificate: cfg.GetCertificate, + VerifyPeerCertificate: verifyPeerCertificateIsProxy, + NextProtos: []string{quicNextProto}, + ClientAuth: tls.RequireAndVerifyClientCert, + MinVersion: tls.VersionTLS13, + } + + getClientCAs := cfg.GetClientCAs + tlsConfig.GetConfigForClient = func(chi *tls.ClientHelloInfo) (*tls.Config, error) { + clientCAs, err := getClientCAs(chi) + if err != nil { + return nil, trace.Wrap(err) + } + + utils.RefreshTLSConfigTickets(tlsConfig) + c := tlsConfig.Clone() + c.ClientCAs = clientCAs + return c, nil + } + + quicConfig := &quic.Config{ + MaxStreamReceiveWindow: quicMaxReceiveWindow, + MaxConnectionReceiveWindow: quicMaxReceiveWindow, + + MaxIncomingStreams: quicMaxIncomingStreams, + MaxIncomingUniStreams: -1, + + MaxIdleTimeout: quicMaxIdleTimeout, + KeepAlivePeriod: quicKeepAlivePeriod, + + Allow0RTT: true, + } + + runCtx, runCancel := context.WithCancel(context.Background()) + serveCtx, serveCancel := context.WithCancel(runCtx) + + return &QUICServer{ + log: cfg.Log, + clusterDialer: cfg.ClusterDialer, + tlsConfig: tlsConfig, + quicConfig: quicConfig, + + runCtx: runCtx, + runCancel: runCancel, + serveCtx: serveCtx, + serveCancel: serveCancel, + }, nil } // Serve opens a listener and serves incoming connection. Returns after calling // Close or Shutdown. -func (s *QUICServer) Serve(t *quic.Transport) error { - panic("QUIC proxy peering is not implemented") +func (s *QUICServer) Serve(transport *quic.Transport) error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return trace.Wrap(quic.ErrServerClosed) + } + s.wg.Add(1) + defer s.wg.Done() + s.mu.Unlock() + + lis, err := transport.ListenEarly(s.tlsConfig, s.quicConfig) + if err != nil { + return trace.Wrap(err) + } + defer lis.Close() + defer context.AfterFunc(s.serveCtx, func() { _ = lis.Close() })() + + for { + // the listener will be closed when serveCtx is done, but Accept will + // return any queued connection before erroring out with a + // [quic.ErrServerClosed] + c, err := lis.Accept(context.Background()) + if err != nil { + return trace.Wrap(err) + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.handleConn(c) + }() + } } +func (s *QUICServer) handleConn(conn quic.EarlyConnection) { + log := s.log.With( + "remote_addr", conn.RemoteAddr().String(), + "internal_id", uuid.NewString(), + ) + state := conn.ConnectionState() + log.InfoContext(conn.Context(), + "handling new peer connection", + "gso", state.GSO, + "used_0rtt", state.Used0RTT, + ) + defer func() { + log.DebugContext(conn.Context(), + "peer connection closed", + "error", context.Cause(conn.Context()), + ) + }() + + defer conn.CloseWithError(0, "") + defer context.AfterFunc(s.runCtx, func() { _ = conn.CloseWithError(0, "") })() + + var wg sync.WaitGroup + defer wg.Wait() + + for { + // TODO(espadolini): stop accepting new streams once s.serveCtx is + // canceled, once quic-go gains the ability to change the amount of + // available streams during a connection (so we can set it to 0) + st, err := conn.AcceptStream(context.Background()) + if err != nil { + log.DebugContext(conn.Context(), "error accepting a stream", "error", err) + return + } + + wg.Add(1) + go func() { + defer wg.Done() + s.handleStream(st, conn, log) + }() + } +} + +func (s *QUICServer) handleStream(stream quic.Stream, conn quic.EarlyConnection, log *slog.Logger) { + log = log.With("stream_id", stream.StreamID()) + log.DebugContext(conn.Context(), "handling stream") + defer log.DebugContext(conn.Context(), "done handling stream") + + defer stream.CancelRead(0) + defer stream.CancelWrite(0) + + sendErr := func(toSend error) { + stream.CancelRead(0) + defer stream.CancelWrite(0) + errBuf, err := proto.Marshal(&quicpeeringv1a.DialResponse{ + Status: status.Convert(trail.ToGRPC(toSend)).Proto(), + }) + if err != nil { + return + } + if len(errBuf) > quicMaxMessageSize { + log.WarnContext(conn.Context(), "refusing to send oversized error message (this is a bug)") + return + } + stream.SetWriteDeadline(time.Now().Add(quicErrorResponseTimeout)) + if _, err := stream.Write(binary.LittleEndian.AppendUint32(nil, uint32(len(errBuf)))); err != nil { + return + } + if _, err := stream.Write(errBuf); err != nil { + return + } + if err := stream.Close(); err != nil { + return + } + } + + stream.SetReadDeadline(time.Now().Add(quicRequestTimeout)) + var reqLen uint32 + if err := binary.Read(stream, binary.LittleEndian, &reqLen); err != nil { + log.DebugContext(conn.Context(), "failed to read request size", "error", err) + return + } + if reqLen >= quicMaxMessageSize { + log.WarnContext(conn.Context(), "received oversized request", "request_len", reqLen) + return + } + reqBuf := make([]byte, reqLen) + if _, err := io.ReadFull(stream, reqBuf); err != nil { + log.DebugContext(conn.Context(), "failed to read request", "error", err) + return + } + stream.SetReadDeadline(time.Time{}) + + req := new(quicpeeringv1a.DialRequest) + if err := proto.Unmarshal(reqBuf, req); err != nil { + log.WarnContext(conn.Context(), "failed to unmarshal request", "error", err) + return + } + + if requestTimestamp := req.GetTimestamp().AsTime(); time.Since(requestTimestamp).Abs() > quicTimestampGraceWindow { + log.WarnContext(conn.Context(), + "dial request has out of sync timestamp, 0-RTT performance will be impacted", + "request_timestamp", requestTimestamp, + ) + select { + case <-conn.HandshakeComplete(): + case <-conn.Context().Done(): + // logging this at warn level because it should be very atypical to + // begin with, and it might be a symptom of a malicious actor + // interfering with the connection + log.WarnContext(conn.Context(), + "handshake failure or connection loss after receiving dial request with out of sync timestamp", + "error", context.Cause(conn.Context()), + ) + return + } + } + + // a replayed request is always wrong even after a full handshake, the + // replay might've happened before the legitimate request + if !s.replayStore.add(req.GetNonce(), time.Now()) { + log.ErrorContext(conn.Context(), "request is reusing a nonce, rejecting", "nonce", req.GetNonce()) + sendErr(trace.BadParameter("reused or invalid nonce")) + return + } + + _, clusterName, ok := strings.Cut(req.GetTargetHostId(), ".") + if !ok { + sendErr(trace.BadParameter("server_id %q is missing cluster information", req.GetTargetHostId())) + return + } + + nodeConn, err := s.clusterDialer.Dial(clusterName, DialParams{ + From: &utils.NetAddr{ + Addr: req.GetSource().GetAddr(), + AddrNetwork: req.GetSource().GetNetwork(), + }, + To: &utils.NetAddr{ + Addr: req.GetDestination().GetAddr(), + AddrNetwork: req.GetDestination().GetNetwork(), + }, + ServerID: req.GetTargetHostId(), + ConnType: types.TunnelType(req.GetConnectionType()), + }) + if err != nil { + sendErr(err) + return + } + defer nodeConn.Close() + + var eg errgroup.Group + eg.Go(func() error { + defer stream.Close() + if _, err := stream.Write([]byte(dialResponseOK)); err != nil { + return trace.Wrap(err) + } + _, err := io.Copy(stream, nodeConn) + return trace.Wrap(err) + }) + eg.Go(func() error { + defer stream.CancelRead(0) + + // wait for the handshake before forwarding application data from the + // client; the client shouldn't be sending application data as 0-RTT + // anyway, but just in case + select { + case <-conn.HandshakeComplete(): + case <-conn.Context().Done(): + return trace.Wrap(context.Cause(conn.Context())) + } + + _, err := io.Copy(nodeConn, stream) + return trace.Wrap(err) + }) + err = eg.Wait() + log.DebugContext(conn.Context(), "done forwarding data", "error", err) +} + +// dialResponseOK is the length-prefixed encoding of a DialResponse message that +// signifies a successful dial. +const dialResponseOK = "\x00\x00\x00\x00" + // Close stops listening for incoming connections and ungracefully terminates // all the existing ones. func (s *QUICServer) Close() error { - panic("QUIC proxy peering is not implemented") + s.runCancel() + s.Shutdown(context.Background()) + return nil } // Shutdown stops listening for incoming connections and waits until the // existing ones are closed or until the context expires. If the context // expires, running connections are ungracefully terminated. func (s *QUICServer) Shutdown(ctx context.Context) error { - panic("QUIC proxy peering is not implemented") + s.mu.Lock() + s.closed = true + s.mu.Unlock() + + defer s.runCancel() + defer context.AfterFunc(ctx, s.runCancel)() + s.serveCancel() + s.wg.Wait() + return nil +} + +// replayStore will keep track of nonces for at least as much time as +// [quicNoncePersistence]. Nonces are added to a "current" set until the oldest +// item in it is older than the period, at which point the set is moved into a +// "previous" slot. After the next "current" set ages out, the previous set is +// cleared. This saves us from having to keep track of individual expiration +// times. +type replayStore struct { + mu sync.Mutex + + currentTime time.Time + currentSet map[uint64]struct{} + previousSet map[uint64]struct{} +} + +func (r *replayStore) add(nonce uint64, now time.Time) (added bool) { + r.mu.Lock() + defer r.mu.Unlock() + if now.Sub(r.currentTime) > quicNoncePersistence { + r.currentTime = now + r.previousSet, r.currentSet = r.currentSet, r.previousSet + clear(r.currentSet) + } + if _, ok := r.previousSet[nonce]; ok { + return false + } + if _, ok := r.currentSet[nonce]; ok { + return false + } + if r.currentSet == nil { + r.currentSet = make(map[uint64]struct{}) + } + r.currentSet[nonce] = struct{}{} + return true } diff --git a/lib/proxy/peer/quicserver_test.go b/lib/proxy/peer/quicserver_test.go new file mode 100644 index 000000000000..98800c848057 --- /dev/null +++ b/lib/proxy/peer/quicserver_test.go @@ -0,0 +1,40 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package peer + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + + quicpeeringv1a "github.com/gravitational/teleport/gen/proto/go/teleport/quicpeering/v1alpha" +) + +func TestDialResponseOKEncoding(t *testing.T) { + resp := &quicpeeringv1a.DialResponse{} + require.NoError(t, status.FromProto(resp.GetStatus()).Err()) + + b, err := proto.Marshal(resp) + require.NoError(t, err) + require.Empty(t, b) + + b = append(binary.LittleEndian.AppendUint32(nil, 0), b...) + require.Equal(t, dialResponseOK, string(b)) +} diff --git a/lib/service/service.go b/lib/service/service.go index 0870695cef8f..fc3c2b9459be 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -4317,8 +4317,11 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { var peerQUICTransport *quic.Transport if !process.Config.Proxy.DisableReverseTunnel { if listeners.proxyPeer != nil { - // TODO(espadolini): allow this when the implementation is merged - if false && os.Getenv("TELEPORT_UNSTABLE_QUIC_PROXY_PEERING") == "yes" { + // QUIC uses TLS 1.3 which is currently not allowed in FIPS builds. + // + // TODO(espadolini): allow QUIC in Go 1.24 (as TLS 1.3 should be + // allowed then) + if !modules.IsBoringBinary() && process.Config.Proxy.QUICProxyPeering { // the stateless reset key is important in case there's a crash // so peers can be told to close their side of the connections // instead of having to wait for a timeout; for this reason, we @@ -4749,7 +4752,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { peerQUICServer, err := peer.NewQUICServer(peer.QUICServerConfig{ Log: process.logger, ClusterDialer: clusterdial.NewClusterDialer(tsrv), - CipherSuites: cfg.CipherSuites, GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) { return conn.serverGetCertificate() }, @@ -4767,13 +4769,13 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { process.RegisterCriticalFunc("proxy.peer.quic", func() error { if _, err := process.WaitForEvent(process.ExitContext(), ProxyReverseTunnelReady); err != nil { - logger.DebugContext(process.ExitContext(), "Process exiting: failed to start QUIC peer proxy service waiting for reverse tunnel server.") + logger.DebugContext(process.ExitContext(), "process exiting: failed to start QUIC peer proxy service waiting for reverse tunnel server") return nil } - logger.InfoContext(process.ExitContext(), "Starting QUIC peer proxy service.", "local_addr", logutils.StringerAttr(peerQUICTransport.Conn.LocalAddr())) + logger.InfoContext(process.ExitContext(), "starting QUIC peer proxy service", "local_addr", logutils.StringerAttr(peerQUICTransport.Conn.LocalAddr())) err := peerQUICServer.Serve(peerQUICTransport) - if err != nil { + if err != nil && !errors.Is(err, quic.ErrServerClosed) { return trace.Wrap(err) } @@ -4793,8 +4795,8 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { logger.InfoContext(process.ExitContext(), "Enabling proxy group labels.", "group_id", cfg.Proxy.ProxyGroupID, "generation", cfg.Proxy.ProxyGroupGeneration) } if peerQUICTransport != nil { - staticLabels[types.ProxyPeerQUICLabel] = "x" - logger.InfoContext(process.ExitContext(), "Advertising proxy peering QUIC support.") + staticLabels[types.UnstableProxyPeerQUICLabel] = "yes" + logger.InfoContext(process.ExitContext(), "advertising proxy peering QUIC support") } sshProxy, err := regular.New( diff --git a/lib/service/servicecfg/proxy.go b/lib/service/servicecfg/proxy.go index c07ce5d47b0f..93ab0767c69b 100644 --- a/lib/service/servicecfg/proxy.go +++ b/lib/service/servicecfg/proxy.go @@ -158,6 +158,11 @@ type ProxyConfig struct { // proxy built-in version server to retrieve target versions. This is part // of the automatic upgrades. AutomaticUpgradesChannels automaticupgrades.Channels + + // QUICProxyPeering will make it so that proxy peering will support inbound + // QUIC connections and will use QUIC to connect to peer proxies that + // advertise support for it. + QUICProxyPeering bool } // WebPublicAddr returns the address for the web endpoint on this proxy that diff --git a/proto/teleport/quicpeering/v1alpha/dial.proto b/proto/teleport/quicpeering/v1alpha/dial.proto new file mode 100644 index 000000000000..470e11068bc0 --- /dev/null +++ b/proto/teleport/quicpeering/v1alpha/dial.proto @@ -0,0 +1,67 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +syntax = "proto3"; + +package teleport.quicpeering.v1alpha; + +import "google/protobuf/timestamp.proto"; +import "google/rpc/status.proto"; + +option go_package = "github.com/gravitational/teleport/gen/proto/go/teleport/quicpeering/v1alpha;quicpeeringv1alpha"; + +// Sent from a proxy to a peer proxy in a fresh QUIC stream to dial a Teleport +// resource through a QUIC proxy peering connection. The message is sent in +// protobuf binary format, prefixed by its length encoded as a little endian +// 32-bit unsigned integer. +message DialRequest { + // The target agent for the connection attempt; should be ".". + string target_host_id = 1; + // The type of the connection as defined by api/types.TunnelType ("node", + // "app", "kube"...). + string connection_type = 2; + + // The source of the connection, the network address of the user for whom the + // connection is being tunneled, as seen from the proxy sending the request. + Addr source = 3; + // The destination of the connection, used as a weak hint and as something to + // put in the "local address" of the connection object handled by the agent. + Addr destination = 4; + + // The time of the client, must be provided and within 5 minutes of the local + // server time for 0-RTT requests. + google.protobuf.Timestamp timestamp = 5; + // A random id for each dial request, must be provided and unique among dial + // requests recently received by the server. + fixed64 nonce = 6; +} + +// A stringy Go net.Addr. Can be converted to and from a lib/utils.NetAddr. +message Addr { + // Likely always "tcp". + string network = 1; + // Depending on the network, likely ":". + string addr = 2; +} + +// Sent from the server to the client as a response to a DialRequest. The +// message is likewise sent in protobuf binary format, prefixed by its length +// encoded as a little endian uint32. +message DialResponse { + // The success or failure of the dial. If the dial is successful, the stream + // will continue with the data of the connection. + google.rpc.Status status = 1; +}