Skip to content

Commit

Permalink
support large zk code
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuhaai committed Dec 18, 2024
1 parent c65b866 commit 28b1c19
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 81 deletions.
5 changes: 3 additions & 2 deletions project/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/iotexproject/w3bstream/util/filefetcher"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -35,8 +36,8 @@ func (m *Manager) Project(projectID *big.Int) (*Project, error) {
}

// project file hash mismatch, fetch new project file from uri
pm := &Meta{ProjectID: projectID, Uri: uri, Hash: hash}
data, err := pm.FetchProjectFile()
fetcher := &filefetcher.Filedescriptor{Uri: uri, Hash: hash}
data, err := fetcher.FetchFile()
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch project file, project_id %v", projectID)
}
Expand Down
56 changes: 0 additions & 56 deletions project/project.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
package project

import (
"bytes"
"crypto/sha256"
"encoding/json"
"io"
"math/big"
"net/http"
"net/url"
"strings"

"github.com/pkg/errors"

"github.com/iotexproject/w3bstream/util/ipfs"
)

var (
Expand All @@ -26,12 +17,6 @@ type Project struct {
Configs []*Config `json:"config"`
}

type Meta struct {
ProjectID *big.Int
Uri string
Hash [32]byte
}

type Attribute struct {
Paused bool
RequestedProverAmount uint64
Expand Down Expand Up @@ -97,44 +82,3 @@ func (c *Config) validate() error {
}
return nil
}

func (m *Meta) FetchProjectFile() ([]byte, error) {
u, err := url.Parse(m.Uri)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse project file uri %s", m.Uri)
}

var data []byte
switch u.Scheme {
case "http", "https":
resp, _err := http.Get(m.Uri)
if _err != nil {
return nil, errors.Wrapf(_err, "failed to fetch project file, uri %s", m.Uri)
}
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)

case "ipfs":
// ipfs url: ipfs://${endpoint}/${cid}
sh := ipfs.NewIPFS(u.Host)
cid := strings.Split(strings.Trim(u.Path, "/"), "/")
data, err = sh.Cat(cid[0])

default:
return nil, errors.Errorf("invalid project file uri %s", m.Uri)
}

if err != nil {
return nil, errors.Wrapf(err, "failed to read project file, uri %s", m.Uri)
}

h := sha256.New()
if _, err := h.Write(data); err != nil {
return nil, errors.Wrap(err, "failed to generate project file hash")
}
if !bytes.Equal(h.Sum(nil), m.Hash[:]) {
return nil, errors.New("failed to validate project file hash")
}

return data, nil
}
21 changes: 11 additions & 10 deletions project/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/iotexproject/w3bstream/util/filefetcher"
"github.com/iotexproject/w3bstream/util/ipfs"
)

Expand All @@ -24,7 +25,7 @@ func TestProjectMeta_FetchProjectRawData_init(t *testing.T) {
t.Run("InvalidUri", func(t *testing.T) {
p = p.ApplyFuncReturn(url.Parse, nil, errors.New(t.Name()))

_, err := (&Meta{}).FetchProjectFile()
_, err := (&filefetcher.Filedescriptor{}).FetchFile()
r.ErrorContains(err, t.Name())
})
}
Expand All @@ -43,15 +44,15 @@ func TestProjectMeta_FetchProjectFile_http(t *testing.T) {
r.NoError(err)
hash := h.Sum(nil)

pm := &Meta{
pm := &filefetcher.Filedescriptor{
Uri: "https://test.com/project_config",
Hash: [32]byte(hash),
}

t.Run("FailedToGetHTTP", func(t *testing.T) {
p = p.ApplyFuncReturn(http.Get, nil, errors.New(t.Name()))

_, err := pm.FetchProjectFile()
_, err := pm.FetchFile()
r.ErrorContains(err, t.Name())
})
t.Run("FailedToIOReadAll", func(t *testing.T) {
Expand All @@ -60,19 +61,19 @@ func TestProjectMeta_FetchProjectFile_http(t *testing.T) {
}, nil)
p = p.ApplyFuncReturn(io.ReadAll, nil, errors.New(t.Name()))

_, err := pm.FetchProjectFile()
_, err := pm.FetchFile()
r.ErrorContains(err, t.Name())
})
t.Run("HashMismatch", func(t *testing.T) {
p = p.ApplyFuncReturn(io.ReadAll, jc, nil)

npm := *pm
npm.Hash = [32]byte{}
_, err := npm.FetchProjectFile()
_, err := npm.FetchFile()
r.ErrorContains(err, "failed to validate project file hash")
})
t.Run("Success", func(t *testing.T) {
_, err := pm.FetchProjectFile()
_, err := pm.FetchFile()
r.NoError(err)
})
}
Expand All @@ -82,13 +83,13 @@ func TestProjectMeta_FetchProjectFile_ipfs(t *testing.T) {
p := gomonkey.NewPatches()
defer p.Reset()

pm := &Meta{
pm := &filefetcher.Filedescriptor{
Uri: "ipfs://test.com/123",
}
t.Run("FailedToGetIPFS", func(t *testing.T) {
p = p.ApplyMethodReturn(&ipfs.IPFS{}, "Cat", nil, errors.New(t.Name()))

_, err := pm.FetchProjectFile()
_, err := pm.FetchFile()
r.ErrorContains(err, t.Name())
})
}
Expand All @@ -98,12 +99,12 @@ func TestProjectMeta_FetchProjectFile_default(t *testing.T) {
p := gomonkey.NewPatches()
defer p.Reset()

pm := &Meta{
pm := &filefetcher.Filedescriptor{
Uri: "test.com/123",
}

t.Run("FailedToGetIPFS", func(t *testing.T) {
_, err := pm.FetchProjectFile()
_, err := pm.FetchFile()
r.ErrorContains(err, "invalid project file uri")
})
}
Expand Down
4 changes: 2 additions & 2 deletions task/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/iotexproject/w3bstream/task"
)

type HandleTask func(task *task.Task, vmTypeID uint64, code string, expParam string) ([]byte, error)
type HandleTask func(task *task.Task, projectConfig *project.Config) ([]byte, error)
type Project func(projectID *big.Int) (*project.Project, error)
type RetrieveTask func(taskIDs []common.Hash) ([]*task.Task, error)

Expand Down Expand Up @@ -60,7 +60,7 @@ func (r *processor) process(taskID common.Hash) error {
}
slog.Info("process task", "project_id", t.ProjectID.String(), "task_id", t.ID, "vm_type", c.VMTypeID)
startTime := time.Now()
proof, err := r.handle(t, c.VMTypeID, c.Code, c.Metadata)
proof, err := r.handle(t, c)
if err != nil {
metrics.FailedTaskNumMtc.WithLabelValues(t.ProjectID.String()).Inc()
slog.Error("failed to handle task", "error", err)
Expand Down
58 changes: 58 additions & 0 deletions util/filefetcher/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package filefetcher

import (
"bytes"
"crypto/sha256"
"io"
"net/http"
"net/url"
"strings"

"github.com/iotexproject/w3bstream/util/ipfs"
"github.com/pkg/errors"
)

type Filedescriptor struct {
Uri string
Hash [32]byte
}

func (fd *Filedescriptor) FetchFile() ([]byte, error) {
u, err := url.Parse(fd.Uri)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse project file uri %s", fd.Uri)
}

var data []byte
switch u.Scheme {
case "http", "https":
resp, _err := http.Get(fd.Uri)
if _err != nil {
return nil, errors.Wrapf(_err, "failed to fetch project file, uri %s", fd.Uri)
}
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
case "ipfs":
// ipfs url: ipfs://${endpoint}/${cid}
sh := ipfs.NewIPFS(u.Host)
cid := strings.Split(strings.Trim(u.Path, "/"), "/")
data, err = sh.Cat(cid[0])
default:
return nil, errors.Errorf("invalid project file uri %s", fd.Uri)
}
if err != nil {
return nil, errors.Wrapf(err, "failed to read project file, uri %s", fd.Uri)
}

if !bytes.Equal(fd.Hash[:], make([]byte, 32)) {
h := sha256.New()
if _, err := h.Write(data); err != nil {
return nil, errors.Wrap(err, "failed to generate project file hash")
}
if !bytes.Equal(h.Sum(nil), fd.Hash[:]) {
return nil, errors.New("failed to validate project file hash")
}
}

return data, nil
}
33 changes: 22 additions & 11 deletions vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,36 @@ import (
"context"
"encoding/hex"
"log/slog"
"strings"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/iotexproject/w3bstream/project"
"github.com/iotexproject/w3bstream/task"
"github.com/iotexproject/w3bstream/util/filefetcher"
"github.com/iotexproject/w3bstream/vm/proto"
)

type Handler struct {
vmClients map[uint64]*grpc.ClientConn
}

func (r *Handler) Handle(task *task.Task, vmTypeID uint64, code string, expParam string) ([]byte, error) {
conn, ok := r.vmClients[vmTypeID]
if !ok {
return nil, errors.Errorf("unsupported vm type id %d", vmTypeID)
}
cli := proto.NewVMClient(conn)

bi, err := hex.DecodeString(code)
func (r *Handler) Handle(task *task.Task, projectConfig *project.Config) ([]byte, error) {
bi, err := decodeBinary(projectConfig.Code)
if err != nil {
return nil, errors.Wrap(err, "failed to decode code")
}
metadata, err := hex.DecodeString(expParam)
metadata, err := decodeBinary(projectConfig.Metadata)
if err != nil {
return nil, errors.Wrap(err, "failed to decode metadata")
}
conn, ok := r.vmClients[projectConfig.VMTypeID]
if !ok {
return nil, errors.Errorf("unsupported vm type id %d", projectConfig.VMTypeID)
}
cli := proto.NewVMClient(conn)
if _, err := cli.NewProject(context.Background(), &proto.NewProjectRequest{
ProjectID: task.ProjectID.String(),
ProjectVersion: task.ProjectVersion,
Expand All @@ -48,14 +50,23 @@ func (r *Handler) Handle(task *task.Task, vmTypeID uint64, code string, expParam
Payloads: [][]byte{task.Payload},
})
if err != nil {
slog.Error("failed to execute task", "project_id", task.ProjectID, "vm_type", vmTypeID,
"task_id", task.ID, "binary", code, "payloads", task.Payload, "err", err)
slog.Error("failed to execute task", "project_id", task.ProjectID, "vm_type", projectConfig.VMTypeID,
"task_id", task.ID, "binary", projectConfig.Code, "payloads", task.Payload, "err", err)
return nil, errors.Wrap(err, "failed to execute vm instance")
}

return resp.Result, nil
}

func decodeBinary(b string) ([]byte, error) {
if strings.Contains(b, "http") ||
strings.Contains(b, "ipfs") {
fd := filefetcher.Filedescriptor{Uri: b}
return fd.FetchFile()
}
return hex.DecodeString(b)
}

func NewHandler(vmEndpoints map[uint64]string) (*Handler, error) {
clients := map[uint64]*grpc.ClientConn{}
for t, e := range vmEndpoints {
Expand Down

0 comments on commit 28b1c19

Please sign in to comment.