diff --git a/README.md b/README.md index 64d3b0d..5e01526 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,42 @@ # nats_to_syslog Subscribes to NATs message bus and forwards messages to remote Syslog server +Example for use getting VM heartbeat messages from a Bosh director: +```bash +./nats_to_syslog \ + -nats-uri "tls://nats:somePassword@somehost:4222" \ + -nats-subject "hm.agent.heartbeat.>" \ + -syslog-endpoint "somesyslogendpoint:5000" \ + -mutualTLSCert ./nats_certificate \ + -mutualTLSKey ./nats_key \ + -noNatsMessagesToDebug \ + -extraFields "cf_foundation:sandbox" \ + -debug false +``` + +# Arguments + +```bash + ./nats_to_syslog --help +Usage of ./nats_to_syslog: + -debug + debug logging true/false + -extraFields string + Extra fields to include in messages + -mutualTLSCert string + Path to cert for mutual TLS to NATS server + -mutualTLSKey string + Path to key for mutual TLS to NATS server + -nats-subject string + The NATS subject to subscribe to (default ">") + -nats-uri string + The NATS server URI (default "nats://localhost:4222") + -noNatsMessagesToDebug + Do not send NATS messages to debug (default true) + -syslog-endpoint string + The remote syslog server host:port (default "localhost:514") +``` + # Testing ## Dependencies @@ -14,7 +50,7 @@ godep go test # Build Instructions -- Ensure you have go 1.6.x installed +- Ensure you have go 1.8.x installed - To cross compile for linux on a mac: ``` diff --git a/main.go b/main.go index 0926f15..749fd54 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,18 @@ package main import ( + "crypto/tls" "encoding/json" "flag" - "github.com/nats-io/nats" - "github.com/pivotal-golang/lager" "log/syslog" "os" "os/signal" + "regexp" "strings" "syscall" + + "code.cloudfoundry.org/lager" + "github.com/nats-io/nats" ) type logEntry struct { @@ -28,32 +31,53 @@ func main() { trapSignals() - var natsUri = flag.String("nats-uri", "nats://localhost:4222", "The NATS server URI") + var natsURI = flag.String("nats-uri", "nats://localhost:4222", "The NATS server URI") var natsSubject = flag.String("nats-subject", ">", "The NATS subject to subscribe to") var syslogEndpoint = flag.String("syslog-endpoint", "localhost:514", "The remote syslog server host:port") var debug = flag.Bool("debug", false, "debug logging true/false") + var mutualTLSKey = flag.String("mutualTLSKey", "", "Path to key for mutual TLS to NATS server") + var mutualTLSCert = flag.String("mutualTLSCert", "", "Path to cert for mutual TLS to NATS server") + var extraFields = flag.String("extraFields", "", "Extra fields to include in messages") + + // It appears that when debug is set false, the nats messages still go to stdout + // Instead of further sleuthing, will do a workaround for now + var noNatsMessagesToDebug = flag.Bool("noNatsMessagesToDebug", true, "Do not send NATS messages to debug") + flag.Parse() + // Handle any extra fields provided + extraFieldCaptures := regexp.MustCompile("([^:,]+):([^:,]+)").FindAllStringSubmatch(*extraFields, -1) + + // Sanity check that mutual TLS key cert both set or not set + if (*mutualTLSKey == "" && *mutualTLSCert != "") || (*mutualTLSKey != "" && *mutualTLSCert == "") { + os.Stderr.WriteString("Usage error: mutualTLSKey and mutualTLSCert must be provided together\n") + os.Exit(1) + } + setupLogger(*debug) syslog := connectToSyslog(*syslogEndpoint) defer syslog.Close() - natsClient := connectToNATS(*natsUri) + natsClient := connectToNATS(*natsURI, *mutualTLSKey, *mutualTLSCert) defer natsClient.Close() go func() { for message := range buffer { - sendToSyslog(message, syslog) + sendToSyslog(message, syslog, *noNatsMessagesToDebug, extraFieldCaptures) } }() - natsClient.Subscribe(*natsSubject, func(message *nats.Msg) { + _, err := natsClient.Subscribe(*natsSubject, func(message *nats.Msg) { buffer <- message }) - logger.Info("subscribed-to-subject", lager.Data{"subject": *natsSubject}) + if err != nil { + logger.Error("subscribed-to-subject-failed", err, lager.Data{"subject": *natsSubject}) + } else { + logger.Info("subscribed-to-subject", lager.Data{"subject": *natsSubject}) - <-stop + <-stop + } logger.Info("bye.") } @@ -66,16 +90,38 @@ func handleError(err error, context string) { } } -func buildLogMessage(message *nats.Msg) string { +func buildLogMessage(message *nats.Msg, extraFieldCaptures [][]string) string { + + // Add any extra fields + var f interface{} + err := json.Unmarshal(message.Data, &f) + + if err != nil { + logger.Error("unmarshalling-message-data-failed", err, lager.Data{"data": string(message.Data)}) + return "" + } + + m := f.(map[string]interface{}) + + for _, match := range extraFieldCaptures { + m[match[1]] = match[2] + } + + messageDataModified, err := json.Marshal(m) + if err != nil { + logger.Error("unmarshalling-message-data-failed", err, lager.Data{"data": string(message.Data)}) + return "" + } + entry := logEntry{ - Data: string(message.Data), + Data: string(messageDataModified), Reply: message.Reply, Subject: message.Subject, } data, err := json.Marshal(entry) if err != nil { - logger.Error("unmarshalling-log-failed", err, lager.Data{"data": string(message.Data)}) + logger.Error("marshalling-log-failed", err, lager.Data{"data": string(message.Data)}) return "" } @@ -89,16 +135,39 @@ func connectToSyslog(endpoint string) *syslog.Writer { return syslog } -func connectToNATS(natsUri string) *nats.Conn { - natsClient, err := nats.Connect(natsUri) +func connectToNATS(natsURI string, mutualTLSKey string, mutualTLSCert string) *nats.Conn { + cert, err := tls.LoadX509KeyPair(mutualTLSCert, mutualTLSKey) + handleError(err, "LoadX509KeyPair") + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + + // Handle no mutual TLS key/cert please + // ToDo: handle no mutual TLS key/cert + natsClient, err := nats.Connect(natsURI, + nats.Secure(tlsConfig), + nats.DisconnectHandler(func(nc *nats.Conn) { + logger.Info("Got disconnected!", lager.Data{"message": nc.LastError()}) + stop <- true + }), + nats.ReconnectHandler(func(nc *nats.Conn) { + logger.Info("Got reconnected to %v!\n", lager.Data{"message": nc.ConnectedUrl()}) + }), + nats.ClosedHandler(func(nc *nats.Conn) { + logger.Info("Connection closed", lager.Data{"message": nc.LastError()}) + stop <- true + })) handleError(err, "connecting to nats") - logger.Info("connected-to-nats", lager.Data{"uri": natsUri}) + logger.Info("connected-to-nats", lager.Data{"uri": natsURI}) return natsClient } -func sendToSyslog(message *nats.Msg, syslog *syslog.Writer) { - logMessage := buildLogMessage(message) - logger.Debug("message-sent-to-syslog", lager.Data{"message": logMessage}) +func sendToSyslog(message *nats.Msg, syslog *syslog.Writer, noNatsMessagesToDebug bool, extraFieldCaptures [][]string) { + logMessage := buildLogMessage(message, extraFieldCaptures) + if noNatsMessagesToDebug { + logger.Debug("message-sent-to-syslog", lager.Data{"message": logMessage}) + } err := syslog.Info(logMessage) if err != nil { logger.Error("logging-to-syslog-failed", err)