Skip to content

Commit

Permalink
add kinsumer autoscale module
Browse files Browse the repository at this point in the history
  • Loading branch information
erdac committed Jan 13, 2025
1 parent a6a2b1d commit 239e1b7
Show file tree
Hide file tree
Showing 10 changed files with 910 additions and 2 deletions.
8 changes: 8 additions & 0 deletions pkg/application/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ func WithKernelExitHandler(handler kernelPkg.ExitHandler) Option {
}
}

func WithKinsumerAutoscaleModule(kinsumerInputName string) Option {
return func(app *App) {
app.addKernelOption(func(config cfg.GosoConf) kernelPkg.Option {
return kernelPkg.WithModuleMultiFactory(stream.KinsumerAutoscaleModuleFactory(kinsumerInputName))
})
}
}

func WithLoggerGroupTag(app *App) {
app.addLoggerOption(func(config cfg.GosoConf, logger log.GosoLogger) error {
if !config.IsSet("app_group") {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cloud/aws/ecs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
type Client interface {
DescribeContainerInstances(ctx context.Context, params *ecs.DescribeContainerInstancesInput, optFns ...func(*ecs.Options)) (*ecs.DescribeContainerInstancesOutput, error)
DescribeTasks(ctx context.Context, params *ecs.DescribeTasksInput, optFns ...func(*ecs.Options)) (*ecs.DescribeTasksOutput, error)
DescribeServices(ctx context.Context, params *ecs.DescribeServicesInput, optFns ...func(*ecs.Options)) (*ecs.DescribeServicesOutput, error)
UpdateService(ctx context.Context, params *ecs.UpdateServiceInput, optFns ...func(*ecs.Options)) (*ecs.UpdateServiceOutput, error)
}

type ClientSettings struct {
Expand Down
148 changes: 148 additions & 0 deletions pkg/cloud/aws/ecs/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/cloud/aws/kinesis/record_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"strings"

"github.com/justtrackio/gosoline/pkg/funk"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
Expand All @@ -15,6 +13,7 @@ import (
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/clock"
"github.com/justtrackio/gosoline/pkg/exec"
"github.com/justtrackio/gosoline/pkg/funk"
"github.com/justtrackio/gosoline/pkg/log"
"github.com/justtrackio/gosoline/pkg/metric"
"github.com/justtrackio/gosoline/pkg/uuid"
Expand Down Expand Up @@ -178,6 +177,7 @@ func (w *recordWriter) putRecordsBatch(ctx context.Context, batch []*Record) err

if len(failedRecords) == 0 && attempt > 1 {
logger.Warn("PutRecords successful after %d attempts in %s", attempt, took)

break
}

Expand Down
Loading

0 comments on commit 239e1b7

Please sign in to comment.