Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add off-chain-data go client application #1269

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

twoGiants
Copy link
Contributor

@twoGiants twoGiants commented Nov 11, 2024

Off Chain Data Go Application

This PR contains an implementation of the off-chain data store sample in go using the block event listening capability of the Fabric Gateway client API. The issue reference is here in the Fabric Gateway SDK repository.

It follows the same procedure and uses mostly the same naming as the off-chain data typescript sample here. There are some differences in the go package structure compared to the typescript modules and I added a couple of tests to the parser which helped me better understand the unpacking of the block data structure. The Go implementation is more similar to the Java version. More about that below in the Differences section.

@bestbeforetoday gave me this useful link where I found the iterator_test.go to be helpful and this link where the "Endorser Transaction" comments were helpful.

Summary

  • created project directory, app.go, getAllAssets.go, transact.go and listen.go files
  • reused and changed the logic for connect.go from the private data application
  • implemented the asset transfer basic contract in the contract package
  • implemented getAllAssets.go and transact.go
  • implemented the block parser and all the supporting data structures
  • implemented the block processor and all the supporting data structures
  • implemented listen.go
  • updated README.md with the command to run the go application
  • updated script which runs the application in the Github Actions workflow
  • manual tests

Differences

  • I didn't use any interfaces. I removed them after the suggestion of @bestbeforetoday.
  • I did not implement any methods which are not used anywhere, like toProto or getSignatureHeader.
  • The block parser with all it's underlying structs (which are mostly private) is in the parser package.
  • There are some simple tests for the cache utility and the block unwrapping.
  • The block processor with it's underlying struct transaction (which is private) is in the processor package.
  • The store is in the store package and because it resembles a bit a flat file db I called it flatFile.go
  • The utilities are in the utils package.
  • In all the additional packages I followed the encapsulation rule: if it's not referenced outside the package it's private.

Additional Information

I process the block events in a go routine and use context to stop the processing. Instead of panicking everywhere down the line where an error can occur I handle the errors by wrapping them, returning them, log them in the go routine, then return and shutdown gracefully. Panicking results in the context, gateway and checkpointer not getting closed.

Questions

  1. Here is a TODO proposing to simplify the processor.transaction data structure.
  2. And here is a TODO proposing to encapsulate the final unwrapping of the kvWrite in the parser instead of having it in transaction.writes.
  3. I assume I should squash everything to one commit? It might be a big commit message.

@twoGiants twoGiants force-pushed the off-chain-data-go branch 3 times, most recently from 45a2aac to 09402bc Compare November 15, 2024 18:02
@twoGiants twoGiants force-pushed the off-chain-data-go branch 3 times, most recently from f7ede20 to 4e58c31 Compare November 25, 2024 14:56
@twoGiants twoGiants force-pushed the off-chain-data-go branch 3 times, most recently from 307bb47 to 43f67fb Compare January 4, 2025 11:04
@twoGiants twoGiants force-pushed the off-chain-data-go branch 2 times, most recently from 433bd6a to 77e19a4 Compare January 7, 2025 10:09
Created project structure, fixed typos. Implemented connect.go and
getAllAssets.go. The latter uses an assetTransferBasic struct which
provides a simple API for basic asset operations like create, transfer,
etc.

Added transact.go with some util functions. Using google uuid package to
generate random UUIDs for the transactions.

Implemented pretty printing of JSON results.

Implemented app.go entry point with error handling. The existing
commands are getAllAssets, transact and listen. They can be called from
the command line via: "go run . <command> <command> ...". They will be
executed in order and if a command is not known an the application
panics and aborts before executing any of the commands.

Implementing listen.go. Added checkpointer, context setups, call to
BlockEvents and all the interfaces needed for parsing. Started
implementing the interfaces needed to represent a block bottom up in
structs. Finished NamespaceReadWriteSet, ReadWriteSet and
EndorserTransaction.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Signed-off-by: Stanislav Jakuschevskij <[email protected]>
For the GetCreator() method return the identity.Identity interface was
also implemented.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Created parser, contract and utils packages and extracted each piece of
functionality into its own files. Removed "Get" prefix from methods and
changed return values from interfaces to structs.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Removed Block and Transaction interfaces and unused statusCode function.
Using the struct instead of the interfaces now.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Added block processor struct and the process method.
Implemented getting valid transactions from the last processed index.
Added data structures needed for the store.

Decomposed the parser.Block.Transactions() method into readable chunks.

Added transaction processor struct and process method. Unwrapping
read write set data from the transaction, mapping to a new "write"
data structure and passing down to the store.

Store is an empty function and will be implemented next.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Persisting ledger writes to the file system into the store.log file in
the application-go directory. The write values are converted from bytes
to a string when the read write sets are unwrapped in the transaction
processor.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Added caching util function with tests and applied in:

- parser.Block.Transactions(),
- parser.Payload.ChannelHeader(),
- parser.Payload.SignatureHeader(),
- parser.NamespaceReadWriteSet.ReadWriteSet(),
- parser.EndorserTransaction.ReadWriteSets(),

methods, as it was in the typescript sample.

Corrected Println usage and added comments to util functions.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Created packages for the flat file store and the processor and moved
functions, variables and constants from listener.go to those packages.
Encapsulated everything not used outside the packages, introduced
model.go files which later might be extracted into a model package and
renamed parser/parsedBlock.go to parser/block.go.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Before when pressing 'ctrl+c' and stopping the go program non of the
deferred functions in listen.go were called. A standard procedure for
stopping goroutines with context was implemented which shuts down the
program gracefully. Logs were added to notify the user that the shutdown
was successful.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Every struct was put in its own file. Every method which is not used
outside the parser package was given package scope. All interfaces were
removed, they are implemented by the structs which are now used
everywhere needed as return values. There is no clear benefit of using
interfaces in this sample.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Before all transactions were processed and when the failure was
simulated a message was printed and all the transactions still
processed. Now the store returns an error when the failure is simulated
which the listener expects so that it can gracefully shutdown the system
and close the context. The context must be closed correctly or the
checkpointer won't save the last processed transactionId to the file
system.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
@twoGiants twoGiants force-pushed the off-chain-data-go branch 2 times, most recently from 9364108 to 0b8ba91 Compare January 8, 2025 15:32
Starting from the processor.Block.Process all methods now return errors
if something goes wrong with unpacking of the blocks and reading the
transactions. In each function where the error is being propagated back
to client it is wrapped in a message with the function name. This makes
it easier to track down the error and see the propagation chain. Finally
the error is logged to the terminal and the go routine shuts down
gracefully. The graceful shutdown executes all deferred functions which
close the context, the checkpointer and the gateway.

Before panics were used everywhere which was an issue because the
unpacking of the blocks happened in a go routine. When a panic happens
in a go routine only the deferred functions of the go routine are called
but not those of the client which lead to unexpected behavior.

The transact function is also executed in a go routine therefore the
same typo of error handling was implemented there.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Signed-off-by: Stanislav Jakuschevskij <[email protected]>
@twoGiants twoGiants changed the title 🚧 WIP: Add off-chain-data go client application 🚧 Add off-chain-data go client application Jan 9, 2025
@twoGiants twoGiants marked this pull request as ready for review January 9, 2025 10:32
@twoGiants twoGiants requested a review from a team as a code owner January 9, 2025 10:32
Copy link
Member

@bestbeforetoday bestbeforetoday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First thing, it runs great for me so this looks nicely done. Thank you! Sorry for large number of comments. This is just polishing the material to make it a really clean sample.

I would be careful to try and keep all the code that listens for blocks, navigates to their transactions and passes the read/write sets to some kind of store close together. Perhaps all in listen.go. This is the key learning point of the sample so it needs to be easy to access and navigate. The block parser should be separate (as you've done) so it is a reusable package on its own. The store implementation can also be a little saperate to avoid cluttering the key listener code. I might put it in a different file within the same package though, just as private (lowercase) content.

The Application section of off_chain_data/README.md should be updated to include references to key parts of the Go implementation, similar to the existing TypeScript and Java entries.

To catch some issues, it might be worth running this command in the application-go directory:

go run honnef.co/go/tools/cmd/staticcheck@latest -f stylish -checks all ./...

To avoid specific checks (like having package-level godoc), you can exclude specific rules. For example:

go run honnef.co/go/tools/cmd/staticcheck@latest -f stylish -checks 'all,-ST1000' ./...

off_chain_data/application-go/app.go Outdated Show resolved Hide resolved
off_chain_data/application-go/processor/block.go Outdated Show resolved Hide resolved
off_chain_data/application-go/processor/block.go Outdated Show resolved Hide resolved
off_chain_data/application-go/contract/model.go Outdated Show resolved Hide resolved
off_chain_data/application-go/contract/model.go Outdated Show resolved Hide resolved
func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) {
result, err := atb.contract.Evaluate("GetAllAssets")
if err != nil {
return []byte{}, fmt.Errorf("in GetAllAssets: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://go.dev/wiki/CodeReviewComments#declaring-empty-slices

Suggested change
return []byte{}, fmt.Errorf("in GetAllAssets: %w", err)
return nil, fmt.Errorf("in GetAllAssets: %w", err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

off_chain_data/application-go/parser/block.go Outdated Show resolved Hide resolved
Comment on lines 65 to 91
var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

for blockProto := range blocks {
select {
case <-ctx.Done():
return
default:
blockProcessor := processor.NewBlock(
parser.ParseBlock(blockProto),
checkpointer,
store.ApplyWritesToOffChainStore,
channelName,
)

if err := blockProcessor.Process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}
}
}()

wg.Wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the reason for using a wait group for a single goroutine that you then wait on before continuing. Maybe this can just be inline code?

It should not be necessary to check the context while reading. If the context used for the BlockEvents call is closed, the returned channel should also get closed, which would mean the range loop will exit.

Suggested change
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for blockProto := range blocks {
select {
case <-ctx.Done():
return
default:
blockProcessor := processor.NewBlock(
parser.ParseBlock(blockProto),
checkpointer,
store.ApplyWritesToOffChainStore,
channelName,
)
if err := blockProcessor.Process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}
}
}()
wg.Wait()
for blockProto := range blocks {
blockProcessor := processor.NewBlock(
parser.ParseBlock(blockProto),
checkpointer,
store.ApplyWritesToOffChainStore,
channelName,
)
if err := blockProcessor.Process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. It's not needed. There must've been a reason why I put it there while developing but I can't recall.

I removed it.

@@ -0,0 +1,20 @@
module offChainData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention Go package names are all lowercase. See https://go.dev/blog/package-names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@twoGiants twoGiants force-pushed the off-chain-data-go branch 5 times, most recently from 648f6fa to d4a1546 Compare January 20, 2025 16:06
- update Application section in README
- remove param name in app.go
- add error checks in processor/block.go
- move vars from model to transact logic
- move newAsset to transact
- use ID for well-known initialisms
- move randomelement, randomnint and differentelement to transact
- remove AssertDefined
- blockTxIdsJoinedByComma: use standard library to join elements
- return nil, instead of []byte{}
- remove go routine in listen.go
- move cache to parser
- inline processor in listen.go
- move store to main package
- move util to main package
- fixed failing cache issue
- fixed staticcheck issues
- removed cache function, implemented caching in the structs and methods

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
Copy link
Member

@bestbeforetoday bestbeforetoday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the excellent work. This is a great improvement on the last time I looked at the code. Sorry for the delay and having lots of comments... again. Most are very minor suggestions but a few I think are more important, like the error handling to ensure execution surfaces errors correctly to the automated build.

Let me know if any of the suggestions are not clear enough. I am happy to help with any tricky changes.

Once again, great work!

return nil
}

func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be parsing the returned []byte and returning []Asset. AssetTransferBasic is providing an abstraction between the business logic and the fabric-gateway API, so it receives parameters and returns results using types that make sense for the calling application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like that?

func (atb *AssetTransferBasic) GetAllAssets() ([]Asset, error) {
	assetsRaw, err := atb.contract.Evaluate("GetAllAssets")
	if err != nil {
		return nil, err
	}

	if len(assetsRaw) == 0 {
		return []Asset{}, nil
	}

	var assets []Asset
	if err := json.Unmarshal(assetsRaw, &assets); err != nil {
		return nil, err
	}

	return assets, nil
}

Comment on lines +29 to +39
assets, err := smartContract.GetAllAssets()
if err != nil {
panic(err)
}

if len(assets) == 0 {
fmt.Println("no assets")
return
}

fmt.Println(formatJSON(assets))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

smartContract.GetAllAssets() should return a []Asset, which the remaining code here should just marshal as a JSON string and print. There shouldn't be any need to handle empty results differently as an empty JSON array can be printed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (t *transactApp) run() {
var wg sync.WaitGroup

for i := 0; i < t.batchSize; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i := 0; i < t.batchSize; i++ {
for i := range t.batchSize {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will be unused. The linter and compiler won't accept it.

Comment on lines +116 to +124
// Generate a random integer in the range 0 to max - 1.
func randomInt(max int) int {
result, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
if err != nil {
panic(err)
}

return int(result.Int64())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to replace this with rand.N() (or rand.IntN()) from the the math/rand/v2 standard library:

https://pkg.go.dev/math/rand/v2#N

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works fine. Thanks!

Updated.


// Pick a random element from an array, excluding the current value.
func differentElement(values []string, currentValue string) string {
candidateValues := []string{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An unassigned (nil) slice variable is equivalent to an empty slice.

Suggested change
candidateValues := []string{}
var candidateValues []string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return result, expectedNamespace, expectedAsset
}

func protoMarshalOrPanic(v protoreflect.ProtoMessage) []byte {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func protoMarshalOrPanic(v protoreflect.ProtoMessage) []byte {
func protoMarshalOrPanic(v proto.Message) []byte {

Comment on lines +10 to +47
type Block struct {
block *common.Block
cachedTransactions []*Transaction
}

func ParseBlock(block *common.Block) *Block {
return &Block{block, nil}
}

func (b *Block) Number() uint64 {
return b.block.GetHeader().GetNumber()
}

func (b *Block) Transactions() ([]*Transaction, error) {
if b.cachedTransactions != nil {
return b.cachedTransactions, nil
}

funcName := "Transactions"
envelopes, err := b.unmarshalEnvelopesFromBlockData()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

payloads, err := b.parse(commonPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

b.cachedTransactions = b.createTransactionsFrom(payloads)

return b.cachedTransactions, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a simpler way to do the lazy unmarshal and caching of results by using the sync package from the standard library. This might be worth trying:

Suggested change
type Block struct {
block *common.Block
cachedTransactions []*Transaction
}
func ParseBlock(block *common.Block) *Block {
return &Block{block, nil}
}
func (b *Block) Number() uint64 {
return b.block.GetHeader().GetNumber()
}
func (b *Block) Transactions() ([]*Transaction, error) {
if b.cachedTransactions != nil {
return b.cachedTransactions, nil
}
funcName := "Transactions"
envelopes, err := b.unmarshalEnvelopesFromBlockData()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
payloads, err := b.parse(commonPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
b.cachedTransactions = b.createTransactionsFrom(payloads)
return b.cachedTransactions, nil
}
type Block struct {
block *common.Block
transactions func() ([]*Transaction, error)
}
func ParseBlock(block *common.Block) *Block {
result := &Block{
block: block,
}
result.transactions = sync.OnceValues(result.unmarshalTransactions)
return result
}
func (b *Block) Number() uint64 {
return b.block.GetHeader().GetNumber()
}
func (b *Block) Transactions() ([]*Transaction, error) {
return b.transactions()
}
func (b *Block) unmarshalTransactions() ([]*Transaction, error) {
envelopes, err := b.unmarshalEnvelopesFromBlockData()
if err != nil {
return nil, err
}
commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil {
return nil, err
}
payloads, err := b.parsePayloads(commonPayloads)
if err != nil {
return nil, err
}
return b.createTransactionsFrom(payloads), nil
}

return b.cachedTransactions, nil
}

func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it's a method on Block, it seems like repetition to say "FromBlockData". Perhaps keep it simple?

Suggested change
func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) {
func (b *Block) unmarshalEnvelopes() ([]*common.Envelope, error) {

return result, nil
}

func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Payload, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the Block isn't used this doesn't need to be method on Block. Maybe it can just be called unmarshalPayloads too. If you prefer more explanation then unmarshalPayloadsFromEnvelopes. Go doesn't have function/method overloading based on parameter types. The name must be unique.

Comment on lines +82 to +86
payload := parsePayload(commonPayload, int32(statusCode))
is, err := payload.isEndorserTransaction()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate that you're following the TypeScript implementation structure pretty closely here, but the error handling in Go makes this pretty ugly. Since we are always unmarshaling the payload's channel header anyway to check if it's an endorser transactions, how about the parsePayload does that work up-front? Then isEndorserTransaction can just return a bool and no error, which is much simpler.

Perhaps the TypeScript implementation should be doing this too, since the cache doesn't save any work, just adds complexity. That can be tackled another time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants