Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPX-67 - Refactor mProxy #68

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"log/slog"
"os"
"os/signal"
"runtime"
"syscall"
"time"

"github.com/absmach/mproxy"
"github.com/absmach/mproxy/examples/simple"
"github.com/absmach/mproxy/pkg/http"
"github.com/absmach/mproxy/pkg/mqtt"
"github.com/absmach/mproxy/pkg/mqtt/websocket"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mproxy/forwarder"
"github.com/absmach/mproxy/http"
"github.com/absmach/mproxy/mqtt"
"github.com/absmach/mproxy/mqtt/websocket"
"github.com/absmach/mproxy/streamer"
"github.com/caarlos0/env/v11"
"github.com/joho/godotenv"
"golang.org/x/sync/errgroup"
Expand All @@ -37,6 +40,18 @@ const (
)

func main() {
go func() {
for {
time.Sleep(time.Second * 3)
fmt.Println("RTN", runtime.NumGoroutine())
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MiB", m.Alloc/1024/1024)
fmt.Printf("\tTotalAlloc = %v MiB", m.TotalAlloc/1024/1024)
fmt.Printf("\tSys = %v MiB", m.Sys/1024/1024)
fmt.Printf("\tNumGC = %v\n", m.NumGC)
}
}()
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

Expand All @@ -47,7 +62,7 @@ func main() {

handler := simple.New(logger)

var interceptor session.Interceptor
var interceptor mproxy.Interceptor

// Loading .env file to environment
err := godotenv.Load()
Expand All @@ -62,9 +77,10 @@ func main() {
}

// mProxy server for MQTT without TLS
mqttProxy := mqtt.New(mqttConfig, handler, interceptor, logger)
mqttProxy := mqtt.New(handler, interceptor)

g.Go(func() error {
return mqttProxy.Listen(ctx)
return streamer.Listen(ctx, "MQTT", mqttConfig, mqttProxy, logger)
})

// mProxy server Configuration for MQTT with TLS
Expand All @@ -74,9 +90,9 @@ func main() {
}

// mProxy server for MQTT with TLS
mqttTLSProxy := mqtt.New(mqttTLSConfig, handler, interceptor, logger)
mqttTLSProxy := mqtt.New(handler, interceptor)
g.Go(func() error {
return mqttTLSProxy.Listen(ctx)
return streamer.Listen(ctx, "MQTT", mqttTLSConfig, mqttTLSProxy, logger)
})

// mProxy server Configuration for MQTT with mTLS
Expand All @@ -86,9 +102,9 @@ func main() {
}

// mProxy server for MQTT with mTLS
mqttMTlsProxy := mqtt.New(mqttMTLSConfig, handler, interceptor, logger)
mqttMTlsProxy := mqtt.New(handler, interceptor)
g.Go(func() error {
return mqttMTlsProxy.Listen(ctx)
return streamer.Listen(ctx, "MQTT", mqttMTLSConfig, mqttMTlsProxy, logger)
})

// mProxy server Configuration for MQTT over Websocket without TLS
Expand All @@ -98,9 +114,9 @@ func main() {
}

// mProxy server for MQTT over Websocket without TLS
wsProxy := websocket.New(wsConfig, handler, interceptor, logger)
wsProxy := websocket.New(wsConfig.Target, handler, interceptor, logger)
g.Go(func() error {
return wsProxy.Listen(ctx)
return forwarder.Listen(ctx, "WS", wsConfig, wsProxy, logger)
})

// mProxy server Configuration for MQTT over Websocket with TLS
Expand All @@ -110,9 +126,9 @@ func main() {
}

// mProxy server for MQTT over Websocket with TLS
wsTLSProxy := websocket.New(wsTLSConfig, handler, interceptor, logger)
wsTLSProxy := websocket.New(wsTLSConfig.Target, handler, interceptor, logger)
g.Go(func() error {
return wsTLSProxy.Listen(ctx)
return forwarder.Listen(ctx, "WS", wsTLSConfig, wsTLSProxy, logger)
})

// mProxy server Configuration for MQTT over Websocket with mTLS
Expand All @@ -122,9 +138,9 @@ func main() {
}

// mProxy server for MQTT over Websocket with mTLS
wsMTLSProxy := websocket.New(wsMTLSConfig, handler, interceptor, logger)
wsMTLSProxy := websocket.New(wsMTLSConfig.Target, handler, interceptor, logger)
g.Go(func() error {
return wsMTLSProxy.Listen(ctx)
return forwarder.Listen(ctx, "WS", wsMTLSConfig, wsMTLSProxy, logger)
})

// mProxy server Configuration for HTTP without TLS
Expand All @@ -139,7 +155,7 @@ func main() {
panic(err)
}
g.Go(func() error {
return httpProxy.Listen(ctx)
return forwarder.Listen(ctx, "HTTP", httpConfig, httpProxy, logger)
})

// mProxy server Configuration for HTTP with TLS
Expand All @@ -154,7 +170,7 @@ func main() {
panic(err)
}
g.Go(func() error {
return httpTLSProxy.Listen(ctx)
return forwarder.Listen(ctx, "HTTP", httpTLSConfig, httpTLSProxy, logger)
})

// mProxy server Configuration for HTTP with mTLS
Expand All @@ -169,7 +185,7 @@ func main() {
panic(err)
}
g.Go(func() error {
return httpMTLSProxy.Listen(ctx)
return forwarder.Listen(ctx, "HTTP", httpMTLSConfig, httpMTLSProxy, logger)
})

g.Go(func() error {
Expand Down
95 changes: 95 additions & 0 deletions coap/streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

Check failure on line 1 in coap/streamer.go

View workflow job for this annotation

GitHub Actions / Lint and Build

Missed header for check (goheader)

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"log"

piondtls "github.com/pion/dtls/v2"
coap "github.com/plgd-dev/go-coap/v3"

Check failure on line 11 in coap/streamer.go

View workflow job for this annotation

GitHub Actions / Lint and Build

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/mux"
"github.com/plgd-dev/go-coap/v3/options"

Check failure on line 16 in coap/streamer.go

View workflow job for this annotation

GitHub Actions / Lint and Build

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
dtlsServer "github.com/plgd-dev/go-coap/v3/dtls/server"
tcpServer "github.com/plgd-dev/go-coap/v3/tcp/server"
udpClient "github.com/plgd-dev/go-coap/v3/udp/client"
)

func handleA(w mux.ResponseWriter, r *mux.Message) {
log.Printf("got message in handleA: %+v from %v\n", r, w.Conn().RemoteAddr())
err := w.SetResponse(codes.GET, message.TextPlain, bytes.NewReader([]byte("A hello world")))
if err != nil {
log.Printf("cannot set response: %v", err)
}
}

func handleB(w mux.ResponseWriter, r *mux.Message) {
log.Printf("got message in handleB: %+v from %v\n", r, w.Conn().RemoteAddr())
customResp := w.Conn().AcquireMessage(r.Context())
defer w.Conn().ReleaseMessage(customResp)
customResp.SetCode(codes.Content)
customResp.SetToken(r.Token())
customResp.SetContentFormat(message.TextPlain)
customResp.SetBody(bytes.NewReader([]byte("B hello world")))
err := w.Conn().WriteMessage(customResp)
if err != nil {
log.Printf("cannot set response: %v", err)
}
}

func handleOnNewConn(cc *udpClient.Conn) {
dtlsConn, ok := cc.NetConn().(*piondtls.Conn)
if !ok {
log.Fatalf("invalid type %T", cc.NetConn())
}
clientId := dtlsConn.ConnectionState().IdentityHint
cc.SetContextValue("clientId", clientId)
cc.AddOnClose(func() {
clientId := dtlsConn.ConnectionState().IdentityHint
log.Printf("closed connection clientId: %s", clientId)
})
}

func main() {
m := mux.NewRouter()
m.Handle("/a", mux.HandlerFunc(handleA))

Check failure on line 59 in coap/streamer.go

View workflow job for this annotation

GitHub Actions / Lint and Build

Error return value of `m.Handle` is not checked (errcheck)
m.Handle("/b", mux.HandlerFunc(handleB))

Check failure on line 60 in coap/streamer.go

View workflow job for this annotation

GitHub Actions / Lint and Build

Error return value of `m.Handle` is not checked (errcheck)

tcpOpts := []tcpServer.Option{}
tcpOpts = append(tcpOpts,
options.WithMux(m),
options.WithContext(context.Background()))

dtlsOpts := []dtlsServer.Option{}
dtlsOpts = append(dtlsOpts,
options.WithMux(m),
options.WithContext(context.Background()),
options.WithOnNewConn(handleOnNewConn),
)

go func() {
// serve a tcp server on :5686
log.Fatal(coap.ListenAndServeWithOptions("tcp", ":5686", tcpOpts))
}()

go func() {
// serve a tls tcp server on :5687
log.Fatal(coap.ListenAndServeTCPTLSWithOptions("tcp", "5687", &tls.Config{}, tcpOpts...))
}()

go func() {
// serve a udp dtls server on :5688
log.Fatal(coap.ListenAndServeDTLSWithOptions("udp", ":5688", &piondtls.Config{
PSK: func(hint []byte) ([]byte, error) {
fmt.Printf("Client's hint: %s \n", hint)
return []byte{0xAB, 0xC1, 0x23}, nil
},
PSKIdentityHint: []byte("Pion DTLS Client"),
CipherSuites: []piondtls.CipherSuiteID{piondtls.TLS_PSK_WITH_AES_128_CCM_8},
}, dtlsOpts...))
}()
}
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package mproxy
import (
"crypto/tls"

mptls "github.com/absmach/mproxy/pkg/tls"
mptls "github.com/absmach/mproxy/tls"
"github.com/caarlos0/env/v11"
)

Expand Down
3 changes: 2 additions & 1 deletion examples/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"log/slog"

"github.com/absmach/mproxy"
"github.com/absmach/mproxy/pkg/session"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ func (h *Handler) Disconnect(ctx context.Context) error {
}

func (h *Handler) logAction(ctx context.Context, action string, topics *[]string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
s, ok := mproxy.FromContext(ctx)
args := []interface{}{
slog.Group("session", slog.String("id", s.ID), slog.String("username", s.Username)),
}
Expand Down
53 changes: 53 additions & 0 deletions forwarder/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

package forwarder

import (
"context"
"crypto/tls"
"fmt"
"log/slog"
"net"
"net/http"

"github.com/absmach/mproxy"
mptls "github.com/absmach/mproxy/tls"
"golang.org/x/sync/errgroup"
)

func Listen(ctx context.Context, name string, config mproxy.Config, passer mproxy.Forwarder, logger *slog.Logger) error {
l, err := net.Listen("tcp", config.Address)
if err != nil {
return err
}

if config.TLSConfig != nil {
l = tls.NewListener(l, config.TLSConfig)
}
status := mptls.SecurityStatus(config.TLSConfig)

logger.Info(fmt.Sprintf("%s Proxy server started at %s%s with %s", name, config.Address, config.PathPrefix, status))

var server http.Server
g, ctx := errgroup.WithContext(ctx)

mux := http.NewServeMux()
mux.HandleFunc(config.PathPrefix, passer.Forward)
server.Handler = mux

g.Go(func() error {
return server.Serve(l)
})

g.Go(func() error {
<-ctx.Done()
return server.Close()
})
if err := g.Wait(); err != nil {
logger.Info(fmt.Sprintf("%s Proxy server at %s%s with %s exiting with errors", name, config.Address, config.PathPrefix, status), slog.String("error", err.Error()))
} else {
logger.Info(fmt.Sprintf("%s Proxy server at %s%s with %s exiting...", name, config.Address, config.PathPrefix, status))
}
return nil
}
25 changes: 19 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,26 @@ go 1.21
toolchain go1.21.4

require (
github.com/caarlos0/env/v11 v11.0.0
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/caarlos0/env/v11 v11.2.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.1
github.com/gorilla/websocket v1.5.3
github.com/inetaf/tcpproxy v0.0.0-20240214030015-3ce58045626c
github.com/joho/godotenv v1.5.1
golang.org/x/crypto v0.22.0
golang.org/x/sync v0.7.0
github.com/pion/dtls/v2 v2.2.8-0.20240501061905-2c36d63320a0
github.com/plgd-dev/go-coap/v3 v3.3.4
golang.org/x/crypto v0.25.0
golang.org/x/sync v0.8.0
)

require golang.org/x/net v0.24.0 // indirect
require (
github.com/dsnet/golib/memfile v1.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport/v3 v3.0.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
)
Loading
Loading