Skip to content

Commit af6a2df

Browse files
Miguel Varela Ramosdeliahu
Miguel Varela Ramos
authored andcommitted
Scale to zero (#2298)
(cherry picked from commit f5e44c6)
1 parent c4250ec commit af6a2df

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2978
-305
lines changed

build/images.sh

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ dev_images=(
2525
"async-gateway"
2626
"enqueuer"
2727
"dequeuer"
28+
"autoscaler"
29+
"activator"
2830
)
2931

3032
non_dev_images=(

cmd/activator/main.go

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"flag"
22+
"net/http"
23+
"os"
24+
"os/signal"
25+
"strconv"
26+
"time"
27+
28+
"github.com/cortexlabs/cortex/pkg/activator"
29+
"github.com/cortexlabs/cortex/pkg/autoscaler"
30+
"github.com/cortexlabs/cortex/pkg/lib/aws"
31+
"github.com/cortexlabs/cortex/pkg/lib/errors"
32+
"github.com/cortexlabs/cortex/pkg/lib/k8s"
33+
"github.com/cortexlabs/cortex/pkg/lib/logging"
34+
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
35+
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
36+
"github.com/cortexlabs/cortex/pkg/types/userconfig"
37+
"go.uber.org/zap"
38+
istioinformers "istio.io/client-go/pkg/informers/externalversions"
39+
kmeta "k8s.io/apimachinery/pkg/apis/meta/v1"
40+
"k8s.io/apimachinery/pkg/runtime"
41+
kinformers "k8s.io/client-go/informers"
42+
)
43+
44+
func main() {
45+
var (
46+
port int
47+
inCluster bool
48+
autoscalerURL string
49+
namespace string
50+
clusterConfigPath string
51+
)
52+
53+
flag.IntVar(&port, "port", 8000, "port where the activator server will be exposed")
54+
flag.BoolVar(&inCluster, "in-cluster", false, "use when autoscaler runs in-cluster")
55+
flag.StringVar(&autoscalerURL, "autoscaler-url", "", "the URL for the cortex autoscaler endpoint")
56+
flag.StringVar(&namespace, "namespace", os.Getenv("CORTEX_NAMESPACE"),
57+
"kubernetes namespace where the cortex APIs are deployed "+
58+
"(can be set through the CORTEX_NAMESPACE env variable)",
59+
)
60+
flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path")
61+
flag.Parse()
62+
63+
log := logging.GetLogger()
64+
defer func() {
65+
_ = log.Sync()
66+
}()
67+
68+
switch {
69+
case autoscalerURL == "":
70+
log.Fatal("--autoscaler-url is a required option")
71+
case namespace == "":
72+
log.Fatal("--namespace is a required option")
73+
case clusterConfigPath == "":
74+
log.Fatal("--cluster-config flag is required")
75+
}
76+
77+
clusterConfig, err := clusterconfig.NewForFile(clusterConfigPath)
78+
if err != nil {
79+
exit(log, err)
80+
}
81+
82+
awsClient, err := aws.NewForRegion(clusterConfig.Region)
83+
if err != nil {
84+
exit(log, err)
85+
}
86+
87+
_, userID, err := awsClient.CheckCredentials()
88+
if err != nil {
89+
exit(log, err)
90+
}
91+
92+
err = telemetry.Init(telemetry.Config{
93+
Enabled: clusterConfig.Telemetry,
94+
UserID: userID,
95+
Properties: map[string]string{
96+
"kind": userconfig.RealtimeAPIKind.String(),
97+
"image_type": "activator",
98+
},
99+
Environment: "operator",
100+
LogErrors: true,
101+
BackoffMode: telemetry.BackoffDuplicateMessages,
102+
})
103+
if err != nil {
104+
log.Fatalw("failed to initialize telemetry", zap.Error(err))
105+
}
106+
defer telemetry.Close()
107+
108+
k8sClient, err := k8s.New(namespace, inCluster, nil, runtime.NewScheme())
109+
if err != nil {
110+
exit(log, err, "failed to initialize kubernetes client")
111+
}
112+
113+
istioClient := k8sClient.IstioClientSet()
114+
kubeClient := k8sClient.ClientSet()
115+
autoscalerClient := autoscaler.NewClient(autoscalerURL)
116+
117+
istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions(
118+
istioClient, 10*time.Second, // TODO: check how much makes sense
119+
istioinformers.WithNamespace(namespace),
120+
istioinformers.WithTweakListOptions(informerFilter),
121+
)
122+
virtualServiceInformer := istioInformerFactory.Networking().V1beta1().VirtualServices().Informer()
123+
virtualServiceClient := istioClient.NetworkingV1beta1().VirtualServices(namespace)
124+
125+
kubeInformerFactory := kinformers.NewSharedInformerFactoryWithOptions(
126+
kubeClient, 2*time.Second, // TODO: check how much makes sense
127+
kinformers.WithNamespace(namespace),
128+
kinformers.WithTweakListOptions(informerFilter),
129+
)
130+
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments().Informer()
131+
132+
act := activator.New(virtualServiceClient, deploymentInformer, virtualServiceInformer, autoscalerClient, log)
133+
134+
handler := activator.NewHandler(act, log)
135+
server := &http.Server{
136+
Addr: ":" + strconv.Itoa(port),
137+
Handler: handler,
138+
}
139+
140+
stopCh := make(chan struct{})
141+
go virtualServiceInformer.Run(stopCh)
142+
go deploymentInformer.Run(stopCh)
143+
defer func() {
144+
stopCh <- struct{}{}
145+
}()
146+
147+
errCh := make(chan error)
148+
go func() {
149+
log.Infof("Starting activator server on %s", server.Addr)
150+
errCh <- server.ListenAndServe()
151+
}()
152+
153+
sigint := make(chan os.Signal, 1)
154+
signal.Notify(sigint, os.Interrupt)
155+
156+
select {
157+
case err = <-errCh:
158+
exit(log, err, "failed to start activator server")
159+
case <-sigint:
160+
// We received an interrupt signal, shut down.
161+
log.Info("Received TERM signal, handling a graceful shutdown...")
162+
log.Info("Shutting down server")
163+
if err = server.Shutdown(context.Background()); err != nil {
164+
// Error from closing listeners, or context timeout:
165+
log.Warnw("HTTP server Shutdown Error", zap.Error(err))
166+
}
167+
log.Info("Shutdown complete, exiting...")
168+
}
169+
}
170+
171+
func informerFilter(listOptions *kmeta.ListOptions) {
172+
listOptions.LabelSelector = kmeta.FormatLabelSelector(&kmeta.LabelSelector{
173+
MatchLabels: map[string]string{
174+
"apiKind": userconfig.RealtimeAPIKind.String(),
175+
},
176+
MatchExpressions: []kmeta.LabelSelectorRequirement{
177+
{
178+
Key: "apiName",
179+
Operator: kmeta.LabelSelectorOpExists,
180+
},
181+
},
182+
})
183+
}
184+
185+
func exit(log *zap.SugaredLogger, err error, wrapStrs ...string) {
186+
for _, str := range wrapStrs {
187+
err = errors.Wrap(err, str)
188+
}
189+
190+
if err != nil && !errors.IsNoTelemetry(err) {
191+
telemetry.Error(err)
192+
}
193+
194+
if err != nil && !errors.IsNoPrint(err) {
195+
log.Error(err)
196+
}
197+
198+
os.Exit(1)
199+
}

0 commit comments

Comments
 (0)