Skip to content

Commit

Permalink
feat: add kinesis stream support (#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taliesin Millhouse authored Jul 21, 2022
1 parent 84e51f2 commit 3044692
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 15 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The currently supported functionality includes:
- Inspecting and deleting all GuardDuty Detectors in an AWS Account
- Inspecting and deleting all Macie member accounts in an AWS account - as long as those accounts were created by Invitation - and not via AWS Organizations
- Inspecting and deleting all SageMaker Notebook Instances in an AWS account
- Inspecting and deleting all Kinesis Streams in an AWS account

### BEWARE!

Expand Down Expand Up @@ -358,6 +359,9 @@ The following resources support the Config file:
Notes:
* no configuration options for KMS customer keys, since keys are created with auto-generated identifier

- Kinesis Streams
- Resource type: `kinesis-stream`
- Config key: `KinesisStream`

#### Example

Expand Down Expand Up @@ -466,6 +470,7 @@ To find out what we options are supported in the config file today, consult this
| eip | none | ✅ | none | none |
| ec2 | none | ✅ | none | none |
| eks | none | ✅ | none | none |
| kinesis-stream | none | ✅ | none | none |
| acmpca | none | none | none | none |
| iam role | none | none | none | none |
| sagemaker-notebook-instances| none| ✅ | none | none |
Expand Down
15 changes: 15 additions & 0 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,20 @@ func GetAllResources(targetRegions []string, excludeAfter time.Time, resourceTyp
}
// End SageMaker Notebook Instances

// Kinesis Streams
kinesisStreams := KinesisStreams{}
if IsNukeable(kinesisStreams.ResourceName(), resourceTypes) {
streams, err := getAllKinesisStreams(cloudNukeSession, configObj)
if err != nil {
return nil, errors.WithStackTrace(err)
}
if len(streams) > 0 {
kinesisStreams.Names = awsgo.StringValueSlice(streams)
resourcesInRegion.Resources = append(resourcesInRegion.Resources, kinesisStreams)
}
}
// End Kinesis Streams

if len(resourcesInRegion.Resources) > 0 {
account.Resources[region] = resourcesInRegion
}
Expand Down Expand Up @@ -861,6 +875,7 @@ func ListResourceTypes() []string {
GuardDuty{}.ResourceName(),
MacieMember{}.ResourceName(),
SageMakerNotebookInstances{}.ResourceName(),
KinesisStreams{}.ResourceName(),
}
sort.Strings(resourceTypes)
return resourceTypes
Expand Down
122 changes: 122 additions & 0 deletions aws/kinesis_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package aws

import (
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/logging"
"github.com/gruntwork-io/go-commons/errors"
"github.com/hashicorp/go-multierror"
)

func getAllKinesisStreams(session *session.Session, configObj config.Config) ([]*string, error) {
svc := kinesis.New(session)

allStreams := []*string{}
err := svc.ListStreamsPages(
&kinesis.ListStreamsInput{},
func(page *kinesis.ListStreamsOutput, lastPage bool) bool {
for _, streamName := range page.StreamNames {
if shouldIncludeKinesisStream(streamName, configObj) {
allStreams = append(allStreams, streamName)
}
}
return !lastPage
},
)
if err != nil {
return nil, errors.WithStackTrace(err)
}
return allStreams, nil
}

func shouldIncludeKinesisStream(streamName *string, configObj config.Config) bool {
if streamName == nil {
return false
}

return config.ShouldInclude(
aws.StringValue(streamName),
configObj.KinesisStream.IncludeRule.NamesRegExp,
configObj.KinesisStream.ExcludeRule.NamesRegExp,
)
}

func nukeAllKinesisStreams(session *session.Session, identifiers []*string) error {
region := aws.StringValue(session.Config.Region)
svc := kinesis.New(session)

if len(identifiers) == 0 {
logging.Logger.Infof("No Kinesis Streams to nuke in region: %s", region)
}

// NOTE: we don't need to do pagination here, because the pagination is handled by the caller to this function,
// based on KinesisStream.MaxBatchSize, however we add a guard here to warn users when the batching fails and
// has a chance of throttling AWS. Since we concurrently make one call for each identifier, we pick 100 for the
// limit here because many APIs in AWS have a limit of 100 requests per second.
if len(identifiers) > 100 {
logging.Logger.Errorf("Nuking too many Kinesis Streams at once (100): halting to avoid hitting AWS API rate limiting")
return TooManyStreamsErr{}
}

// There is no bulk delete Kinesis Stream API, so we delete the batch of Kinesis Streams concurrently
// using go routines.
logging.Logger.Infof("Deleting Kinesis Streams in region: %s", region)
wg := new(sync.WaitGroup)
wg.Add(len(identifiers))
errChans := make([]chan error, len(identifiers))
for i, streamName := range identifiers {
errChans[i] = make(chan error, 1)
go deleteKinesisStreamAsync(wg, errChans[i], svc, streamName, region)
}
wg.Wait()

// Collect all the errors from the async delete calls into a single error struct.
// NOTE: We ignore OperationAbortedException which is thrown when there is an eventual consistency issue, where
// cloud-nuke picks up a Stream that is already requested to be deleted.
var allErrs *multierror.Error
for _, errChan := range errChans {
if err := <-errChan; err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "OperationAbortedException" {
allErrs = multierror.Append(allErrs, err)
}
}
}
finalErr := allErrs.ErrorOrNil()
if finalErr != nil {
return errors.WithStackTrace(finalErr)
}
return nil
}

func deleteKinesisStreamAsync(
wg *sync.WaitGroup,
errChan chan error,
svc *kinesis.Kinesis,
streamName *string,
region string,
) {
defer wg.Done()
input := &kinesis.DeleteStreamInput{StreamName: streamName}
_, err := svc.DeleteStream(input)
errChan <- err

streamNameStr := aws.StringValue(streamName)
if err == nil {
logging.Logger.Infof("[OK] Kinesis Stream %s delete in %s", streamNameStr, region)
} else {
logging.Logger.Errorf("[Failed] Error deleting Kinesis Stream %s in %s: %s", streamNameStr, region, err)
}
}

// Custom errors

type TooManyStreamsErr struct{}

func (err TooManyStreamsErr) Error() string {
return "Too many Streams requested at once."
}
126 changes: 126 additions & 0 deletions aws/kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package aws

import (
"fmt"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestListKinesisStreams(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)

sNames, err := getAllKinesisStreams(session, config.Config{})
require.NoError(t, err)
assert.Contains(t, aws.StringValueSlice(sNames), aws.StringValue(sName))
}

func TestNukeKinesisStreamOne(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
identifiers := []*string{sName}

require.NoError(
t,
nukeAllKinesisStreams(session, identifiers),
)

assertKinesisStreamsDeleted(t, svc, identifiers)
}

func TestNukeKinesisStreamMoreThanOne(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&aws.Config{Region: aws.String(region)})
require.NoError(t, err)
svc := kinesis.New(session)

sNames := []*string{}
for i := 0; i < 3; i++ {
// We ignore errors in the delete call here, because it is intended to be a stop gap in case there is a bug in nuke.
sName := createKinesisStream(t, svc)
defer deleteKinesisStream(t, svc, sName, true)
sNames = append(sNames, sName)
}

require.NoError(
t,
nukeAllKinesisStreams(session, sNames),
)

assertKinesisStreamsDeleted(t, svc, sNames)
}

func createKinesisStream(t *testing.T, svc *kinesis.Kinesis) *string {
uniqueID := util.UniqueID()
name := fmt.Sprintf("cloud-nuke-test-%s", strings.ToLower(uniqueID))

_, err := svc.CreateStream(&kinesis.CreateStreamInput{
ShardCount: aws.Int64(1),
StreamName: aws.String(name),
})
require.NoError(t, err)

// Add an arbitrary sleep to account for eventual consistency
time.Sleep(15 * time.Second)
return &name
}

func deleteKinesisStream(t *testing.T, svc *kinesis.Kinesis, name *string, checkErr bool) {
_, err := svc.DeleteStream(&kinesis.DeleteStreamInput{
StreamName: name,
})
if checkErr {
require.NoError(t, err)
}
}

func assertKinesisStreamsDeleted(t *testing.T, svc *kinesis.Kinesis, identifiers []*string) {
for _, name := range identifiers {
stream, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: name,
})

// There is an error returned, assert it's because the Stream cannot be found because it's
// been deleted. Otherwise assert that the stream status is DELETING.
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() != "ResourceNotFoundException" {
t.Fatalf("Stream %s is not deleted", aws.StringValue(name))
}
} else {
require.Equal(t, "DELETING", *stream.StreamDescription.StreamStatus)
}
}
}
38 changes: 38 additions & 0 deletions aws/kinesis_stream_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/gruntwork-io/go-commons/errors"
)

// KinesisStreams - represents all Kinesis streams
type KinesisStreams struct {
Names []string
}

// ResourceName - The simple name of the AWS resource
func (k KinesisStreams) ResourceName() string {
return "kinesis-stream"
}

// ResourceIdentifiers - The names of the Kinesis Streams
func (k KinesisStreams) ResourceIdentifiers() []string {
return k.Names
}

func (k KinesisStreams) MaxBatchSize() int {
// Tentative batch size to ensure AWS doesn't throttle. Note that Kinesis Streams does not support bulk delete, so
// we will be deleting this many in parallel using go routines. We pick 35 here, which is half of what the AWS web
// console will do. We pick a conservative number here to avoid hitting AWS API rate limits.
return 35
}

// Nuke - nuke 'em all!!!
func (k KinesisStreams) Nuke(session *session.Session, identifiers []string) error {
if err := nukeAllKinesisStreams(session, aws.StringSlice(identifiers)); err != nil {
return errors.WithStackTrace(err)
}

return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
KMSCustomerKeys ResourceType `yaml:"KMSCustomerKeys"`
EKSCluster ResourceType `yaml:"EKSCluster"`
SageMakerNotebook ResourceType `yaml:"SageMakerNotebook"`
KinesisStream ResourceType `yaml:"KinesisStream"`
}

type ResourceType struct {
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func emptyConfig() *Config {
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
ResourceType{FilterRule{}, FilterRule{}},
}
}

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ require (
github.com/hashicorp/go-multierror v1.1.0
github.com/pquerna/otp v1.3.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/testify v1.7.1
github.com/urfave/cli v1.22.4
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 3044692

Please sign in to comment.