Skip to content

Commit

Permalink
Implement mtls for config plane communications
Browse files Browse the repository at this point in the history
  • Loading branch information
kchiranjewee63 committed Jul 9, 2024
1 parent 7ab3b3c commit 2eed546
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 71 deletions.
1 change: 1 addition & 0 deletions build/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker build -t tconfigd:latest -f ../service/Dockerfile ../service/
5 changes: 3 additions & 2 deletions installation/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
enableTratInterception: "true" # Enable or disable incoming requests interception for TraT verification
agentApiPort: "9040" # Port number for the tratteria agent API
agentHttpsApiPort: "9040" # Port number for the tratteria agent HTTPS APIs
agentHttpApiPort: "9030" # Port number for the tratteria agent HTTP APIs
agentInterceptorPort: "9050" # Port number for the tratteria agent incoming requests interceptor
spiffeEndpointSocket: "unix:///run/spire/sockets/agent.sock" # Don't change this if you are using tconfigd SPIRE installation
tconfigdSpiffeId: "spiffe://tratteria.io/tconfigd" # Don't change this if you are using tconfigd SPIRE installation
tratteriaSpiffeId: "spiffe://<trust-domain>/tratteria" # Replace "<trust-domain>" with your trust domain
2 changes: 1 addition & 1 deletion installation/resources/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spec:
args: ["/etc/tconfigd/config/config.yaml"]
imagePullPolicy: Never
ports:
- containerPort: 9060
- containerPort: 8443
protocol: TCP
- containerPort: 443
protocol: TCP
Expand Down
6 changes: 3 additions & 3 deletions installation/resources/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ metadata:
spec:
type: ClusterIP
ports:
- name: rules
port: 9060
targetPort: 9060
- name: api
port: 8443
targetPort: 8443
protocol: TCP
- name: webhook
port: 443
Expand Down
54 changes: 43 additions & 11 deletions service/api/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package handler
import (
"encoding/json"
"net/http"
"strings"

"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/tratteria/tconfigd/api/pkg/service"

"go.uber.org/zap"
Expand All @@ -22,34 +24,48 @@ func NewHandlers(service *service.Service, logger *zap.Logger) *Handlers {
}

type registrationRequest struct {
IpAddress string `json:"ipAddress"`
Port int `json:"port"`
ServiceName string `json:"serviceName"`
NameSpace string `json:"namespace"`
IpAddress string `json:"ipAddress"`
Port int `json:"port"`
NameSpace string `json:"namespace"`
}

type heartBeatRequest struct {
IpAddress string `json:"ipAddress"`
Port int `json:"port"`
ServiceName string `json:"serviceName"`
NameSpace string `json:"namespace"`
RulesVersionID string `json:"rulesVersionId"`
}

func (h *Handlers) RegistrationHandler(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 {
http.Error(w, "No client certificate provided", http.StatusUnauthorized)

return
}

spiffeID, err := spiffeid.FromURI(r.TLS.PeerCertificates[0].URIs[0])
if err != nil {
h.Logger.Error("Failed to parse SPIFFE ID", zap.Error(err))
http.Error(w, "Invalid SPIFFE ID", http.StatusBadRequest)

return
}

serviceName := strings.TrimPrefix(spiffeID.Path(), "/")

var registrationRequest registrationRequest

err := json.NewDecoder(r.Body).Decode(&registrationRequest)
err = json.NewDecoder(r.Body).Decode(&registrationRequest)
if err != nil {
h.Logger.Error("Invalid registration request.", zap.Error(err))
http.Error(w, "Invalid request", http.StatusBadRequest)

return
}

h.Logger.Info("Received a registration request.", zap.String("service", registrationRequest.ServiceName))
h.Logger.Info("Received a registration request.", zap.String("service", serviceName))

registrationResponse := h.Service.RegisterAgent(registrationRequest.IpAddress, registrationRequest.Port, registrationRequest.ServiceName, registrationRequest.NameSpace)
registrationResponse := h.Service.RegisterAgent(registrationRequest.IpAddress, registrationRequest.Port, serviceName, registrationRequest.NameSpace)

// TODO: return rules belonging to this service

Expand All @@ -65,19 +81,35 @@ func (h *Handlers) RegistrationHandler(w http.ResponseWriter, r *http.Request) {
}

func (h *Handlers) HeartBeatHandler(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 {
http.Error(w, "No client certificate provided", http.StatusUnauthorized)

return
}

spiffeID, err := spiffeid.FromURI(r.TLS.PeerCertificates[0].URIs[0])
if err != nil {
h.Logger.Error("Failed to parse SPIFFE ID", zap.Error(err))
http.Error(w, "Invalid SPIFFE ID", http.StatusBadRequest)

return
}

serviceName := strings.TrimPrefix(spiffeID.Path(), "/")

var heartBeatRequest heartBeatRequest

err := json.NewDecoder(r.Body).Decode(&heartBeatRequest)
err = json.NewDecoder(r.Body).Decode(&heartBeatRequest)
if err != nil {
h.Logger.Error("Invalid heartbeat request.", zap.Error(err))
http.Error(w, "Invalid request", http.StatusBadRequest)

return
}

h.Logger.Info("Received a heartbeat.", zap.String("service", heartBeatRequest.ServiceName))
h.Logger.Info("Received a heartbeat request.", zap.String("service", serviceName))

h.Service.RegisterHeartBeat(heartBeatRequest.IpAddress, heartBeatRequest.Port, heartBeatRequest.ServiceName, heartBeatRequest.NameSpace)
h.Service.RegisterHeartBeat(heartBeatRequest.IpAddress, heartBeatRequest.Port, serviceName, heartBeatRequest.NameSpace)

// TODO: if an agent is heartbeating with an old rule version id, notify it to fetch the latest rules

Expand Down
47 changes: 41 additions & 6 deletions service/api/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,34 @@ package service
import (
"context"
"fmt"
"io"
"net/http"

"github.com/lestrrat-go/jwx/jwk"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"github.com/spiffe/go-spiffe/v2/workloadapi"
"github.com/tratteria/tconfigd/common"
"github.com/tratteria/tconfigd/dataplaneregistry"
"go.uber.org/zap"
)

const (
TRATTERIA_JWKS_ENDPOINT = "/.well-known/jwks.json"
TRATTERIA_JWKS_ENDPOINT = ".well-known/jwks.json"
)

type Service struct {
dataPlaneRegistryManager dataplaneregistry.Manager
httpClient *http.Client
x509Source *workloadapi.X509Source
tratteriaSpiffeId spiffeid.ID
logger *zap.Logger
}

func NewService(dataPlaneRegistryManager dataplaneregistry.Manager, httpClient *http.Client, logger *zap.Logger) *Service {
func NewService(dataPlaneRegistryManager dataplaneregistry.Manager, x509Source *workloadapi.X509Source, tratteriaSpiffeId spiffeid.ID, logger *zap.Logger) *Service {
return &Service{
dataPlaneRegistryManager: dataPlaneRegistryManager,
httpClient: httpClient,
x509Source: x509Source,
tratteriaSpiffeId: tratteriaSpiffeId,
logger: logger,
}
}
Expand Down Expand Up @@ -57,14 +63,43 @@ func (s *Service) CollectJwks(ctx context.Context, namespace string) (jwk.Set, e

allKeys := jwk.NewSet()

tlsConfig := tlsconfig.MTLSClientConfig(s.x509Source, s.x509Source, tlsconfig.AuthorizeID(s.tratteriaSpiffeId))

client := http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}

for _, instance := range tratteriaInstances {
url := fmt.Sprintf("http://%s:%d/%s", instance.IpAddress, instance.Port, TRATTERIA_JWKS_ENDPOINT)
url := fmt.Sprintf("https://%s/%s", instance.IpAddress, TRATTERIA_JWKS_ENDPOINT)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("error creating request for URL %s: %w", url, err)
}

set, err := jwk.Fetch(ctx, url)
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch JWKS from URL %s: %w", url, err)
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("received non-ok status code %d from URL %s", resp.StatusCode, url)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body from URL %s: %w", url, err)
}

set, err := jwk.Parse(body)
if err != nil {
return nil, fmt.Errorf("failed to parse JWKS from URL %s: %w", url, err)
}

for iter := set.Iterate(ctx); iter.Next(ctx); {
pair := iter.Pair()
if key, ok := pair.Value.(jwk.Key); ok {
Expand Down
19 changes: 14 additions & 5 deletions service/api/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,45 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"github.com/spiffe/go-spiffe/v2/workloadapi"
"go.uber.org/zap"

"github.com/tratteria/tconfigd/api/handler"
"github.com/tratteria/tconfigd/api/pkg/service"
"github.com/tratteria/tconfigd/dataplaneregistry"
)

const API_PORT = 8443

type API struct {
DataPlaneRegistryManager dataplaneregistry.Manager
HttpClient *http.Client
X509Source *workloadapi.X509Source
TratteriaSpiffeId spiffeid.ID
Logger *zap.Logger
}

func (api *API) Run() error {
service := service.NewService(api.DataPlaneRegistryManager, api.HttpClient, api.Logger)
service := service.NewService(api.DataPlaneRegistryManager, api.X509Source, api.TratteriaSpiffeId, api.Logger)
handler := handler.NewHandlers(service, api.Logger)
router := mux.NewRouter()

initializeRulesRoutes(router, handler)

serverTLSConfig := tlsconfig.MTLSServerConfig(api.X509Source, api.X509Source, tlsconfig.AuthorizeAny())

srv := &http.Server{
Handler: router,
Addr: "0.0.0.0:9060",
Addr: fmt.Sprintf("0.0.0.0:%d", API_PORT),
TLSConfig: serverTLSConfig,
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
}

api.Logger.Info("Starting api server on port 9060.")
api.Logger.Info("Starting api server...", zap.Int("port", API_PORT))

if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err := srv.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed {
api.Logger.Error("Failed to start the api server", zap.Error(err))

return fmt.Errorf("failed to start the api server :%w", err)
Expand Down
46 changes: 32 additions & 14 deletions service/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/workloadapi"
"github.com/tratteria/tconfigd/api"
"github.com/tratteria/tconfigd/spiffe"
"github.com/tratteria/tconfigd/config"
"github.com/tratteria/tconfigd/configdispatcher"
"github.com/tratteria/tconfigd/dataplaneregistry"
Expand All @@ -18,6 +21,8 @@ import (
"go.uber.org/zap"
)

const X509_SOURCE_TIMEOUT = 15 * time.Second

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -41,21 +46,34 @@ func main() {

configPath := os.Args[1]

appConfig, err := config.GetAppConfig(configPath)
config, err := config.GetConfig(configPath)
if err != nil {
logger.Fatal("Error reading configuration.", zap.Error(err))
}

httpClient := &http.Client{}
x509SrcCtx, cancel := context.WithTimeout(context.Background(), X509_SOURCE_TIMEOUT)
defer cancel()

x509Source, err := workloadapi.NewX509Source(x509SrcCtx, workloadapi.WithClientOptions(workloadapi.WithAddr(config.SpiffeEndpointSocket)))
if err != nil {
logger.Fatal("Failed to create X.509 source", zap.Error(err))
}

defer x509Source.Close()

tconfigdSpiffeId, err := spiffe.FetchSpiffeIdFromX509(x509Source)
if err != nil {
logger.Fatal("Error getting tconfigd spiffe id.", zap.Error(err))
}

agentsManager := dataplaneregistry.NewRegistry()
configdispatcher := configdispatcher.NewConfigDispatcher(agentsManager, httpClient)
configdispatcher := configdispatcher.NewConfigDispatcher(agentsManager, x509Source)

go func() {
logger.Info("Starting API server...")

apiServer := &api.API{
DataPlaneRegistryManager: agentsManager,
HttpClient: httpClient,
X509Source: x509Source,
TratteriaSpiffeId: spiffeid.ID(config.TratteriaSpiffeId),
Logger: logger,
}

Expand All @@ -65,13 +83,13 @@ func main() {
}()

go func() {
logger.Info("Starting Webhook server...")

webhook := &webhook.Webhook{
EnableTratInterception: bool(appConfig.EnableTratInterception),
AgentApiPort: int(appConfig.AgentApiPort),
AgentInterceptorPort: int(appConfig.AgentInterceptorPort),
SpiffeEndpointSocket: appConfig.SpiffeEndpointSocket,
EnableTratInterception: bool(config.EnableTratInterception),
AgentHttpsApiPort: int(config.AgentHttpsApiPort),
AgentHttpApiPort: int(config.AgentHttpApiPort),
AgentInterceptorPort: int(config.AgentInterceptorPort),
SpiffeEndpointSocket: config.SpiffeEndpointSocket,
TconfigdSpiffeId: tconfigdSpiffeId,
Logger: logger,
}

Expand All @@ -94,7 +112,7 @@ func main() {

<-ctx.Done()

logger.Info("Shutting down servers and controllers...")
logger.Info("Shutting down tconfigd...")
}

func setupSignalHandler(cancel context.CancelFunc) {
Expand Down
2 changes: 1 addition & 1 deletion service/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package common

const (
DATA_PLANE_HEARTBEAT_INTERVAL_MINUTES = 5
TRATTERIA_SERVICE_NAME = "TRATTERIA"
TRATTERIA_SERVICE_NAME = "tratteria"
)
Loading

0 comments on commit 2eed546

Please sign in to comment.