From ad62fd92aeb7886ed06c2f7f71e9869a2d9db916 Mon Sep 17 00:00:00 2001 From: 1998-felix Date: Fri, 12 Apr 2024 10:41:26 +0300 Subject: [PATCH] feat: Add coap proxy Signed-off-by: 1998-felix --- .env | 12 +++ cmd/main.go | 28 +++++ config.go | 7 ++ go.mod | 17 ++- go.sum | 77 +++++++++++++ pkg/coap/coap.go | 273 +++++++++++++++++++++++++++++++++++++++++++++++ pkg/tls/tls.go | 52 +++++++++ 7 files changed, 465 insertions(+), 1 deletion(-) create mode 100644 pkg/coap/coap.go diff --git a/.env b/.env index 6995156..9468dcb 100644 --- a/.env +++ b/.env @@ -55,3 +55,15 @@ MPROXY_HTTP_WITH_MTLS_SERVER_CA_FILE=ssl/certs/ca.crt MPROXY_HTTP_WITH_MTLS_CLIENT_CA_FILE=ssl/certs/ca.crt MPROXY_HTTP_WITH_MTLS_CERT_VERIFICATION_METHODS=ocsp MPROXY_HTTP_WITH_MTLS_OCSP_RESPONDER_URL=http://localhost:8080/ocsp + +MPROXY_COAP_WITHOUT_DTLS_ADDRESS=:5682 +MPROXY_COAP_WITHOUT_DTLS_TARGET=localhost:5683 + +MPROXY_COAP_WITH_DTLS_ADDRESS=:5684 +MPROXY_COAP_WITH_DTLS_TARGET=localhost:5683 +MPROXY_COAP_WITH_DTLS_CERT_FILE=ssl/certs/server.crt +MPROXY_COAP_WITH_DTLS_KEY_FILE=ssl/certs/server.key +MPROXY_COAP_WITH_DTLS_SERVER_CA_FILE=ssl/certs/ca.crt +MPROXY_COAP_WITH_DTLS_CLIENT_CA_FILE=ssl/certs/ca.crt + + diff --git a/cmd/main.go b/cmd/main.go index f66176a..b34f444 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,7 @@ import ( "github.com/absmach/mproxy" "github.com/absmach/mproxy/examples/simple" + "github.com/absmach/mproxy/pkg/coap" "github.com/absmach/mproxy/pkg/http" "github.com/absmach/mproxy/pkg/mqtt" "github.com/absmach/mproxy/pkg/mqtt/websocket" @@ -34,6 +35,9 @@ const ( httpWithoutTLS = "MPROXY_HTTP_WITHOUT_TLS_" httpWithTLS = "MPROXY_HTTP_WITH_TLS_" httpWithmTLS = "MPROXY_HTTP_WITH_MTLS_" + + coapWithoutDTLS = "MPROXY_COAP_WITHOUT_DTLS_" + coapWithDTLS = "MPROXY_COAP_WITH_DTLS_" ) func main() { @@ -172,6 +176,30 @@ func main() { return httpMTLSProxy.Listen(ctx) }) + // mProxy server Configuration for CoAP without DTLS + coapConfig, err := mproxy.NewConfig(env.Options{Prefix: coapWithoutDTLS}) + if err != nil { + panic(err) + } + + // mProxy server for CoAP without DTLS + coapProxy := coap.New(coapConfig, handler, logger) + g.Go(func() error { + return coapProxy.Listen(ctx) + }) + + // mProxy server Configuration for CoAP with DTLS + coapDTLSConfig, err := mproxy.NewConfig(env.Options{Prefix: coapWithDTLS}) + if err != nil { + panic(err) + } + + // mProxy server for CoAP with DTLS + coapDTLSProxy := coap.New(coapDTLSConfig, handler, logger) + g.Go(func() error { + return coapDTLSProxy.ListenDTLS(ctx) + }) + g.Go(func() error { return StopSignalHandler(ctx, cancel, logger) }) diff --git a/config.go b/config.go index 6f5c01b..2d2f3df 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,7 @@ import ( mptls "github.com/absmach/mproxy/pkg/tls" "github.com/caarlos0/env/v10" + "github.com/pion/dtls/v2" ) type Config struct { @@ -15,6 +16,7 @@ type Config struct { PrefixPath string `env:"PREFIX_PATH" envDefault:""` Target string `env:"TARGET" envDefault:""` TLSConfig *tls.Config + DTLSConfig *dtls.Config } func NewConfig(opts env.Options) (Config, error) { @@ -32,5 +34,10 @@ func NewConfig(opts env.Options) (Config, error) { if err != nil { return Config{}, err } + + c.DTLSConfig, err = mptls.LoadDTLS(&cfg) + if err != nil { + return Config{}, err + } return c, nil } diff --git a/go.mod b/go.mod index 6fd9a90..1f95db5 100644 --- a/go.mod +++ b/go.mod @@ -14,4 +14,19 @@ require ( golang.org/x/sync v0.6.0 ) -require golang.org/x/net v0.21.0 // indirect +require ( + github.com/dsnet/golib/memfile v1.0.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/pion/dtls/v2 v2.2.8-0.20240201071732-2597464081c8 // indirect + github.com/pion/logging v0.2.2 // indirect + github.com/pion/transport/v3 v3.0.1 // indirect + go.uber.org/atomic v1.11.0 // indirect + golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect + golang.org/x/sys v0.17.0 // indirect +) + +require ( + github.com/plgd-dev/go-coap/v3 v3.3.3 + golang.org/x/net v0.21.0 // indirect +) diff --git a/go.sum b/go.sum index ec95eb4..38d213c 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,93 @@ github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA= github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dsnet/golib/memfile v1.0.0 h1:J9pUspY2bDCbF9o+YGwcf3uG6MdyITfh/Fk3/CaEiFs= +github.com/dsnet/golib/memfile v1.0.0/go.mod h1:tXGNW9q3RwvWt1VV2qrRKlSSz0npnh12yftCSCy2T64= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/pion/dtls/v2 v2.2.8-0.20240201071732-2597464081c8 h1:r7K+oQUYubeA0am08kTAvd2wT2D8PZggs/CpMGp0nkM= +github.com/pion/dtls/v2 v2.2.8-0.20240201071732-2597464081c8/go.mod h1:/gft3czh67pwl4nM1BBUvF7eTy72uGkObJXOYfxRDbA= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/plgd-dev/go-coap/v3 v3.3.3 h1:Cbw5TUFRygqz6UXjrRZvfP5RpxWIX8UzaodAjnmf1ko= +github.com/plgd-dev/go-coap/v3 v3.3.3/go.mod h1:Z2Cucu5EelDWdk684WbL7S5mV9/ZA7ejixcpYaB7gSg= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/coap/coap.go b/pkg/coap/coap.go new file mode 100644 index 0000000..034357a --- /dev/null +++ b/pkg/coap/coap.go @@ -0,0 +1,273 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package coap + +import ( + "bytes" + "context" + "fmt" + "log/slog" + + "github.com/absmach/mproxy" + "github.com/absmach/mproxy/pkg/session" + "github.com/plgd-dev/go-coap/v3/dtls" + "github.com/plgd-dev/go-coap/v3/message" + "github.com/plgd-dev/go-coap/v3/message/codes" + "github.com/plgd-dev/go-coap/v3/message/pool" + "github.com/plgd-dev/go-coap/v3/mux" + "github.com/plgd-dev/go-coap/v3/net" + "github.com/plgd-dev/go-coap/v3/options" + "github.com/plgd-dev/go-coap/v3/udp" +) + +type Proxy struct { + config mproxy.Config + event session.Handler + logger *slog.Logger +} + +func New(config mproxy.Config, handler session.Handler, logger *slog.Logger) *Proxy { + return &Proxy{ + config: config, + event: handler, + logger: logger, + } +} + +func sendErrorMessage(cc mux.Conn, token []byte, err error, code codes.Code) error { + m := cc.AcquireMessage(cc.Context()) + defer cc.ReleaseMessage(m) + m.SetCode(code) + m.SetBody(bytes.NewReader(([]byte)(err.Error()))) + m.SetToken(token) + m.SetContentFormat(message.TextPlain) + return cc.WriteMessage(m) +} + +func (p *Proxy) postUpstream(cc mux.Conn, req *mux.Message, token []byte) error { + format, err := req.ContentFormat() + if err != nil { + return err + } + path, err := req.Options().Path() + if err != nil { + return err + } + + targetConn, err := udp.Dial(p.config.Target) + if err != nil { + return err + } + defer targetConn.Close() + pm, err := targetConn.Post(cc.Context(), path, format, req.Body(), req.Options()...) + if err != nil { + return err + } + pm.SetToken(token) + return cc.WriteMessage(pm) +} + +func (p *Proxy) getUpstream(cc mux.Conn, req *mux.Message, token []byte) error { + path, err := req.Options().Path() + if err != nil { + return err + } + + targetConn, err := udp.Dial(p.config.Target) + if err != nil { + return err + } + defer targetConn.Close() + pm, err := targetConn.Get(cc.Context(), path, req.Options()...) + if err != nil { + return err + } + pm.SetToken(token) + return cc.WriteMessage(pm) +} + +func (p *Proxy) observeUpstream(ctx context.Context, cc mux.Conn, opts []message.Option, token []byte, path string) { + targetConn, err := udp.Dial(p.config.Target) + if err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { + p.logger.Error("cannot send error response: %v", err) + } + } + defer targetConn.Close() + doneObserving := make(chan struct{}) + + obs, err := targetConn.Observe(context.Background(), path, func(req *pool.Message) { + req.SetToken(token) + if err := cc.WriteMessage(req); err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { + p.logger.Error("cannot send error response: %v", err) + } + p.logger.Error("cannot send response: %v", err) + } + if req.Code() == codes.NotFound { + close(doneObserving) + } + }, opts...) + if err != nil { + if err := sendErrorMessage(cc, token, err, codes.BadGateway); err != nil { + p.logger.Error("cannot send error response: %v", err) + } + } + + select { + case <-doneObserving: + if err := obs.Cancel(ctx); err != nil { + p.logger.Error("failed to cancel observation: %v", err) + } + case <-ctx.Done(): + return + } +} + +func (p *Proxy) handler(w mux.ResponseWriter, r *mux.Message) { + tok, err := r.Options().GetBytes(message.URIQuery) + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + ctx := session.NewContext(r.Context(), &session.Session{Password: tok}) + if err := p.event.AuthConnect(ctx); err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + path, err := r.Options().Path() + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadOption); err != nil { + p.logger.Error(err.Error()) + } + return + } + switch r.Code() { + case codes.GET: + obs, err := r.Options().Observe() + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadRequest); err != nil { + p.logger.Error(err.Error()) + } + } + p.handleGet(ctx, path, w.Conn(), r.Token(), obs, r) + + case codes.POST: + body, err := r.ReadBody() + if err != nil { + if err := sendErrorMessage(w.Conn(), r.Token(), err, codes.BadRequest); err != nil { + p.logger.Error(err.Error()) + } + return + } + p.handlePost(ctx, w.Conn(), body, r.Token(), path, r) + } +} + +func (p *Proxy) handleGet(ctx context.Context, path string, con mux.Conn, token []byte, obs uint32, r *mux.Message) { + if err := p.event.AuthSubscribe(ctx, &[]string{path}); err != nil { + if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + if err := p.event.Subscribe(ctx, &[]string{path}); err != nil { + if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + switch { + // obs == 0, start observe + case obs == 0: + go p.observeUpstream(ctx, con, r.Options(), token, path) + + default: + if err := p.getUpstream(con, r, token); err != nil { + p.logger.Debug(fmt.Sprintf("error performing get: %v\n", err)) + if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { + p.logger.Error(err.Error()) + } + return + } + } +} + +func (p *Proxy) handlePost(ctx context.Context, con mux.Conn, body, token []byte, path string, r *mux.Message) { + if err := p.event.AuthPublish(ctx, &path, &body); err != nil { + if err := sendErrorMessage(con, token, err, codes.Unauthorized); err != nil { + p.logger.Error(err.Error()) + } + return + } + if err := p.event.Publish(ctx, &path, &body); err != nil { + if err := sendErrorMessage(con, token, err, codes.BadRequest); err != nil { + p.logger.Error(err.Error()) + } + return + } + if err := p.postUpstream(con, r, token); err != nil { + p.logger.Debug(fmt.Sprintf("error performing post: %v\n", err)) + if err := sendErrorMessage(con, token, err, codes.BadGateway); err != nil { + p.logger.Error(err.Error()) + } + return + } +} + +func (p *Proxy) Listen(ctx context.Context) error { + l, err := net.NewListenUDP("udp", p.config.Address) + if err != nil { + return err + } + defer l.Close() + + p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s without DTLS", p.config.Address)) + s := udp.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + + errCh := make(chan error) + go func() { + errCh <- s.Serve(l) + }() + + select { + case <-ctx.Done(): + p.logger.Info(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting ...", p.config.Address)) + l.Close() + case err := <-errCh: + p.logger.Error(fmt.Sprintf("CoAP proxy server at %s without DTLS exiting with errors: %s", p.config.Address, err.Error())) + return err + } + return nil +} + +func (p *Proxy) ListenDTLS(ctx context.Context) error { + l, err := net.NewDTLSListener("udp", p.config.Address, p.config.DTLSConfig) + if err != nil { + return err + } + defer l.Close() + + p.logger.Info(fmt.Sprintf("CoAP proxy server started at %s with DTLS", p.config.Address)) + s := dtls.NewServer(options.WithMux(mux.HandlerFunc(p.handler))) + + errCh := make(chan error) + go func() { + errCh <- s.Serve(l) + }() + + select { + case <-ctx.Done(): + p.logger.Info(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting ...", p.config.Address)) + l.Close() + case err := <-errCh: + p.logger.Error(fmt.Sprintf("CoAP proxy server at %s with DTLS exiting with errors: %s", p.config.Address, err.Error())) + return err + } + return nil +} diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go index be66aad..fc5cd47 100644 --- a/pkg/tls/tls.go +++ b/pkg/tls/tls.go @@ -9,6 +9,8 @@ import ( "errors" "net" "os" + + "github.com/pion/dtls/v2" ) var ( @@ -69,6 +71,56 @@ func Load(c *Config) (*tls.Config, error) { return tlsConfig, nil } +// Load returns a DTLS configuration that can be used in DTLS servers. +func LoadDTLS(c *Config) (*dtls.Config, error) { + if c.CertFile == "" || c.KeyFile == "" { + return nil, nil + } + + dtlsConfig := &dtls.Config{} + + certificate, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) + if err != nil { + return nil, errors.Join(errLoadCerts, err) + } + dtlsConfig = &dtls.Config{ + Certificates: []tls.Certificate{certificate}, + } + + // Loading Server CA file + rootCA, err := loadCertFile(c.ServerCAFile) + if err != nil { + return nil, errors.Join(errLoadServerCA, err) + } + if len(rootCA) > 0 { + if dtlsConfig.RootCAs == nil { + dtlsConfig.RootCAs = x509.NewCertPool() + } + if !dtlsConfig.RootCAs.AppendCertsFromPEM(rootCA) { + return nil, errAppendCA + } + } + + // Loading Client CA File + clientCA, err := loadCertFile(c.ClientCAFile) + if err != nil { + return nil, errors.Join(errLoadClientCA, err) + } + if len(clientCA) > 0 { + if dtlsConfig.ClientCAs == nil { + dtlsConfig.ClientCAs = x509.NewCertPool() + } + if !dtlsConfig.ClientCAs.AppendCertsFromPEM(clientCA) { + return nil, errAppendCA + } + dtlsConfig.ClientAuth = dtls.RequireAndVerifyClientCert + if c.Validator != nil { + dtlsConfig.VerifyPeerCertificate = c.Validator + } + } + return dtlsConfig, nil +} + // ClientCert returns client certificate. func ClientCert(conn net.Conn) (x509.Certificate, error) { switch connVal := conn.(type) {