Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track malleated tx events #607

Merged
merged 7 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (mem *CListMempool) Update(
mem.removeTx(tx, e.(*clist.CElement), false)
// see if the transaction is a child transaction of a some parent
// transaction that exists in the mempool
} else if parentHash, _, isChild := types.DecodeChildTx(tx); isChild {
} else if parentHash, _, isChild := types.UnwrapChildTx(tx); isChild {
var parentKey [TxKeySize]byte
copy(parentKey[:], parentHash)
mem.RemoveTxByKey(parentKey, false)
Expand Down
10 changes: 9 additions & 1 deletion types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,19 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()

var txHash []byte
if parentHash, childTx, isChildTx := UnwrapChildTx(data.Tx); isChildTx {
txHash = parentHash
data.Tx = childTx
} else {
txHash = Tx(data.Tx).Hash()
}

events := b.validateAndStringifyEvents(data.Result.Events, b.Logger.With("tx", data.Tx))

// add predefined compositeKeys
events[EventTypeKey] = append(events[EventTypeKey], EventTx)
events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", Tx(data.Tx).Hash()))
events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", txHash))
events[TxHeightKey] = append(events[TxHeightKey], fmt.Sprintf("%d", data.Height))

return b.pubsub.PublishWithEvents(ctx, data, events)
Expand Down
53 changes: 53 additions & 0 deletions types/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,59 @@ func TestEventBusPublishEventTx(t *testing.T) {
}
}

func TestEventBusPublishEventMalleatedTx(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})

tx := Tx("foo")
childTx := Tx("foo-malleated")
wrappedChildTx, err := WrapChildTx(tx.Hash(), childTx)
require.NoError(t, err)

result := abci.ResponseDeliverTx{
Data: []byte("bar"),
Events: []abci.Event{
{Type: "testType", Attributes: []abci.EventAttribute{{Key: []byte("baz"), Value: []byte("1")}}},
},
}

// PublishEventTx adds 3 composite keys, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND testType.baz=1", tx.Hash())
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)

done := make(chan struct{})
go func() {
msg := <-txsSub.Out()
edt := msg.Data().(EventDataTx)
assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index)
assert.EqualValues(t, childTx, edt.Tx)
assert.Equal(t, result, edt.Result)
close(done)
}()

err = eventBus.PublishEventTx(EventDataTx{abci.TxResult{
Height: 1,
Index: 0,
Tx: wrappedChildTx,
Result: result,
}})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a transaction after 1 sec.")
}
}

func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
Expand Down
17 changes: 9 additions & 8 deletions types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,16 @@ func ComputeProtoSizeForTxs(txs []Tx) int64 {
return int64(pdData.Size())
}

// DecodeChildTx attempts to unmarshal the provided transaction into a child
// UnwrapChildTx attempts to unmarshal the provided transaction into a child
// transaction wrapper, if this an be done, then it returns true. A child
// transaction is a normal transaction that has been derived from a different
// parent transaction. The returned hash is that of the parent transaction,
// which allows us to remove the parent transaction from the mempool. NOTE:
// protobuf sometimes does not throw an error if the transaction passed is not
// a tmproto.ChildTx, since the schema for PayForMessage is kept in the app, we
// cannot perform further checks without creating an import cycle.
func DecodeChildTx(tx Tx) (hash []byte, unwrapped Tx, has bool) {
// transaction is a normal transaction that has been derived (malleated) from a
// different parent transaction. The returned hash is that of the parent
// transaction, which allows us to remove the parent transaction from the
// mempool. NOTE: protobuf sometimes does not throw an error if the transaction
// passed is not a tmproto.ChildTx, since the schema for PayForMessage is kept
// in the app, we cannot perform further checks without creating an import
// cycle.
func UnwrapChildTx(tx Tx) (parentHash []byte, unwrapped Tx, isChild bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe orthogonal to this PR, but semantically should this not return an error instead of a Boolean?

// attempt to unmarshal into a a child transaction
var childTx tmproto.ChildTx
err := proto.Unmarshal(tx, &childTx)
Expand Down
8 changes: 4 additions & 4 deletions types/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ func assertBadProof(t *testing.T, root []byte, bad []byte, good TxProof) {
}
}

func TestDecodeChildTx(t *testing.T) {
func TestUnwrapChildTx(t *testing.T) {
// perform a simple test for being unable to decode a non
// child transaction
tx := Tx{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
_, _, ok := DecodeChildTx(tx)
_, _, ok := UnwrapChildTx(tx)
require.False(t, ok)

// create a proto message that used to be decoded when it shouldn't have
Expand All @@ -181,15 +181,15 @@ func TestDecodeChildTx(t *testing.T) {

// due to protobuf not actually requiring type compatibility
// we need to make sure that there is some check
_, _, ok = DecodeChildTx(rawBlock)
_, _, ok = UnwrapChildTx(rawBlock)
require.False(t, ok)

pHash := sha256.Sum256(rawBlock)
childTx, err := WrapChildTx(pHash[:], rawBlock)
require.NoError(t, err)

// finally, ensure that the unwrapped bytes are identical to the input
unwrappedHash, unwrapped, ok := DecodeChildTx(childTx)
unwrappedHash, unwrapped, ok := UnwrapChildTx(childTx)
require.True(t, ok)
assert.Equal(t, 32, len(unwrappedHash))
require.Equal(t, rawBlock, []byte(unwrapped))
Expand Down