Skip to content

Commit

Permalink
test(sftp): add WIP test for delete-on-finish bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ooesili committed Nov 26, 2024
1 parent 07081b8 commit 83668cd
Showing 1 changed file with 83 additions and 28 deletions.
111 changes: 83 additions & 28 deletions internal/impl/sftp/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package sftp
import (
"io/fs"
"os"
"strings"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service/integration"

// Bring in memory cache.
Expand All @@ -39,34 +41,7 @@ func TestIntegrationSFTP(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "atmoz/sftp",
Tag: "alpine",
Cmd: []string{
// https://github.com/atmoz/sftp/issues/401
"/bin/sh", "-c", "ulimit -n 65535 && exec /entrypoint " + sftpUsername + ":" + sftpPassword + ":1001:100:upload",
},
})
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)

creds := credentials{
Username: sftpUsername,
Password: sftpPassword,
}

require.NoError(t, pool.Retry(func() error {
_, err = creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp"))
return err
}))
resource := setupDockerPool(t)

t.Run("sftp", func(t *testing.T) {
template := `
Expand Down Expand Up @@ -129,6 +104,86 @@ cache_resources:
})
}

func TestIntegrationSFTPDeleteOnFinish(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

resource := setupDockerPool(t)

config := `
output:
drop: {}
input:
sftp:
address: localhost:$PORT
paths:
- /upload/*.txt
credentials:
username: foo
password: pass
delete_on_finish: true
watcher:
enabled: true
minimum_age: 100ms
poll_interval: 100ms
cache: files_memory
cache_resources:
- label: files_memory
memory:
default_ttl: 900s
`
config = strings.NewReplacer(
"PORT", resource.GetPort("22/tcp"),
).Replace(config)

env := service.NewEnvironment()
parsedConfig, err := sftpInputSpec().ParseYAML(config, env)
require.NoError(t, err)

reader, err := newSFTPReaderFromParsed(parsedConfig, service.MockResources())
require.NoError(t, err)

// TODO: what do I do here to drive the input
// reader.Connect
_ = reader
}

func setupDockerPool(t *testing.T) *dockertest.Resource {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "atmoz/sftp",
Tag: "alpine",
Cmd: []string{
// https://github.com/atmoz/sftp/issues/401
"/bin/sh", "-c", "ulimit -n 65535 && exec /entrypoint " + sftpUsername + ":" + sftpPassword + ":1001:100:upload",
},
})
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)

creds := credentials{
Username: sftpUsername,
Password: sftpPassword,
}

// wait for server to be ready to accept connections
require.NoError(t, pool.Retry(func() error {
_, err = creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp"))
return err
}))

return resource
}

type osPT struct{}

func (o *osPT) Open(name string) (fs.File, error) {
Expand Down

0 comments on commit 83668cd

Please sign in to comment.