Skip to content

Commit

Permalink
Merge pull request #9 from aliyun/add-features
Browse files Browse the repository at this point in the history
add some features
  • Loading branch information
fanzhe328 authored May 9, 2022
2 parents 1dba556 + 1f99859 commit e22e382
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 62 deletions.
4 changes: 2 additions & 2 deletions fc/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func RegisterInitializerFunction(handler interface{}) {

// RegisterPreFreezeFunction ...
func RegisterPreFreezeFunction(handler interface{}) {
registerLifeCycleHandler(handler, initializerFunction)
registerLifeCycleHandler(handler, preFreezeFunction)
}

// RegisterPreStopFunction ...
func RegisterPreStopFunction(handler interface{}) {
registerLifeCycleHandler(handler, initializerFunction)
registerLifeCycleHandler(handler, preStopFunction)
}

// StartWithContext is the same as Start except sets the base context for the function.
Expand Down
41 changes: 23 additions & 18 deletions fc/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ const (
// function type, do not modify!!
handleFunction functionType = 0
initializerFunction functionType = 1
preFreezeFunction functionType = 2
preStopFunction functionType = 3
preStopFunction functionType = 2
preFreezeFunction functionType = 3

// base function type
eventFunction functionType = 101
Expand Down Expand Up @@ -55,7 +55,7 @@ func NewFunction(handler interface{}, funcType functionType) *Function {
return f
}

// RegistryLifeCycleHandler
// RegistryLifeCycleHandler ...
func (fn *Function) RegistryLifeCycleHandler(lifeCycleHandlers []handlerWrapper) {
for _, item := range lifeCycleHandlers {
switch item.funcType {
Expand Down Expand Up @@ -96,12 +96,12 @@ func (fn *Function) Invoke(req *messages.InvokeRequest, response *messages.Invok
lc := &req.Context
lc.RequestID = req.RequestId
invokeContext = fccontext.NewContext(invokeContext, lc)

fn.printStartLog(invokeFuncType, req.RequestId)
if invokeFuncType == initializerFunction {
if fn.initializeHandler == nil {
fn.initializeHandler = errorLifeCycleHandler(errors.New("no initializer handler registered"))
}
fn.printStartLog(invokeFuncType, req.RequestId)

fn.initializeHandler.Invoke(invokeContext)
return nil
}
Expand All @@ -119,10 +119,10 @@ func (fn *Function) Invoke(req *messages.InvokeRequest, response *messages.Invok
fn.preStopHandler = errorLifeCycleHandler(errors.New("no prestop handler registered"))
}
fn.preStopHandler.Invoke(invokeContext)
response.Payload = []byte{}
return nil
}

fn.printStartLog(invokeFuncType, req.RequestId)
if fn.funcType == eventFunction {
return fn.invokeEventFunc(invokeContext, req.Payload, response)
}
Expand All @@ -148,12 +148,12 @@ func (fn *Function) invokeHttpFunc(invokeContext context.Context, httpParams *st
response.Error = fcErrorResponse(err)
return nil
}
respPayload, err := resp.Payload()
response.Payload = resp.Body()
response.HttpParam, err = resp.HttpParam()
if err != nil {
response.Error = fcErrorResponse(err)
return nil
}
response.Payload = respPayload
return nil
}

Expand Down Expand Up @@ -199,27 +199,32 @@ func (fn *Function) printPanicLog(requestId, errorMessage string) {
}

func (fn *Function) printEndLog(funcType functionType, requestId string, isHandled bool) {
if !enableInvokePanicLog {
return
}
suffix := ""
if !isHandled {
suffix = ", Error: Unhandled function error"
}
if funcType == initializerFunction {

switch funcType {
case initializerFunction:
fmt.Printf("FC Initialize End RequestId: %s%s\n", requestId, suffix)
} else {
case preStopFunction:
fmt.Printf("FC PreStop End RequestId: %s%s\n", requestId, suffix)
case preFreezeFunction:
fmt.Printf("FC PreFreeze End RequestId: %s%s\n", requestId, suffix)
default:
fmt.Printf("FC Invoke End RequestId: %s%s\n", requestId, suffix)
}
}

func (fn *Function) printStartLog(funcType functionType, requestId string) {
if !enableInvokePanicLog {
return
}
if funcType == initializerFunction {
switch funcType {
case initializerFunction:
fmt.Printf("FC Initialize Start RequestId: %s\n", requestId)
} else {
case preStopFunction:
fmt.Printf("FC PreStop Start RequestId: %s\n", requestId)
case preFreezeFunction:
fmt.Printf("FC PreFreeze Start RequestId: %s\n", requestId)
default:
fmt.Printf("FC Invoke Start RequestId: %s\n", requestId)
}
}
15 changes: 15 additions & 0 deletions fc/http_handler_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (r *fcResponse) responseParams() (string, error) {
return encoded, nil
}

// Deprecated: Use Body and HttpParam instead
func (r *fcResponse) Payload() ([]byte, error) {
respHeaders := map[string]string{}
for key, values := range r.header {
Expand All @@ -89,3 +90,17 @@ func (r *fcResponse) Payload() ([]byte, error) {
}
return json.Marshal(resp)
}

func (r *fcResponse) Body() []byte {
body := r.body.Bytes()
r.body.Reset()
return body
}

func (r *fcResponse) HttpParam() (string, error) {
encodedHttpParams, err := r.responseParams()
if err != nil {
return "", err
}
return encodedHttpParams, nil
}
75 changes: 38 additions & 37 deletions fc/invoke_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,32 @@ func startRuntimeAPILoop(ctx context.Context, api string, baseHandler handlerWra
function := NewFunction(baseHandler.handler, baseHandler.funcType).withContext(ctx)
function.RegistryLifeCycleHandler(lifeCycleHandlers)
for {
invoke, err := client.next()
req, err := client.next()
if err != nil {
logPrintf("failed to get invoke request due to %v", err)
continue
}
err = handleInvoke(invoke, function)
if err != nil {
logPrintf("failed to invoke function due to %v", err)
}
go func(req *invoke, f *Function) {
err = handleInvoke(req, function)
if err != nil {
logPrintf("failed to invoke function due to %v", err)
}
}(req, function)
}
}

// handleInvoke returns an error if the function panics, or some other non-recoverable error occurred
func handleInvoke(invoke *invoke, function *Function) error {
functionRequest, err := convertInvokeRequest(invoke)
func handleInvoke(invokeInstance *invoke, function *Function) error {
functionRequest, err := convertInvokeRequest(invokeInstance)
if err != nil {
return fmt.Errorf("unexpected error occurred when parsing the invoke: %v", err)
}

functionResponse := &messages.InvokeResponse{}
ivkErr := function.Invoke(functionRequest, functionResponse, convertInvokeFunctionType(invoke))
ivkErr := function.Invoke(functionRequest, functionResponse, convertInvokeFunctionType(invokeInstance))
if functionResponse.Error != nil {
payload := safeMarshal(functionResponse.Error)
if err := invoke.failure(payload, contentTypeJSON); err != nil {
if err := invokeInstance.failure(payload, contentTypeJSON); err != nil {
return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err)
}
if functionResponse.Error.ShouldExit {
Expand All @@ -72,15 +74,14 @@ func handleInvoke(invoke *invoke, function *Function) error {
return ivkErr
}

if err := invoke.success(functionResponse.Payload, contentTypeJSON); err != nil {
if err := invokeInstance.success(functionResponse.Payload, contentTypeJSON, functionResponse.HttpParam); err != nil {
return fmt.Errorf("unexpected error occurred when sending the function functionResponse to the API: %v", err)
}

return nil
}

func convertInvokeFunctionType(invoke *invoke) functionType {
funcType, err := strconv.ParseInt(invoke.headers.Get(headerFunctionType), 10, 64)
func convertInvokeFunctionType(invokeInstance *invoke) functionType {
funcType, err := strconv.ParseInt(invokeInstance.headers.Get(headerFunctionType), 10, 64)
if err != nil {
return handleFunction
}
Expand All @@ -98,29 +99,29 @@ func convertInvokeFunctionType(invoke *invoke) functionType {
}

// convertInvokeRequest converts an invoke from the Runtime API, and unpacks it to be compatible with the shape of a `lambda.Function` InvokeRequest.
func convertInvokeRequest(invoke *invoke) (*messages.InvokeRequest, error) {
deadlineEpochMS, err := strconv.ParseInt(invoke.headers.Get(headerDeadlineMS), 10, 64)
func convertInvokeRequest(invokeInstance *invoke) (*messages.InvokeRequest, error) {
deadlineEpochMS, err := strconv.ParseInt(invokeInstance.headers.Get(headerDeadlineMS), 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header: %s", headerDeadlineMS)
}
deadlineS := deadlineEpochMS / msPerS
deadlineNS := (deadlineEpochMS % msPerS) * nsPerMS

functionTimeoutSec, err := strconv.Atoi(invoke.headers.Get(headerFunctionTimeout))
functionTimeoutSec, err := strconv.Atoi(invokeInstance.headers.Get(headerFunctionTimeout))
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header: %s", headerFunctionTimeout)
}

retryCount := 0
if retryCountStr := invoke.headers.Get(headerRetryCount); retryCountStr != "" {
if retryCountStr := invokeInstance.headers.Get(headerRetryCount); retryCountStr != "" {
retryCount, err = strconv.Atoi(retryCountStr)
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header: %s", headerFunctionTimeout)
}
}

spanBaggages := make(map[string]string)
if base64SpanBaggages := invoke.headers.Get(headerOpenTracingSpanBaggages); base64SpanBaggages != "" {
if base64SpanBaggages := invokeInstance.headers.Get(headerOpenTracingSpanBaggages); base64SpanBaggages != "" {
spanBaggagesByte, err := base64.StdEncoding.DecodeString(base64SpanBaggages)
if err != nil {
return nil, fmt.Errorf("failed to parse contents of header %s: %s", headerOpenTracingSpanContext, base64SpanBaggages)
Expand All @@ -131,44 +132,44 @@ func convertInvokeRequest(invoke *invoke) (*messages.InvokeRequest, error) {
}

res := &messages.InvokeRequest{
RequestId: invoke.id,
RequestId: invokeInstance.id,
Deadline: messages.InvokeRequest_Timestamp{
Seconds: deadlineS,
Nanos: deadlineNS,
},
Payload: invoke.payload,
Payload: invokeInstance.payload,
Context: fccontext.FcContext{
RequestID: invoke.id,
RequestID: invokeInstance.id,
Credentials: fccontext.Credentials{
AccessKeyId: invoke.headers.Get(headerAccessKeyId),
AccessKeySecret: invoke.headers.Get(headerAccessKeySecret),
SecurityToken: invoke.headers.Get(headerSecurityToken),
AccessKeyId: invokeInstance.headers.Get(headerAccessKeyId),
AccessKeySecret: invokeInstance.headers.Get(headerAccessKeySecret),
SecurityToken: invokeInstance.headers.Get(headerSecurityToken),
},
Function: fccontext.Function{
Name: invoke.headers.Get(headerFunctionName),
Handler: invoke.headers.Get(headerFunctionHandler),
Memory: invoke.headers.Get(headerFunctionMemory),
Name: invokeInstance.headers.Get(headerFunctionName),
Handler: invokeInstance.headers.Get(headerFunctionHandler),
Memory: invokeInstance.headers.Get(headerFunctionMemory),
Timeout: functionTimeoutSec,
},
Service: fccontext.Service{
Name: invoke.headers.Get(headerServiceName),
LogProject: invoke.headers.Get(headerServiceLogproject),
LogStore: invoke.headers.Get(headerServiceLogstore),
Qualifier: invoke.headers.Get(headerQualifier),
VersionId: invoke.headers.Get(headerVersionId),
Name: invokeInstance.headers.Get(headerServiceName),
LogProject: invokeInstance.headers.Get(headerServiceLogproject),
LogStore: invokeInstance.headers.Get(headerServiceLogstore),
Qualifier: invokeInstance.headers.Get(headerQualifier),
VersionId: invokeInstance.headers.Get(headerVersionId),
},
Tracing: fccontext.Tracing{
OpenTracingSpanContext: invoke.headers.Get(headerOpenTracingSpanContext),
OpenTracingSpanContext: invokeInstance.headers.Get(headerOpenTracingSpanContext),
OpenTracingSpanBaggages: spanBaggages,
JaegerEndpoint: invoke.headers.Get(headerJaegerEndpoint),
JaegerEndpoint: invokeInstance.headers.Get(headerJaegerEndpoint),
},
Region: invoke.headers.Get(headerRegion),
AccountId: invoke.headers.Get(headerAccountId),
Region: invokeInstance.headers.Get(headerRegion),
AccountId: invokeInstance.headers.Get(headerAccountId),
RetryCount: retryCount,
},
}

if httpParams := invoke.headers.Get(headerHttpParams); httpParams != "" {
if httpParams := invokeInstance.headers.Get(headerHttpParams); httpParams != "" {
res.HttpParams = &httpParams
}

Expand Down
3 changes: 3 additions & 0 deletions fc/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type InvokeRequest struct {
type InvokeResponse struct {
Payload []byte
Error *InvokeResponse_Error

// HttpHandler parameter
HttpParam string
}

type InvokeResponse_Error struct {
Expand Down
13 changes: 8 additions & 5 deletions fc/runtime_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"log"
"net/http"
"runtime"
"strings"
)

const (
Expand Down Expand Up @@ -81,9 +82,9 @@ type invokeTypeInfo struct {
// success sends the response payload for an in-progress invocation.
// Notes:
// * An invoke is not complete until next() is called again!
func (i *invoke) success(payload []byte, contentType string) error {
func (i *invoke) success(payload []byte, contentType string, httpParams string) error {
url := i.client.baseURL + i.id + "/response"
return i.client.post(url, payload, contentType)
return i.client.post(url, payload, contentType, httpParams)
}

// failure sends the payload to the Runtime API. This marks the function's invoke as a failure.
Expand All @@ -93,7 +94,7 @@ func (i *invoke) success(payload []byte, contentType string) error {
// If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure()
func (i *invoke) failure(payload []byte, contentType string) error {
url := i.client.baseURL + i.id + "/error"
return i.client.post(url, payload, contentType)
return i.client.post(url, payload, contentType, "")
}

// next connects to the Runtime API and waits for a new invoke Request to be available.
Expand Down Expand Up @@ -134,14 +135,16 @@ func (c *runtimeAPIClient) next() (*invoke, error) {
}, nil
}

func (c *runtimeAPIClient) post(url string, payload []byte, contentType string) error {
func (c *runtimeAPIClient) post(url string, payload []byte, contentType, httpParams string) error {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(payload))
if err != nil {
return fmt.Errorf("failed to construct POST request to %s: %v", url, err)
}
req.Header.Set("User-Agent", c.userAgent)
req.Header.Set("Content-Type", contentType)

if strings.TrimSpace(httpParams) != "" {
req.Header.Set(headerHttpParams, httpParams)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to POST to %s: %v", url, err)
Expand Down
7 changes: 7 additions & 0 deletions fccontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type FcContext struct {
AccountId string
RetryCount int
Tracing Tracing

logger *FcLogger
}

func (f *FcContext) GetLogger() *FcLogger {
return f.logger
}

// An unexported type to be used as the key for types in this package.
Expand All @@ -71,5 +77,6 @@ func NewContext(parent context.Context, lc *FcContext) context.Context {
// FromContext returns the LambdaContext value stored in ctx, if any.
func FromContext(ctx context.Context) (*FcContext, bool) {
lc, ok := ctx.Value(contextKey).(*FcContext)
lc.logger = NewFcLogger(lc.RequestID)
return lc, ok
}
Loading

0 comments on commit e22e382

Please sign in to comment.