Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Implement context-aware APIs for better concurrency and robustness #70

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goar

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -372,38 +373,38 @@ func (c *Client) GetTransactionAnchor() (anchor string, err error) {
return
}

func (c *Client) SubmitTransaction(tx *types.Transaction) (status string, code int, err error) {
func (c *Client) SubmitTransaction(ctx context.Context, tx *types.Transaction) (status string, code int, err error) {
by, err := json.Marshal(tx)
if err != nil {
return
}

body, statusCode, err := c.httpPost("tx", by)
body, statusCode, err := c.httpPost(ctx, "tx", by)
status = string(body)
code = statusCode
return
}

func (c *Client) SubmitChunks(gc *types.GetChunk) (status string, code int, err error) {
func (c *Client) SubmitChunks(ctx context.Context, gc *types.GetChunk) (status string, code int, err error) {
byteGc, err := gc.Marshal()
if err != nil {
return
}

var body []byte
body, code, err = c.httpPost("chunk", byteGc)
body, code, err = c.httpPost(ctx, "chunk", byteGc)
status = string(body)
return
}

// Arql is Deprecated, recommended to use GraphQL
func (c *Client) Arql(arql string) (ids []string, err error) {
body, _, err := c.httpPost("arql", []byte(arql))
func (c *Client) Arql(ctx context.Context, arql string) (ids []string, err error) {
body, _, err := c.httpPost(ctx, "arql", []byte(arql))
err = json.Unmarshal(body, &ids)
return
}

func (c *Client) GraphQL(query string) ([]byte, error) {
func (c *Client) GraphQL(ctx context.Context, query string) ([]byte, error) {
// generate query
graQuery := struct {
Query string `json:"query"`
Expand All @@ -414,7 +415,7 @@ func (c *Client) GraphQL(query string) ([]byte, error) {
}

// query from http client
data, statusCode, err := c.httpPost("graphql", byQuery)
data, statusCode, err := c.httpPost(ctx, "graphql", byQuery)
if statusCode == 429 {
return nil, ErrRequestLimit
}
Expand Down Expand Up @@ -529,15 +530,20 @@ func (c *Client) httpGet(_path string) (body []byte, statusCode int, err error)
return
}

func (c *Client) httpPost(_path string, payload []byte) (body []byte, statusCode int, err error) {
func (c *Client) httpPost(ctx context.Context, _path string, payload []byte) (body []byte, statusCode int, err error) {
u, err := url.Parse(c.url)
if err != nil {
return
}

u.Path = path.Join(u.Path, _path)

resp, err := c.client.Post(u.String(), "application/json", bytes.NewReader(payload))
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(payload))
if err != nil {
return
}

resp, err := c.client.Do(req)
if err != nil {
return
}
Expand Down Expand Up @@ -1029,7 +1035,7 @@ func (c *Client) DataSyncRecord(endOffset string, intervalsNum int) ([]string, e
return result, nil
}

func (c *Client) SubmitToWarp(tx *types.Transaction) ([]byte, error) {
func (c *Client) SubmitToWarp(ctx context.Context, tx *types.Transaction) ([]byte, error) {
by, err := json.Marshal(tx)
if err != nil {
return nil, err
Expand All @@ -1040,7 +1046,7 @@ func (c *Client) SubmitToWarp(tx *types.Transaction) ([]byte, error) {
}

u.Path = path.Join(u.Path, "/gateway/sequencer/register")
req, err := http.NewRequest("POST", u.String(), bytes.NewReader(by))
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(by))
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions client_broadcast.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package goar

import (
"context"
"errors"
"fmt"

"github.com/everFinance/goar/types"
)

func (c *Client) BroadcastData(txId string, data []byte, numOfNodes int64, peers ...string) error {
func (c *Client) BroadcastData(ctx context.Context, txId string, data []byte, numOfNodes int64, peers ...string) error {
var err error
if len(peers) == 0 {
peers, err = c.GetPeers()
Expand All @@ -24,7 +26,7 @@ func (c *Client) BroadcastData(txId string, data []byte, numOfNodes int64, peers
continue
}

if err = uploader.Once(); err != nil {
if err = uploader.Once(ctx); err != nil {
continue
}

Expand Down
11 changes: 7 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package goar

import (
"github.com/everFinance/goar/types"
"github.com/everFinance/goar/utils"
"github.com/stretchr/testify/assert"
"context"
"os"
"strconv"
"testing"

"github.com/everFinance/goar/types"
"github.com/everFinance/goar/utils"
"github.com/stretchr/testify/assert"
)

// func TestGetTransactionByID(t *testing.T) {
Expand Down Expand Up @@ -219,12 +221,13 @@ func Test_GetTxDataFromPeers(t *testing.T) {
}

func TestClient_BroadcastData(t *testing.T) {
ctx := context.Background()
cli := NewClient("https://arweave.net")
txId := "J5FY1Ovd6JJ49WFHfCf-1wDM1TbaPSdKnGIB_8ePErE"
data, err := cli.GetTransactionData(txId, "json")
assert.NoError(t, err)

err = cli.BroadcastData(txId, data, 20)
err = cli.BroadcastData(ctx, txId, data, 20)
assert.NoError(t, err)
}

Expand Down
7 changes: 5 additions & 2 deletions example/api_example_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package example

import (
"context"
"testing"

"github.com/everFinance/goar/types"
"github.com/everFinance/goar/utils"
"testing"

"github.com/everFinance/goar"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -63,6 +65,7 @@ func Test_Arq1(t *testing.T) {
}

func Test_Arq(t *testing.T) {
ctx := context.Background()
arqStr := `{
"op": "and",
"expr1": {
Expand All @@ -79,7 +82,7 @@ func Test_Arq(t *testing.T) {
// create client
arNode := "https://arweave.net"
c := goar.NewClient(arNode)
ids, err := c.Arql(arqStr)
ids, err := c.Arql(ctx, arqStr)
t.Log(len(ids))
assert.NoError(t, err)
sstr := make([]string, 0)
Expand Down
14 changes: 9 additions & 5 deletions example/chunks_tx_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package example

import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
Expand Down Expand Up @@ -71,6 +72,7 @@ func assemblyDataTx(bigData []byte, wallet *goar.Wallet, tags []types.Tag) (*typ

// test upload post big size data by chunks
func Test_PostBigDataByChunks(t *testing.T) {
ctx := context.Background()
filePath := "./testFile/2mbFile.pdf"
bigData, err := os.ReadFile(filePath)
assert.NoError(t, err)
Expand All @@ -83,11 +85,12 @@ func Test_PostBigDataByChunks(t *testing.T) {
// uploader Transaction
uploader, err := goar.CreateUploader(wallet.Client, tx, nil)
assert.NoError(t, err)
assert.NoError(t, uploader.Once())
assert.NoError(t, uploader.Once(ctx))
}

// test retry upload(断点重传) post big size data by tx id
func Test_RetryUploadDataByTxId(t *testing.T) {
ctx := context.Background()
filePath := "./testFile/3mPhoto.jpg"
bigData, err := os.ReadFile(filePath)
assert.NoError(t, err)
Expand All @@ -100,7 +103,7 @@ func Test_RetryUploadDataByTxId(t *testing.T) {

// 1. post this tx without data
tx.Data = ""
body, status, err := wallet.Client.SubmitTransaction(tx)
body, status, err := wallet.Client.SubmitTransaction(ctx, tx)
assert.NoError(t, err)
t.Logf("post tx without data; body: %s, status: %d", string(body), status)

Expand All @@ -124,11 +127,12 @@ func Test_RetryUploadDataByTxId(t *testing.T) {
// get uploader by txId and post big data by chunks
uploader, err := goar.CreateUploader(wallet.Client, tx.ID, bigData)
assert.NoError(t, err)
assert.NoError(t, uploader.Once())
assert.NoError(t, uploader.Once(ctx))
}

// test continue upload(断点续传) big size data by last time uploader
func Test_ContinueUploadDataByLastUploader(t *testing.T) {
ctx := context.Background()
filePath := "./testFile/1.8mPhoto.jpg"
bigData, err := os.ReadFile(filePath)
assert.NoError(t, err)
Expand All @@ -143,7 +147,7 @@ func Test_ContinueUploadDataByLastUploader(t *testing.T) {
assert.NoError(t, err)
// only upload 2 chunks to ar chain
for !uploader.IsComplete() && uploader.ChunkIndex <= 2 {
err := uploader.UploadChunk()
err := uploader.UploadChunk(ctx)
assert.NoError(t, err)
}

Expand All @@ -165,7 +169,7 @@ func Test_ContinueUploadDataByLastUploader(t *testing.T) {
// new uploader object by last time uploader
newUploader, err := goar.CreateUploader(wallet.Client, lastUploader.FormatSerializedUploader(), bigData)
assert.NoError(t, err)
assert.NoError(t, newUploader.Once())
assert.NoError(t, newUploader.Once(ctx))

// end remove jsonUploaderFile.json file
_ = os.Remove("./jsonUploaderFile.json")
Expand Down
7 changes: 5 additions & 2 deletions example/local_data_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package example

import (
"context"
"os"
"testing"

Expand All @@ -10,6 +11,7 @@ import (
)

func Test_SendData(t *testing.T) {
ctx := context.Background()
arNode := "https://arweave.net"
w, err := goar.NewWalletFromPath("./wallet/account1.json", arNode) // your wallet private key
assert.NoError(t, err)
Expand All @@ -22,7 +24,7 @@ func Test_SendData(t *testing.T) {
{Name: "xxxx", Value: "sssss"},
{Name: "yyyyyy", Value: "kkkkkk"},
}
tx, err := w.SendDataSpeedUp(data, tags, 10)
tx, err := w.SendDataSpeedUp(ctx, data, tags, 10)
assert.NoError(t, err)
t.Logf("tx hash: %s", tx.ID)
}
Expand Down Expand Up @@ -61,6 +63,7 @@ func TestConcurrentDownloadStream(t *testing.T) {
}

func TestSendDataStream(t *testing.T) {
ctx := context.Background()
arNode := "https://arweave.net"
w, err := goar.NewWalletFromPath("./testKey.json", arNode) // your wallet private key
assert.NoError(t, err)
Expand All @@ -73,7 +76,7 @@ func TestSendDataStream(t *testing.T) {
{Name: "Content-Type", Value: "img/jpeg"},
{Name: "test", Value: "kevin-test"},
}
tx, err := w.SendDataStreamSpeedUp(data, tags, 10)
tx, err := w.SendDataStreamSpeedUp(ctx, data, tags, 10)
assert.NoError(t, err)
t.Log(tx.ID)
// test arId: k5IgHLTag_3bB6Sp5tTUhrFrPPvU5MjevV468dfxNKk
Expand Down
16 changes: 8 additions & 8 deletions types/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ const (

// Errors from /chunk we should never try and continue on.
var FATAL_CHUNK_UPLOAD_ERRORS = map[string]struct{}{
"{\"error\":\"disk_full\"}": struct{}{},
"{\"error\":\"invalid_json\"}": struct{}{},
"{\"error\":\"chunk_too_big\"}": struct{}{},
"{\"error\":\"data_path_too_big\"}": struct{}{},
"{\"error\":\"offset_too_big\"}": struct{}{},
"{\"error\":\"data_size_too_big\"}": struct{}{},
"{\"error\":\"chunk_proof_ratio_not_attractive\"}": struct{}{},
"{\"error\":\"invalid_proof\"}": struct{}{},
"{\"error\":\"disk_full\"}": {},
"{\"error\":\"invalid_json\"}": {},
"{\"error\":\"chunk_too_big\"}": {},
"{\"error\":\"data_path_too_big\"}": {},
"{\"error\":\"offset_too_big\"}": {},
"{\"error\":\"data_size_too_big\"}": {},
"{\"error\":\"chunk_proof_ratio_not_attractive\"}": {},
"{\"error\":\"invalid_proof\"}": {},
}

// about bundle
Expand Down
2 changes: 1 addition & 1 deletion types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ type BundlrResp struct {
N string `json:"n"`
Public string `json:"public"`
Block int64 `json:"block"`
ValidatorSignatures []string `json:"validatorSignatures"`
ValidatorSignatures []string `json:"validatorSignatures"`
}
Loading