Skip to content

Commit

Permalink
Merge pull request #6 from flycash/master
Browse files Browse the repository at this point in the history
the implementation based on local files
  • Loading branch information
askuy authored Jan 25, 2024
2 parents 7402f33 + 6abafb4 commit 3696513
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 100 deletions.
190 changes: 100 additions & 90 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,114 +46,124 @@ type Client interface {

func newStorage(name string, cfg *BucketConfig, logger *elog.Component) (Client, error) {
storageType := strings.ToLower(cfg.StorageType)
switch storageType {
case StorageTypeOSS:
return newOSS(name, cfg, logger)
case StorageTypeS3:
return newS3(name, cfg, logger)
case StorageTypeFile:
return NewLocalFile(cfg.Endpoint)
default:
return nil, fmt.Errorf("unknown StorageType:\"%s\", only supports oss,s3", cfg.StorageType)
}
}

if storageType == StorageTypeOSS {
var opts = []oss.ClientOption{oss.HTTPClient(newHttpClient(name, cfg, logger))}
if cfg.Debug {
opts = append(opts, oss.SetLogLevel(oss.Debug))
func newS3(name string, cfg *BucketConfig, logger *elog.Component) (Client, error) {
var config *aws.Config

// use minio
if cfg.S3ForcePathStyle {
config = &aws.Config{
Region: aws.String(cfg.Region),
DisableSSL: aws.Bool(!cfg.SSL),
Credentials: credentials.NewStaticCredentials(cfg.AccessKeyID, cfg.AccessKeySecret, ""),
Endpoint: aws.String(cfg.Endpoint),
S3ForcePathStyle: aws.Bool(true),
}
client, err := oss.New(cfg.Endpoint, cfg.AccessKeyID, cfg.AccessKeySecret, opts...)
if err != nil {
return nil, err
} else {
config = &aws.Config{
Region: aws.String(cfg.Region),
DisableSSL: aws.Bool(!cfg.SSL),
Credentials: credentials.NewStaticCredentials(cfg.AccessKeyID, cfg.AccessKeySecret, ""),
}
if cfg.Endpoint != "" {
config.Endpoint = aws.String(cfg.Endpoint)
}
}
if cfg.Debug {
config.LogLevel = aws.LogLevel(aws.LogDebugWithHTTPBody | aws.LogDebugWithSigning)
slog.Default().Enabled(context.Background(), slog.LevelDebug)
}

var ossClient *OSS
if cfg.Shards != nil && len(cfg.Shards) > 0 {
buckets := make(map[string]*oss.Bucket)
for _, v := range cfg.Shards {
bucket, err := client.Bucket(cfg.Bucket + "-" + v)
if err != nil {
return nil, err
}
for i := 0; i < len(v); i++ {
buckets[strings.ToLower(v[i:i+1])] = bucket
}
}

ossClient = &OSS{
Shards: buckets,
}
} else {
bucket, err := client.Bucket(cfg.Bucket)
if err != nil {
return nil, err
}
config.HTTPClient = newHttpClient(name, cfg, logger)
service := s3.New(session.Must(session.NewSession(config)))

ossClient = &OSS{
Bucket: bucket,
var s3Client *S3
if cfg.Shards != nil && len(cfg.Shards) > 0 {
buckets := make(map[string]string)
for _, v := range cfg.Shards {
for i := 0; i < len(v); i++ {
buckets[strings.ToLower(v[i:i+1])] = cfg.Bucket + "-" + v
}
}
ossClient.cfg = cfg
if cfg.EnableCompressor {
// 目前仅支持 gzip
if comp, ok := compressors[cfg.CompressType]; ok {
ossClient.compressor = comp
} else {
logger.Warn("unknown type", elog.String("name", cfg.CompressType))
}
s3Client = &S3{
ShardsBucket: buckets,
client: service,
}
return ossClient, nil
} else if storageType == StorageTypeS3 {
var config *aws.Config

// use minio
if cfg.S3ForcePathStyle {
config = &aws.Config{
Region: aws.String(cfg.Region),
DisableSSL: aws.Bool(!cfg.SSL),
Credentials: credentials.NewStaticCredentials(cfg.AccessKeyID, cfg.AccessKeySecret, ""),
Endpoint: aws.String(cfg.Endpoint),
S3ForcePathStyle: aws.Bool(true),
}
} else {
config = &aws.Config{
Region: aws.String(cfg.Region),
DisableSSL: aws.Bool(!cfg.SSL),
Credentials: credentials.NewStaticCredentials(cfg.AccessKeyID, cfg.AccessKeySecret, ""),
}
if cfg.Endpoint != "" {
config.Endpoint = aws.String(cfg.Endpoint)
}
} else {
s3Client = &S3{
BucketName: cfg.Bucket,
client: service,
}
if cfg.Debug {
config.LogLevel = aws.LogLevel(aws.LogDebugWithHTTPBody | aws.LogDebugWithSigning)
slog.Default().Enabled(context.Background(), slog.LevelDebug)
}
s3Client.cfg = cfg
if cfg.EnableCompressor {
// 目前仅支持 gzip
if comp, ok := compressors[cfg.CompressType]; ok {
s3Client.compressor = comp
} else {
logger.Warn("unknown type", elog.String("name", cfg.CompressType))
}
}
return s3Client, nil
}

config.HTTPClient = newHttpClient(name, cfg, logger)
service := s3.New(session.Must(session.NewSession(config)))
func newOSS(name string, cfg *BucketConfig, logger *elog.Component) (Client, error) {
var opts = []oss.ClientOption{oss.HTTPClient(newHttpClient(name, cfg, logger))}
if cfg.Debug {
opts = append(opts, oss.SetLogLevel(oss.Debug))
}
client, err := oss.New(cfg.Endpoint, cfg.AccessKeyID, cfg.AccessKeySecret, opts...)
if err != nil {
return nil, err
}

var s3Client *S3
if cfg.Shards != nil && len(cfg.Shards) > 0 {
buckets := make(map[string]string)
for _, v := range cfg.Shards {
for i := 0; i < len(v); i++ {
buckets[strings.ToLower(v[i:i+1])] = cfg.Bucket + "-" + v
}
var ossClient *OSS
if cfg.Shards != nil && len(cfg.Shards) > 0 {
buckets := make(map[string]*oss.Bucket)
for _, v := range cfg.Shards {
bucket, err := client.Bucket(cfg.Bucket + "-" + v)
if err != nil {
return nil, err
}
s3Client = &S3{
ShardsBucket: buckets,
client: service,
}
} else {
s3Client = &S3{
BucketName: cfg.Bucket,
client: service,
for i := 0; i < len(v); i++ {
buckets[strings.ToLower(v[i:i+1])] = bucket
}
}
s3Client.cfg = cfg
if cfg.EnableCompressor {
// 目前仅支持 gzip
if comp, ok := compressors[cfg.CompressType]; ok {
s3Client.compressor = comp
} else {
logger.Warn("unknown type", elog.String("name", cfg.CompressType))
}

ossClient = &OSS{
Shards: buckets,
}
return s3Client, nil
} else {
return nil, fmt.Errorf("unknown StorageType:\"%s\", only supports oss,s3", cfg.StorageType)
bucket, err := client.Bucket(cfg.Bucket)
if err != nil {
return nil, err
}

ossClient = &OSS{
Bucket: bucket,
}
}
ossClient.cfg = cfg
if cfg.EnableCompressor {
// 目前仅支持 gzip
if comp, ok := compressors[cfg.CompressType]; ok {
ossClient.compressor = comp
} else {
logger.Warn("unknown type", elog.String("name", cfg.CompressType))
}
}
return ossClient, nil
}

func newHttpClient(name string, cfg *BucketConfig, logger *elog.Component) *http.Client {
Expand Down
5 changes: 3 additions & 2 deletions constants.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package eos

const (
StorageTypeOSS = "oss"
StorageTypeS3 = "s3"
StorageTypeOSS = "oss"
StorageTypeS3 = "s3"
StorageTypeFile = "file"

MetaCompressor = "compressor"
)
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0
go.opentelemetry.io/otel v1.18.0
go.opentelemetry.io/otel/trace v1.18.0
go.uber.org/multierr v1.6.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -29,17 +30,16 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/spf13/cast v1.4.1 // indirect
go.opentelemetry.io/otel/metric v1.18.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/goleak v1.2.1 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -195,8 +196,9 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
Expand All @@ -221,8 +223,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
Expand Down
Loading

0 comments on commit 3696513

Please sign in to comment.