Skip to content

Commit

Permalink
Initial journald log provider attempt
Browse files Browse the repository at this point in the history
**What**
- journald log provider using exec to journalctl
```
journalctl -t <namespace>:<name>  --output=json --since=<timestamp> <--follow> --output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP
```
- This can be tested manually using `faas-cli logs` as normal, e.g.
  `faas-cli logs nodeinfo` should tail the last 5 mins of logs.
- Very basic tests ensuring that the `journalctl` comamand is correctly
  construction and that the json log entrys are parsed correctly.
- Add simple e2e test to grep the function logs

Signed-off-by: Lucas Roesler <[email protected]>
  • Loading branch information
LucasRoesler authored and alexellis committed Mar 7, 2020
1 parent 667d74a commit 22882e2
Show file tree
Hide file tree
Showing 14 changed files with 497 additions and 25 deletions.
8 changes: 5 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ test-e2e:
/usr/local/bin/faas-cli remove figlet
sleep 3
/usr/local/bin/faas-cli list
sleep 1
/usr/local/bin/faas-cli logs figlet | grep Forking
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ An active community of almost 3000 users awaits you on Slack. Over 250 of those
* `faas login`
* `faas up`
* `faas list`
* `faas describe`
* `faas describe`
* `faas deploy --update=true --replace=false`
* `faas invoke --async`
* `faas invoke`
Expand All @@ -130,12 +130,12 @@ An active community of almost 3000 users awaits you on Slack. Over 250 of those
* `faas version`
* `faas namespace`
* `faas secret`
* `faas logs`

Scale from and to zero is also supported. On a Dell XPS with a small, pre-pulled image unpausing an existing task took 0.19s and starting a task for a killed function took 0.39s. There may be further optimizations to be gained.

Other operations are pending development in the provider such as:

* `faas logs` - to stream logs on-demand for a known function, for the time being you can find logs via `journalctl -u faasd-provider`
* `faas auth` - supported for Basic Authentication, but OAuth2 & OIDC require a patch

## Todo
Expand Down
11 changes: 3 additions & 8 deletions cmd/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (

"github.com/containerd/containerd"
bootstrap "github.com/openfaas/faas-provider"
"github.com/openfaas/faas-provider/logs"
"github.com/openfaas/faas-provider/proxy"
"github.com/openfaas/faas-provider/types"
"github.com/openfaas/faasd/pkg/cninetwork"
faasdlogs "github.com/openfaas/faasd/pkg/logs"
"github.com/openfaas/faasd/pkg/provider/config"
"github.com/openfaas/faasd/pkg/provider/handlers"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -93,14 +95,7 @@ func makeProviderCmd() *cobra.Command {
InfoHandler: handlers.MakeInfoHandler(Version, GitCommit),
ListNamespaceHandler: listNamespaces(),
SecretHandler: handlers.MakeSecretHandler(client, userSecretPath),
LogHandler: func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}

w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte(`Logs are not implemented for faasd`))
},
LogHandler: logs.NewLogHandlerFunc(faasdlogs.New(), config.ReadTimeout),
}

log.Printf("Listening on TCP port: %d\n", *config.TCPPort)
Expand Down
6 changes: 6 additions & 0 deletions pkg/contants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package pkg

const (
// FunctionNamespace is the default containerd namespace functions are created
FunctionNamespace = "openfaas-fn"
)
185 changes: 185 additions & 0 deletions pkg/logs/requestor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package logs

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os/exec"
"strconv"
"strings"
"time"

"github.com/openfaas/faas-provider/logs"

faasd "github.com/openfaas/faasd/pkg"
)

type requester struct{}

// New returns a new journalctl log Requester
func New() logs.Requester {
return &requester{}
}

// Query submits a log request to the actual logging system.
func (r *requester) Query(ctx context.Context, req logs.Request) (<-chan logs.Message, error) {
_, err := exec.LookPath("journalctl")
if err != nil {
return nil, fmt.Errorf("can not find journalctl: %w", err)
}

cmd := buildCmd(ctx, req)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to create journalctl pipe: %w", err)
}

stderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to create journalctl err pipe: %w", err)
}

err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("failed to create journalctl: %w", err)
}

// call start and get the stdout prior to streaming so that we can return a meaningful
// error for as long as possible. If the cmd starts correctly, we are highly likely to
// succeed anyway
msgs := make(chan logs.Message, 100)
go streamLogs(ctx, cmd, stdout, msgs)
go logErrOut(stderr)

return msgs, nil
}

// buildCmd reeturns the equivalent of
//
// journalctl -t <namespace>:<name> \
// --output=json \
// --since=<timestamp> \
// <--follow> \
// --output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP
func buildCmd(ctx context.Context, req logs.Request) *exec.Cmd {
// // set the cursor position based on req, default to 5m
since := time.Now().Add(-5 * time.Minute)
if req.Since != nil && req.Since.Before(time.Now()) {
since = *req.Since
}

namespace := req.Namespace
if namespace == "" {
namespace = faasd.FunctionNamespace
}

// find the description of the fields here
// https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html
// the available fields can vary greatly, the selected fields were detemined by
// trial and error with journalctl in an ubuntu VM (via multipass)
args := []string{
"--utc",
"--no-pager",
"--output=json",
"--output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP",
"--identifier=" + namespace + ":" + req.Name,
fmt.Sprintf("--since=%s", since.UTC().Format("2006-01-02 15:04:05")),
}

if req.Follow {
args = append(args, "--follow")
}

if req.Tail > 0 {
args = append(args, fmt.Sprintf("--lines=%d", req.Tail))
}

return exec.CommandContext(ctx, "journalctl", args...)
}

// streamLogs copies log entries from the journalctl `cmd`/`out` to `msgs`
// the loop is based on the Decoder example in the docs
// https://golang.org/pkg/encoding/json/#Decoder.Decode
func streamLogs(ctx context.Context, cmd *exec.Cmd, out io.ReadCloser, msgs chan logs.Message) {
log.Println("starting journal stream using ", cmd.String())

// will ensure `out` is closed and all related resources cleaned up
go func() {
err := cmd.Wait()
log.Println("wait result", err)
}()

defer func() {
log.Println("closing journal stream")
close(msgs)
}()

dec := json.NewDecoder(out)
for dec.More() {
if ctx.Err() != nil {
log.Println("log stream context cancelled")
return
}

// the journalctl outputs all the values as a string, so a struct with json
// tags wont help much
entry := map[string]string{}
err := dec.Decode(&entry)
if err != nil {
log.Printf("error decoding journalctl output: %s", err)
return
}

msg, err := parseEntry(entry)
if err != nil {
log.Printf("error parsing journalctl output: %s", err)
return
}

msgs <- msg
}
}

// parseEntry reads the deserialized json from journalctl into a log.Message
//
// The following fields are parsed from the journal
// - MESSAGE
// - _PID
// - SYSLOG_IDENTIFIER
// - __REALTIME_TIMESTAMP
func parseEntry(entry map[string]string) (logs.Message, error) {
logMsg := logs.Message{
Text: entry["MESSAGE"],
Instance: entry["_PID"],
}

identifier := entry["SYSLOG_IDENTIFIER"]
parts := strings.Split(identifier, ":")
if len(parts) != 2 {
return logMsg, fmt.Errorf("invalid SYSLOG_IDENTIFIER")
}
logMsg.Namespace = parts[0]
logMsg.Name = parts[1]

ts, ok := entry["__REALTIME_TIMESTAMP"]
if !ok {
return logMsg, fmt.Errorf("missing required field __REALTIME_TIMESTAMP")
}

ms, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return logMsg, fmt.Errorf("invalid timestamp: %w", err)
}
logMsg.Timestamp = time.Unix(0, ms*1000).UTC()

return logMsg, nil
}

func logErrOut(out io.ReadCloser) {
defer log.Println("stderr closed")
defer out.Close()

io.Copy(log.Writer(), out)
}
73 changes: 73 additions & 0 deletions pkg/logs/requestor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package logs

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

"github.com/openfaas/faas-provider/logs"
)

func Test_parseEntry(t *testing.T) {
rawEntry := `{ "__CURSOR" : "s=71c4550142d14ace8e2959e3540cc15c;i=133c;b=44864010f0d94baba7b6bf8019f82a56;m=2945cd3;t=5a00d4eb59180;x=8ed47f7f9b3d798", "__REALTIME_TIMESTAMP" : "1583353899094400", "__MONOTONIC_TIMESTAMP" : "43277523", "_BOOT_ID" : "44864010f0d94baba7b6bf8019f82a56", "SYSLOG_IDENTIFIER" : "openfaas-fn:nodeinfo", "_PID" : "2254", "MESSAGE" : "2020/03/04 20:31:39 POST / - 200 OK - ContentLength: 83", "_SOURCE_REALTIME_TIMESTAMP" : "1583353899094372" }`
expectedEntry := logs.Message{
Name: "nodeinfo",
Namespace: "openfaas-fn",
Text: "2020/03/04 20:31:39 POST / - 200 OK - ContentLength: 83",
Timestamp: time.Unix(0, 1583353899094400*1000).UTC(),
}

value := map[string]string{}
json.Unmarshal([]byte(rawEntry), &value)

entry, err := parseEntry(value)
if err != nil {
t.Fatalf("unexpected error %s", err)
}

if entry.Name != expectedEntry.Name {
t.Fatalf("expected Name %s, got %s", expectedEntry.Name, entry.Name)
}

if entry.Namespace != expectedEntry.Namespace {
t.Fatalf("expected Namespace %s, got %s", expectedEntry.Namespace, entry.Namespace)
}

if entry.Timestamp != expectedEntry.Timestamp {
t.Fatalf("expected Timestamp %s, got %s", expectedEntry.Timestamp, entry.Timestamp)
}

if entry.Text != expectedEntry.Text {
t.Fatalf("expected Text %s, got %s", expectedEntry.Text, entry.Text)
}
}

func Test_buildCmd(t *testing.T) {
ctx := context.TODO()
now := time.Now()
req := logs.Request{
Name: "loggyfunc",
Namespace: "spacetwo",
Follow: true,
Since: &now,
Tail: 5,
}

expectedArgs := fmt.Sprintf(
"--utc --no-pager --output=json --output-fields=SYSLOG_IDENTIFIER,MESSAGE,_PID,_SOURCE_REALTIME_TIMESTAMP --identifier=spacetwo:loggyfunc --since=%s --follow --lines=5",
now.UTC().Format("2006-01-02 15:04:05"),
)

cmd := buildCmd(ctx, req).String()

if !strings.Contains(cmd, "journalctl") {
t.Fatalf("expected journalctl cmd, got cmd %s", cmd)
}

if !strings.HasSuffix(cmd, expectedArgs) {
t.Fatalf("expected arg %s,\ngot cmd %s", expectedArgs, cmd)
}
}
4 changes: 3 additions & 1 deletion pkg/provider/handlers/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/containerd/containerd/namespaces"
gocni "github.com/containerd/go-cni"
"github.com/openfaas/faas/gateway/requests"

faasd "github.com/openfaas/faasd/pkg"
cninetwork "github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
)
Expand Down Expand Up @@ -49,7 +51,7 @@ func MakeDeleteHandler(client *containerd.Client, cni gocni.CNI) func(w http.Res
return
}

ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)

// TODO: this needs to still happen if the task is paused
if function.replicas != 0 {
Expand Down
3 changes: 2 additions & 1 deletion pkg/provider/handlers/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/docker/distribution/reference"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/openfaas/faas-provider/types"
faasd "github.com/openfaas/faasd/pkg"
cninetwork "github.com/openfaas/faasd/pkg/cninetwork"
"github.com/openfaas/faasd/pkg/service"
"github.com/pkg/errors"
Expand Down Expand Up @@ -52,7 +53,7 @@ func MakeDeployHandler(client *containerd.Client, cni gocni.CNI, secretMountPath
}

name := req.Service
ctx := namespaces.WithNamespace(context.Background(), FunctionNamespace)
ctx := namespaces.WithNamespace(context.Background(), faasd.FunctionNamespace)

deployErr := deploy(ctx, req, client, cni, secretMountPath, alwaysPull)
if deployErr != nil {
Expand Down
Loading

0 comments on commit 22882e2

Please sign in to comment.