From ae6c6e5ba3950e7d7eac34c237819dc493522790 Mon Sep 17 00:00:00 2001 From: Mikey Date: Sat, 28 Jan 2023 20:37:53 +0800 Subject: [PATCH] feat: add aliyun oss store Co-Authored-By: ljarvan <74408712+ljarvan@users.noreply.github.com> --- arseeding.go | 14 +++- cmd/main.go | 9 ++- go.mod | 2 + rawdb/aliyun.go | 173 ++++++++++++++++++++++++++++++++++++++++++++++++ store.go | 10 +++ 5 files changed, 204 insertions(+), 4 deletions(-) create mode 100644 rawdb/aliyun.go diff --git a/arseeding.go b/arseeding.go index 0ad6a8e..99de87c 100644 --- a/arseeding.go +++ b/arseeding.go @@ -48,18 +48,26 @@ func New( boltDirPath, mySqlDsn string, sqliteDir string, useSqlite bool, arWalletKeyPath string, arNode, payUrl string, noFee bool, enableManifest bool, useS3 bool, s3AccKey, s3SecretKey, s3BucketPrefix, s3Region, s3Endpoint string, - use4EVER bool, port string, customTags []types.Tag, + use4EVER bool, useAliyun bool, aliyunEndpoint, aliyunAccKey, aliyunSecretKey, aliyunPrefix string, + port string, customTags []types.Tag, ) *Arseeding { var err error KVDb := &Store{} - if useS3 { + + switch { + case useS3 && useAliyun: + panic("can not use both s3 and aliyun") + case useS3: if use4EVER { s3Endpoint = rawdb.ForeverLandEndpoint // inject 4everland endpoint } KVDb, err = NewS3Store(s3AccKey, s3SecretKey, s3Region, s3BucketPrefix, s3Endpoint) - } else { + case useAliyun: + KVDb, err = NewAliyunStore(aliyunEndpoint, aliyunAccKey, aliyunSecretKey, aliyunPrefix) + default: KVDb, err = NewBoltStore(boltDirPath) } + if err != nil { panic(err) } diff --git a/cmd/main.go b/cmd/main.go index 7148416..921c9ea 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -37,6 +37,12 @@ func main() { &cli.StringFlag{Name: "s3_endpoint", Value: "", Usage: "s3 bucket endpoint", EnvVars: []string{"S3_ENDPOINT"}}, &cli.BoolFlag{Name: "use_4ever", Value: false, Usage: "run with 4everland s3 service", EnvVars: []string{"USE_4EVER"}}, + &cli.BoolFlag{Name: "use_aliyun", Value: false, Usage: "run with aliyun oss store", EnvVars: []string{"USE_ALIYUN"}}, + &cli.StringFlag{Name: "aliyun_endpoint", Value: "oss-cn-shanghai.aliyuncs.com", Usage: "aliyun oss endpoint", EnvVars: []string{"ALIYUN_ENDPOINT"}}, + &cli.StringFlag{Name: "aliyun_acc_key", Value: "your oss access key", Usage: "aliyun oss access key", EnvVars: []string{"ALIYUN_ACC_KEY"}}, + &cli.StringFlag{Name: "aliyun_secret_key", Value: "your oss secret key", Usage: "aliyun oss secret key", EnvVars: []string{"ALIYUN_SECRET_KEY"}}, + &cli.StringFlag{Name: "aliyun_prefix", Value: "arseed", Usage: "aliyun oss bucket name prefix", EnvVars: []string{"ALIYUN_PREFIX"}}, + &cli.StringFlag{Name: "port", Value: ":8080", EnvVars: []string{"PORT"}}, &cli.StringFlag{Name: "tags", Value: `{"Community":"PermaDAO","Website":"permadao.com"}`, EnvVars: []string{"TAGS"}}, }, @@ -70,7 +76,8 @@ func run(c *cli.Context) error { c.String("db_dir"), c.String("mysql"), c.String("sqlite_dir"), c.Bool("use_sqlite"), c.String("key_path"), c.String("ar_node"), c.String("pay"), c.Bool("no_fee"), c.Bool("manifest"), c.Bool("use_s3"), c.String("s3_acc_key"), c.String("s3_secret_key"), c.String("s3_prefix"), c.String("s3_region"), c.String("s3_endpoint"), - c.Bool("use_4ever"), c.String("port"), customTags) + c.Bool("use_4ever"), c.Bool("use_aliyun"), c.String("aliyun_endpoint"), c.String("aliyun_acc_key"), c.String("aliyun_secret_key"), + c.String("aliyun_prefix"), c.String("port"), customTags) s.Run(c.String("port"), c.Int("bundle_interval")) common.NewMetricServer() diff --git a/go.mod b/go.mod index 06f657f..284fc1c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/everFinance/arseeding go 1.17 require ( + github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible github.com/aws/aws-sdk-go v1.27.0 github.com/everFinance/everpay-go v0.0.2 github.com/everFinance/goar v1.4.7 @@ -75,6 +76,7 @@ require ( golang.org/x/net v0.2.0 // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.2.0 // indirect + golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect google.golang.org/protobuf v1.26.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/rawdb/aliyun.go b/rawdb/aliyun.go new file mode 100644 index 0000000..ba5d23a --- /dev/null +++ b/rawdb/aliyun.go @@ -0,0 +1,173 @@ +package rawdb + +import ( + "bytes" + "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/everFinance/arseeding/schema" + "io" +) + +// refer https://help.aliyun.com/document_detail/32157.html?spm=a2c4g.11186623.0.0.1a4b32bcxaC4kR +const ( + ossErrorNoSuchKey = "NoSuchKey" +) + +type AliyunDB struct { + bucketPrefix string + client *oss.Client +} + +func NewAliyunDB(endpoint, accKey, accessKeySecret, bktPrefix string) (*AliyunDB, error) { + client, err := oss.New(endpoint, accKey, accessKeySecret) + if err != nil { + return nil, err + } + + err = createAliyunBucket(client, bktPrefix) + if err != nil { + return nil, err + } + + log.Info("run with aliyun oss success") + + return &AliyunDB{ + bucketPrefix: bktPrefix, + client: client, + }, nil +} + +func (a *AliyunDB) Put(bucket, key string, value []byte) (err error) { + bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket)) + if err != nil { + return err + } + + return bkt.PutObject(key, bytes.NewReader(value)) +} + +func (a *AliyunDB) Get(bucket, key string) (data []byte, err error) { + bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket)) + if err != nil { + return + } + + body, err := bkt.GetObject(key) + if err != nil { + // handleOSSErr make file non-existent errors converted to schema.ErrNotFound + return nil, handleOSSErr(err) + } + + defer func(body io.ReadCloser) { + _ = body.Close() + }(body) + + data, err = io.ReadAll(body) + return +} + +func (a *AliyunDB) GetAllKey(bucket string) (keys []string, err error) { + bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket)) + if err != nil { + return + } + + keys = make([]string, 0) + + startAfter := "" + continueToken := "" + var lsRes oss.ListObjectsResultV2 + + for { + lsRes, err = bkt.ListObjectsV2(oss.StartAfter(startAfter), oss.ContinuationToken(continueToken)) + if err != nil { + break + } + for _, object := range lsRes.Objects { + keys = append(keys, object.Key) + } + if lsRes.IsTruncated { + startAfter = lsRes.StartAfter + continueToken = lsRes.NextContinuationToken + } else { + break + } + } + + if len(keys) == 0 { + err = schema.ErrNotExist + } + + return +} + +func (a *AliyunDB) Delete(bucket, key string) (err error) { + bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket)) + if err != nil { + return + } + + return bkt.DeleteObject(key) +} + +func (a *AliyunDB) Close() (err error) { + return +} + +func createAliyunBucket(svc *oss.Client, prefix string) error { + bucketNames := []string{ + schema.ChunkBucket, + schema.TxDataEndOffSetBucket, + schema.TxMetaBucket, + schema.ConstantsBucket, + schema.TaskIdPendingPoolBucket, + schema.TaskBucket, + schema.BundleItemBinary, + schema.BundleItemMeta, + schema.BundleWaitParseArIdBucket, + schema.BundleArIdToItemIdsBucket, + } + + ownBuckets, err := getBucketWithPrefix(svc, prefix) + if err != nil { + return err + } + + for _, bucketName := range bucketNames { + s3Bkt := getS3Bucket(prefix, bucketName) // s3 bucket name only accept lower case + if !ownBuckets[s3Bkt] { + err := svc.CreateBucket(s3Bkt) + if err != nil { + return err + } + } + } + return nil +} + +func getBucketWithPrefix(svc *oss.Client, prefix string) (map[string]bool, error) { + res := make(map[string]bool) + + lsRes, err := svc.ListBuckets(oss.Prefix(prefix)) + if err != nil { + return nil, err + } + + for _, bucket := range lsRes.Buckets { + res[bucket.Name] = true + } + + return res, nil +} + +func handleOSSErr(ossErr error) (err error) { + switch ossErr.(type) { + case oss.ServiceError: + if ossErr.(oss.ServiceError).Code == ossErrorNoSuchKey { + err = schema.ErrNotExist + } + default: + err = ossErr + } + + return +} diff --git a/store.go b/store.go index fa5e081..5258b42 100644 --- a/store.go +++ b/store.go @@ -33,6 +33,16 @@ func NewBoltStore(boltDirPath string) (*Store, error) { return &Store{KVDb: Db}, nil } +func NewAliyunStore(endpoint, accKey, secretKey, bucketPrefix string) (*Store, error) { + Db, err := rawdb.NewAliyunDB(endpoint, accKey, secretKey, bucketPrefix) + if err != nil { + return nil, err + } + return &Store{ + KVDb: Db, + }, nil +} + func (s *Store) Close() error { return s.KVDb.Close() }