From 3ca34d6406cf1134419829070f5f55bd6a79ee8d Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Fri, 12 Apr 2024 10:41:26 +0300 Subject: [PATCH 01/10] feat: Add coap proxy Signed-off-by: 1998-felix --- .env | 12 +++ README.md | 2 + cmd/main.go | 28 +++++ config.go | 6 ++ go.sum | 5 + pkg/coap/coap.go | 273 +++++++++++++++++++++++++++++++++++++++++++++++ pkg/tls/tls.go | 52 +++++++++ 7 files changed, 378 insertions(+) create mode 100644 pkg/coap/coap.go diff --git a/.env b/.env index 56bb008..8861b2a 100644 --- a/.env +++ b/.env @@ -55,3 +55,15 @@ MPROXY_HTTP_WITH_MTLS_SERVER_CA_FILE=ssl/certs/ca.crt MPROXY_HTTP_WITH_MTLS_CLIENT_CA_FILE=ssl/certs/ca.crt MPROXY_HTTP_WITH_MTLS_CERT_VERIFICATION_METHODS=ocsp MPROXY_HTTP_WITH_MTLS_OCSP_RESPONDER_URL=http://localhost:8080/ocsp + +MPROXY_COAP_WITHOUT_DTLS_ADDRESS=:5682 +MPROXY_COAP_WITHOUT_DTLS_TARGET=localhost:5683 + +MPROXY_COAP_WITH_DTLS_ADDRESS=:5684 +MPROXY_COAP_WITH_DTLS_TARGET=localhost:5683 +MPROXY_COAP_WITH_DTLS_CERT_FILE=ssl/certs/server.crt +MPROXY_COAP_WITH_DTLS_KEY_FILE=ssl/certs/server.key +MPROXY_COAP_WITH_DTLS_SERVER_CA_FILE=ssl/certs/ca.crt +MPROXY_COAP_WITH_DTLS_CLIENT_CA_FILE=ssl/certs/ca.crt + + diff --git a/README.md b/README.md index de78185..6bcd8bd 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,8 @@ mProxy is used to proxy requests to a backend server. For the example setup, we - mProxy server for `HTTP protocol without TLS` on port `8086` with prefix path `/messages` - mProxy server for `HTTP protocol with TLS` on port `8087` with prefix path `/messages` - mProxy server for `HTTP protocol with mTLS` on port `8088` with prefix path `/messages` + - mProxy server for `COAP protocol without DTLS` on port `5682` + - mProxy server for `COAP protocol with DTLS` on port `5684` ### Example testing of mProxy diff --git a/cmd/main.go b/cmd/main.go index 31c83b2..ace2243 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,7 @@ import ( "github.com/absmach/mproxy" "github.com/absmach/mproxy/examples/simple" + "github.com/absmach/mproxy/pkg/coap" "github.com/absmach/mproxy/pkg/http" "github.com/absmach/mproxy/pkg/mqtt" "github.com/absmach/mproxy/pkg/mqtt/websocket" @@ -34,6 +35,9 @@ const ( httpWithoutTLS = "MPROXY_HTTP_WITHOUT_TLS_" httpWithTLS = "MPROXY_HTTP_WITH_TLS_" httpWithmTLS = "MPROXY_HTTP_WITH_MTLS_" + + coapWithoutDTLS = "MPROXY_COAP_WITHOUT_DTLS_" + coapWithDTLS = "MPROXY_COAP_WITH_DTLS_" ) func main() { @@ -172,6 +176,30 @@ func main() { return httpMTLSProxy.Listen(ctx) }) + // mProxy server Configuration for CoAP without DTLS + coapConfig, err := mproxy.NewConfig(env.Options{Prefix: coapWithoutDTLS}) + if err != nil { + panic(err) + } + + // mProxy server for CoAP without DTLS + coapProxy := coap.New(coapConfig, handler, logger) + g.Go(func() error { + return coapProxy.Listen(ctx) + }) + + // mProxy server Configuration for CoAP with DTLS + coapDTLSConfig, err := mproxy.NewConfig(env.Options{Prefix: coapWithDTLS}) + if err != nil { + panic(err) + } + + // mProxy server for CoAP with DTLS + coapDTLSProxy := coap.New(coapDTLSConfig, handler, logger) + g.Go(func() error { + return coapDTLSProxy.ListenDTLS(ctx) + }) + g.Go(func() error { return StopSignalHandler(ctx, cancel, logger) }) diff --git a/config.go b/config.go index a0fda0f..1c923a2 100644 --- a/config.go +++ b/config.go @@ -15,6 +15,7 @@ type Config struct { PathPrefix string `env:"PATH_PREFIX" envDefault:"/"` Target string `env:"TARGET" envDefault:""` TLSConfig *tls.Config + DTLSConfig *dtls.Config } func NewConfig(opts env.Options) (Config, error) { @@ -32,5 +33,10 @@ func NewConfig(opts env.Options) (Config, error) { if err != nil { return Config{}, err } + + c.DTLSConfig, err = mptls.LoadDTLS(&cfg) + if err != nil { + return Config{}, err + } return c, nil } diff --git a/go.sum b/go.sum index aa55fb1..05385b0 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,11 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go new file mode 100644 index 0000000..0eb27c3 --- /dev/null +++ b/pkg/coap/coap.go @@ -0,0 +1,273 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package coap + +import ( + "bytes" + "context" + "fmt" + "log/slog" + + "github.com/absmach/mproxy" + "github.com/absmach/mproxy/pkg/session" + "github.com/plgd-dev/go-coap/v3/dtls" + "github.com/plgd-dev/go-coap/v3/message" + "github.com/plgd-dev/go-coap/v3/message/codes" + "github.com/plgd-dev/go-coap/v3/message/pool" + "github.com/plgd-dev/go-coap/v3/mux" + "github.com/plgd-dev/go-coap/v3/net" + "github.com/plgd-dev/go-coap/v3/options" + "github.com/plgd-dev/go-coap/v3/udp" +) + +type Proxy struct { + config mproxy.Config + event session.Handler + logger *slog.Logger +} + +func New(config mproxy.Config, handler session.Handler, logger *slog.Logger) *Proxy { + return &Proxy{ + config: config, + event: handler, + logger: logger, + } +} + +func sendErrorMessage(cc mux.Conn, token []byte, err error, code codes.Code) error { + m := cc.AcquireMessage(cc.Context()) + defer cc.ReleaseMessage(m) + m.SetCode(code) + m.SetBody(bytes.NewReader(([]byte)(err.Error()))) + m.SetToken(token) + m.SetContentFormat(message.TextPlain) + return cc.WriteMessage(m) +} + +func (p *Proxy) postUpstream(cc mux.Conn, req *mux.Message, token []byte) error { + format, err := req.ContentFormat() + if err != nil { + return err + } + path, err := req.Options().Path() + if err != nil { + return err + } + + targetConn, err := udp.Dial(p.config.Target) + if err != nil { + return err + } + defer targetConn.Close() + pm, err := targetConn.Post(cc.Context(), path, format, req.Body(), req.Options()...) + if err != nil { + return err + } + pm.SetToken(token) + return cc.WriteMessage(pm) +} + +func (p *Proxy) getUpstream(cc mux.Conn, req *mux.Message, token []byte) error { + path, err := req.Options().Path() + if err != nil { + return err + } + + targetConn, err := udp.Dial(p.config.Target) + if err != nil { + return err + } + defer targetConn.Close() + pm, err := targetConn.Get(cc.Context(), path, req.Options()...) + if err != nil { + return err + } + pm.SetToken(token) + return cc.WriteMessage(pm) +} + +func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message.Option, token []byte, path string) { + targetConn, err := udp.Dial(p.config.Target) + if err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { + p.logger.Error("cannot send error response: %v", err) + } + } + defer targetConn.Close() + doneObserving := make(chan struct{}) + + obs, err := targetConn.Observe(ctx, path, func(req *pool.Message) { + req.SetToken(token) + if err := cc.WriteMessage(req); err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { + p.logger.Error(err.Error()) + } + p.logger.Error(err.Error()) + } + if req.Code() == codes.NotFound { + close(doneObserving) + } + }, opts...) + if err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { + p.logger.Error("cannot send error response: %v", err) + } + } + + select { + case <-doneObserving: + if err := obs.Cancel(ctx); err != nil { + p.logger.Error("failed to cancel observation: %v", err) + } + case <-ctx.Done(): + return + } +} + +func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { + tok, err := r.Options().GetBytes(message.URIQuery) + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + ctx := session.NewContext(r.Context(), &session.Session{Password: tok}) + if err := p.event.AuthConnect(ctx); err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + path, err := r.Options().Path() + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadOption); err != nil { + p.logger.Error(err.Error()) + } + return + } + switch r.Code() { + case codes.GET: + obs, err := r.Options().Observe() + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadRequest); err != nil { + p.logger.Error(err.Error()) + } + } + p.handleGet(ctx, path, w.Conn(), r.Token(), obs, r) + + case codes.POST: + body, err := r.ReadBody() + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadRequest); err != nil { + p.logger.Error(err.Error()) + } + return + } + p.handlePost(ctx, w.Conn(), body, r.Token(), path, r) + } +} + +func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token []byte, obs uint32, r *mux.Message) { + if err := p.event.AuthSubscribe(ctx, &[]string{path}); err != nil { + if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + if err := p.event.Subscribe(ctx, &[]string{path}); err != nil { + if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + switch { + // obs == 0, start observe + case obs == 0: + go p.observeUpstream(ctx, con, r.Options(), token, path) + + default: + if err := p.getUpstream(con, r, token); err != nil { + p.logger.Error(fmt.Sprintf("error performing get: %v\n", err)) + if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { + p.logger.Error(err.Error()) + } + return + } + } +} + +func (p *Proxy) handlePost(ctx context.Context, con mux.Conn, body, token []byte, path string, r *mux.Message) { + if err := p.event.AuthPublish(ctx, &path, &body); err != nil { + if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + if err := p.event.Publish(ctx, &path, &body); err != nil { + if err := sendErrorMessage(con, token, err, codes.BadRequest); err != nil { + p.logger.Error(err.Error()) + } + return + } + if err := p.postUpstream(con, r, token); err != nil { + p.logger.Debug(fmt.Sprintf("error performing post: %v\n", err)) + if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { + p.logger.Error(err.Error()) + } + return + } +} + +func (p *Proxy) Listen(ctx context.Context) error { + l, err := net.NewListenUDP("udp", p.config.Address) + if err != nil { + return err + } + defer l.Close() + + p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) + s := udp.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + + errCh := make(chan error) + go func() { + errCh <- s.Serve(l) + }() + + select { + case <-ctx.Done(): + p.logger.Info(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting ...", p.config.Address)) + l.Close() + case err := <-errCh: + p.logger.Error(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting with errors: %s", p.config.Address, err.Error())) + return err + } + return nil +} + +func (p *Proxy) ListenDTLS(ctx context.Context) error { + l, err := net.NewDTLSListener("udp", p.config.Address, p.config.DTLSConfig) + if err != nil { + return err + } + defer l.Close() + + p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s with DTLS", p.config.Address)) + s := dtls.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + + errCh := make(chan error) + go func() { + errCh <- s.Serve(l) + }() + + select { + case <-ctx.Done(): + p.logger.Info(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting ...", p.config.Address)) + l.Close() + case err := <-errCh: + p.logger.Error(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting with errors: %s", p.config.Address, err.Error())) + return err + } + return nil +} diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go index be66aad..fc5cd47 100644 --- a/pkg/tls/tls.go +++ b/pkg/tls/tls.go @@ -9,6 +9,8 @@ import ( "errors" "net" "os" + + "github.com/pion/dtls/v2" ) var ( @@ -69,6 +71,56 @@ func Load(c *Config) (*tls.Config, error) { return tlsConfig, nil } +// Load returns a DTLS configuration that can be used in DTLS servers. +func LoadDTLS(c *Config) (*dtls.Config, error) { + if c.CertFile == "" || c.KeyFile == "" { + return nil, nil + } + + dtlsConfig := &dtls.Config{} + + certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) + if err != nil { + return nil, errors.Join(errLoadCerts, err) + } + dtlsConfig = &dtls.Config{ + Certificates: []tls.Certificate{certificate}, + } + + // Loading Server CA file + rootCA, err := loadCertFile(c.ServerCAFile) + if err != nil { + return nil, errors.Join(errLoadServerCA, err) + } + if len(rootCA) > 0 { + if dtlsConfig.RootCAs == nil { + dtlsConfig.RootCAs = x509.NewCertPool() + } + if !dtlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { + return nil, errAppendCA + } + } + + // Loading Client CA File + clientCA, err := loadCertFile(c.ClientCAFile) + if err != nil { + return nil, errors.Join(errLoadClientCA, err) + } + if len(clientCA) > 0 { + if dtlsConfig.ClientCAs == nil { + dtlsConfig.ClientCAs = x509.NewCertPool() + } + if !dtlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { + return nil, errAppendCA + } + dtlsConfig.ClientAuth = dtls.RequireAndVerifyClientCert + if c.Validator != nil { + dtlsConfig.VerifyPeerCertificate = c.Validator + } + } + return dtlsConfig, nil +} + // ClientCert returns client certificate. func ClientCert(conn net.Conn) (x509.Certificate, error) { switch connVal := conn.(type) { From 7f20209ee2213aca7f85a0bf6519f48e89dc6e06 Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Mon, 22 Apr 2024 08:58:35 +0300 Subject: [PATCH 02/10] refactor: Refactor tls.go for load tls and dtls Signed-off-by: 1998-felix --- cmd/main.go | 16 +++--- config.go | 2 +- pkg/coap/coap.go | 25 +++++---- pkg/mqtt/mqtt.go | 2 +- pkg/tls/tls.go | 140 +++++++++++++++++++++++------------------------ 5 files changed, 91 insertions(+), 94 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ace2243..0ddf894 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -66,7 +66,7 @@ func main() { } // mProxy server for MQTT without TLS - mqttProxy := mqtt.New(mqttConfig, handler, interceptor, logger) + mqttProxy := mqtt.NewProxy(mqttConfig, handler, interceptor, logger) g.Go(func() error { return mqttProxy.Listen(ctx) }) @@ -78,7 +78,7 @@ func main() { } // mProxy server for MQTT with TLS - mqttTLSProxy := mqtt.New(mqttTLSConfig, handler, interceptor, logger) + mqttTLSProxy := mqtt.NewProxy(mqttTLSConfig, handler, interceptor, logger) g.Go(func() error { return mqttTLSProxy.Listen(ctx) }) @@ -90,7 +90,7 @@ func main() { } // mProxy server for MQTT with mTLS - mqttMTlsProxy := mqtt.New(mqttMTLSConfig, handler, interceptor, logger) + mqttMTlsProxy := mqtt.NewProxy(mqttMTLSConfig, handler, interceptor, logger) g.Go(func() error { return mqttMTlsProxy.Listen(ctx) }) @@ -102,7 +102,7 @@ func main() { } // mProxy server for MQTT over Websocket without TLS - wsProxy := websocket.New(wsConfig, handler, interceptor, logger) + wsProxy := websocket.NewProxy(wsConfig, handler, interceptor, logger) g.Go(func() error { return wsProxy.Listen(ctx) }) @@ -114,7 +114,7 @@ func main() { } // mProxy server for MQTT over Websocket with TLS - wsTLSProxy := websocket.New(wsTLSConfig, handler, interceptor, logger) + wsTLSProxy := websocket.NewProxy(wsTLSConfig, handler, interceptor, logger) g.Go(func() error { return wsTLSProxy.Listen(ctx) }) @@ -126,7 +126,7 @@ func main() { } // mProxy server for MQTT over Websocket with mTLS - wsMTLSProxy := websocket.New(wsMTLSConfig, handler, interceptor, logger) + wsMTLSProxy := websocket.NewProxy(wsMTLSConfig, handler, interceptor, logger) g.Go(func() error { return wsMTLSProxy.Listen(ctx) }) @@ -183,7 +183,7 @@ func main() { } // mProxy server for CoAP without DTLS - coapProxy := coap.New(coapConfig, handler, logger) + coapProxy := coap.NewProxy(coapConfig, handler, logger) g.Go(func() error { return coapProxy.Listen(ctx) }) @@ -195,7 +195,7 @@ func main() { } // mProxy server for CoAP with DTLS - coapDTLSProxy := coap.New(coapDTLSConfig, handler, logger) + coapDTLSProxy := coap.NewProxy(coapDTLSConfig, handler, logger) g.Go(func() error { return coapDTLSProxy.ListenDTLS(ctx) }) diff --git a/config.go b/config.go index 1c923a2..7c810ac 100644 --- a/config.go +++ b/config.go @@ -29,7 +29,7 @@ func NewConfig(opts env.Options) (Config, error) { return Config{}, err } - c.TLSConfig, err = mptls.Load(&cfg) + c.TLSConfig, err = mptls.LoadTLS(&cfg) if err != nil { return Config{}, err } diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go index 0eb27c3..c61cfc5 100644 --- a/pkg/coap/coap.go +++ b/pkg/coap/coap.go @@ -22,16 +22,16 @@ import ( ) type Proxy struct { - config mproxy.Config - event session.Handler - logger *slog.Logger + config mproxy.Config + session session.Handler + logger *slog.Logger } -func New(config mproxy.Config, handler session.Handler, logger *slog.Logger) *Proxy { +func NewProxy(config mproxy.Config, handler session.Handler, logger *slog.Logger) *Proxy { return &Proxy{ - config: config, - event: handler, - logger: logger, + config: config, + session: handler, + logger: logger, } } @@ -134,7 +134,7 @@ func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { return } ctx := session.NewContext(r.Context(), &session.Session{Password: tok}) - if err := p.event.AuthConnect(ctx); err != nil { + if err := p.session.AuthConnect(ctx); err != nil { if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.Unauthorized); err != nil { p.logger.Error(err.Error()) } @@ -154,6 +154,7 @@ func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadRequest); err != nil { p.logger.Error(err.Error()) } + return } p.handleGet(ctx, path, w.Conn(), r.Token(), obs, r) @@ -170,13 +171,13 @@ func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { } func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token []byte, obs uint32, r *mux.Message) { - if err := p.event.AuthSubscribe(ctx, &[]string{path}); err != nil { + if err := p.session.AuthSubscribe(ctx, &[]string{path}); err != nil { if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { p.logger.Error(err.Error()) } return } - if err := p.event.Subscribe(ctx, &[]string{path}); err != nil { + if err := p.session.Subscribe(ctx, &[]string{path}); err != nil { if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { p.logger.Error(err.Error()) } @@ -199,13 +200,13 @@ func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token } func (p *Proxy) handlePost(ctx context.Context, con mux.Conn, body, token []byte, path string, r *mux.Message) { - if err := p.event.AuthPublish(ctx, &path, &body); err != nil { + if err := p.session.AuthPublish(ctx, &path, &body); err != nil { if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { p.logger.Error(err.Error()) } return } - if err := p.event.Publish(ctx, &path, &body); err != nil { + if err := p.session.Publish(ctx, &path, &body); err != nil { if err := sendErrorMessage(con, token, err, codes.BadRequest); err != nil { p.logger.Error(err.Error()) } diff --git a/pkg/mqtt/mqtt.go b/pkg/mqtt/mqtt.go index 1ad2c34..f247ef6 100644 --- a/pkg/mqtt/mqtt.go +++ b/pkg/mqtt/mqtt.go @@ -27,7 +27,7 @@ type Proxy struct { } // New returns a new MQTT Proxy instance. -func New(config mproxy.Config, handler session.Handler, interceptor session.Interceptor, logger *slog.Logger) *Proxy { +func NewProxy(config mproxy.Config, handler session.Handler, interceptor session.Interceptor, logger *slog.Logger) *Proxy { return &Proxy{ config: config, handler: handler, diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go index fc5cd47..e92a326 100644 --- a/pkg/tls/tls.go +++ b/pkg/tls/tls.go @@ -14,109 +14,77 @@ import ( ) var ( - errTLSdetails = errors.New("failed to get TLS details of connection") - errLoadCerts = errors.New("failed to load certificates") - errLoadServerCA = errors.New("failed to load Server CA") - errLoadClientCA = errors.New("failed to load Client CA") - errAppendCA = errors.New("failed to append root ca tls.Config") + errTLSdetails = errors.New("failed to get TLS details of connection") + errLoadCerts = errors.New("failed to load certificates") + errLoadCA = errors.New("failed to load CA file") + errAppendClientCA = errors.New("failed to append client root ca tls.Config") + errAppendServerCA = errors.New("failed to append server root ca tls.Config") ) -// Load return a TLS configuration that can be used in TLS servers. -func Load(c *Config) (*tls.Config, error) { - if c.CertFile == "" || c.KeyFile == "" { +// LoadTLS returns a TLS configuration that can be used in TLS servers. +func LoadTLS(c *Config) (*tls.Config, error) { + certificate, err := loadCertificates(c.CertFile, c.KeyFile) + if err != nil { + return nil, err + } + if certificate == nil { return nil, nil } - tlsConfig := &tls.Config{} - - certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) - if err != nil { - return nil, errors.Join(errLoadCerts, err) - } - tlsConfig = &tls.Config{ - Certificates: []tls.Certificate{certificate}, + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{*certificate}, } // Loading Server CA file - rootCA, err := loadCertFile(c.ServerCAFile) - if err != nil { - return nil, errors.Join(errLoadServerCA, err) - } - if len(rootCA) > 0 { - if tlsConfig.RootCAs == nil { - tlsConfig.RootCAs = x509.NewCertPool() - } - if !tlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { - return nil, errAppendCA - } + if _, err = appendCAs(&tlsConfig.RootCAs, c.ServerCAFile); err != nil { + return nil, errors.Join(errAppendServerCA, err) } // Loading Client CA File - clientCA, err := loadCertFile(c.ClientCAFile) + appended, err := appendCAs(&tlsConfig.ClientCAs, c.ClientCAFile) if err != nil { - return nil, errors.Join(errLoadClientCA, err) + return nil, errors.Join(errAppendClientCA, err) } - if len(clientCA) > 0 { - if tlsConfig.ClientCAs == nil { - tlsConfig.ClientCAs = x509.NewCertPool() - } - if !tlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { - return nil, errAppendCA - } + + if appended { tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - if c.Validator != nil { - tlsConfig.VerifyPeerCertificate = c.Validator - } + } + if c.Validator != nil { + tlsConfig.VerifyPeerCertificate = c.Validator } return tlsConfig, nil } -// Load returns a DTLS configuration that can be used in DTLS servers. +// LoadDTLS returns a DTLS configuration that can be used in DTLS servers. func LoadDTLS(c *Config) (*dtls.Config, error) { - if c.CertFile == "" || c.KeyFile == "" { + certificate, err := loadCertificates(c.CertFile, c.KeyFile) + if err != nil { + return nil, err + } + if certificate == nil { return nil, nil } - dtlsConfig := &dtls.Config{} - - certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) - if err != nil { - return nil, errors.Join(errLoadCerts, err) - } - dtlsConfig = &dtls.Config{ - Certificates: []tls.Certificate{certificate}, + dtlsConfig := &dtls.Config{ + Certificates: []tls.Certificate{*certificate}, } // Loading Server CA file - rootCA, err := loadCertFile(c.ServerCAFile) - if err != nil { - return nil, errors.Join(errLoadServerCA, err) - } - if len(rootCA) > 0 { - if dtlsConfig.RootCAs == nil { - dtlsConfig.RootCAs = x509.NewCertPool() - } - if !dtlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { - return nil, errAppendCA - } + if _, err = appendCAs(&dtlsConfig.RootCAs, c.ServerCAFile); err != nil { + return nil, errors.Join(errAppendServerCA, err) } // Loading Client CA File - clientCA, err := loadCertFile(c.ClientCAFile) + appended, err := appendCAs(&dtlsConfig.ClientCAs, c.ClientCAFile) if err != nil { - return nil, errors.Join(errLoadClientCA, err) + return nil, errors.Join(errAppendClientCA, err) } - if len(clientCA) > 0 { - if dtlsConfig.ClientCAs == nil { - dtlsConfig.ClientCAs = x509.NewCertPool() - } - if !dtlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { - return nil, errAppendCA - } + + if appended { dtlsConfig.ClientAuth = dtls.RequireAndVerifyClientCert - if c.Validator != nil { - dtlsConfig.VerifyPeerCertificate = c.Validator - } + } + if c.Validator != nil { + dtlsConfig.VerifyPeerCertificate = c.Validator } return dtlsConfig, nil } @@ -164,3 +132,31 @@ func loadCertFile(certFile string) ([]byte, error) { } return []byte{}, nil } + +func loadCertificates(certFile, keyFile string) (*tls.Certificate, error) { + if certFile == "" || keyFile == "" { + return nil, nil + } + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, errors.Join(errLoadCerts, err) + } + return &cert, err +} + +func appendCAs(certPool **x509.CertPool, caFile string) (bool, error) { + ca, err := loadCertFile(caFile) + if err != nil { + return false, errors.Join(errLoadCA, err) + } + if len(ca) > 0 { + if *certPool == nil { + *certPool = x509.NewCertPool() + } + if !(*certPool).AppendCertsFromPEM(ca) { + return false, errors.New("failed to append CA certificates") + } + return true, nil + } + return false, nil +} From c8011e6837eaf22d7e3c4e1e23bb42f6614213a6 Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Thu, 25 Apr 2024 17:51:50 +0300 Subject: [PATCH 03/10] refactor: refactor loadtls and loaddtls functions Signed-off-by: 1998-felix --- pkg/tls/tls.go | 134 +++++++++++++++++++++++++------------------------ 1 file changed, 69 insertions(+), 65 deletions(-) diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go index e92a326..71f1052 100644 --- a/pkg/tls/tls.go +++ b/pkg/tls/tls.go @@ -14,77 +14,109 @@ import ( ) var ( - errTLSdetails = errors.New("failed to get TLS details of connection") - errLoadCerts = errors.New("failed to load certificates") - errLoadCA = errors.New("failed to load CA file") - errAppendClientCA = errors.New("failed to append client root ca tls.Config") - errAppendServerCA = errors.New("failed to append server root ca tls.Config") + errTLSdetails = errors.New("failed to get TLS details of connection") + errLoadCerts = errors.New("failed to load certificates") + errLoadServerCA = errors.New("failed to load Server CA") + errLoadClientCA = errors.New("failed to load Client CA") + errAppendCA = errors.New("failed to append root ca tls.Config") ) // LoadTLS returns a TLS configuration that can be used in TLS servers. func LoadTLS(c *Config) (*tls.Config, error) { - certificate, err := loadCertificates(c.CertFile, c.KeyFile) - if err != nil { - return nil, err - } - if certificate == nil { + if c.CertFile == "" || c.KeyFile == "" { return nil, nil } - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{*certificate}, + tlsConfig := &tls.Config{} + + certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) + if err != nil { + return nil, errors.Join(errLoadCerts, err) + } + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{certificate}, } // Loading Server CA file - if _, err = appendCAs(&tlsConfig.RootCAs, c.ServerCAFile); err != nil { - return nil, errors.Join(errAppendServerCA, err) + rootCA, err := loadCertFile(c.ServerCAFile) + if err != nil { + return nil, errors.Join(errLoadServerCA, err) + } + if len(rootCA) > 0 { + if tlsConfig.RootCAs == nil { + tlsConfig.RootCAs = x509.NewCertPool() + } + if !tlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { + return nil, errAppendCA + } } // Loading Client CA File - appended, err := appendCAs(&tlsConfig.ClientCAs, c.ClientCAFile) + clientCA, err := loadCertFile(c.ClientCAFile) if err != nil { - return nil, errors.Join(errAppendClientCA, err) + return nil, errors.Join(errLoadClientCA, err) } - - if appended { + if len(clientCA) > 0 { + if tlsConfig.ClientCAs == nil { + tlsConfig.ClientCAs = x509.NewCertPool() + } + if !tlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { + return nil, errAppendCA + } tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - } - if c.Validator != nil { - tlsConfig.VerifyPeerCertificate = c.Validator + if c.Validator != nil { + tlsConfig.VerifyPeerCertificate = c.Validator + } } return tlsConfig, nil } // LoadDTLS returns a DTLS configuration that can be used in DTLS servers. func LoadDTLS(c *Config) (*dtls.Config, error) { - certificate, err := loadCertificates(c.CertFile, c.KeyFile) - if err != nil { - return nil, err - } - if certificate == nil { + if c.CertFile == "" || c.KeyFile == "" { return nil, nil } - dtlsConfig := &dtls.Config{ - Certificates: []tls.Certificate{*certificate}, + dtlsConfig := &dtls.Config{} + + certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) + if err != nil { + return nil, errors.Join(errLoadCerts, err) + } + dtlsConfig = &dtls.Config{ + Certificates: []tls.Certificate{certificate}, } // Loading Server CA file - if _, err = appendCAs(&dtlsConfig.RootCAs, c.ServerCAFile); err != nil { - return nil, errors.Join(errAppendServerCA, err) + rootCA, err := loadCertFile(c.ServerCAFile) + if err != nil { + return nil, errors.Join(errLoadServerCA, err) + } + if len(rootCA) > 0 { + if dtlsConfig.RootCAs == nil { + dtlsConfig.RootCAs = x509.NewCertPool() + } + if !dtlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { + return nil, errAppendCA + } } // Loading Client CA File - appended, err := appendCAs(&dtlsConfig.ClientCAs, c.ClientCAFile) + clientCA, err := loadCertFile(c.ClientCAFile) if err != nil { - return nil, errors.Join(errAppendClientCA, err) + return nil, errors.Join(errLoadClientCA, err) } - - if appended { + if len(clientCA) > 0 { + if dtlsConfig.ClientCAs == nil { + dtlsConfig.ClientCAs = x509.NewCertPool() + } + if !dtlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { + return nil, errAppendCA + } dtlsConfig.ClientAuth = dtls.RequireAndVerifyClientCert - } - if c.Validator != nil { - dtlsConfig.VerifyPeerCertificate = c.Validator + if c.Validator != nil { + dtlsConfig.VerifyPeerCertificate = c.Validator + } } return dtlsConfig, nil } @@ -132,31 +164,3 @@ func loadCertFile(certFile string) ([]byte, error) { } return []byte{}, nil } - -func loadCertificates(certFile, keyFile string) (*tls.Certificate, error) { - if certFile == "" || keyFile == "" { - return nil, nil - } - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, errors.Join(errLoadCerts, err) - } - return &cert, err -} - -func appendCAs(certPool **x509.CertPool, caFile string) (bool, error) { - ca, err := loadCertFile(caFile) - if err != nil { - return false, errors.Join(errLoadCA, err) - } - if len(ca) > 0 { - if *certPool == nil { - *certPool = x509.NewCertPool() - } - if !(*certPool).AppendCertsFromPEM(ca) { - return false, errors.New("failed to append CA certificates") - } - return true, nil - } - return false, nil -} From 79bc3c52c04f6ed964225025d1e56c1fd93d2e80 Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Mon, 29 Apr 2024 05:47:39 +0300 Subject: [PATCH 04/10] refactor: refactor load tls and dtls using generics Signed-off-by: 1998-felix --- config.go | 6 +-- pkg/tls/tls.go | 134 ++++++++++++++++++++++--------------------------- 2 files changed, 61 insertions(+), 79 deletions(-) diff --git a/config.go b/config.go index 7c810ac..e0c0110 100644 --- a/config.go +++ b/config.go @@ -28,13 +28,11 @@ func NewConfig(opts env.Options) (Config, error) { if err != nil { return Config{}, err } - - c.TLSConfig, err = mptls.LoadTLS(&cfg) + c.TLSConfig, err = mptls.LoadSecConfig(&cfg, &tls.Config{}) if err != nil { return Config{}, err } - - c.DTLSConfig, err = mptls.LoadDTLS(&cfg) + c.DTLSConfig, err = mptls.LoadSecConfig(&cfg, &dtls.Config{}) if err != nil { return Config{}, err } diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go index 71f1052..1826227 100644 --- a/pkg/tls/tls.go +++ b/pkg/tls/tls.go @@ -14,111 +14,95 @@ import ( ) var ( - errTLSdetails = errors.New("failed to get TLS details of connection") - errLoadCerts = errors.New("failed to load certificates") - errLoadServerCA = errors.New("failed to load Server CA") - errLoadClientCA = errors.New("failed to load Client CA") - errAppendCA = errors.New("failed to append root ca tls.Config") + errTLSdetails = errors.New("failed to get TLS details of connection") + errLoadCerts = errors.New("failed to load certificates") + errLoadServerCA = errors.New("failed to load Server CA") + errLoadClientCA = errors.New("failed to load Client CA") + errAppendCA = errors.New("failed to append root ca tls.Config") + errUnsupportedSec = errors.New("unsupported security configuration") ) -// LoadTLS returns a TLS configuration that can be used in TLS servers. -func LoadTLS(c *Config) (*tls.Config, error) { +type SecConfig interface { + *tls.Config | *dtls.Config +} + +// LoadSecConfig returns a TLS or DTLS configuration that can be used for TLS or DTLS servers. +func LoadSecConfig[sc SecConfig](c *Config, s sc) (sc, error) { if c.CertFile == "" || c.KeyFile == "" { return nil, nil } - tlsConfig := &tls.Config{} - certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) if err != nil { return nil, errors.Join(errLoadCerts, err) } - tlsConfig = &tls.Config{ - Certificates: []tls.Certificate{certificate}, - } // Loading Server CA file rootCA, err := loadCertFile(c.ServerCAFile) if err != nil { return nil, errors.Join(errLoadServerCA, err) } - if len(rootCA) > 0 { - if tlsConfig.RootCAs == nil { - tlsConfig.RootCAs = x509.NewCertPool() - } - if !tlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { - return nil, errAppendCA - } - } // Loading Client CA File clientCA, err := loadCertFile(c.ClientCAFile) if err != nil { return nil, errors.Join(errLoadClientCA, err) } - if len(clientCA) > 0 { - if tlsConfig.ClientCAs == nil { - tlsConfig.ClientCAs = x509.NewCertPool() - } - if !tlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { - return nil, errAppendCA - } - tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - if c.Validator != nil { - tlsConfig.VerifyPeerCertificate = c.Validator - } - } - return tlsConfig, nil -} -// LoadDTLS returns a DTLS configuration that can be used in DTLS servers. -func LoadDTLS(c *Config) (*dtls.Config, error) { - if c.CertFile == "" || c.KeyFile == "" { - return nil, nil - } - - dtlsConfig := &dtls.Config{} - - certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) - if err != nil { - return nil, errors.Join(errLoadCerts, err) - } - dtlsConfig = &dtls.Config{ - Certificates: []tls.Certificate{certificate}, - } + switch config := any(s).(type) { + case *tls.Config: + config.Certificates = []tls.Certificate{certificate} - // Loading Server CA file - rootCA, err := loadCertFile(c.ServerCAFile) - if err != nil { - return nil, errors.Join(errLoadServerCA, err) - } - if len(rootCA) > 0 { - if dtlsConfig.RootCAs == nil { - dtlsConfig.RootCAs = x509.NewCertPool() + if len(rootCA) > 0 { + if config.RootCAs == nil { + config.RootCAs = x509.NewCertPool() + } + if !config.RootCAs.AppendCertsFromPEM(rootCA) { + return nil, errAppendCA + } } - if !dtlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { - return nil, errAppendCA - } - } - // Loading Client CA File - clientCA, err := loadCertFile(c.ClientCAFile) - if err != nil { - return nil, errors.Join(errLoadClientCA, err) - } - if len(clientCA) > 0 { - if dtlsConfig.ClientCAs == nil { - dtlsConfig.ClientCAs = x509.NewCertPool() + if len(clientCA) > 0 { + if config.ClientCAs == nil { + config.ClientCAs = x509.NewCertPool() + } + if !config.ClientCAs.AppendCertsFromPEM(clientCA) { + return nil, errAppendCA + } + config.ClientAuth = tls.RequireAndVerifyClientCert + if c.Validator != nil { + config.VerifyPeerCertificate = c.Validator + } } - if !dtlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { - return nil, errAppendCA + return s, nil + case *dtls.Config: + config.Certificates = []tls.Certificate{certificate} + + if len(rootCA) > 0 { + if config.RootCAs == nil { + config.RootCAs = x509.NewCertPool() + } + if !config.RootCAs.AppendCertsFromPEM(rootCA) { + return nil, errAppendCA + } } - dtlsConfig.ClientAuth = dtls.RequireAndVerifyClientCert - if c.Validator != nil { - dtlsConfig.VerifyPeerCertificate = c.Validator + + if len(clientCA) > 0 { + if config.ClientCAs == nil { + config.ClientCAs = x509.NewCertPool() + } + if !config.ClientCAs.AppendCertsFromPEM(clientCA) { + return nil, errAppendCA + } + config.ClientAuth = dtls.RequireAndVerifyClientCert + if c.Validator != nil { + config.VerifyPeerCertificate = c.Validator + } } + return s, nil + default: + return nil, errUnsupportedSec } - return dtlsConfig, nil } // ClientCert returns client certificate. From 247100b960b3e70fcc4e52c6076db34aa8702408 Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Tue, 30 Apr 2024 15:57:49 +0300 Subject: [PATCH 05/10] refactor: rebase Signed-off-by: 1998-felix --- config.go | 1 + go.mod | 14 +++++++++++- go.sum | 38 +++++++++++++++++++++++++++++++++ pkg/mqtt/websocket/websocket.go | 4 ++-- 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/config.go b/config.go index e0c0110..fa1cb6c 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,7 @@ import ( mptls "github.com/absmach/mproxy/pkg/tls" "github.com/caarlos0/env/v11" + "github.com/pion/dtls/v2" ) type Config struct { diff --git a/go.mod b/go.mod index a6b3b3e..b1ffb23 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,20 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 github.com/joho/godotenv v1.5.1 + github.com/pion/dtls/v2 v2.2.8-0.20240201071732-2597464081c8 + github.com/plgd-dev/go-coap/v3 v3.3.3 golang.org/x/crypto v0.22.0 golang.org/x/sync v0.7.0 ) -require golang.org/x/net v0.24.0 // indirect +require ( + github.com/dsnet/golib/memfile v1.0.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/pion/logging v0.2.2 // indirect + github.com/pion/transport/v3 v3.0.1 // indirect + go.uber.org/atomic v1.11.0 // indirect + golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.19.0 // indirect +) diff --git a/go.sum b/go.sum index 05385b0..9630ae3 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,10 @@ github.com/caarlos0/env/v11 v11.0.0 h1:ZIlkOjuL3xoZS0kmUJlF74j2Qj8GMOq3CDLX/Viak8Q= github.com/caarlos0/env/v11 v11.0.0/go.mod h1:2RC3HQu8BQqtEK3V4iHPxj0jOdWdbPpWJ6pOueeU1xM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dsnet/golib/memfile v1.0.0 h1:J9pUspY2bDCbF9o+YGwcf3uG6MdyITfh/Fk3/CaEiFs= +github.com/dsnet/golib/memfile v1.0.0/go.mod h1:tXGNW9q3RwvWt1VV2qrRKlSSz0npnh12yftCSCy2T64= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -19,3 +24,36 @@ golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/mqtt/websocket/websocket.go b/pkg/mqtt/websocket/websocket.go index 14150f2..037d216 100644 --- a/pkg/mqtt/websocket/websocket.go +++ b/pkg/mqtt/websocket/websocket.go @@ -28,8 +28,8 @@ type Proxy struct { logger *slog.Logger } -// New - creates new WS proxy. -func New(config mproxy.Config, handler session.Handler, interceptor session.Interceptor, logger *slog.Logger) *Proxy { +// NewProxy - creates new WS proxy. +func NewProxy(config mproxy.Config, handler session.Handler, interceptor session.Interceptor, logger *slog.Logger) *Proxy { return &Proxy{ config: config, handler: handler, From 46c672cf19a5910a40a114576a7b442f4d02dffe Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Thu, 2 May 2024 12:55:19 +0300 Subject: [PATCH 06/10] refactor: merge listen and listendtls Signed-off-by: 1998-felix --- cmd/main.go | 2 +- config.go | 4 +-- pkg/coap/coap.go | 88 +++++++++++++++++++++++++----------------------- pkg/tls/tls.go | 10 +++--- 4 files changed, 54 insertions(+), 50 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 0ddf894..d37505f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -197,7 +197,7 @@ func main() { // mProxy server for CoAP with DTLS coapDTLSProxy := coap.NewProxy(coapDTLSConfig, handler, logger) g.Go(func() error { - return coapDTLSProxy.ListenDTLS(ctx) + return coapDTLSProxy.Listen(ctx) }) g.Go(func() error { diff --git a/config.go b/config.go index fa1cb6c..232869d 100644 --- a/config.go +++ b/config.go @@ -29,11 +29,11 @@ func NewConfig(opts env.Options) (Config, error) { if err != nil { return Config{}, err } - c.TLSConfig, err = mptls.LoadSecConfig(&cfg, &tls.Config{}) + c.TLSConfig, err = mptls.LoadTLSConfig(&cfg, &tls.Config{}) if err != nil { return Config{}, err } - c.DTLSConfig, err = mptls.LoadSecConfig(&cfg, &dtls.Config{}) + c.DTLSConfig, err = mptls.LoadTLSConfig(&cfg, &dtls.Config{}) if err != nil { return Config{}, err } diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go index c61cfc5..88c7159 100644 --- a/pkg/coap/coap.go +++ b/pkg/coap/coap.go @@ -6,6 +6,7 @@ package coap import ( "bytes" "context" + "errors" "fmt" "log/slog" @@ -222,53 +223,56 @@ func (p *Proxy) handlePost(ctx context.Context, con mux.Conn, body, token []byte } func (p *Proxy) Listen(ctx context.Context) error { - l, err := net.NewListenUDP("udp", p.config.Address) - if err != nil { - return err - } - defer l.Close() - - p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) - s := udp.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + switch { + case p.config.DTLSConfig == nil: + l, err := net.NewListenUDP("udp", p.config.Address) + if err != nil { + return err + } + defer l.Close() - errCh := make(chan error) - go func() { - errCh <- s.Serve(l) - }() + p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) + s := udp.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) - select { - case <-ctx.Done(): - p.logger.Info(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting ...", p.config.Address)) - l.Close() - case err := <-errCh: - p.logger.Error(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting with errors: %s", p.config.Address, err.Error())) - return err - } - return nil -} + errCh := make(chan error) + go func() { + errCh <- s.Serve(l) + }() -func (p *Proxy) ListenDTLS(ctx context.Context) error { - l, err := net.NewDTLSListener("udp", p.config.Address, p.config.DTLSConfig) - if err != nil { - return err - } - defer l.Close() + select { + case <-ctx.Done(): + p.logger.Info(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting ...", p.config.Address)) + l.Close() + case err := <-errCh: + p.logger.Error(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting with errors: %s", p.config.Address, err.Error())) + return err + } + return nil + case p.config.DTLSConfig != nil: + l, err := net.NewDTLSListener("udp", p.config.Address, p.config.DTLSConfig) + if err != nil { + return err + } + defer l.Close() - p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s with DTLS", p.config.Address)) - s := dtls.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s with DTLS", p.config.Address)) + s := dtls.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) - errCh := make(chan error) - go func() { - errCh <- s.Serve(l) - }() + errCh := make(chan error) + go func() { + errCh <- s.Serve(l) + }() - select { - case <-ctx.Done(): - p.logger.Info(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting ...", p.config.Address)) - l.Close() - case err := <-errCh: - p.logger.Error(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting with errors: %s", p.config.Address, err.Error())) - return err + select { + case <-ctx.Done(): + p.logger.Info(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting ...", p.config.Address)) + l.Close() + case err := <-errCh: + p.logger.Error(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting with errors: %s", p.config.Address, err.Error())) + return err + } + return nil + default: + return errors.New("unsupported CoAP configuration") } - return nil } diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go index 1826227..b930ccd 100644 --- a/pkg/tls/tls.go +++ b/pkg/tls/tls.go @@ -19,15 +19,15 @@ var ( errLoadServerCA = errors.New("failed to load Server CA") errLoadClientCA = errors.New("failed to load Client CA") errAppendCA = errors.New("failed to append root ca tls.Config") - errUnsupportedSec = errors.New("unsupported security configuration") + errUnsupportedTLS = errors.New("unsupported tls configuration") ) -type SecConfig interface { +type TLSConfig interface { *tls.Config | *dtls.Config } -// LoadSecConfig returns a TLS or DTLS configuration that can be used for TLS or DTLS servers. -func LoadSecConfig[sc SecConfig](c *Config, s sc) (sc, error) { +// LoadTLSConfig returns a TLS or DTLS configuration that can be used for TLS or DTLS servers. +func LoadTLSConfig[sc TLSConfig](c *Config, s sc) (sc, error) { if c.CertFile == "" || c.KeyFile == "" { return nil, nil } @@ -101,7 +101,7 @@ func LoadSecConfig[sc SecConfig](c *Config, s sc) (sc, error) { } return s, nil default: - return nil, errUnsupportedSec + return nil, errUnsupportedTLS } } From 78a0d6f5d0016ad3c0225dc2db7ac8f3ada56e42 Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Thu, 9 May 2024 19:13:38 +0300 Subject: [PATCH 07/10] feat: add examples for coap client, update readme Signed-off-by: 1998-felix --- README.md | 27 +++++++++++++++++++++++++- examples/client/coap/with_dtls.sh | 25 ++++++++++++++++++++++++ examples/client/coap/without_dtls.sh | 14 ++++++++++++++ pkg/coap/coap.go | 29 ++++++++++++++++++++-------- 4 files changed, 86 insertions(+), 9 deletions(-) create mode 100755 examples/client/coap/with_dtls.sh create mode 100755 examples/client/coap/without_dtls.sh diff --git a/README.md b/README.md index 6bcd8bd..858fcbb 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,7 @@ mProxy is used to proxy requests to a backend server. For the example setup, we - mProxy server for `HTTP protocol without TLS` on port `8086` with prefix path `/messages` - mProxy server for `HTTP protocol with TLS` on port `8087` with prefix path `/messages` - mProxy server for `HTTP protocol with mTLS` on port `8088` with prefix path `/messages` - - mProxy server for `COAP protocol without DTLS` on port `5682` + - mProxy server for `COAP protocol without DTLS` on port `5682` - mProxy server for `COAP protocol with DTLS` on port `5684` ### Example testing of mProxy @@ -193,6 +193,23 @@ Bash scripts available in `examples/client/http` directory help to test the mPro examples/client/http/with_mtls.sh ``` +### Test mProxy server for CoAP protocols + +Bash scripts available in `example/client/coap` directory help to test the mProxy servers running for CoAP protocols. +This scripts can be used after changing the `MPROXY_COAP_WITHOUT_DTLS_TARGET` and `MPROXY_COAP_WITH_DTLS_TARGET` to a public coap server such as `coap://coap.me:5683` + +- Script to test mProxy server running at 5682 for CoAP without DTLS + + ```bash + examples/client/coap/without_dtls.sh + ``` + +- Script to test mProxy server running at 5684 for CoAP with DTLS + + ```bash + examples/client/coap/without_dtls.sh + ``` + ## Configuration The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. @@ -248,6 +265,14 @@ The service is configured using the environment variables presented in the follo | MPROXY_HTTP_WITH_MTLS_CLIENT_CA_FILE | HTTP with mTLS client CA file path | ssl/certs/ca.crt | | MPROXY_HTTP_WITH_MTLS_CERT_VERIFICATION_METHODS | HTTP with mTLS certificate verification methods, if no value or unset then mProxy server will not do client validation | ocsp | | MPROXY_HTTP_WITH_MTLS_OCSP_RESPONDER_URL | HTTP with mTLS OCSP responder URL, it is used if OCSP responder URL is not available in client certificate AIA | | +| MPROXY_COAP_WITHOUT_DTLS_ADDRESS | CoAP without DTLS inbound (IN) connection listening address | localhost:5682 | +| MPROXY_COAP_WITH_DTLS_TARGET | CoAP without DTLS outbound (OUT) connection | localhost:5683 | +| MPROXY_COAP_WITH_DTLS_ADDRESS | CoAP with DTLS inbound (IN) connection listening address | localhost:5684 | +| MPROXY_COAP_WITH_DTLS_TARGET | CoAP with DTLS outbound (OUT) connection | localhost:5683 | +| MPROXY_COAP_WITH_DTLS_CERT_FILE | CoAP with DTLS certificate file | ssl/certs/server.crt | +| MPROXY_COAP_WITH_DTLS_KEY_FILE | CoAP with DTLS key file | ssl/certs/server.key | +| MPROXY_COAP_WITH_DTLS_SERVER_CA_FILE | CoAP with DTLS server CA file | ssl/certs/ca.crt | +| MPROXY_COAP_WITH_DTLS_CLIENT_CA_FILE | CoAP with DTLS client CA file | ssl/certs/ca.crt | ## mProxy Configuration Environment Variables diff --git a/examples/client/coap/with_dtls.sh b/examples/client/coap/with_dtls.sh new file mode 100755 index 0000000..3e6cbee --- /dev/null +++ b/examples/client/coap/with_dtls.sh @@ -0,0 +1,25 @@ +#!/bin/bash +protocol=coaps +host=localhost +port=5684 +path="test" +content=0x32 +message="{\"message\": \"Hello mProxy\"}" +auth="TOKEN" +cafile=ssl/certs/ca.crt +certfile=ssl/certs/client.crt +keyfile=ssl/certs/client.key + +echo "Posting message to ${protocol}://${host}:${port}/${path} with dtls ..." +coap-client -m post coap://${host}:${port}/${path} -e "${message}" -O 12,${content} -O 15,auth=${auth} \ + -c $certfile -k $keyfile -C $cafile + +echo "Getting message from ${protocol}://${host}:${port}/${path} with dtls ..." +coap-client -m get coap://${host}:${port}/${path} -O 6,0x00 -O 15,auth=${auth} -c $certfile -k $keyfile -C $cafile + +echo "Posting message to ${protocol}://${host}:${port}/${path} with dtls and invalid client certificate..." +coap-client -m post coap://${host}:${port}/${path} -e "${message}" -O 12,${content} -O 15,auth=${auth} \ + -c ssl/certs/client_unknown.crt -k ssl/certs/client_unknown.key -C $cafile + +echo "Getting message from ${protocol}://${host}:${port}/${path} with dtls and invalid client certificate..." +coap-client -m get coap://${host}:${port}/${path} -O 6,0x00 -O 15,auth=${auth} -c ssl/certs/client_unknown.crt -k ssl/certs/client_unknown.key -C $cafile diff --git a/examples/client/coap/without_dtls.sh b/examples/client/coap/without_dtls.sh new file mode 100755 index 0000000..add609a --- /dev/null +++ b/examples/client/coap/without_dtls.sh @@ -0,0 +1,14 @@ +#!/bin/bash +protocol=coap +host=localhost +port=5682 +path="test" +content=0x32 +message="{\"message\": \"Hello mProxy\"}" +auth="TOKEN" + +echo "Posting message to ${protocol}://${host}:${port}/${path} without tls ..." +coap-client -m post coap://${host}:${port}/${path} -e "${message}" -O 12,${content} -O 15,auth=${auth} + +echo "Getting message from ${protocol}://${host}:${port}/${path} without tls ..." +coap-client -m get coap://${host}:${port}/${path} -O 6,0x00 -O 15,auth=${auth} - diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go index 88c7159..7964900 100644 --- a/pkg/coap/coap.go +++ b/pkg/coap/coap.go @@ -22,6 +22,11 @@ import ( "github.com/plgd-dev/go-coap/v3/udp" ) +var ( + errUnsupportedConfig = errors.New("unsupported CoAP configuration") + errUnsupportedMethod = errors.New("unsupported CoAP method") +) + type Proxy struct { config mproxy.Config session session.Handler @@ -47,15 +52,19 @@ func sendErrorMessage(cc mux.Conn, token []byte, err error, code codes.Code) err } func (p *Proxy) postUpstream(cc mux.Conn, req *mux.Message, token []byte) error { - format, err := req.ContentFormat() - if err != nil { - return err - } path, err := req.Options().Path() if err != nil { return err } + format := message.TextPlain + if req.HasOption(message.ContentFormat) { + format, err = req.ContentFormat() + if err != nil { + return err + } + } + targetConn, err := udp.Dial(p.config.Target) if err != nil { return err @@ -92,7 +101,7 @@ func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message targetConn, err := udp.Dial(p.config.Target) if err != nil { if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { - p.logger.Error("cannot send error response: %v", err) + p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) } } defer targetConn.Close() @@ -112,14 +121,14 @@ func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message }, opts...) if err != nil { if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { - p.logger.Error("cannot send error response: %v", err) + p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) } } select { case <-doneObserving: if err := obs.Cancel(ctx); err != nil { - p.logger.Error("failed to cancel observation: %v", err) + p.logger.Error(fmt.Sprintf("failed to cancel observation:%v", err)) } case <-ctx.Done(): return @@ -168,6 +177,10 @@ func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { return } p.handlePost(ctx, w.Conn(), body, r.Token(), path, r) + default: + if err := sendErrorMessage(w.Conn(), r.Token(), errUnsupportedMethod, codes.MethodNotAllowed); err != nil { + p.logger.Error(err.Error()) + } } } @@ -273,6 +286,6 @@ func (p *Proxy) Listen(ctx context.Context) error { } return nil default: - return errors.New("unsupported CoAP configuration") + return errUnsupportedConfig } } From 6a9c7e3cc4cb4fb7c4209945f245e705e836c37e Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Thu, 23 May 2024 17:49:22 +0300 Subject: [PATCH 08/10] feat: add server for coap example Signed-off-by: 1998-felix --- README.md | 7 ++- examples/client/coap/without_dtls.sh | 10 +++- examples/server/coap/main.go | 37 ++++++++++++ pkg/coap/coap.go | 86 +++++++++++++--------------- 4 files changed, 90 insertions(+), 50 deletions(-) create mode 100644 examples/server/coap/main.go diff --git a/README.md b/README.md index 858fcbb..abd6037 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ LB tasks can be offloaded to a standard ingress proxy - for example, NginX. - Golang - Mosquitto MQTT Server - Mosquitto Publisher and Subscriber Client +- coap-client or Magistrala coap-cli ### Example Setup of mProxy @@ -195,8 +196,8 @@ Bash scripts available in `examples/client/http` directory help to test the mPro ### Test mProxy server for CoAP protocols -Bash scripts available in `example/client/coap` directory help to test the mProxy servers running for CoAP protocols. -This scripts can be used after changing the `MPROXY_COAP_WITHOUT_DTLS_TARGET` and `MPROXY_COAP_WITH_DTLS_TARGET` to a public coap server such as `coap://coap.me:5683` +Bash scripts available in `example/client/coap` directory help to test the mProxy servers running for CoAP protocols. You will require to have either the [coap-client](https://libcoap.net/doc/reference/4.3.1/man_coap-client.html) or the [Magistrala coap-cli](https://github.com/absmach/coap-cli). +The script can be used alongside the simple go-coap server provided at `example/server/coap`. - Script to test mProxy server running at 5682 for CoAP without DTLS @@ -207,7 +208,7 @@ This scripts can be used after changing the `MPROXY_COAP_WITHOUT_DTLS_TARGET` an - Script to test mProxy server running at 5684 for CoAP with DTLS ```bash - examples/client/coap/without_dtls.sh + examples/client/coap/with_dtls.sh ``` ## Configuration diff --git a/examples/client/coap/without_dtls.sh b/examples/client/coap/without_dtls.sh index add609a..a2a477f 100755 --- a/examples/client/coap/without_dtls.sh +++ b/examples/client/coap/without_dtls.sh @@ -7,8 +7,16 @@ content=0x32 message="{\"message\": \"Hello mProxy\"}" auth="TOKEN" +#Examples using lib-coap coap-client echo "Posting message to ${protocol}://${host}:${port}/${path} without tls ..." coap-client -m post coap://${host}:${port}/${path} -e "${message}" -O 12,${content} -O 15,auth=${auth} echo "Getting message from ${protocol}://${host}:${port}/${path} without tls ..." -coap-client -m get coap://${host}:${port}/${path} -O 6,0x00 -O 15,auth=${auth} - +coap-client -m get coap://${host}:${port}/${path} -O 6,0x00 -O 15,auth=${auth} + +#Examples using Magisrala coap-cli +echo "Posting message to ${protocol}://${host}:${port}/${path} without tls ..." +coap-cli post ${host}:${port}/${path} -d "${message}" -O 12,${content} -O 15,auth=${auth} + +echo "Getting message from ${protocol}://${host}:${port}/${path} without tls ..." +coap-cli get ${host}:${port}/${path} -O 6,0x00 -O 15,auth=${auth} diff --git a/examples/server/coap/main.go b/examples/server/coap/main.go new file mode 100644 index 0000000..5a95d10 --- /dev/null +++ b/examples/server/coap/main.go @@ -0,0 +1,37 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "fmt" + "log" + "strings" + + coap "github.com/plgd-dev/go-coap/v3" + "github.com/plgd-dev/go-coap/v3/message" + "github.com/plgd-dev/go-coap/v3/message/codes" + "github.com/plgd-dev/go-coap/v3/mux" +) + +const defaultPort = "5683" + +func handleRequest(w mux.ResponseWriter, r *mux.Message) { + resp := w.Conn().AcquireMessage(r.Context()) + defer w.Conn().ReleaseMessage(resp) + resp.SetCode(codes.Content) + resp.SetToken(r.Token()) + resp.SetContentFormat(message.TextPlain) + resp.SetBody(strings.NewReader(fmt.Sprintf("%v OK", r.Code()))) + err := w.Conn().WriteMessage(resp) + if err != nil { + log.Printf("Cannot send response: %v", err) + } +} + +func main() { + r := mux.NewRouter() + r.DefaultHandle(mux.HandlerFunc(handleRequest)) + log.Println("starting coap server, listening on port " + defaultPort) + log.Fatal(coap.ListenAndServe("udp", ":"+defaultPort, r)) +} diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go index 7964900..ba778ef 100644 --- a/pkg/coap/coap.go +++ b/pkg/coap/coap.go @@ -22,10 +22,7 @@ import ( "github.com/plgd-dev/go-coap/v3/udp" ) -var ( - errUnsupportedConfig = errors.New("unsupported CoAP configuration") - errUnsupportedMethod = errors.New("unsupported CoAP method") -) +var errUnsupportedMethod = errors.New("unsupported CoAP method") type Proxy struct { config mproxy.Config @@ -52,6 +49,12 @@ func sendErrorMessage(cc mux.Conn, token []byte, err error, code codes.Code) err } func (p *Proxy) postUpstream(cc mux.Conn, req *mux.Message, token []byte) error { + outbound, err := udp.Dial(p.config.Target) + if err != nil { + return err + } + defer outbound.Close() + path, err := req.Options().Path() if err != nil { return err @@ -65,12 +68,7 @@ func (p *Proxy) postUpstream(cc mux.Conn, req *mux.Message, token []byte) error } } - targetConn, err := udp.Dial(p.config.Target) - if err != nil { - return err - } - defer targetConn.Close() - pm, err := targetConn.Post(cc.Context(), path, format, req.Body(), req.Options()...) + pm, err := outbound.Post(cc.Context(), path, format, req.Body(), req.Options()...) if err != nil { return err } @@ -84,12 +82,12 @@ func (p *Proxy) getUpstream(cc mux.Conn, req *mux.Message, token []byte) error { return err } - targetConn, err := udp.Dial(p.config.Target) + outbound, err := udp.Dial(p.config.Target) if err != nil { return err } - defer targetConn.Close() - pm, err := targetConn.Get(cc.Context(), path, req.Options()...) + defer outbound.Close() + pm, err := outbound.Get(cc.Context(), path, req.Options()...) if err != nil { return err } @@ -98,16 +96,16 @@ func (p *Proxy) getUpstream(cc mux.Conn, req *mux.Message, token []byte) error { } func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message.Option, token []byte, path string) { - targetConn, err := udp.Dial(p.config.Target) + outbound, err := udp.Dial(p.config.Target) if err != nil { if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) } } - defer targetConn.Close() + defer outbound.Close() doneObserving := make(chan struct{}) - obs, err := targetConn.Observe(ctx, path, func(req *pool.Message) { + obs, err := outbound.Observe(ctx, path, func(req *pool.Message) { req.SetToken(token) if err := cc.WriteMessage(req); err != nil { if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { @@ -236,16 +234,15 @@ func (p *Proxy) handlePost(ctx context.Context, con mux.Conn, body, token []byte } func (p *Proxy) Listen(ctx context.Context) error { - switch { - case p.config.DTLSConfig == nil: - l, err := net.NewListenUDP("udp", p.config.Address) + if p.config.DTLSConfig != nil { + l, err := net.NewDTLSListener("udp", p.config.Address, p.config.DTLSConfig) if err != nil { return err } defer l.Close() - p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) - s := udp.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + p.logger.Info(fmt.Sprintf("CoAP proxy server started on port %s with DTLS", p.config.Address)) + s := dtls.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) errCh := make(chan error) go func() { @@ -254,38 +251,35 @@ func (p *Proxy) Listen(ctx context.Context) error { select { case <-ctx.Done(): - p.logger.Info(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting ...", p.config.Address)) + p.logger.Info(fmt.Sprintf("CoAP proxy server on port %s with DTLS exiting ...", p.config.Address)) l.Close() case err := <-errCh: - p.logger.Error(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting with errors: %s", p.config.Address, err.Error())) + p.logger.Error(fmt.Sprintf("CoAP proxy server on port %s with DTLS exiting with errors: %s", p.config.Address, err.Error())) return err } return nil - case p.config.DTLSConfig != nil: - l, err := net.NewDTLSListener("udp", p.config.Address, p.config.DTLSConfig) - if err != nil { - return err - } - defer l.Close() + } + l, err := net.NewListenUDP("udp", p.config.Address) + if err != nil { + return err + } + defer l.Close() - p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s with DTLS", p.config.Address)) - s := dtls.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) + s := udp.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) - errCh := make(chan error) - go func() { - errCh <- s.Serve(l) - }() + errCh := make(chan error) + go func() { + errCh <- s.Serve(l) + }() - select { - case <-ctx.Done(): - p.logger.Info(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting ...", p.config.Address)) - l.Close() - case err := <-errCh: - p.logger.Error(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting with errors: %s", p.config.Address, err.Error())) - return err - } - return nil - default: - return errUnsupportedConfig + select { + case <-ctx.Done(): + p.logger.Info(fmt.Sprintf("CoAP proxy server on port %s without DTLS exiting ...", p.config.Address)) + l.Close() + case err := <-errCh: + p.logger.Error(fmt.Sprintf("CoAP proxy server on port %s without DTLS exiting with errors: %s", p.config.Address, err.Error())) + return err } + return nil } From 7f0cbb28925bf6ad6005b981c44e778c604b7625 Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Wed, 29 May 2024 10:50:34 +0300 Subject: [PATCH 09/10] feat: add cancel observation Signed-off-by: 1998-felix --- pkg/coap/coap.go | 120 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 103 insertions(+), 17 deletions(-) diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go index ba778ef..3569d5d 100644 --- a/pkg/coap/coap.go +++ b/pkg/coap/coap.go @@ -13,6 +13,7 @@ import ( "github.com/absmach/mproxy" "github.com/absmach/mproxy/pkg/session" "github.com/plgd-dev/go-coap/v3/dtls" + dtlsServer "github.com/plgd-dev/go-coap/v3/dtls/server" "github.com/plgd-dev/go-coap/v3/message" "github.com/plgd-dev/go-coap/v3/message/codes" "github.com/plgd-dev/go-coap/v3/message/pool" @@ -20,8 +21,11 @@ import ( "github.com/plgd-dev/go-coap/v3/net" "github.com/plgd-dev/go-coap/v3/options" "github.com/plgd-dev/go-coap/v3/udp" + udpServer "github.com/plgd-dev/go-coap/v3/udp/server" ) +const startObserve uint32 = 0 + var errUnsupportedMethod = errors.New("unsupported CoAP method") type Proxy struct { @@ -30,6 +34,30 @@ type Proxy struct { logger *slog.Logger } +type udpNilMonitor struct{} + +func (u *udpNilMonitor) UDPServerApply(cfg *udpServer.Config) { + cfg.CreateInactivityMonitor = nil +} + +func NewUDPNilMonitor() udpServer.Option { + return &udpNilMonitor{} +} + +var _ udpServer.Option = (*udpNilMonitor)(nil) + +type dtlsNilMonitor struct{} + +func (d *dtlsNilMonitor) DTLSServerApply(cfg *dtlsServer.Config) { + cfg.CreateInactivityMonitor = nil +} + +func NewDTLSNilMonitor() dtlsServer.Option { + return &dtlsNilMonitor{} +} + +var _ udpServer.Option = (*udpNilMonitor)(nil) + func NewProxy(config mproxy.Config, handler session.Handler, logger *slog.Logger) *Proxy { return &Proxy{ config: config, @@ -105,7 +133,21 @@ func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message defer outbound.Close() doneObserving := make(chan struct{}) - obs, err := outbound.Observe(ctx, path, func(req *pool.Message) { + pm := outbound.AcquireMessage(outbound.Context()) + defer outbound.ReleaseMessage(pm) + pm.SetToken(token) + pm.SetCode(codes.GET) + for _, opt := range opts { + pm.SetOptionBytes(opt.ID, opt.Value) + } + if err := pm.SetPath(path); err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadOption); err != nil { + p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) + } + return + } + + obs, err := outbound.DoObserve(pm, func(req *pool.Message) { req.SetToken(token) if err := cc.WriteMessage(req); err != nil { if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { @@ -116,7 +158,7 @@ func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message if req.Code() == codes.NotFound { close(doneObserving) } - }, opts...) + }) if err != nil { if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) @@ -133,6 +175,35 @@ func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message } } +func (p *Proxy) CancelObservation(cc mux.Conn, opts []message.Option, token []byte, path string) error { + outbound, err := udp.Dial(p.config.Target) + if err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { + p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) + } + } + defer outbound.Close() + + pm := outbound.AcquireMessage(outbound.Context()) + defer outbound.ReleaseMessage(pm) + pm.SetToken(token) + pm.SetCode(codes.GET) + for _, opt := range opts { + pm.SetOptionBytes(opt.ID, opt.Value) + } + if err := pm.SetPath(path); err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadOption); err != nil { + p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) + } + return err + } + if err := outbound.WriteMessage(pm); err != nil { + return err + } + pm.SetCode(codes.Content) + return cc.WriteMessage(pm) +} + func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { tok, err := r.Options().GetBytes(message.URIQuery) if err != nil { @@ -157,14 +228,7 @@ func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { } switch r.Code() { case codes.GET: - obs, err := r.Options().Observe() - if err != nil { - if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadRequest); err != nil { - p.logger.Error(err.Error()) - } - return - } - p.handleGet(ctx, path, w.Conn(), r.Token(), obs, r) + p.handleGet(ctx, path, w.Conn(), r.Token(), r) case codes.POST: body, err := r.ReadBody() @@ -182,7 +246,7 @@ func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { } } -func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token []byte, obs uint32, r *mux.Message) { +func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token []byte, r *mux.Message) { if err := p.session.AuthSubscribe(ctx, &[]string{path}); err != nil { if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { p.logger.Error(err.Error()) @@ -196,10 +260,26 @@ func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token return } switch { - // obs == 0, start observe - case obs == 0: - go p.observeUpstream(ctx, con, r.Options(), token, path) - + case r.HasOption(message.Observe): + obs, err := r.Options().Observe() + if err != nil { + if err := sendErrorMessage(con, r.Token(), err, codes.BadRequest); err != nil { + p.logger.Error(err.Error()) + } + return + } + switch obs { + case startObserve: + go p.observeUpstream(ctx, con, r.Options(), token, path) + default: + if err := p.CancelObservation(con, r.Options(), token, path); err != nil { + p.logger.Error(fmt.Sprintf("error performing cancel observation: %v\n", err)) + if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { + p.logger.Error(err.Error()) + } + return + } + } default: if err := p.getUpstream(con, r, token); err != nil { p.logger.Error(fmt.Sprintf("error performing get: %v\n", err)) @@ -242,7 +322,10 @@ func (p *Proxy) Listen(ctx context.Context) error { defer l.Close() p.logger.Info(fmt.Sprintf("CoAP proxy server started on port %s with DTLS", p.config.Address)) - s := dtls.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + var dialOpts []dtlsServer.Option + dialOpts = append(dialOpts, options.WithMux(mux.HandlerFunc(p.handler)), NewDTLSNilMonitor()) + + s := dtls.NewServer(dialOpts...) errCh := make(chan error) go func() { @@ -266,7 +349,10 @@ func (p *Proxy) Listen(ctx context.Context) error { defer l.Close() p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) - s := udp.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + var dialOpts []udpServer.Option + dialOpts = append(dialOpts, options.WithMux(mux.HandlerFunc(p.handler)), NewUDPNilMonitor()) + + s := udp.NewServer(dialOpts...) errCh := make(chan error) go func() { From 0f62fe0b5959211dc369240482e9c6b4b64db0ce Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Wed, 24 Jul 2024 09:21:17 +0300 Subject: [PATCH 10/10] feat: add udp and dtls proxy Signed-off-by: 1998-felix --- go.mod | 1 + go.sum | 44 ++++- pkg/coap/coap.go | 465 +++++++++++++++++++---------------------------- pkg/tls/tls.go | 39 ++-- 4 files changed, 257 insertions(+), 292 deletions(-) diff --git a/go.mod b/go.mod index b1ffb23..4048cff 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ toolchain go1.21.4 require ( github.com/caarlos0/env/v11 v11.0.0 + github.com/dustin/go-coap v0.0.0-20190908170653-752e0f79981e github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 diff --git a/go.sum b/go.sum index 9630ae3..a36f6a8 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dsnet/golib/memfile v1.0.0 h1:J9pUspY2bDCbF9o+YGwcf3uG6MdyITfh/Fk3/CaEiFs= github.com/dsnet/golib/memfile v1.0.0/go.mod h1:tXGNW9q3RwvWt1VV2qrRKlSSz0npnh12yftCSCy2T64= +github.com/dustin/go-coap v0.0.0-20190908170653-752e0f79981e h1:oppjHFVTardH+VyOD32F9uBtgT5Wd/qVqEGcwj389Lc= +github.com/dustin/go-coap v0.0.0-20190908170653-752e0f79981e/go.mod h1:as2rZ2aojRzZF8bGx1bPAn1yi9ICG6LwkiPOj6PBtjc= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -18,10 +20,48 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/pion/dtls/v2 v2.2.8-0.20240201071732-2597464081c8 h1:r7K+oQUYubeA0am08kTAvd2wT2D8PZggs/CpMGp0nkM= +github.com/pion/dtls/v2 v2.2.8-0.20240201071732-2597464081c8/go.mod h1:/gft3czh67pwl4nM1BBUvF7eTy72uGkObJXOYfxRDbA= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/plgd-dev/go-coap/v3 v3.3.3 h1:Cbw5TUFRygqz6UXjrRZvfP5RpxWIX8UzaodAjnmf1ko= +github.com/plgd-dev/go-coap/v3 v3.3.3/go.mod h1:Z2Cucu5EelDWdk684WbL7S5mV9/ZA7ejixcpYaB7gSg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go index 3569d5d..fefbeec 100644 --- a/pkg/coap/coap.go +++ b/pkg/coap/coap.go @@ -4,29 +4,35 @@ package coap import ( - "bytes" "context" - "errors" "fmt" "log/slog" + "net" + "sync" + "time" "github.com/absmach/mproxy" "github.com/absmach/mproxy/pkg/session" - "github.com/plgd-dev/go-coap/v3/dtls" - dtlsServer "github.com/plgd-dev/go-coap/v3/dtls/server" - "github.com/plgd-dev/go-coap/v3/message" - "github.com/plgd-dev/go-coap/v3/message/codes" - "github.com/plgd-dev/go-coap/v3/message/pool" - "github.com/plgd-dev/go-coap/v3/mux" - "github.com/plgd-dev/go-coap/v3/net" - "github.com/plgd-dev/go-coap/v3/options" - "github.com/plgd-dev/go-coap/v3/udp" - udpServer "github.com/plgd-dev/go-coap/v3/udp/server" + mptls "github.com/absmach/mproxy/pkg/tls" + gocoap "github.com/dustin/go-coap" + "github.com/pion/dtls/v2" + "golang.org/x/sync/errgroup" ) -const startObserve uint32 = 0 +const ( + bufferSize uint64 = 1280 + startObserve uint32 = 0 +) -var errUnsupportedMethod = errors.New("unsupported CoAP method") +var ( + ConnMap = make(map[string]*Conn) + mutex sync.Mutex +) + +type Conn struct { + clientAddr *net.UDPAddr + serverConn *net.UDPConn +} type Proxy struct { config mproxy.Config @@ -34,30 +40,6 @@ type Proxy struct { logger *slog.Logger } -type udpNilMonitor struct{} - -func (u *udpNilMonitor) UDPServerApply(cfg *udpServer.Config) { - cfg.CreateInactivityMonitor = nil -} - -func NewUDPNilMonitor() udpServer.Option { - return &udpNilMonitor{} -} - -var _ udpServer.Option = (*udpNilMonitor)(nil) - -type dtlsNilMonitor struct{} - -func (d *dtlsNilMonitor) DTLSServerApply(cfg *dtlsServer.Config) { - cfg.CreateInactivityMonitor = nil -} - -func NewDTLSNilMonitor() dtlsServer.Option { - return &dtlsNilMonitor{} -} - -var _ udpServer.Option = (*udpNilMonitor)(nil) - func NewProxy(config mproxy.Config, handler session.Handler, logger *slog.Logger) *Proxy { return &Proxy{ config: config, @@ -66,306 +48,241 @@ func NewProxy(config mproxy.Config, handler session.Handler, logger *slog.Logger } } -func sendErrorMessage(cc mux.Conn, token []byte, err error, code codes.Code) error { - m := cc.AcquireMessage(cc.Context()) - defer cc.ReleaseMessage(m) - m.SetCode(code) - m.SetBody(bytes.NewReader(([]byte)(err.Error()))) - m.SetToken(token) - m.SetContentFormat(message.TextPlain) - return cc.WriteMessage(m) +func (p *Proxy) proxyUDP(ctx context.Context, l *net.UDPConn) { + buffer := make([]byte, bufferSize) + for { + select { + case <-ctx.Done(): + return + default: + n, clientAddr, err := l.ReadFromUDP(buffer) + if err != nil { + return + } + mutex.Lock() + conn, ok := ConnMap[clientAddr.String()] + if !ok { + conn, err = p.newConn(clientAddr) + if err != nil { + p.logger.Error("Failed to create new connection", slog.Any("error", err)) + mutex.Unlock() + return + } + ConnMap[clientAddr.String()] = conn + go p.downUDP(l, conn) + } + mutex.Unlock() + p.upUDP(conn, buffer[:n]) + } + } } -func (p *Proxy) postUpstream(cc mux.Conn, req *mux.Message, token []byte) error { - outbound, err := udp.Dial(p.config.Target) +func (p *Proxy) Listen(ctx context.Context) error { + addr, err := net.ResolveUDPAddr("udp", p.config.Address) if err != nil { + p.logger.Error("Failed to resolve UDP address", slog.Any("error", err)) return err } - defer outbound.Close() + g, ctx := errgroup.WithContext(ctx) + switch { + case p.config.DTLSConfig != nil: + l, err := dtls.Listen("udp", addr, p.config.DTLSConfig) + if err != nil { + return err + } + defer l.Close() - path, err := req.Options().Path() - if err != nil { - return err - } + g.Go(func() error { + p.proxyDTLS(ctx, l) + return nil + }) + + g.Go(func() error { + <-ctx.Done() + return l.Close() + }) - format := message.TextPlain - if req.HasOption(message.ContentFormat) { - format, err = req.ContentFormat() + default: + l, err := net.ListenUDP("udp", addr) if err != nil { return err } - } + defer l.Close() - pm, err := outbound.Post(cc.Context(), path, format, req.Body(), req.Options()...) - if err != nil { - return err + g.Go(func() error { + p.proxyUDP(ctx, l) + return nil + }) + + g.Go(func() error { + <-ctx.Done() + return l.Close() + }) } - pm.SetToken(token) - return cc.WriteMessage(pm) -} -func (p *Proxy) getUpstream(cc mux.Conn, req *mux.Message, token []byte) error { - path, err := req.Options().Path() - if err != nil { - return err + status := mptls.SecurityStatus(p.config.DTLSConfig) + p.logger.Info(fmt.Sprintf("COAP proxy server started at %s with %s", p.config.Address, status)) + + if err := g.Wait(); err != nil { + p.logger.Info(fmt.Sprintf("COAP proxy server at %s exiting with errors", p.config.Address), slog.String("error", err.Error())) + } else { + p.logger.Info(fmt.Sprintf("COAP proxy server at %s exiting...", p.config.Address)) } + return nil +} - outbound, err := udp.Dial(p.config.Target) +func (p *Proxy) newConn(clientAddr *net.UDPAddr) (*Conn, error) { + conn := new(Conn) + conn.clientAddr = clientAddr + addr, err := net.ResolveUDPAddr("udp", p.config.Target) if err != nil { - return err + return nil, err } - defer outbound.Close() - pm, err := outbound.Get(cc.Context(), path, req.Options()...) + t, err := net.DialUDP("udp", nil, addr) if err != nil { - return err + return nil, err } - pm.SetToken(token) - return cc.WriteMessage(pm) + conn.serverConn = t + return conn, nil } -func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message.Option, token []byte, path string) { - outbound, err := udp.Dial(p.config.Target) +func (p *Proxy) upUDP(conn *Conn, buffer []byte) { + p.handleCoAPMessage(buffer) + _, err := conn.serverConn.Write(buffer) if err != nil { - if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { - p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) - } - } - defer outbound.Close() - doneObserving := make(chan struct{}) - - pm := outbound.AcquireMessage(outbound.Context()) - defer outbound.ReleaseMessage(pm) - pm.SetToken(token) - pm.SetCode(codes.GET) - for _, opt := range opts { - pm.SetOptionBytes(opt.ID, opt.Value) - } - if err := pm.SetPath(path); err != nil { - if err := sendErrorMessage(cc, token, err, codes.BadOption); err != nil { - p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) - } return } +} - obs, err := outbound.DoObserve(pm, func(req *pool.Message) { - req.SetToken(token) - if err := cc.WriteMessage(req); err != nil { - if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { - p.logger.Error(err.Error()) - } - p.logger.Error(err.Error()) +func (p *Proxy) downUDP(l *net.UDPConn, conn *Conn) { + buffer := make([]byte, bufferSize) + for { + err := conn.serverConn.SetReadDeadline(time.Now().Add(10 * time.Second)) + if err != nil { + return } - if req.Code() == codes.NotFound { - close(doneObserving) + n, err := conn.serverConn.Read(buffer) + if err != nil { + p.close(conn) + return } - }) - if err != nil { - if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { - p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) + _, err = l.WriteToUDP(buffer[:n], conn.clientAddr) + if err != nil { + return } } +} - select { - case <-doneObserving: - if err := obs.Cancel(ctx); err != nil { - p.logger.Error(fmt.Sprintf("failed to cancel observation:%v", err)) - } - case <-ctx.Done(): - return - } +func (p *Proxy) close(conn *Conn) { + mutex.Lock() + defer mutex.Unlock() + delete(ConnMap, conn.clientAddr.String()) + conn.serverConn.Close() } -func (p *Proxy) CancelObservation(cc mux.Conn, opts []message.Option, token []byte, path string) error { - outbound, err := udp.Dial(p.config.Target) - if err != nil { - if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { - p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) - } - } - defer outbound.Close() - - pm := outbound.AcquireMessage(outbound.Context()) - defer outbound.ReleaseMessage(pm) - pm.SetToken(token) - pm.SetCode(codes.GET) - for _, opt := range opts { - pm.SetOptionBytes(opt.ID, opt.Value) - } - if err := pm.SetPath(path); err != nil { - if err := sendErrorMessage(cc, token, err, codes.BadOption); err != nil { - p.logger.Error(fmt.Sprintf("cannot send error response: %v", err)) +func (p *Proxy) proxyDTLS(ctx context.Context, l net.Listener) { + for { + select { + case <-ctx.Done(): + return + default: + conn, err := l.Accept() + if err != nil { + p.logger.Warn("Accept error " + err.Error()) + continue + } + p.logger.Info("Accepted new client") + go p.handleDTLS(conn) } - return err } - if err := outbound.WriteMessage(pm); err != nil { - return err - } - pm.SetCode(codes.Content) - return cc.WriteMessage(pm) } -func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { - tok, err := r.Options().GetBytes(message.URIQuery) +func (p *Proxy) handleDTLS(inbound net.Conn) { + outboundAddr, err := net.ResolveUDPAddr("udp", p.config.Address) if err != nil { - if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.Unauthorized); err != nil { - p.logger.Error(err.Error()) - } return } - ctx := session.NewContext(r.Context(), &session.Session{Password: tok}) - if err := p.session.AuthConnect(ctx); err != nil { - if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.Unauthorized); err != nil { - p.logger.Error(err.Error()) - } - return - } - path, err := r.Options().Path() + + outbound, err := net.DialUDP("udp", nil, outboundAddr) if err != nil { - if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadOption); err != nil { - p.logger.Error(err.Error()) - } + p.logger.Error("Cannot connect to remote broker " + p.config.Address + " due to: " + err.Error()) return } - switch r.Code() { - case codes.GET: - p.handleGet(ctx, path, w.Conn(), r.Token(), r) - case codes.POST: - body, err := r.ReadBody() + go p.dtlsUp(outbound, inbound) + go p.dtlsDown(inbound, outbound) +} + +func (p *Proxy) dtlsUp(outbound *net.UDPConn, inbound net.Conn) { + buffer := make([]byte, bufferSize) + for { + n, err := inbound.Read(buffer) if err != nil { - if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadRequest); err != nil { - p.logger.Error(err.Error()) - } return } - p.handlePost(ctx, w.Conn(), body, r.Token(), path, r) - default: - if err := sendErrorMessage(w.Conn(), r.Token(), errUnsupportedMethod, codes.MethodNotAllowed); err != nil { - p.logger.Error(err.Error()) + p.handleCoAPMessage(buffer[:n]) + + _, err = outbound.Write(buffer[:n]) + if err != nil { + slog.Error("Failed to write to server", slog.Any("err", err)) } } } -func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token []byte, r *mux.Message) { - if err := p.session.AuthSubscribe(ctx, &[]string{path}); err != nil { - if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { - p.logger.Error(err.Error()) - } - return - } - if err := p.session.Subscribe(ctx, &[]string{path}); err != nil { - if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { - p.logger.Error(err.Error()) - } - return - } - switch { - case r.HasOption(message.Observe): - obs, err := r.Options().Observe() +func (p *Proxy) dtlsDown(inbound net.Conn, outbound *net.UDPConn) { + buffer := make([]byte, bufferSize) + for { + err := outbound.SetReadDeadline(time.Now().Add(1 * time.Minute)) if err != nil { - if err := sendErrorMessage(con, r.Token(), err, codes.BadRequest); err != nil { - p.logger.Error(err.Error()) - } return } - switch obs { - case startObserve: - go p.observeUpstream(ctx, con, r.Options(), token, path) - default: - if err := p.CancelObservation(con, r.Options(), token, path); err != nil { - p.logger.Error(fmt.Sprintf("error performing cancel observation: %v\n", err)) - if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { - p.logger.Error(err.Error()) - } - return - } + n, err := outbound.Read(buffer) + defer outbound.Close() + if err != nil { + return } - default: - if err := p.getUpstream(con, r, token); err != nil { - p.logger.Error(fmt.Sprintf("error performing get: %v\n", err)) - if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { - p.logger.Error(err.Error()) - } + + _, err = inbound.Write(buffer[:n]) + defer inbound.Close() + if err != nil { return } } } -func (p *Proxy) handlePost(ctx context.Context, con mux.Conn, body, token []byte, path string, r *mux.Message) { - if err := p.session.AuthPublish(ctx, &path, &body); err != nil { - if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { - p.logger.Error(err.Error()) - } +func (p *Proxy) handleCoAPMessage(buffer []byte) { + msg, err := gocoap.ParseMessage(buffer) + if err != nil { + p.logger.Error("Failed to parse message", slog.Any("error", err)) return } - if err := p.session.Publish(ctx, &path, &body); err != nil { - if err := sendErrorMessage(con, token, err, codes.BadRequest); err != nil { - p.logger.Error(err.Error()) + + token := msg.Token + path := msg.Path() + ctx := session.NewContext(context.Background(), &session.Session{Password: token}) + + switch msg.Code { + case gocoap.POST: + if err := p.session.AuthConnect(ctx); err != nil { + return } - return - } - if err := p.postUpstream(con, r, token); err != nil { - p.logger.Debug(fmt.Sprintf("error performing post: %v\n", err)) - if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { - p.logger.Error(err.Error()) + if err := p.session.AuthPublish(ctx, &path[0], &msg.Payload); err != nil { + return } - return - } -} - -func (p *Proxy) Listen(ctx context.Context) error { - if p.config.DTLSConfig != nil { - l, err := net.NewDTLSListener("udp", p.config.Address, p.config.DTLSConfig) - if err != nil { - return err + if err := p.session.Publish(ctx, &path[0], &msg.Payload); err != nil { + return } - defer l.Close() - - p.logger.Info(fmt.Sprintf("CoAP proxy server started on port %s with DTLS", p.config.Address)) - var dialOpts []dtlsServer.Option - dialOpts = append(dialOpts, options.WithMux(mux.HandlerFunc(p.handler)), NewDTLSNilMonitor()) - - s := dtls.NewServer(dialOpts...) - - errCh := make(chan error) - go func() { - errCh <- s.Serve(l) - }() - - select { - case <-ctx.Done(): - p.logger.Info(fmt.Sprintf("CoAP proxy server on port %s with DTLS exiting ...", p.config.Address)) - l.Close() - case err := <-errCh: - p.logger.Error(fmt.Sprintf("CoAP proxy server on port %s with DTLS exiting with errors: %s", p.config.Address, err.Error())) - return err + case gocoap.GET: + if err := p.session.AuthConnect(ctx); err != nil { + return + } + if msg.Option(gocoap.Observe) == startObserve { + if err := p.session.AuthSubscribe(ctx, &path); err != nil { + return + } + if err := p.session.Subscribe(ctx, &path); err != nil { + return + } } - return nil - } - l, err := net.NewListenUDP("udp", p.config.Address) - if err != nil { - return err - } - defer l.Close() - - p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) - var dialOpts []udpServer.Option - dialOpts = append(dialOpts, options.WithMux(mux.HandlerFunc(p.handler)), NewUDPNilMonitor()) - - s := udp.NewServer(dialOpts...) - - errCh := make(chan error) - go func() { - errCh <- s.Serve(l) - }() - - select { - case <-ctx.Done(): - p.logger.Info(fmt.Sprintf("CoAP proxy server on port %s without DTLS exiting ...", p.config.Address)) - l.Close() - case err := <-errCh: - p.logger.Error(fmt.Sprintf("CoAP proxy server on port %s without DTLS exiting with errors: %s", p.config.Address, err.Error())) - return err } - return nil } diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go index b930ccd..ff7da54 100644 --- a/pkg/tls/tls.go +++ b/pkg/tls/tls.go @@ -105,6 +105,29 @@ func LoadTLSConfig[sc TLSConfig](c *Config, s sc) (sc, error) { } } +// SecurityStatus returns log message from TLS config. +func SecurityStatus[sc TLSConfig](s sc) string { + if s == nil { + return "no TLS" + } + switch c := any(s).(type) { + case *tls.Config: + ret := "TLS" + // It is possible to establish TLS with client certificates only. + if c.Certificates == nil || len(c.Certificates) == 0 { + ret = "no server certificates" + } + if c.ClientCAs != nil { + ret += " and " + c.ClientAuth.String() + } + return ret + case *dtls.Config: + return "DTLS" + default: + return "no TLS" + } +} + // ClientCert returns client certificate. func ClientCert(conn net.Conn) (x509.Certificate, error) { switch connVal := conn.(type) { @@ -126,22 +149,6 @@ func ClientCert(conn net.Conn) (x509.Certificate, error) { } } -// SecurityStatus returns log message from TLS config. -func SecurityStatus(c *tls.Config) string { - if c == nil { - return "no TLS" - } - ret := "TLS" - // It is possible to establish TLS with client certificates only. - if c.Certificates == nil || len(c.Certificates) == 0 { - ret = "no server certificates" - } - if c.ClientCAs != nil { - ret += " and " + c.ClientAuth.String() - } - return ret -} - func loadCertFile(certFile string) ([]byte, error) { if certFile != "" { return os.ReadFile(certFile)