Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support large zk code #785

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions project/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/iotexproject/w3bstream/util/filefetcher"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -35,8 +36,8 @@ func (m *Manager) Project(projectID *big.Int) (*Project, error) {
}

// project file hash mismatch, fetch new project file from uri
pm := &Meta{ProjectID: projectID, Uri: uri, Hash: hash}
data, err := pm.FetchProjectFile()
fetcher := &filefetcher.Filedescriptor{Uri: uri, Hash: hash}
data, err := fetcher.FetchFile()
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch project file, project_id %v", projectID)
}
Expand Down
56 changes: 0 additions & 56 deletions project/project.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
package project

import (
"bytes"
"crypto/sha256"
"encoding/json"
"io"
"math/big"
"net/http"
"net/url"
"strings"

"github.com/pkg/errors"

"github.com/iotexproject/w3bstream/util/ipfs"
)

var (
Expand All @@ -26,12 +17,6 @@ type Project struct {
Configs []*Config `json:"config"`
}

type Meta struct {
ProjectID *big.Int
Uri string
Hash [32]byte
}

type Attribute struct {
Paused bool
RequestedProverAmount uint64
Expand Down Expand Up @@ -97,44 +82,3 @@ func (c *Config) validate() error {
}
return nil
}

func (m *Meta) FetchProjectFile() ([]byte, error) {
u, err := url.Parse(m.Uri)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse project file uri %s", m.Uri)
}

var data []byte
switch u.Scheme {
case "http", "https":
resp, _err := http.Get(m.Uri)
if _err != nil {
return nil, errors.Wrapf(_err, "failed to fetch project file, uri %s", m.Uri)
}
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)

case "ipfs":
// ipfs url: ipfs://${endpoint}/${cid}
sh := ipfs.NewIPFS(u.Host)
cid := strings.Split(strings.Trim(u.Path, "/"), "/")
data, err = sh.Cat(cid[0])

default:
return nil, errors.Errorf("invalid project file uri %s", m.Uri)
}

if err != nil {
return nil, errors.Wrapf(err, "failed to read project file, uri %s", m.Uri)
}

h := sha256.New()
if _, err := h.Write(data); err != nil {
return nil, errors.Wrap(err, "failed to generate project file hash")
}
if !bytes.Equal(h.Sum(nil), m.Hash[:]) {
return nil, errors.New("failed to validate project file hash")
}

return data, nil
}
25 changes: 13 additions & 12 deletions project/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/iotexproject/w3bstream/util/filefetcher"
"github.com/iotexproject/w3bstream/util/ipfs"
)

Expand All @@ -24,7 +25,7 @@ func TestProjectMeta_FetchProjectRawData_init(t *testing.T) {
t.Run("InvalidUri", func(t *testing.T) {
p = p.ApplyFuncReturn(url.Parse, nil, errors.New(t.Name()))

_, err := (&Meta{}).FetchProjectFile()
_, err := (&filefetcher.Filedescriptor{}).FetchFile()
r.ErrorContains(err, t.Name())
})
}
Expand All @@ -43,15 +44,15 @@ func TestProjectMeta_FetchProjectFile_http(t *testing.T) {
r.NoError(err)
hash := h.Sum(nil)

pm := &Meta{
fd := &filefetcher.Filedescriptor{
Uri: "https://test.com/project_config",
Hash: [32]byte(hash),
}

t.Run("FailedToGetHTTP", func(t *testing.T) {
p = p.ApplyFuncReturn(http.Get, nil, errors.New(t.Name()))

_, err := pm.FetchProjectFile()
_, err := fd.FetchFile()
r.ErrorContains(err, t.Name())
})
t.Run("FailedToIOReadAll", func(t *testing.T) {
Expand All @@ -60,19 +61,19 @@ func TestProjectMeta_FetchProjectFile_http(t *testing.T) {
}, nil)
p = p.ApplyFuncReturn(io.ReadAll, nil, errors.New(t.Name()))

_, err := pm.FetchProjectFile()
_, err := fd.FetchFile()
r.ErrorContains(err, t.Name())
})
t.Run("HashMismatch", func(t *testing.T) {
p = p.ApplyFuncReturn(io.ReadAll, jc, nil)

npm := *pm
npm.Hash = [32]byte{}
_, err := npm.FetchProjectFile()
nfd := *fd
nfd.Hash = [32]byte{}
_, err := nfd.FetchFile()
r.ErrorContains(err, "failed to validate project file hash")
})
t.Run("Success", func(t *testing.T) {
_, err := pm.FetchProjectFile()
_, err := fd.FetchFile()
r.NoError(err)
})
}
Expand All @@ -82,13 +83,13 @@ func TestProjectMeta_FetchProjectFile_ipfs(t *testing.T) {
p := gomonkey.NewPatches()
defer p.Reset()

pm := &Meta{
fd := &filefetcher.Filedescriptor{
Uri: "ipfs://test.com/123",
}
t.Run("FailedToGetIPFS", func(t *testing.T) {
p = p.ApplyMethodReturn(&ipfs.IPFS{}, "Cat", nil, errors.New(t.Name()))

_, err := pm.FetchProjectFile()
_, err := fd.FetchFile()
r.ErrorContains(err, t.Name())
})
}
Expand All @@ -98,12 +99,12 @@ func TestProjectMeta_FetchProjectFile_default(t *testing.T) {
p := gomonkey.NewPatches()
defer p.Reset()

pm := &Meta{
fd := &filefetcher.Filedescriptor{
Uri: "test.com/123",
}

t.Run("FailedToGetIPFS", func(t *testing.T) {
_, err := pm.FetchProjectFile()
_, err := fd.FetchFile()
r.ErrorContains(err, "invalid project file uri")
})
}
Expand Down
4 changes: 2 additions & 2 deletions task/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/iotexproject/w3bstream/task"
)

type HandleTask func(task *task.Task, vmTypeID uint64, code string, expParam string) ([]byte, error)
type HandleTask func(task *task.Task, projectConfig *project.Config) ([]byte, error)
type Project func(projectID *big.Int) (*project.Project, error)
type RetrieveTask func(taskIDs []common.Hash) ([]*task.Task, error)

Expand Down Expand Up @@ -60,7 +60,7 @@ func (r *processor) process(taskID common.Hash) error {
}
slog.Info("process task", "project_id", t.ProjectID.String(), "task_id", t.ID, "vm_type", c.VMTypeID)
startTime := time.Now()
proof, err := r.handle(t, c.VMTypeID, c.Code, c.Metadata)
proof, err := r.handle(t, c)
if err != nil {
metrics.FailedTaskNumMtc.WithLabelValues(t.ProjectID.String()).Inc()
slog.Error("failed to handle task", "error", err)
Expand Down
58 changes: 58 additions & 0 deletions util/filefetcher/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package filefetcher

import (
"bytes"
"crypto/sha256"
"io"
"net/http"
"net/url"
"strings"

"github.com/iotexproject/w3bstream/util/ipfs"
"github.com/pkg/errors"
)

type Filedescriptor struct {
Uri string
Hash [32]byte
}

func (fd *Filedescriptor) FetchFile(skipHash ...bool) ([]byte, error) {
u, err := url.Parse(fd.Uri)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse project file uri %s", fd.Uri)
}

var data []byte
switch u.Scheme {
case "http", "https":
resp, _err := http.Get(fd.Uri)
if _err != nil {
return nil, errors.Wrapf(_err, "failed to fetch project file, uri %s", fd.Uri)
}
defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
case "ipfs":
// ipfs url: ipfs://${endpoint}/${cid}
sh := ipfs.NewIPFS(u.Host)
cid := strings.Split(strings.Trim(u.Path, "/"), "/")
data, err = sh.Cat(cid[0])
default:
return nil, errors.Errorf("invalid project file uri %s", fd.Uri)
}
if err != nil {
return nil, errors.Wrapf(err, "failed to read project file, uri %s", fd.Uri)
}

if len(skipHash) == 0 || !skipHash[0] {
h := sha256.New()
if _, err := h.Write(data); err != nil {
return nil, errors.Wrap(err, "failed to generate project file hash")
}
if !bytes.Equal(h.Sum(nil), fd.Hash[:]) {
return nil, errors.New("failed to validate project file hash")
}
}

return data, nil
}
34 changes: 23 additions & 11 deletions vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,36 @@ import (
"context"
"encoding/hex"
"log/slog"
"strings"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/iotexproject/w3bstream/project"
"github.com/iotexproject/w3bstream/task"
"github.com/iotexproject/w3bstream/util/filefetcher"
"github.com/iotexproject/w3bstream/vm/proto"
)

type Handler struct {
vmClients map[uint64]*grpc.ClientConn
}

func (r *Handler) Handle(task *task.Task, vmTypeID uint64, code string, expParam string) ([]byte, error) {
conn, ok := r.vmClients[vmTypeID]
if !ok {
return nil, errors.Errorf("unsupported vm type id %d", vmTypeID)
}
cli := proto.NewVMClient(conn)

bi, err := hex.DecodeString(code)
func (r *Handler) Handle(task *task.Task, projectConfig *project.Config) ([]byte, error) {
bi, err := decodeBinary(projectConfig.Code)
if err != nil {
return nil, errors.Wrap(err, "failed to decode code")
}
metadata, err := hex.DecodeString(expParam)
metadata, err := decodeBinary(projectConfig.Metadata)
if err != nil {
return nil, errors.Wrap(err, "failed to decode metadata")
}
conn, ok := r.vmClients[projectConfig.VMTypeID]
if !ok {
return nil, errors.Errorf("unsupported vm type id %d", projectConfig.VMTypeID)
}
cli := proto.NewVMClient(conn)
if _, err := cli.NewProject(context.Background(), &proto.NewProjectRequest{
ProjectID: task.ProjectID.String(),
ProjectVersion: task.ProjectVersion,
Expand All @@ -48,14 +50,24 @@ func (r *Handler) Handle(task *task.Task, vmTypeID uint64, code string, expParam
Payloads: [][]byte{task.Payload},
})
if err != nil {
slog.Error("failed to execute task", "project_id", task.ProjectID, "vm_type", vmTypeID,
"task_id", task.ID, "binary", code, "payloads", task.Payload, "err", err)
slog.Error("failed to execute task", "project_id", task.ProjectID, "vm_type", projectConfig.VMTypeID,
"task_id", task.ID, "binary", projectConfig.Code, "payloads", task.Payload, "err", err)
return nil, errors.Wrap(err, "failed to execute vm instance")
}

return resp.Result, nil
}

func decodeBinary(b string) ([]byte, error) {
if strings.Contains(b, "http") ||
strings.Contains(b, "ipfs") {
fd := filefetcher.Filedescriptor{Uri: b}
skipHash := true
return fd.FetchFile(skipHash)
}
return hex.DecodeString(b)
}

func NewHandler(vmEndpoints map[uint64]string) (*Handler, error) {
clients := map[uint64]*grpc.ClientConn{}
for t, e := range vmEndpoints {
Expand Down
Loading