Skip to content

Commit

Permalink
Merge branch 'main' of github.com:hyperledger/firefly-transaction-man…
Browse files Browse the repository at this point in the history
…ager into pass-in-action-occurred-time

Signed-off-by: Chengxuan Xing <[email protected]>
  • Loading branch information
Chengxuan committed Apr 26, 2024
2 parents cd9755d + af4a223 commit cc68a9c
Show file tree
Hide file tree
Showing 15 changed files with 225 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ coverage.html:
$(VGO) tool cover -html=coverage.txt
coverage: test coverage.html
lint:
$(VGO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.53.2
$(VGO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2
GOGC=20 $(LINT) run -v --timeout 5m

${MOCKERY}:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS transactions_status;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX transactions_status ON transactions(status);
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hyperledger/firefly-common v1.4.2
github.com/hyperledger/firefly-common v1.4.6
github.com/lib/pq v1.10.9
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hyperledger/firefly-common v1.4.2 h1:sBbiTFWDu1qCnXFA6JobasJl4AXphCAUZU/R4nyWPdE=
github.com/hyperledger/firefly-common v1.4.2/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
github.com/hyperledger/firefly-common v1.4.6 h1:qqXoSaRml3WjUnWcWxrrXs5AIOWa+UcMXLCF8yEa4Pk=
github.com/hyperledger/firefly-common v1.4.6/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
Expand Down
7 changes: 5 additions & 2 deletions internal/persistence/postgres/eventstreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) {
var eventStreams []*apitypes.EventStream
for i := 0; i < 20; i++ {
es := &apitypes.EventStream{
ID: fftypes.NewUUID(),
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
ID: fftypes.NewUUID(),
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
BatchTimeout: ffDurationPtr(22222 * time.Second),
RetryTimeout: ffDurationPtr(33333 * time.Second),
BlockedRetryDelay: ffDurationPtr(44444 * time.Second),
}
err := p.WriteStream(ctx, es)
assert.NoError(t, err)
Expand Down
30 changes: 25 additions & 5 deletions internal/persistence/postgres/transaction_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -40,6 +40,7 @@ type transactionOperation struct {
sentConflict bool
done chan error

opID string
isShutdown bool
txInsert *apitypes.ManagedTX
noncePreAssigned bool
Expand Down Expand Up @@ -77,6 +78,7 @@ type transactionWriter struct {

type transactionWriterBatch struct {
id string
opened time.Time
ops []*transactionOperation
timeoutContext context.Context
timeoutCancel func()
Expand Down Expand Up @@ -122,6 +124,7 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config.

func newTransactionOperation(txID string) *transactionOperation {
return &transactionOperation{
opID: fftypes.ShortID(),
txID: txID,
done: make(chan error, 1), // 1 slot to ensure we don't block the writer
}
Expand All @@ -130,6 +133,7 @@ func newTransactionOperation(txID string) *transactionOperation {
func (op *transactionOperation) flush(ctx context.Context) error {
select {
case err := <-op.done:
log.L(ctx).Debugf("Flushed write operation %s (err=%v)", op.opID, err)
return err
case <-ctx.Done():
return i18n.NewError(ctx, i18n.MsgContextCanceled)
Expand Down Expand Up @@ -165,6 +169,7 @@ func (tw *transactionWriter) queue(ctx context.Context, op *transactionOperation
h := fnv.New32a() // simple non-cryptographic hash algo
_, _ = h.Write([]byte(hashKey))
routine := h.Sum32() % tw.workerCount
log.L(ctx).Debugf("Queuing write operation %s to worker tx_writer_%.4d", op.opID, routine)
select {
case tw.workQueues[routine] <- op: // it's queued
case <-ctx.Done(): // timeout of caller context
Expand All @@ -180,6 +185,7 @@ func (tw *transactionWriter) worker(i int) {
defer close(tw.workersDone[i])
workerID := fmt.Sprintf("tx_writer_%.4d", i)
ctx := log.WithLogField(tw.bgCtx, "job", workerID)
l := log.L(ctx)
var batch *transactionWriterBatch
batchCount := 0
workQueue := tw.workQueues[i]
Expand All @@ -202,24 +208,27 @@ func (tw *transactionWriter) worker(i int) {
}
if batch == nil {
batch = &transactionWriterBatch{
id: fmt.Sprintf("%.4d_%.9d", i, batchCount),
id: fmt.Sprintf("%.4d_%.9d", i, batchCount),
opened: time.Now(),
}
batch.timeoutContext, batch.timeoutCancel = context.WithTimeout(ctx, tw.batchTimeout)
batchCount++
}
batch.ops = append(batch.ops, op)
l.Debugf("Added write operation %s to batch %s (len=%d)", op.opID, batch.id, len(batch.ops))
case <-timeoutContext.Done():
timedOut = true
select {
case <-ctx.Done():
log.L(ctx).Debugf("Transaction writer ending")
l.Debugf("Transaction writer ending")
return
default:
}
}

if batch != nil && (timedOut || (len(batch.ops) >= tw.batchMaxSize)) {
batch.timeoutCancel()
l.Debugf("Running batch %s (len=%d,timeout=%t,age=%dms)", batch.id, len(batch.ops), timedOut, time.Since(batch.opened).Milliseconds())
tw.runBatch(ctx, batch)
batch = nil
}
Expand Down Expand Up @@ -383,6 +392,7 @@ func (tw *transactionWriter) preInsertIdempotencyCheck(ctx context.Context, b *t
txOp.sentConflict = true
txOp.done <- i18n.NewError(ctx, tmmsgs.MsgDuplicateID, txOp.txID)
} else {
log.L(ctx).Debugf("Adding TX %s from write operation %s to insert idx=%d", txOp.txID, txOp.opID, len(validInserts))
validInserts = append(validInserts, txOp.txInsert)
}
}
Expand Down Expand Up @@ -413,9 +423,19 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction
}
}
// Do all the transaction updates
mergedUpdates := make(map[string]*apitypes.TXUpdates)
for _, op := range b.txUpdates {
if err := tw.p.updateTransaction(ctx, op.txID, op.txUpdate); err != nil {
log.L(ctx).Errorf("Update transaction %s failed: %s", op.txID, err)
update, merge := mergedUpdates[op.txID]
if merge {
update.Merge(op.txUpdate)
} else {
mergedUpdates[op.txID] = op.txUpdate
}
log.L(ctx).Debugf("Updating transaction %s in write operation %s (merged=%t)", op.txID, op.opID, merge)
}
for txID, update := range mergedUpdates {
if err := tw.p.updateTransaction(ctx, txID, update); err != nil {
log.L(ctx).Errorf("Update transaction %s failed: %s", txID, err)
return err
}
}
Expand Down
73 changes: 73 additions & 0 deletions internal/persistence/postgres/transaction_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -275,6 +276,48 @@ func TestExecuteBatchOpsUpdateTXFail(t *testing.T) {
assert.NoError(t, mdb.ExpectationsWereMet())
}

func TestExecuteBatchOpsUpdateTXMerge(t *testing.T) {
logrus.SetLevel(logrus.TraceLevel)

ctx, p, mdb, done := newMockSQLPersistence(t)
defer done()

mdb.ExpectBegin()
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
mdb.ExpectCommit()

err := p.db.RunAsGroup(ctx, func(ctx context.Context) error {
return p.writer.executeBatchOps(ctx, &transactionWriterBatch{
txUpdates: []*transactionOperation{
{
txID: "11111",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusPending),
From: strPtr("0xaaaaa"),
},
},
{
txID: "22222",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusPending),
},
},
{
txID: "11111",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusSucceeded),
TransactionHash: strPtr("0xaabbcc"),
},
},
},
})
})
assert.NoError(t, err)

assert.NoError(t, mdb.ExpectationsWereMet())
}

func TestExecuteBatchOpsUpsertReceiptFail(t *testing.T) {
ctx, p, mdb, done := newMockSQLPersistence(t)
defer done()
Expand Down Expand Up @@ -455,3 +498,33 @@ func TestQueueClosedContext(t *testing.T) {
p.writer.queue(closedCtx, newTransactionOperation("tx1"))

}

func TestStopDoneWorker(t *testing.T) {
tw := &transactionWriter{
workersDone: []chan struct{}{
make(chan struct{}),
},
}
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
close(tw.workersDone[0])
tw.stop()
}

func TestStopDoneCtx(t *testing.T) {
tw := &transactionWriter{
workersDone: []chan struct{}{
make(chan struct{}, 1),
},
}
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
tw.cancelCtx()
go func() {
time.Sleep(10 * time.Millisecond)
tw.workersDone[0] <- struct{}{}
}()
tw.stop()
}

func ptrTo[T any](v T) *T {
return &v
}
47 changes: 46 additions & 1 deletion pkg/apitypes/managed_tx.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -208,6 +208,51 @@ type TXUpdates struct {
ErrorMessage *string `json:"errorMessage,omitempty"`
}

func (txu *TXUpdates) Merge(txu2 *TXUpdates) {
if txu2.Status != nil {
txu.Status = txu2.Status
}
if txu2.DeleteRequested != nil {
txu.DeleteRequested = txu2.DeleteRequested
}
if txu2.From != nil {
txu.From = txu2.From
}
if txu2.To != nil {
txu.To = txu2.To
}
if txu2.Nonce != nil {
txu.Nonce = txu2.Nonce
}
if txu2.Gas != nil {
txu.Gas = txu2.Gas
}
if txu2.Value != nil {
txu.Value = txu2.Value
}
if txu2.GasPrice != nil {
txu.GasPrice = txu2.GasPrice
}
if txu2.TransactionData != nil {
txu.TransactionData = txu2.TransactionData
}
if txu2.TransactionHash != nil {
txu.TransactionHash = txu2.TransactionHash
}
if txu2.PolicyInfo != nil {
txu.PolicyInfo = txu2.PolicyInfo
}
if txu2.FirstSubmit != nil {
txu.FirstSubmit = txu2.FirstSubmit
}
if txu2.LastSubmit != nil {
txu.LastSubmit = txu2.LastSubmit
}
if txu2.ErrorMessage != nil {
txu.ErrorMessage = txu2.ErrorMessage
}
}

// TXWithStatus is a convenience object that fetches all data about a transaction into one
// large JSON payload (with limits on certain parts, such as the history entries).
// Note that in LevelDB persistence this is the stored form of the single document object.
Expand Down
28 changes: 28 additions & 0 deletions pkg/apitypes/managed_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,31 @@ func TestReceiptRecord(t *testing.T) {
r.SetUpdated(t2)
assert.Equal(t, t2, r.Updated)
}

func TestTXUpdatesMerge(t *testing.T) {
txu := &TXUpdates{}
txu2 := &TXUpdates{
Status: ptrTo(TxStatusPending),
DeleteRequested: fftypes.Now(),
From: ptrTo("1111"),
To: ptrTo("2222"),
Nonce: fftypes.NewFFBigInt(3333),
Gas: fftypes.NewFFBigInt(4444),
Value: fftypes.NewFFBigInt(5555),
GasPrice: fftypes.JSONAnyPtr(`{"some": "stuff"}`),
TransactionData: ptrTo("xxxx"),
TransactionHash: ptrTo("yyyy"),
PolicyInfo: fftypes.JSONAnyPtr(`{"more": "stuff"}`),
FirstSubmit: fftypes.Now(),
LastSubmit: fftypes.Now(),
ErrorMessage: ptrTo("pop"),
}
txu.Merge(txu2)
assert.Equal(t, *txu2, *txu)
txu.Merge(&TXUpdates{})
assert.Equal(t, *txu2, *txu)
}

func ptrTo[T any](v T) *T {
return &v
}
3 changes: 2 additions & 1 deletion pkg/apitypes/query_request.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -24,6 +24,7 @@ import (
type QueryRequest struct {
Headers RequestHeaders `json:"headers"`
ffcapi.TransactionInput
BlockNumber *string `json:"blockNumber,omitempty"`
}

// QueryResponse is the response payload for a query
Expand Down
4 changes: 2 additions & 2 deletions pkg/ffcapi/method_call.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -28,7 +28,7 @@ import (
// detected by the back-end connector.
type QueryInvokeRequest struct {
TransactionInput
BlockNumber *fftypes.FFBigInt `json:"blockNumber,omitempty"`
BlockNumber *string `json:"blockNumber,omitempty"`
}

type QueryInvokeResponse struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/fftm/route__root_command.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -101,6 +101,7 @@ var postRootCommand = func(m *manager) *ffapi.Route {
}
res, _, err := m.connector.QueryInvoke(r.Req.Context(), &ffcapi.QueryInvokeRequest{
TransactionInput: tReq.TransactionInput,
BlockNumber: tReq.BlockNumber,
})
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit cc68a9c

Please sign in to comment.