Skip to content

Commit

Permalink
TEMP: stub out ipfs service
Browse files Browse the repository at this point in the history
ipfs support uses unsupported libs that fail when switching to go 1.21.
This stubs them out from das, enabling system_tests to compile.
  • Loading branch information
tsahee committed Jan 19, 2024
1 parent dd31351 commit dcddcba
Show file tree
Hide file tree
Showing 3 changed files with 562 additions and 536 deletions.
259 changes: 128 additions & 131 deletions das/ipfs_storage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,13 @@ import (
"bytes"
"context"
"errors"
"io"
"math/rand"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ipfs/go-cid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
"github.com/multiformats/go-multihash"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/cmd/ipfshelper"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/pretty"
flag "github.com/spf13/pflag"
)

Expand Down Expand Up @@ -63,27 +55,28 @@ func IpfsStorageServiceConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type IpfsStorageService struct {
config IpfsStorageServiceConfig
ipfsHelper *ipfshelper.IpfsHelper
ipfsApi coreiface.CoreAPI
config IpfsStorageServiceConfig
// ipfsHelper *ipfshelper.IpfsHelper
// ipfsApi coreiface.CoreAPI
}

func NewIpfsStorageService(ctx context.Context, config IpfsStorageServiceConfig) (*IpfsStorageService, error) {
ipfsHelper, err := ipfshelper.CreateIpfsHelper(ctx, config.RepoDir, false, config.Peers, config.Profiles)
if err != nil {
return nil, err
}
addrs, err := ipfsHelper.GetPeerHostAddresses()
if err != nil {
return nil, err
}
log.Info("IPFS node started up", "hostAddresses", addrs)

return &IpfsStorageService{
config: config,
ipfsHelper: ipfsHelper,
ipfsApi: ipfsHelper.GetAPI(),
}, nil
return nil, errors.New("ipfs needs to be updated for go 1.21")
// ipfsHelper, err := ipfshelper.CreateIpfsHelper(ctx, config.RepoDir, false, config.Peers, config.Profiles)
// if err != nil {
// return nil, err
// }
// addrs, err := ipfsHelper.GetPeerHostAddresses()
// if err != nil {
// return nil, err
// }
// log.Info("IPFS node started up", "hostAddresses", addrs)

// return &IpfsStorageService{
// config: config,
// ipfsHelper: ipfsHelper,
// ipfsApi: ipfsHelper.GetAPI(),
// }, nil
}

func hashToCid(hash common.Hash) (cid.Cid, error) {
Expand All @@ -103,69 +96,70 @@ func hashToCid(hash common.Hash) (cid.Cid, error) {
// GetByHash retrieves and reconstructs one batch's data, using IPFS to retrieve the preimages
// for each chunk of data and the dastree nodes.
func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
log.Trace("das.IpfsStorageService.GetByHash", "hash", pretty.PrettyHash(hash))

doPin := false // If true, pin every block related to this batch
if s.config.PinAfterGet {
if s.config.PinPercentage == 100.0 {
doPin = true
} else if (rand.Float64() * 100.0) <= s.config.PinPercentage {
doPin = true
}

}

oracle := func(h common.Hash) ([]byte, error) {
thisCid, err := hashToCid(h)
if err != nil {
return nil, err
}

ipfsPath := path.IpfsPath(thisCid)
log.Trace("Retrieving IPFS path", "path", ipfsPath.String())

parentCtx := ctx
if doPin {
// If we want to pin this batch, then detach from the parent context so
// we are not canceled before s.config.ReadTimeout.
parentCtx = context.Background()
}

timeoutCtx, cancel := context.WithTimeout(parentCtx, s.config.ReadTimeout)
defer cancel()
rdr, err := s.ipfsApi.Block().Get(timeoutCtx, ipfsPath)
if err != nil {
if timeoutCtx.Err() != nil {
return nil, ErrNotFound
}
return nil, err
}

data, err := io.ReadAll(rdr)
if err != nil {
return nil, err
}

if doPin {
go func() {
pinCtx, pinCancel := context.WithTimeout(context.Background(), s.config.ReadTimeout)
defer pinCancel()
err := s.ipfsApi.Pin().Add(pinCtx, ipfsPath)
// Recursive pinning not needed, each dastree preimage fits in a single
// IPFS block.
if err != nil {
// Pinning is best-effort.
log.Warn("Failed to pin in IPFS", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String())
} else {
log.Trace("Pin in IPFS successful", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String())
}
}()
}

return data, nil
}

return dastree.Content(hash, oracle)
return nil, errors.New("ipfshelper should be updated for go 1.21 support")
// log.Trace("das.IpfsStorageService.GetByHash", "hash", pretty.PrettyHash(hash))

// doPin := false // If true, pin every block related to this batch
// if s.config.PinAfterGet {
// if s.config.PinPercentage == 100.0 {
// doPin = true
// } else if (rand.Float64() * 100.0) <= s.config.PinPercentage {
// doPin = true
// }

// }

// oracle := func(h common.Hash) ([]byte, error) {
// thisCid, err := hashToCid(h)
// if err != nil {
// return nil, err
// }

// ipfsPath := path.IpfsPath(thisCid)
// log.Trace("Retrieving IPFS path", "path", ipfsPath.String())

// parentCtx := ctx
// if doPin {
// // If we want to pin this batch, then detach from the parent context so
// // we are not canceled before s.config.ReadTimeout.
// parentCtx = context.Background()
// }

// timeoutCtx, cancel := context.WithTimeout(parentCtx, s.config.ReadTimeout)
// defer cancel()
// rdr, err := s.ipfsApi.Block().Get(timeoutCtx, ipfsPath)
// if err != nil {
// if timeoutCtx.Err() != nil {
// return nil, ErrNotFound
// }
// return nil, err
// }

// data, err := io.ReadAll(rdr)
// if err != nil {
// return nil, err
// }

// if doPin {
// go func() {
// pinCtx, pinCancel := context.WithTimeout(context.Background(), s.config.ReadTimeout)
// defer pinCancel()
// err := s.ipfsApi.Pin().Add(pinCtx, ipfsPath)
// // Recursive pinning not needed, each dastree preimage fits in a single
// // IPFS block.
// if err != nil {
// // Pinning is best-effort.
// log.Warn("Failed to pin in IPFS", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String())
// } else {
// log.Trace("Pin in IPFS successful", "hash", pretty.PrettyHash(hash), "path", ipfsPath.String())
// }
// }()
// }

// return data, nil
// }

// return dastree.Content(hash, oracle)
}

// Put stores all the preimages required to reconstruct the dastree for single batch,
Expand All @@ -176,47 +170,49 @@ func (s *IpfsStorageService) GetByHash(ctx context.Context, hash common.Hash) ([
// IPFS default block size is 256KB and dastree max block size is 64KB so each dastree
// node and data chunk easily fits within an IPFS block.
func (s *IpfsStorageService) Put(ctx context.Context, data []byte, timeout uint64) error {
logPut("das.IpfsStorageService.Put", data, timeout, s)

var chunks [][]byte

record := func(_ common.Hash, value []byte) {
chunks = append(chunks, value)
}

_ = dastree.RecordHash(record, data)

numChunks := len(chunks)
resultChan := make(chan error, numChunks)
for _, chunk := range chunks {
_chunk := chunk
go func() {
blockStat, err := s.ipfsApi.Block().Put(
ctx,
bytes.NewReader(_chunk),
options.Block.CidCodec("raw"), // Store the data in raw form since the hash in the CID must be the hash
// of the preimage for our lookup scheme to work.
options.Block.Hash(multihash.KECCAK_256, -1), // Use keccak256 to calculate the hash to put in the block's
// CID, since it is the same algo used by dastree.
options.Block.Pin(true)) // Keep the data in the local IPFS repo, don't GC it.
if err == nil {
log.Trace("Wrote IPFS path", "path", blockStat.Path().String())
}
resultChan <- err
}()
}

successfullyWrittenChunks := 0
for err := range resultChan {
if err != nil {
return err
}
successfullyWrittenChunks++
if successfullyWrittenChunks == numChunks {
return nil
}
}
panic("unreachable")
return errors.New("ipfshelper should be updated for go 1.21 support")

// logPut("das.IpfsStorageService.Put", data, timeout, s)

// var chunks [][]byte

// record := func(_ common.Hash, value []byte) {
// chunks = append(chunks, value)
// }

// _ = dastree.RecordHash(record, data)

// numChunks := len(chunks)
// resultChan := make(chan error, numChunks)
// for _, chunk := range chunks {
// _chunk := chunk
// go func() {
// blockStat, err := s.ipfsApi.Block().Put(
// ctx,
// bytes.NewReader(_chunk),
// options.Block.CidCodec("raw"), // Store the data in raw form since the hash in the CID must be the hash
// // of the preimage for our lookup scheme to work.
// options.Block.Hash(multihash.KECCAK_256, -1), // Use keccak256 to calculate the hash to put in the block's
// // CID, since it is the same algo used by dastree.
// options.Block.Pin(true)) // Keep the data in the local IPFS repo, don't GC it.
// if err == nil {
// log.Trace("Wrote IPFS path", "path", blockStat.Path().String())
// }
// resultChan <- err
// }()
// }

// successfullyWrittenChunks := 0
// for err := range resultChan {
// if err != nil {
// return err
// }
// successfullyWrittenChunks++
// if successfullyWrittenChunks == numChunks {
// return nil
// }
// }
// panic("unreachable")
}

func (s *IpfsStorageService) ExpirationPolicy(ctx context.Context) (arbstate.ExpirationPolicy, error) {
Expand All @@ -228,7 +224,8 @@ func (s *IpfsStorageService) Sync(ctx context.Context) error {
}

func (s *IpfsStorageService) Close(ctx context.Context) error {
return s.ipfsHelper.Close()
return errors.New("ipfshelper should be updated for go 1.21 support")
// return s.ipfsHelper.Close()
}

func (s *IpfsStorageService) String() string {
Expand Down
Loading

0 comments on commit dcddcba

Please sign in to comment.