Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AG-22 - Add Agent Tests #58

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func main() {
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
return
}
defer svc.Close()

svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand Down Expand Up @@ -402,7 +403,7 @@ func StopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger lo
shutdownCtx, shutdownCancel := context.WithTimeout(ctx, 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("Failed to shutdown %s server: %v", svcName, err)
return fmt.Errorf("failed to shutdown %s server: %v", svcName, err)
}
return fmt.Errorf("%s service shutdown by signal: %s", svcName, sig)
case <-ctx.Done():
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/api/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func TestPublish(t *testing.T) {
{"publish data", data, http.StatusOK},
{"publish data with invalid data", "}", http.StatusInternalServerError},
}
t.Cleanup(func() {
assert.Nil(t, svc.Close())
})

for _, tc := range cases {
req := testRequest{
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,16 @@ func (lm loggingMiddleware) Terminal(uuid, cmdStr string) (err error) {

return lm.svc.Terminal(uuid, cmdStr)
}

func (lm loggingMiddleware) Close() (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method close took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Close()
}
13 changes: 11 additions & 2 deletions pkg/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,18 @@ func (ms *metricsMiddleware) Publish(topic, payload string) error {

func (ms *metricsMiddleware) Terminal(topic, payload string) error {
defer func(begin time.Time) {
ms.counter.With("method", "publish").Add(1)
ms.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
ms.counter.With("method", "terminal").Add(1)
ms.latency.With("method", "terminal").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Terminal(topic, payload)
}

func (ms *metricsMiddleware) Close() error {
defer func(begin time.Time) {
ms.counter.With("method", "close").Add(1)
ms.latency.With("method", "close").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Close()
}
18 changes: 12 additions & 6 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ package agent
import (
"crypto/tls"
"encoding/json"
"fmt"
"os"
"time"

"github.com/mainflux/mainflux/pkg/errors"
"github.com/pelletier/go-toml"
)

var (
ErrWritingToml = errors.New("error writing to toml file")
errReadingFile = errors.New("error reading config file")
errUnmarshalToml = errors.New("error unmarshaling toml")
errMarshalToml = errors.New("error marshaling toml")
)

type ServerConfig struct {
Port string `toml:"port" json:"port"`
BrokerURL string `toml:"broker_url" json:"broker_url"`
Expand Down Expand Up @@ -86,24 +92,24 @@ func NewConfig(sc ServerConfig, cc ChanConfig, ec EdgexConfig, lc LogConfig, mc
func SaveConfig(c Config) error {
b, err := toml.Marshal(c)
if err != nil {
return errors.New(fmt.Sprintf("Error reading config file: %s", err))
return errors.Wrap(errMarshalToml, err)
}
if err := os.WriteFile(c.File, b, 0644); err != nil {
return errors.New(fmt.Sprintf("Error writing toml: %s", err))
return errors.Wrap(ErrWritingToml, err)
}
return nil
}

// Read - retrieve config from a file.
// ReadConfig - retrieve config from a file.
func ReadConfig(file string) (Config, error) {
data, err := os.ReadFile(file)
c := Config{}
if err != nil {
return c, errors.New(fmt.Sprintf("Error reading config file: %s", err))
return Config{}, errors.Wrap(errReadingFile, err)
}

if err := toml.Unmarshal(data, &c); err != nil {
return Config{}, errors.New(fmt.Sprintf("Error unmarshaling toml: %s", err))
return Config{}, errors.Wrap(errUnmarshalToml, err)
}
return c, nil
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/agent/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package agent

import (
"fmt"
"os"
"strings"
"testing"

"github.com/mainflux/mainflux/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestReadConfig(t *testing.T) {
// Create a temporary config file for testing.
tempFile, err := os.CreateTemp("", "config.toml")
if err != nil {
t.Fatalf("Failed to create temporary file: %v", err)
}
defer os.Remove(tempFile.Name())
tempFile2, err := os.CreateTemp("", "invalid.toml")
if err != nil {
t.Fatalf("Failed to create temporary file: %v", err)
}
defer os.Remove(tempFile2.Name())

sampleConfig := `
File = "config.toml"

[channels]
control = ""
data = ""

[edgex]
url = "http://localhost:48090/api/v1/"

[heartbeat]
interval = "10s"

[log]
level = "info"

[mqtt]
ca_cert = ""
ca_path = "ca.crt"
cert_path = "thing.cert"
client_cert = ""
client_key = ""
mtls = false
password = ""
priv_key_path = "thing.key"
qos = 0
retain = false
skip_tls_ver = true
url = "localhost:1883"
username = ""

[server]
nats_url = "nats://127.0.0.1:4222"
port = "9999"

[terminal]
session_timeout = "1m0s"
`

if _, writeErr := tempFile.WriteString(sampleConfig); writeErr != nil {
t.Fatalf("Failed to write to temporary file: %v", writeErr)
}
tempFile.Close()

if _, writeErr := tempFile2.WriteString(strings.ReplaceAll(sampleConfig, "[", "")); writeErr != nil {
t.Fatalf("Failed to write to temporary file: %v", writeErr)
}
tempFile2.Close()

tests := []struct {
name string
fileName string
expectedErr error
}{
{
name: "failed to read file",
fileName: "invalidFile.toml",
expectedErr: errReadingFile,
},
{
name: "invalid toml",
fileName: tempFile2.Name(),
expectedErr: errUnmarshalToml,
},
{
name: "successful read",
fileName: tempFile.Name(),
expectedErr: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := ReadConfig(test.fileName)
assert.True(t, errors.Contains(err, test.expectedErr), fmt.Sprintf("expected %v got %v", test.expectedErr, err))
})
}
}
26 changes: 19 additions & 7 deletions pkg/agent/heartbeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"context"
"sync"
"time"
)
Expand Down Expand Up @@ -33,11 +34,12 @@ type Info struct {
type Heartbeat interface {
Update()
Info() Info
Close()
}

// interval - duration of interval
// if service doesnt send heartbeat during interval it is marked offline.
func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat {
func NewHeartbeat(ctx context.Context, name, svcType string, interval time.Duration) Heartbeat {
ticker := time.NewTicker(interval)
s := svc{
info: Info{
Expand All @@ -49,22 +51,25 @@ func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat {
ticker: ticker,
interval: interval,
}
s.listen()
go s.listen(ctx)
return &s
}

func (s *svc) listen() {
go func() {
for range s.ticker.C {
func (s *svc) listen(ctx context.Context) {
for {
select {
case <-s.ticker.C:
// TODO - we can disable ticker when the status gets OFFLINE
// and on the next heartbeat enable it again.
s.mu.Lock()
if time.Now().After(s.info.LastSeen.Add(s.interval)) {
s.info.Status = offline
}
s.mu.Unlock()
case <-ctx.Done():
return
}
}()
}
}

func (s *svc) Update() {
Expand All @@ -75,5 +80,12 @@ func (s *svc) Update() {
}

func (s *svc) Info() Info {
return s.info
s.mu.Lock()
defer s.mu.Unlock()
info := s.info
return info
}

func (s *svc) Close() {
s.ticker.Stop()
}
72 changes: 72 additions & 0 deletions pkg/agent/heartbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package agent

import (
"context"
"testing"
"time"
)

const (
name = "TestService"
serviceType = "TestType"
interval = 2 * time.Second
)

func TestNewHeartbeat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
heartbeat := NewHeartbeat(ctx, name, serviceType, interval)

// Check initial status and info
info := heartbeat.Info()
if info.Name != name {
t.Errorf("Expected name to be %s, but got %s", name, info.Name)
}
if info.Type != serviceType {
t.Errorf("Expected type to be %s, but got %s", serviceType, info.Type)
}
if info.Status != online {
t.Errorf("Expected initial status to be %s, but got %s", online, info.Status)
}
t.Cleanup(func() {
cancel()
heartbeat.Close()
})
}

func TestHeartbeat_Update(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
heartbeat := NewHeartbeat(ctx, name, serviceType, interval)

// Sleep for more than the interval to simulate an update
time.Sleep(3 * time.Second)

heartbeat.Update()

// Check if the status has been updated to online
info := heartbeat.Info()
if info.Status != online {
t.Errorf("Expected status to be %s, but got %s", online, info.Status)
}
t.Cleanup(func() {
cancel()
heartbeat.Close()
})
}

func TestHeartbeat_StatusOffline(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
heartbeat := NewHeartbeat(ctx, name, serviceType, interval)

// Sleep for more than two intervals to simulate offline status
time.Sleep(5 * time.Second)

// Check if the status has been updated to offline
info := heartbeat.Info()
if info.Status != offline {
t.Errorf("Expected status to be %s, but got %s", offline, info.Status)
}
t.Cleanup(func() {
cancel()
heartbeat.Close()
})
}
Loading