Skip to content

Commit

Permalink
Disable alert handlers with a flag (#2591)
Browse files Browse the repository at this point in the history
This creates a flag that allows you to disable alert handlers. This is useful for security reasons such as disabling exec on shared machines.
  • Loading branch information
docmerlin authored Jul 9, 2021
1 parent 22bbcc7 commit ebcb9f1
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 23 deletions.
28 changes: 19 additions & 9 deletions cmd/kapacitord/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"

"github.com/BurntSushi/toml"
"github.com/influxdata/flux/fluxinit"
Expand Down Expand Up @@ -103,7 +104,6 @@ func (cmd *Command) Run(args ...string) error {
if options.LogLevel != "" {
config.Logging.Level = options.LogLevel
}

// Initialize Logging Services
cmd.diagService = diagnostic.NewService(config.Logging, cmd.Stdout, cmd.Stderr)
if err := cmd.diagService.Open(); err != nil {
Expand All @@ -127,8 +127,13 @@ func (cmd *Command) Run(args ...string) error {
fluxinit.FluxInit()

// Create server from config and start it.
disabledAlertHandlers := map[string]struct{}{}
for _, x := range strings.Split(options.DisabledAlertHandlers, ",") {
disabledAlertHandlers[strings.TrimSpace(x)] = struct{}{}
}

buildInfo := server.BuildInfo{Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch, Platform: cmd.Platform}
s, err := server.New(config, buildInfo, cmd.diagService)
s, err := server.New(config, buildInfo, cmd.diagService, disabledAlertHandlers)
if err != nil {
return fmt.Errorf("create server: %s", err)
}
Expand Down Expand Up @@ -192,6 +197,7 @@ func (cmd *Command) ParseFlags(args ...string) (Options, error) {
fs.StringVar(&options.MemProfile, "memprofile", "", "")
fs.StringVar(&options.LogFile, "log-file", "", "")
fs.StringVar(&options.LogLevel, "log-level", "", "")
fs.StringVar(&options.DisabledAlertHandlers, "disable-handlers", "", "")
fs.Usage = func() { fmt.Fprintln(cmd.Stderr, usage) }
if err := fs.Parse(args); err != nil {
return Options{}, err
Expand Down Expand Up @@ -258,15 +264,19 @@ run starts the Kapacitor server.
-log-level <level>
Sets the log level. One of debug,info,error.
-disable-handlers <comma-separated list of alert-handlers>
Disables certain alert handlers. This is useful for security, reasons. For example: disabling exec on a shared system.
`

// Options represents the command line options that can be parsed.
type Options struct {
ConfigPath string
PIDFile string
Hostname string
CPUProfile string
MemProfile string
LogFile string
LogLevel string
ConfigPath string
PIDFile string
Hostname string
CPUProfile string
MemProfile string
LogFile string
LogLevel string
DisabledAlertHandlers string
}
2 changes: 1 addition & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13842,7 +13842,7 @@ func createTaskMaster(name string) (*kapacitor.TaskMaster, error) {
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler())
as := alertservice.NewService(diagService.NewAlertServiceHandler())
as := alertservice.NewService(diagService.NewAlertServiceHandler(), nil)
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
if err := as.Open(); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type Server struct {

FluxTaskService taskmodel.TaskService

// DisabledHandlers are the disabled alert handlers.
DisabledHandlers map[string]struct{}

LoadService *load.Service
SideloadService *sideload.Service
AuthService auth.Interface
Expand Down Expand Up @@ -167,7 +170,7 @@ type Server struct {
}

// New returns a new instance of Server built from a config.
func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Server, error) {
func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service, disabledAlertHandlers map[string]struct{}) (*Server, error) {
err := c.Validate()
if err != nil {
return nil, fmt.Errorf("invalid configuration: %s. To generate a valid configuration file run `kapacitord config > kapacitor.generated.conf`.", err)
Expand All @@ -181,8 +184,12 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv
tlsConfig = new(tls.Config)
}
d := diagService.NewServerHandler()
if disabledAlertHandlers == nil {
disabledAlertHandlers = map[string]struct{}{}
}
s := &Server{
config: c,
DisabledHandlers: disabledAlertHandlers,
tlsConfig: tlsConfig,
BuildInfo: buildInfo,
dataDir: c.DataDir,
Expand Down Expand Up @@ -397,7 +404,7 @@ func (s *Server) appendConfigOverrideService() {

func (s *Server) initAlertService() {
d := s.DiagService.NewAlertServiceHandler()
srv := alert.NewService(d)
srv := alert.NewService(d, s.DisabledHandlers)

srv.Commander = s.Commander
srv.HTTPDService = s.HTTPDService
Expand Down
17 changes: 13 additions & 4 deletions server/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Server struct {
}

// NewServer returns a new instance of Server.
func NewServer(c *server.Config) *Server {
func NewServer(c *server.Config, disabled map[string]struct{}) *Server {
configureLogging()
buildInfo := server.BuildInfo{
Version: "testServer",
Expand All @@ -44,7 +44,7 @@ func NewServer(c *server.Config) *Server {
c.HTTP.LogEnabled = testing.Verbose()
ds := diagnostic.NewService(diagnostic.NewConfig(), ioutil.Discard, ioutil.Discard)
ds.Open()
srv, err := server.New(c, buildInfo, ds)
srv, err := server.New(c, buildInfo, ds, disabled)
if err != nil {
panic(err)
}
Expand All @@ -62,7 +62,7 @@ func (s *Server) Stop() {
}

func (s *Server) Start() {
srv, err := server.New(s.Config, s.buildInfo, s.ds)
srv, err := server.New(s.Config, s.buildInfo, s.ds, s.Server.DisabledHandlers)
if err != nil {
panic(err.Error())
}
Expand Down Expand Up @@ -97,7 +97,16 @@ func OpenLoadServer() (*Server, *server.Config, *client.Client) {

// OpenServer opens a test server.
func OpenServer(c *server.Config) *Server {
s := NewServer(c)
s := NewServer(c, nil)
if err := s.Open(); err != nil {
panic(err.Error())
}
return s
}

// OpenServer opens a test server.
func OpenServerWithDisabledHandlers(c *server.Config, disabledAlertHandlers map[string]struct{}) *Server {
s := NewServer(c, disabledAlertHandlers)
if err := s.Open(); err != nil {
panic(err.Error())
}
Expand Down
121 changes: 117 additions & 4 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"time"

"github.com/davecgh/go-spew/spew"
"github.com/dgrijalva/jwt-go"
jwt "github.com/dgrijalva/jwt-go"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux/fluxinit"
iclient "github.com/influxdata/influxdb/client/v2"
Expand Down Expand Up @@ -5534,7 +5534,7 @@ func TestServer_UDFStreamAgents(t *testing.T) {
}

func testStreamAgent(t *testing.T, c *server.Config) {
s := NewServer(c)
s := NewServer(c, nil)
err := s.Open()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -5740,7 +5740,7 @@ func TestServer_UDFStreamAgentsSocket(t *testing.T) {
}

func testStreamAgentSocket(t *testing.T, c *server.Config) {
s := NewServer(c)
s := NewServer(c, nil)
err := s.Open()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -5989,7 +5989,7 @@ func testBatchAgent(t *testing.T, c *server.Config) {
})
c.InfluxDB[0].URLs = []string{db.URL()}
c.InfluxDB[0].Enabled = true
s := NewServer(c)
s := NewServer(c, nil)
err := s.Open()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -10084,6 +10084,119 @@ func TestServer_AlertHandlers_CRUD(t *testing.T) {
}
}

func TestServer_AlertHandlers_disable(t *testing.T) {

testCases := []struct {
handler client.TopicHandler
setup func(*server.Config, *client.TopicHandler) (context.Context, error)
result func(context.Context) error
disable map[string]struct{}
}{{
disable: map[string]struct{}{"alerta": struct{}{}},
handler: client.TopicHandler{
Kind: "alerta",
Options: map[string]interface{}{
"token": "testtoken1234567",
"token-prefix": "Bearer",
"origin": "kapacitor",
"group": "test",
"environment": "env",
"timeout": time.Duration(24 * time.Hour),
},
},
setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) {
ts := alertatest.NewServer()
ctxt := context.WithValue(context.Background(), "server", ts)

c.Alerta.Enabled = true
c.Alerta.URL = ts.URL
return ctxt, nil
},
result: func(ctxt context.Context) error {
ts := ctxt.Value("server").(*alertatest.Server)
ts.Close()
got := ts.Requests()
exp := []alertatest.Request(nil)
if !reflect.DeepEqual(exp, got) {
return fmt.Errorf("unexpected alerta request:\nexp\n%+v\ngot\n%+v\n", exp, got)
}
return nil
},
}}

for i, tc := range testCases {
t.Run(fmt.Sprintf("%s-%d", tc.handler.Kind, i), func(t *testing.T) {
kind := tc.handler.Kind

// Create default config
c := NewConfig()
var ctxt context.Context
if tc.setup != nil {
var err error
ctxt, err = tc.setup(c, &tc.handler)
if err != nil {
t.Fatal(err)
}
}
s := OpenServerWithDisabledHandlers(c, tc.disable)
cli := Client(s)
closed := false
defer func() {
if !closed {
s.Close()
}
}()
ctxt = context.WithValue(ctxt, "kapacitorURL", s.URL())

if _, err := cli.CreateTopicHandler(cli.TopicHandlersLink("test"), client.TopicHandlerOptions{
ID: "testAlertHandlers",
Kind: tc.handler.Kind,
Options: tc.handler.Options,
}); err != nil {
t.Fatalf("%s: %v", kind, err)
}

tick := `
stream
|from()
.measurement('alert')
|alert()
.topic('test')
.id('id')
.message('message')
.details('details')
.crit(lambda: TRUE)
`

if _, err := cli.CreateTask(client.CreateTaskOptions{
ID: "testAlertHandlers",
Type: client.StreamTask,
DBRPs: []client.DBRP{{
Database: "mydb",
RetentionPolicy: "myrp",
}},
TICKscript: tick,
Status: client.Enabled,
}); err != nil {
t.Fatalf("%s: %v", kind, err)
}

point := "alert value=1 0000000000"
v := url.Values{}
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", point, v)

// Close the entire server to ensure all data is processed
s.Close()
closed = true

if err := tc.result(ctxt); err != nil {
t.Errorf("%s: %v", kind, err)
}
})
}
}

func TestServer_AlertHandlers(t *testing.T) {

resultJSON := `{"series":[{"name":"alert","columns":["time","value"],"values":[["1970-01-01T00:00:00Z",1]]}]}`
Expand Down
15 changes: 12 additions & 3 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type Diagnostic interface {
}

type Service struct {
mu sync.RWMutex

mu sync.RWMutex
disabled map[string]struct{}
specsDAO HandlerSpecDAO
topicsDAO TopicStateDAO
PersistTopics bool
Expand Down Expand Up @@ -154,8 +154,9 @@ type Service struct {
}
}

func NewService(d Diagnostic) *Service {
func NewService(d Diagnostic, disabled map[string]struct{}) *Service {
s := &Service{
disabled: disabled,
handlers: make(map[string]map[string]handler),
closedTopics: make(map[string]bool),
topics: alert.NewTopics(),
Expand Down Expand Up @@ -767,7 +768,15 @@ func decodeStringToTextUnmarshaler(f, t reflect.Type, data interface{}) (interfa
return data, nil
}

var ErrHandlerDIsabled = errors.New("handler disabled")

func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) {

if _, ok := s.disabled[spec.Kind]; ok {
s.diag.Error(fmt.Sprintf("handler '%s' is disabled", spec.Kind), ErrHandlerDIsabled)
return handler{}, nil
}

var h alert.Handler
var err error
ctx := []keyvalue.T{
Expand Down

0 comments on commit ebcb9f1

Please sign in to comment.