Skip to content

Commit

Permalink
Merge pull request #63 from dirname/main
Browse files Browse the repository at this point in the history
feat: add aliyun oss store
  • Loading branch information
zyjblockchain authored Jan 29, 2023
2 parents 8e0e8f1 + ae6c6e5 commit 75d0163
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 4 deletions.
14 changes: 11 additions & 3 deletions arseeding.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,26 @@ func New(
boltDirPath, mySqlDsn string, sqliteDir string, useSqlite bool,
arWalletKeyPath string, arNode, payUrl string, noFee bool, enableManifest bool,
useS3 bool, s3AccKey, s3SecretKey, s3BucketPrefix, s3Region, s3Endpoint string,
use4EVER bool, port string, customTags []types.Tag,
use4EVER bool, useAliyun bool, aliyunEndpoint, aliyunAccKey, aliyunSecretKey, aliyunPrefix string,
port string, customTags []types.Tag,
) *Arseeding {
var err error
KVDb := &Store{}
if useS3 {

switch {
case useS3 && useAliyun:
panic("can not use both s3 and aliyun")
case useS3:
if use4EVER {
s3Endpoint = rawdb.ForeverLandEndpoint // inject 4everland endpoint
}
KVDb, err = NewS3Store(s3AccKey, s3SecretKey, s3Region, s3BucketPrefix, s3Endpoint)
} else {
case useAliyun:
KVDb, err = NewAliyunStore(aliyunEndpoint, aliyunAccKey, aliyunSecretKey, aliyunPrefix)
default:
KVDb, err = NewBoltStore(boltDirPath)
}

if err != nil {
panic(err)
}
Expand Down
9 changes: 8 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func main() {
&cli.StringFlag{Name: "s3_endpoint", Value: "", Usage: "s3 bucket endpoint", EnvVars: []string{"S3_ENDPOINT"}},
&cli.BoolFlag{Name: "use_4ever", Value: false, Usage: "run with 4everland s3 service", EnvVars: []string{"USE_4EVER"}},

&cli.BoolFlag{Name: "use_aliyun", Value: false, Usage: "run with aliyun oss store", EnvVars: []string{"USE_ALIYUN"}},
&cli.StringFlag{Name: "aliyun_endpoint", Value: "oss-cn-shanghai.aliyuncs.com", Usage: "aliyun oss endpoint", EnvVars: []string{"ALIYUN_ENDPOINT"}},
&cli.StringFlag{Name: "aliyun_acc_key", Value: "your oss access key", Usage: "aliyun oss access key", EnvVars: []string{"ALIYUN_ACC_KEY"}},
&cli.StringFlag{Name: "aliyun_secret_key", Value: "your oss secret key", Usage: "aliyun oss secret key", EnvVars: []string{"ALIYUN_SECRET_KEY"}},
&cli.StringFlag{Name: "aliyun_prefix", Value: "arseed", Usage: "aliyun oss bucket name prefix", EnvVars: []string{"ALIYUN_PREFIX"}},

&cli.StringFlag{Name: "port", Value: ":8080", EnvVars: []string{"PORT"}},
&cli.StringFlag{Name: "tags", Value: `{"Community":"PermaDAO","Website":"permadao.com"}`, EnvVars: []string{"TAGS"}},
},
Expand Down Expand Up @@ -70,7 +76,8 @@ func run(c *cli.Context) error {
c.String("db_dir"), c.String("mysql"), c.String("sqlite_dir"), c.Bool("use_sqlite"),
c.String("key_path"), c.String("ar_node"), c.String("pay"), c.Bool("no_fee"), c.Bool("manifest"),
c.Bool("use_s3"), c.String("s3_acc_key"), c.String("s3_secret_key"), c.String("s3_prefix"), c.String("s3_region"), c.String("s3_endpoint"),
c.Bool("use_4ever"), c.String("port"), customTags)
c.Bool("use_4ever"), c.Bool("use_aliyun"), c.String("aliyun_endpoint"), c.String("aliyun_acc_key"), c.String("aliyun_secret_key"),
c.String("aliyun_prefix"), c.String("port"), customTags)
s.Run(c.String("port"), c.Int("bundle_interval"))

common.NewMetricServer()
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/everFinance/arseeding
go 1.17

require (
github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible
github.com/aws/aws-sdk-go v1.27.0
github.com/everFinance/everpay-go v0.0.2
github.com/everFinance/goar v1.4.7
Expand Down Expand Up @@ -75,6 +76,7 @@ require (
golang.org/x/net v0.2.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
173 changes: 173 additions & 0 deletions rawdb/aliyun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package rawdb

import (
"bytes"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/everFinance/arseeding/schema"
"io"
)

// refer https://help.aliyun.com/document_detail/32157.html?spm=a2c4g.11186623.0.0.1a4b32bcxaC4kR
const (
ossErrorNoSuchKey = "NoSuchKey"
)

type AliyunDB struct {
bucketPrefix string
client *oss.Client
}

func NewAliyunDB(endpoint, accKey, accessKeySecret, bktPrefix string) (*AliyunDB, error) {
client, err := oss.New(endpoint, accKey, accessKeySecret)
if err != nil {
return nil, err
}

err = createAliyunBucket(client, bktPrefix)
if err != nil {
return nil, err
}

log.Info("run with aliyun oss success")

return &AliyunDB{
bucketPrefix: bktPrefix,
client: client,
}, nil
}

func (a *AliyunDB) Put(bucket, key string, value []byte) (err error) {
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
if err != nil {
return err
}

return bkt.PutObject(key, bytes.NewReader(value))
}

func (a *AliyunDB) Get(bucket, key string) (data []byte, err error) {
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
if err != nil {
return
}

body, err := bkt.GetObject(key)
if err != nil {
// handleOSSErr make file non-existent errors converted to schema.ErrNotFound
return nil, handleOSSErr(err)
}

defer func(body io.ReadCloser) {
_ = body.Close()
}(body)

data, err = io.ReadAll(body)
return
}

func (a *AliyunDB) GetAllKey(bucket string) (keys []string, err error) {
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
if err != nil {
return
}

keys = make([]string, 0)

startAfter := ""
continueToken := ""
var lsRes oss.ListObjectsResultV2

for {
lsRes, err = bkt.ListObjectsV2(oss.StartAfter(startAfter), oss.ContinuationToken(continueToken))
if err != nil {
break
}
for _, object := range lsRes.Objects {
keys = append(keys, object.Key)
}
if lsRes.IsTruncated {
startAfter = lsRes.StartAfter
continueToken = lsRes.NextContinuationToken
} else {
break
}
}

if len(keys) == 0 {
err = schema.ErrNotExist
}

return
}

func (a *AliyunDB) Delete(bucket, key string) (err error) {
bkt, err := a.client.Bucket(getS3Bucket(a.bucketPrefix, bucket))
if err != nil {
return
}

return bkt.DeleteObject(key)
}

func (a *AliyunDB) Close() (err error) {
return
}

func createAliyunBucket(svc *oss.Client, prefix string) error {
bucketNames := []string{
schema.ChunkBucket,
schema.TxDataEndOffSetBucket,
schema.TxMetaBucket,
schema.ConstantsBucket,
schema.TaskIdPendingPoolBucket,
schema.TaskBucket,
schema.BundleItemBinary,
schema.BundleItemMeta,
schema.BundleWaitParseArIdBucket,
schema.BundleArIdToItemIdsBucket,
}

ownBuckets, err := getBucketWithPrefix(svc, prefix)
if err != nil {
return err
}

for _, bucketName := range bucketNames {
s3Bkt := getS3Bucket(prefix, bucketName) // s3 bucket name only accept lower case
if !ownBuckets[s3Bkt] {
err := svc.CreateBucket(s3Bkt)
if err != nil {
return err
}
}
}
return nil
}

func getBucketWithPrefix(svc *oss.Client, prefix string) (map[string]bool, error) {
res := make(map[string]bool)

lsRes, err := svc.ListBuckets(oss.Prefix(prefix))
if err != nil {
return nil, err
}

for _, bucket := range lsRes.Buckets {
res[bucket.Name] = true
}

return res, nil
}

func handleOSSErr(ossErr error) (err error) {
switch ossErr.(type) {
case oss.ServiceError:
if ossErr.(oss.ServiceError).Code == ossErrorNoSuchKey {
err = schema.ErrNotExist
}
default:
err = ossErr
}

return
}
10 changes: 10 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ func NewBoltStore(boltDirPath string) (*Store, error) {
return &Store{KVDb: Db}, nil
}

func NewAliyunStore(endpoint, accKey, secretKey, bucketPrefix string) (*Store, error) {
Db, err := rawdb.NewAliyunDB(endpoint, accKey, secretKey, bucketPrefix)
if err != nil {
return nil, err
}
return &Store{
KVDb: Db,
}, nil
}

func (s *Store) Close() error {
return s.KVDb.Close()
}
Expand Down

0 comments on commit 75d0163

Please sign in to comment.