-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paths3.go
121 lines (108 loc) · 3.49 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package goucsdnt
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
smithyendpoints "github.com/aws/smithy-go/endpoints"
)
const UCSDNT_S3_ENDPOINT = "https://hermes.caida.org/"
const UCSDNT_S3_PCAPLIVE = "telescope-ucsdnt-pcap-live"
const UCSDNT_S3_FT = "telescope-ucsdnt-avro-flowtuple-v4-2024"
type UCSDNTBucket struct {
S3Client *s3.Client
Ctx context.Context
}
type staticResolver struct{}
func (*staticResolver) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (
smithyendpoints.Endpoint, error,
) {
// This value will be used as-is when making the request.
/* if len(*params.Endpoint) == 0 {
u, err := url.Parse(UCSDNT_S3_ENDPOINT)
if err != nil {
return smithyendpoints.Endpoint{}, err
}
return smithyendpoints.Endpoint{
URI: *u,
}, nil
}*/
// s3.Options.BaseEndpoint is accessible here:
//fmt.Printf("The endpoint provided in config is %s\n", *params.Endpoint)
//default
return s3.NewDefaultEndpointResolverV2().ResolveEndpoint(ctx, params)
}
func NewUCSDNTBucket(ctx context.Context) *UCSDNTBucket {
//read the key and secret from the environment
UCSD_NT_S3_ACCESS_KEY := os.Getenv("UCSD_NT_S3_ACCESS_KEY")
UCSD_NT_S3_SECRET_KEY := os.Getenv("UCSD_NT_S3_SECRET_KEY")
if UCSD_NT_S3_ACCESS_KEY == "" || UCSD_NT_S3_SECRET_KEY == "" {
log.Fatal("UCSD_NT_S3_ACCESS_KEY and UCSD_NT_S3_SECRET_KEY must be set in the environment.")
}
/*tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
hclient := &http.Client{Transport: tr}*/
client := s3.New(s3.Options{
BaseEndpoint: aws.String(UCSDNT_S3_ENDPOINT),
EndpointResolverV2: &staticResolver{},
Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(UCSD_NT_S3_ACCESS_KEY, UCSD_NT_S3_SECRET_KEY, "")),
Region: "us-east-1",
UsePathStyle: true,
//ClientLogMode: aws.LogRetries | aws.LogRequest | aws.LogResponse,
//HTTPClient: hclient,
})
return &UCSDNTBucket{
S3Client: client,
Ctx: ctx,
}
}
func (b *UCSDNTBucket) ListObjects() ([]string, error) {
var keys []string
paginator := s3.NewListObjectsV2Paginator(b.S3Client, &s3.ListObjectsV2Input{
Bucket: aws.String(UCSDNT_S3_PCAPLIVE),
//Bucket: aws.String(UCSDNT_S3_FT),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(b.Ctx)
if err != nil {
return nil, err
}
for _, obj := range page.Contents {
keys = append(keys, *obj.Key)
}
}
return keys, nil
}
func (b *UCSDNTBucket) GetObjectByDatetime(d time.Time) (string, io.Reader, error) {
objpath := d.Format("datasource=ucsd-nt/year=2006/month=01/day=02/hour=15/")
dhour := d.Truncate(1 * time.Hour)
pcapname := fmt.Sprintf("ucsd-nt.%d.pcap.gz", dhour.Unix())
return b.GetObjectByPath(filepath.Join(objpath, pcapname))
}
func (b *UCSDNTBucket) GetObjectByPath(f string) (string, io.Reader, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(UCSDNT_S3_PCAPLIVE),
Key: aws.String(f),
}
result, err := b.S3Client.GetObject(b.Ctx, input)
if err != nil {
var noKey *types.NoSuchKey
if errors.As(err, &noKey) {
log.Printf("Can't get object %s. No such key exists.\n", *input.Bucket)
err = noKey
} else {
log.Printf("Couldn't get object %v. Here's why: %v\n", *input.Bucket, err)
}
return "", nil, err
}
return filepath.Base(f), result.Body, nil
}