Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track malleated tx events #607

Merged
merged 7 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# The version of the generation template (required).
# The only currently-valid value is v1beta1.
version: v1beta1

# The plugins to run.
plugins:
# The name of the plugin.
- name: gogofaster
# The directory where the generated proto output will be written.
# The directory is relative to where the generation tool was run.
out: proto
# Set options to assign import paths to the well-known types
# and to enable service generation.
opt: Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/golang/protobuf/ptypes/duration,plugins=grpc,paths=source_relative
4 changes: 2 additions & 2 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,9 @@ func (mem *CListMempool) Update(
// https://github.com/tendermint/tendermint/issues/3322.
if e, ok := mem.txsMap.Load(TxKey(tx)); ok {
mem.removeTx(tx, e.(*clist.CElement), false)
// see if the transaction is a child transaction of a some parent
// see if the transaction is a malleated transaction of a some parent
// transaction that exists in the mempool
} else if parentHash, _, isChild := types.DecodeChildTx(tx); isChild {
} else if parentHash, _, isMalleated := types.UnwrapMalleatedTx(tx); isMalleated {
var parentKey [TxKeySize]byte
copy(parentKey[:], parentHash)
mem.RemoveTxByKey(parentKey, false)
Expand Down
18 changes: 9 additions & 9 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,22 @@ func TestMempoolUpdate(t *testing.T) {
require.NoError(t, err)
}

// 4. Removes a parent transaction after receiving a child transaction in the update
// 4. Removes a original transaction after receiving a malleated transaction in the update
{
mempool.Flush()
parentTx := []byte{1, 2, 3, 4}
childTx := []byte{1, 2}
parentHash := sha256.Sum256(parentTx)
originalTx := []byte{1, 2, 3, 4}
malleated := []byte{1, 2}
originalHash := sha256.Sum256(originalTx)

// create the wrapped child transaction
wTx, err := types.WrapChildTx(parentHash[:], childTx)
// create the wrapped malleated transaction
wTx, err := types.WrapMalleatedTx(originalHash[:], malleated)
require.NoError(t, err)

// add the parent transaction to the mempool
err = mempool.CheckTx(parentTx, nil, TxInfo{})
// add the original transaction to the mempool
err = mempool.CheckTx(originalTx, nil, TxInfo{})
require.NoError(t, err)

// remove the parent from the mempool using the wrapped child tx
// remove the original from the mempool using the wrapped malleated tx
err = mempool.Update(1, []types.Tx{wTx}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.NoError(t, err)

Expand Down
311 changes: 156 additions & 155 deletions proto/tendermint/types/types.pb.go

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions proto/tendermint/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,10 @@ message TxProof {
tendermint.crypto.Proof proof = 3;
}

// ChildTx wraps a transaction that was derived from a parent transaction. This
// allows for removal of the parent transaction from the mempool.
message ChildTx {
bytes parent_tx_hash = 1;
// MalleatedTx wraps a transaction that was derived from a different original
// transaction. This allows for tendermint to track malleated and original
// transactions
message MalleatedTx {
bytes original_tx_hash = 1;
bytes tx = 2;
}
10 changes: 9 additions & 1 deletion types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,19 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()

var txHash []byte
if originalHash, malleated, ismalleated := UnwrapMalleatedTx(data.Tx); ismalleated {
txHash = originalHash
data.Tx = malleated
} else {
txHash = Tx(data.Tx).Hash()
}

events := b.validateAndStringifyEvents(data.Result.Events, b.Logger.With("tx", data.Tx))

// add predefined compositeKeys
events[EventTypeKey] = append(events[EventTypeKey], EventTx)
events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", Tx(data.Tx).Hash()))
events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", txHash))
events[TxHeightKey] = append(events[TxHeightKey], fmt.Sprintf("%d", data.Height))

return b.pubsub.PublishWithEvents(ctx, data, events)
Expand Down
78 changes: 62 additions & 16 deletions types/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,59 @@ func TestEventBusPublishEventTx(t *testing.T) {
}
}

func TestEventBusPublishEventMalleatedTx(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})

tx := Tx("foo")
malleatedTx := Tx("foo-malleated")
wrappedMalleatedTx, err := WrapMalleatedTx(tx.Hash(), malleatedTx)
require.NoError(t, err)

result := abci.ResponseDeliverTx{
Data: []byte("bar"),
Events: []abci.Event{
{Type: "testType", Attributes: []abci.EventAttribute{{Key: []byte("baz"), Value: []byte("1")}}},
},
}

// PublishEventTx adds 3 composite keys, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND testType.baz=1", tx.Hash())
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)

done := make(chan struct{})
go func() {
msg := <-txsSub.Out()
edt := msg.Data().(EventDataTx)
assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index)
assert.EqualValues(t, malleatedTx, edt.Tx)
assert.Equal(t, result, edt.Result)
close(done)
}()

err = eventBus.PublishEventTx(EventDataTx{abci.TxResult{
Height: 1,
Index: 0,
Tx: wrappedMalleatedTx,
Result: result,
}})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a transaction after 1 sec.")
}
}

func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
Expand Down Expand Up @@ -93,28 +146,21 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)

done := make(chan struct{})
go func() {
msg := <-blocksSub.Out()
edt := msg.Data().(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}()

err = eventBus.PublishEventNewBlock(EventDataNewBlock{
Block: block,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block after 1 sec.")
}
done := make(chan struct{})
// go func() {
msg := <-blocksSub.Out()
edt := msg.Data().(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
assert.NoError(t, err)
}

func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
Expand Down
41 changes: 21 additions & 20 deletions types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,38 +161,39 @@ func ComputeProtoSizeForTxs(txs []Tx) int64 {
return int64(pdData.Size())
}

// DecodeChildTx attempts to unmarshal the provided transaction into a child
// transaction wrapper, if this an be done, then it returns true. A child
// transaction is a normal transaction that has been derived from a different
// parent transaction. The returned hash is that of the parent transaction,
// which allows us to remove the parent transaction from the mempool. NOTE:
// protobuf sometimes does not throw an error if the transaction passed is not
// a tmproto.ChildTx, since the schema for PayForMessage is kept in the app, we
// cannot perform further checks without creating an import cycle.
func DecodeChildTx(tx Tx) (hash []byte, unwrapped Tx, has bool) {
// attempt to unmarshal into a a child transaction
var childTx tmproto.ChildTx
err := proto.Unmarshal(tx, &childTx)
// UnwrapMalleatedTx attempts to unmarshal the provided transaction into a malleated
// transaction wrapper, if this an be done, then it returns true. A malleated
// transaction is a normal transaction that has been derived (malleated) from a
// different original transaction. The returned hash is that of the original
// transaction, which allows us to remove the original transaction from the
// mempool. NOTE: protobuf sometimes does not throw an error if the transaction
// passed is not a tmproto.MalleatedTx, since the schema for PayForMessage is kept
// in the app, we cannot perform further checks without creating an import
// cycle.
func UnwrapMalleatedTx(tx Tx) (originalHash []byte, unwrapped Tx, isMalleated bool) {
// attempt to unmarshal into a a malleated transaction
var malleatedTx tmproto.MalleatedTx
err := proto.Unmarshal(tx, &malleatedTx)
if err != nil {
return nil, nil, false
}
// this check will fail to catch unwanted types should those unmarshalled
// types happen to have a hash sized slice of bytes in the same field number
// as ParentTxHash. TODO(evan): either fix this, or better yet use a different
// as originalTxHash. TODO(evan): either fix this, or better yet use a different
// mechanism
if len(childTx.ParentTxHash) != tmhash.Size {
if len(malleatedTx.OriginalTxHash) != tmhash.Size {
return nil, nil, false
}
return childTx.ParentTxHash, childTx.Tx, true
return malleatedTx.OriginalTxHash, malleatedTx.Tx, true
}

// WrapChildTx creates a wrapped Tx that includes the parent transaction's hash
// WrapMalleatedTx creates a wrapped Tx that includes the original transaction's hash
// so that it can be easily removed from the mempool. note: must be unwrapped to
// be a viable sdk.Tx
func WrapChildTx(parentHash []byte, child Tx) (Tx, error) {
wTx := tmproto.ChildTx{
ParentTxHash: parentHash,
Tx: child,
func WrapMalleatedTx(originalHash []byte, malleated Tx) (Tx, error) {
wTx := tmproto.MalleatedTx{
OriginalTxHash: originalHash,
Tx: malleated,
}
return proto.Marshal(&wTx)
}
12 changes: 6 additions & 6 deletions types/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ func assertBadProof(t *testing.T, root []byte, bad []byte, good TxProof) {
}
}

func TestDecodeChildTx(t *testing.T) {
func TestUnwrapMalleatedTx(t *testing.T) {
// perform a simple test for being unable to decode a non
// child transaction
// malleated transaction
tx := Tx{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
_, _, ok := DecodeChildTx(tx)
_, _, ok := UnwrapMalleatedTx(tx)
require.False(t, ok)

// create a proto message that used to be decoded when it shouldn't have
Expand All @@ -181,15 +181,15 @@ func TestDecodeChildTx(t *testing.T) {

// due to protobuf not actually requiring type compatibility
// we need to make sure that there is some check
_, _, ok = DecodeChildTx(rawBlock)
_, _, ok = UnwrapMalleatedTx(rawBlock)
require.False(t, ok)

pHash := sha256.Sum256(rawBlock)
childTx, err := WrapChildTx(pHash[:], rawBlock)
MalleatedTx, err := WrapMalleatedTx(pHash[:], rawBlock)
require.NoError(t, err)

// finally, ensure that the unwrapped bytes are identical to the input
unwrappedHash, unwrapped, ok := DecodeChildTx(childTx)
unwrappedHash, unwrapped, ok := UnwrapMalleatedTx(MalleatedTx)
require.True(t, ok)
assert.Equal(t, 32, len(unwrappedHash))
require.Equal(t, rawBlock, []byte(unwrapped))
Expand Down