-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconnect.go
96 lines (82 loc) · 2.6 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package inngestgo
import (
"context"
"fmt"
"github.com/inngest/inngest/pkg/execution/state"
"github.com/inngest/inngest/pkg/publicerr"
"github.com/inngest/inngestgo/connect"
"github.com/inngest/inngestgo/internal/sdkrequest"
"net/url"
)
func (h *handler) Connect(ctx context.Context) error {
concurrency := h.HandlerOpts.GetWorkerConcurrency()
connectPlaceholder := url.URL{
Scheme: "ws",
Host: "connect",
}
fns, err := createFunctionConfigs(h.appName, h.funcs, connectPlaceholder, true)
if err != nil {
return fmt.Errorf("error creating function configs: %w", err)
}
signingKey := h.GetSigningKey()
if signingKey == "" {
return fmt.Errorf("signing key is required")
}
hashedKey, err := hashedSigningKey([]byte(signingKey))
if err != nil {
return fmt.Errorf("failed to hash signing key: %w", err)
}
var hashedFallbackKey []byte
{
if fallbackKey := h.GetSigningKeyFallback(); fallbackKey != "" {
hashedFallbackKey, err = hashedSigningKey([]byte(fallbackKey))
if err != nil {
return fmt.Errorf("failed to hash fallback signing key: %w", err)
}
}
}
return connect.Connect(ctx, connect.Opts{
AppName: h.appName,
Env: h.Env,
Functions: fns,
Capabilities: capabilities,
HashedSigningKey: hashedKey,
HashedSigningKeyFallback: hashedFallbackKey,
WorkerConcurrency: concurrency,
APIBaseUrl: h.GetAPIBaseURL(),
IsDev: h.isDev(),
DevServerUrl: DevServerURL(),
InstanceId: h.InstanceId,
BuildId: h.BuildId,
Platform: Ptr(platform()),
SDKVersion: SDKVersion,
SDKLanguage: SDKLanguage,
}, h, h.Logger)
}
func (h *handler) getServableFunctionBySlug(slug string) ServableFunction {
h.l.RLock()
var fn ServableFunction
for _, f := range h.funcs {
if f.Slug() == slug {
fn = f
break
}
}
h.l.RUnlock()
return fn
}
func (h *handler) InvokeFunction(ctx context.Context, slug string, stepId *string, request sdkrequest.Request) (any, []state.GeneratorOpcode, error) {
fn := h.getServableFunctionBySlug(slug)
if fn == nil {
// XXX: This is a 500 within the JS SDK. We should probably change
// the JS SDK's status code to 410. 404 indicates that the overall
// API for serving Inngest isn't found.
return nil, nil, publicerr.Error{
Message: fmt.Sprintf("function not found: %s", slug),
Status: 410,
}
}
// Invoke function, always complete regardless of
resp, ops, err := invoke(context.Background(), fn, &request, stepId)
return resp, ops, err
}