Skip to content

Commit

Permalink
Fix #18 invocations failing without namespace
Browse files Browse the repository at this point in the history
When no namespace was set, invocations were failing due to the
way strings were being built.

Tested e2e with faasd and nodeinfo.

Also fixes: #19

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Jul 14, 2021
1 parent 63646e5 commit f21d5e7
Show file tree
Hide file tree
Showing 9 changed files with 455 additions and 97 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
build:
strategy:
matrix:
go-version: [1.13.x]
go-version: [1.16.x]
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
publish:
strategy:
matrix:
go-version: [1.13.x]
go-version: [1.16.x]
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/openfaas/cron-connector

go 1.15
go 1.16

require (
github.com/openfaas/connector-sdk v0.0.0-20201220114541-89f0ffcc5448
github.com/openfaas/faas-cli v0.0.0-20210208174727-eecaee04d592
github.com/openfaas/faas-provider v0.16.2
github.com/openfaas/connector-sdk v0.6.1
github.com/openfaas/faas-cli v0.0.0-20210617112918-72816d486cf7
github.com/openfaas/faas-provider v0.18.5
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.0
github.com/robfig/cron/v3 v3.0.1
)
403 changes: 371 additions & 32 deletions go.sum

Large diffs are not rendered by default.

56 changes: 34 additions & 22 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) OpenFaaS Author(s) 2020. All rights reserved.
// Copyright (c) OpenFaaS Author(s) 2021. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package main
Expand Down Expand Up @@ -29,7 +29,11 @@ func main() {
log.Printf("Gateway URL: %s", config.GatewayURL)
log.Printf("Async Invocation: %v", config.AsyncFunctionInvocation)

invoker := types.NewInvoker(gatewayRoute(config), types.MakeClient(config.UpstreamTimeout), config.PrintResponse)
invoker := types.NewInvoker(gatewayRoute(config),
types.MakeClient(config.UpstreamTimeout),
config.ContentType,
config.PrintResponse)

cronScheduler := cfunction.NewScheduler()
topic := "cron-function"
interval := time.Second * 10
Expand All @@ -53,29 +57,35 @@ func getControllerConfig() (*types.ControllerConfig, error) {
gURL, ok := os.LookupEnv("gateway_url")

if !ok {
return nil, fmt.Errorf("Gateway URL not set")
return nil, fmt.Errorf("gateway_url environment variable not set")
}

asynchronousInvocation := false
if val, exists := os.LookupEnv("asynchronous_invocation"); exists {
asynchronousInvocation = (val == "1" || val == "true")
}

contentType := "text/plain"
if v, exists := os.LookupEnv("content_type"); exists && len(v) > 0 {
contentType = v
}

return &types.ControllerConfig{
RebuildInterval: time.Millisecond * 1000,
GatewayURL: gURL,
PrintResponse: true,
AsyncFunctionInvocation: asynchronousInvocation,
ContentType: contentType,
}, nil
}

//BasicAuth basic authentication for the the gateway
// BasicAuth basic authentication for the the gateway
type BasicAuth struct {
Username string
Password string
}

//Set set Authorization header on request
// Set set Authorization header on request
func (auth *BasicAuth) Set(req *http.Request) error {
req.SetBasicAuth(auth.Username, auth.Password)
return nil
Expand Down Expand Up @@ -103,42 +113,44 @@ func startFunctionProbe(interval time.Duration, topic string, c *types.Controlle

namespaces, err := sdkClient.ListNamespaces(ctx)
if err != nil {
return fmt.Errorf("Couldn't fetch Namespaces due to: %s", err)
return fmt.Errorf("can't list namespaces: %w", err)
}

for _, namespace := range namespaces {
functions, err := sdkClient.ListFunctions(ctx, namespace)
if err != nil {
return fmt.Errorf("Couldn't fetch Functions due to: %s", err)
return fmt.Errorf("can't list functions: %w", err)
}

newCronFunctions := RequestsToCronFunctions(functions, namespace, topic)
addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, runningFuncs, namespace)
newCronFunctions := requestsToCronFunctions(functions, namespace, topic)
addFuncs, deleteFuncs := getNewAndDeleteFuncs(newCronFunctions, runningFuncs, namespace)

for _, function := range deleteFuncs {
log.Printf("Unregistered [%s]", function.Function.String())

cronScheduler.Remove(function)
log.Print("deleted function ", function.Function.Name, " in ", function.Function.Namespace)
}

newScheduledFuncs := make(cfunction.ScheduledFunctions, 0)

for _, function := range addFuncs {
f, err := cronScheduler.AddCronFunction(function, invoker)
if err != nil {
log.Fatal("could not add function ", function.Name, " in ", function.Namespace)
return fmt.Errorf("can't add function: %s, %w", function.String(), err)
}

newScheduledFuncs = append(newScheduledFuncs, f)
log.Print("added function ", function.Name, " in ", function.Namespace)
log.Printf("Registered: %s [%s]", function.String(), function.Schedule)
}

runningFuncs = UpdateScheduledFunctions(runningFuncs, newScheduledFuncs, deleteFuncs)
runningFuncs = updateScheduledFunctions(runningFuncs, newScheduledFuncs, deleteFuncs)
}
}
}

// RequestsToCronFunctions converts an array of types.FunctionStatus object to CronFunction, ignoring those that cannot be converted
func RequestsToCronFunctions(functions []ptypes.FunctionStatus, namespace string, topic string) cfunction.CronFunctions {
// requestsToCronFunctions converts an array of types.FunctionStatus object
// to CronFunction, ignoring those that cannot be converted
func requestsToCronFunctions(functions []ptypes.FunctionStatus, namespace string, topic string) cfunction.CronFunctions {
newCronFuncs := make(cfunction.CronFunctions, 0)
for _, function := range functions {
cF, err := cfunction.ToCronFunction(function, namespace, topic)
Expand All @@ -150,8 +162,9 @@ func RequestsToCronFunctions(functions []ptypes.FunctionStatus, namespace string
return newCronFuncs
}

// GetNewAndDeleteFuncs takes new functions and running cron functions and returns functions that need to be added and that need to be deleted
func GetNewAndDeleteFuncs(newFuncs cfunction.CronFunctions, oldFuncs cfunction.ScheduledFunctions, namespace string) (cfunction.CronFunctions, cfunction.ScheduledFunctions) {
// getNewAndDeleteFuncs takes new functions and running cron functions and returns
// functions that need to be added and that need to be deleted
func getNewAndDeleteFuncs(newFuncs cfunction.CronFunctions, oldFuncs cfunction.ScheduledFunctions, namespace string) (cfunction.CronFunctions, cfunction.ScheduledFunctions) {
addFuncs := make(cfunction.CronFunctions, 0)
deleteFuncs := make(cfunction.ScheduledFunctions, 0)

Expand All @@ -170,8 +183,9 @@ func GetNewAndDeleteFuncs(newFuncs cfunction.CronFunctions, oldFuncs cfunction.S
return addFuncs, deleteFuncs
}

// UpdateScheduledFunctions updates the scheduled function with added functions and removes deleted functions
func UpdateScheduledFunctions(running, added, deleted cfunction.ScheduledFunctions) cfunction.ScheduledFunctions {
// updateScheduledFunctions updates the scheduled function with
// added functions and removes deleted functions
func updateScheduledFunctions(running, added, deleted cfunction.ScheduledFunctions) cfunction.ScheduledFunctions {
updatedSchedule := make(cfunction.ScheduledFunctions, 0)

for _, function := range running {
Expand All @@ -180,9 +194,7 @@ func UpdateScheduledFunctions(running, added, deleted cfunction.ScheduledFunctio
}
}

for _, function := range added {
updatedSchedule = append(updatedSchedule, function)
}
updatedSchedule = append(updatedSchedule, added...)

return updatedSchedule
}
8 changes: 4 additions & 4 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) OpenFaaS Author(s) 2020. All rights reserved.
// Copyright (c) OpenFaaS Author(s) 2021. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package main

Expand All @@ -10,7 +10,7 @@ import (
ptypes "github.com/openfaas/faas-provider/types"
)

func TestGetNewAndDeleteFuncs(t *testing.T) {
func TestgetNewAndDeleteFuncs(t *testing.T) {
newCronFunctions := make(cfunction.CronFunctions, 3)
defaultReq := ptypes.FunctionStatus{}
newCronFunctions[0] = cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_unchanged", Namespace: "openfaas-fn", Schedule: "* * * * *"}
Expand All @@ -22,7 +22,7 @@ func TestGetNewAndDeleteFuncs(t *testing.T) {
oldFuncs[1] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_delete", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}
oldFuncs[2] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}

addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, oldFuncs, "openfaas-fn")
addFuncs, deleteFuncs := getNewAndDeleteFuncs(newCronFunctions, oldFuncs, "openfaas-fn")
if !deleteFuncs.Contains(&oldFuncs[1].Function) {
t.Error("function was not deleted")
}
Expand Down Expand Up @@ -52,7 +52,7 @@ func TestNamespaceFuncs(t *testing.T) {
oldFuncs[1] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_delete", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}
oldFuncs[2] = cfunction.ScheduledFunction{Function: cfunction.CronFunction{FuncData: defaultReq, Name: "test_function_to_update", Namespace: "openfaas-fn", Schedule: "* * * * *"}, ID: 0}

addFuncs, deleteFuncs := GetNewAndDeleteFuncs(newCronFunctions, oldFuncs, "openfaas-fn")
addFuncs, deleteFuncs := getNewAndDeleteFuncs(newCronFunctions, oldFuncs, "openfaas-fn")
if !deleteFuncs.Contains(&oldFuncs[1].Function) {
t.Error("function was not deleted")
}
Expand Down
59 changes: 33 additions & 26 deletions types/cron_function.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright (c) OpenFaaS Author(s) 2020. All rights reserved.
// Copyright (c) OpenFaaS Author(s) 2021. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package types

import (
"bytes"
"fmt"
"io/ioutil"
"log"
Expand All @@ -23,24 +22,32 @@ type CronFunction struct {
Schedule string
}

func (c *CronFunction) String() string {
if len(c.Namespace) > 0 {
return fmt.Sprintf("%s.%s", c.Name, c.Namespace)
}

return c.Name
}

// CronFunctions a list of CronFunction
type CronFunctions []CronFunction

// Contains returns true if the provided CronFunction object is in list
func (c *CronFunctions) Contains(cF *CronFunction) bool {

func (c *CronFunctions) Contains(cf *CronFunction) bool {
for _, f := range *c {

if f.Name == cF.Name && f.Namespace == cF.Namespace && f.Schedule == cF.Schedule {
if f.Name == cf.Name &&
f.Namespace == cf.Namespace &&
f.Schedule == cf.Schedule {
return true
}

}

return false
}

// ToCronFunction converts a ptypes.FunctionStatus object to the CronFunction and returns error if it is not possible
// ToCronFunction converts a ptypes.FunctionStatus object to the CronFunction
// and returns error if it is not possible
func ToCronFunction(f ptypes.FunctionStatus, namespace string, topic string) (CronFunction, error) {
if f.Annotations == nil {
return CronFunction{}, errors.New(fmt.Sprint(f.Name, " has no annotations."))
Expand All @@ -66,34 +73,39 @@ func ToCronFunction(f ptypes.FunctionStatus, namespace string, topic string) (Cr

// InvokeFunction Invokes the cron function
func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {
gwURL := fmt.Sprintf("%s/%s.%s", i.GatewayURL, c.Name, c.Namespace)
reader := bytes.NewReader(make([]byte, 0))
httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader)

if httpReq.Body != nil {
defer httpReq.Body.Close()
gwURL := fmt.Sprintf("%s/%s", i.GatewayURL, c.String())
log.Printf("HTTP POST: %s", gwURL)

req, err := http.NewRequest(http.MethodPost, gwURL, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create http request")
}

if req.Body != nil {
defer req.Body.Close()
}

var body *[]byte
res, doErr := i.Client.Do(httpReq)
res, err := i.Client.Do(req)

if doErr != nil {
if err != nil {
i.Responses <- types.InvokerResponse{
Error: errors.Wrap(doErr, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
Error: errors.Wrap(err, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
}
return nil, doErr
return nil, err
}

if res.Body != nil {
defer res.Body.Close()
bytesOut, readErr := ioutil.ReadAll(res.Body)
bytesOut, err := ioutil.ReadAll(res.Body)

if readErr != nil {
if err != nil {
log.Printf("Error reading body")
i.Responses <- types.InvokerResponse{
Error: errors.Wrap(readErr, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
Error: errors.Wrap(err, fmt.Sprint("unable to invoke ", c.Name, " in ", c.Namespace)),
}
return nil, doErr
return nil, err
}

body = &bytesOut
Expand All @@ -109,8 +121,3 @@ func (c CronFunction) InvokeFunction(i *types.Invoker) (*[]byte, error) {

return body, nil
}

// CronFunctionInterface defines an interface to work with CronFunction during testing
type CronFunctionInterface interface {
InvokeFunction(i *types.Invoker) (*[]byte, error)
}
10 changes: 5 additions & 5 deletions types/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) OpenFaaS Author(s) 2020. All rights reserved.
// Copyright (c) OpenFaaS Author(s) 2021. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package types
Expand Down Expand Up @@ -38,7 +38,7 @@ type ScheduledFunctions []ScheduledFunction
// AddCronFunction adds a function to cron
func (s *Scheduler) AddCronFunction(c CronFunction, invoker *types.Invoker) (ScheduledFunction, error) {
eID, err := s.main.AddFunc(c.Schedule, func() {
log.Printf("Executed function: %s (ns=%s)", c.Name, c.Namespace)
log.Printf("Executing function: %s", c.String())
c.InvokeFunction(invoker)
})
return ScheduledFunction{c, EntryID(eID)}, err
Expand Down Expand Up @@ -70,11 +70,11 @@ func CheckSchedule(schedule string) bool {
// Contains returns true if the ScheduledFunctions array contains the CronFunction
func (functions *ScheduledFunctions) Contains(cronFunc *CronFunction) bool {
for _, f := range *functions {

if f.Function.Name == cronFunc.Name && f.Function.Namespace == cronFunc.Namespace && f.Function.Schedule == cronFunc.Schedule {
if f.Function.Name == cronFunc.Name &&
f.Function.Namespace == cronFunc.Namespace &&
f.Function.Schedule == cronFunc.Schedule {
return true
}

}

return false
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 OpenFaaS Author(s)
// Copyright 2021 OpenFaaS Author(s)
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package version
Expand Down

0 comments on commit f21d5e7

Please sign in to comment.