diff --git a/Makefile b/Makefile index 4d33299c..d30017b8 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ coverage.html: $(VGO) tool cover -html=coverage.txt coverage: test coverage.html lint: - $(VGO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.53.2 + $(VGO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 GOGC=20 $(LINT) run -v --timeout 5m ${MOCKERY}: diff --git a/db/migrations/postgres/000009_add_transactions_status_index.down.sql b/db/migrations/postgres/000009_add_transactions_status_index.down.sql new file mode 100644 index 00000000..9651d311 --- /dev/null +++ b/db/migrations/postgres/000009_add_transactions_status_index.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS transactions_status; \ No newline at end of file diff --git a/db/migrations/postgres/000009_add_transactions_status_index.up.sql b/db/migrations/postgres/000009_add_transactions_status_index.up.sql new file mode 100644 index 00000000..4c57b567 --- /dev/null +++ b/db/migrations/postgres/000009_add_transactions_status_index.up.sql @@ -0,0 +1 @@ +CREATE INDEX transactions_status ON transactions(status); \ No newline at end of file diff --git a/go.mod b/go.mod index 776e43e6..cfb6f7e8 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.1 github.com/hashicorp/golang-lru/v2 v2.0.7 - github.com/hyperledger/firefly-common v1.4.2 + github.com/hyperledger/firefly-common v1.4.6 github.com/lib/pq v1.10.9 github.com/oklog/ulid/v2 v2.1.0 github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index bf26b38a..73da180a 100644 --- a/go.sum +++ b/go.sum @@ -90,8 +90,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= -github.com/hyperledger/firefly-common v1.4.2 h1:sBbiTFWDu1qCnXFA6JobasJl4AXphCAUZU/R4nyWPdE= -github.com/hyperledger/firefly-common v1.4.2/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE= +github.com/hyperledger/firefly-common v1.4.6 h1:qqXoSaRml3WjUnWcWxrrXs5AIOWa+UcMXLCF8yEa4Pk= +github.com/hyperledger/firefly-common v1.4.6/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= diff --git a/internal/persistence/postgres/eventstreams_test.go b/internal/persistence/postgres/eventstreams_test.go index a95b19a8..c5ac39aa 100644 --- a/internal/persistence/postgres/eventstreams_test.go +++ b/internal/persistence/postgres/eventstreams_test.go @@ -108,8 +108,11 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) { var eventStreams []*apitypes.EventStream for i := 0; i < 20; i++ { es := &apitypes.EventStream{ - ID: fftypes.NewUUID(), - Name: strPtr(fmt.Sprintf("es_%.3d", i)), + ID: fftypes.NewUUID(), + Name: strPtr(fmt.Sprintf("es_%.3d", i)), + BatchTimeout: ffDurationPtr(22222 * time.Second), + RetryTimeout: ffDurationPtr(33333 * time.Second), + BlockedRetryDelay: ffDurationPtr(44444 * time.Second), } err := p.WriteStream(ctx, es) assert.NoError(t, err) diff --git a/internal/persistence/postgres/transaction_writer.go b/internal/persistence/postgres/transaction_writer.go index 27f7d95a..0099671d 100644 --- a/internal/persistence/postgres/transaction_writer.go +++ b/internal/persistence/postgres/transaction_writer.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -40,6 +40,7 @@ type transactionOperation struct { sentConflict bool done chan error + opID string isShutdown bool txInsert *apitypes.ManagedTX noncePreAssigned bool @@ -77,6 +78,7 @@ type transactionWriter struct { type transactionWriterBatch struct { id string + opened time.Time ops []*transactionOperation timeoutContext context.Context timeoutCancel func() @@ -122,6 +124,7 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config. func newTransactionOperation(txID string) *transactionOperation { return &transactionOperation{ + opID: fftypes.ShortID(), txID: txID, done: make(chan error, 1), // 1 slot to ensure we don't block the writer } @@ -130,6 +133,7 @@ func newTransactionOperation(txID string) *transactionOperation { func (op *transactionOperation) flush(ctx context.Context) error { select { case err := <-op.done: + log.L(ctx).Debugf("Flushed write operation %s (err=%v)", op.opID, err) return err case <-ctx.Done(): return i18n.NewError(ctx, i18n.MsgContextCanceled) @@ -165,6 +169,7 @@ func (tw *transactionWriter) queue(ctx context.Context, op *transactionOperation h := fnv.New32a() // simple non-cryptographic hash algo _, _ = h.Write([]byte(hashKey)) routine := h.Sum32() % tw.workerCount + log.L(ctx).Debugf("Queuing write operation %s to worker tx_writer_%.4d", op.opID, routine) select { case tw.workQueues[routine] <- op: // it's queued case <-ctx.Done(): // timeout of caller context @@ -180,6 +185,7 @@ func (tw *transactionWriter) worker(i int) { defer close(tw.workersDone[i]) workerID := fmt.Sprintf("tx_writer_%.4d", i) ctx := log.WithLogField(tw.bgCtx, "job", workerID) + l := log.L(ctx) var batch *transactionWriterBatch batchCount := 0 workQueue := tw.workQueues[i] @@ -202,17 +208,19 @@ func (tw *transactionWriter) worker(i int) { } if batch == nil { batch = &transactionWriterBatch{ - id: fmt.Sprintf("%.4d_%.9d", i, batchCount), + id: fmt.Sprintf("%.4d_%.9d", i, batchCount), + opened: time.Now(), } batch.timeoutContext, batch.timeoutCancel = context.WithTimeout(ctx, tw.batchTimeout) batchCount++ } batch.ops = append(batch.ops, op) + l.Debugf("Added write operation %s to batch %s (len=%d)", op.opID, batch.id, len(batch.ops)) case <-timeoutContext.Done(): timedOut = true select { case <-ctx.Done(): - log.L(ctx).Debugf("Transaction writer ending") + l.Debugf("Transaction writer ending") return default: } @@ -220,6 +228,7 @@ func (tw *transactionWriter) worker(i int) { if batch != nil && (timedOut || (len(batch.ops) >= tw.batchMaxSize)) { batch.timeoutCancel() + l.Debugf("Running batch %s (len=%d,timeout=%t,age=%dms)", batch.id, len(batch.ops), timedOut, time.Since(batch.opened).Milliseconds()) tw.runBatch(ctx, batch) batch = nil } @@ -383,6 +392,7 @@ func (tw *transactionWriter) preInsertIdempotencyCheck(ctx context.Context, b *t txOp.sentConflict = true txOp.done <- i18n.NewError(ctx, tmmsgs.MsgDuplicateID, txOp.txID) } else { + log.L(ctx).Debugf("Adding TX %s from write operation %s to insert idx=%d", txOp.txID, txOp.opID, len(validInserts)) validInserts = append(validInserts, txOp.txInsert) } } @@ -413,9 +423,19 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction } } // Do all the transaction updates + mergedUpdates := make(map[string]*apitypes.TXUpdates) for _, op := range b.txUpdates { - if err := tw.p.updateTransaction(ctx, op.txID, op.txUpdate); err != nil { - log.L(ctx).Errorf("Update transaction %s failed: %s", op.txID, err) + update, merge := mergedUpdates[op.txID] + if merge { + update.Merge(op.txUpdate) + } else { + mergedUpdates[op.txID] = op.txUpdate + } + log.L(ctx).Debugf("Updating transaction %s in write operation %s (merged=%t)", op.txID, op.opID, merge) + } + for txID, update := range mergedUpdates { + if err := tw.p.updateTransaction(ctx, txID, update); err != nil { + log.L(ctx).Errorf("Update transaction %s failed: %s", txID, err) return err } } diff --git a/internal/persistence/postgres/transaction_writer_test.go b/internal/persistence/postgres/transaction_writer_test.go index 1e42cc1b..61856bef 100644 --- a/internal/persistence/postgres/transaction_writer_test.go +++ b/internal/persistence/postgres/transaction_writer_test.go @@ -27,6 +27,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) @@ -275,6 +276,48 @@ func TestExecuteBatchOpsUpdateTXFail(t *testing.T) { assert.NoError(t, mdb.ExpectationsWereMet()) } +func TestExecuteBatchOpsUpdateTXMerge(t *testing.T) { + logrus.SetLevel(logrus.TraceLevel) + + ctx, p, mdb, done := newMockSQLPersistence(t) + defer done() + + mdb.ExpectBegin() + mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1)) + mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1)) + mdb.ExpectCommit() + + err := p.db.RunAsGroup(ctx, func(ctx context.Context) error { + return p.writer.executeBatchOps(ctx, &transactionWriterBatch{ + txUpdates: []*transactionOperation{ + { + txID: "11111", + txUpdate: &apitypes.TXUpdates{ + Status: ptrTo(apitypes.TxStatusPending), + From: strPtr("0xaaaaa"), + }, + }, + { + txID: "22222", + txUpdate: &apitypes.TXUpdates{ + Status: ptrTo(apitypes.TxStatusPending), + }, + }, + { + txID: "11111", + txUpdate: &apitypes.TXUpdates{ + Status: ptrTo(apitypes.TxStatusSucceeded), + TransactionHash: strPtr("0xaabbcc"), + }, + }, + }, + }) + }) + assert.NoError(t, err) + + assert.NoError(t, mdb.ExpectationsWereMet()) +} + func TestExecuteBatchOpsUpsertReceiptFail(t *testing.T) { ctx, p, mdb, done := newMockSQLPersistence(t) defer done() @@ -455,3 +498,33 @@ func TestQueueClosedContext(t *testing.T) { p.writer.queue(closedCtx, newTransactionOperation("tx1")) } + +func TestStopDoneWorker(t *testing.T) { + tw := &transactionWriter{ + workersDone: []chan struct{}{ + make(chan struct{}), + }, + } + tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background()) + close(tw.workersDone[0]) + tw.stop() +} + +func TestStopDoneCtx(t *testing.T) { + tw := &transactionWriter{ + workersDone: []chan struct{}{ + make(chan struct{}, 1), + }, + } + tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background()) + tw.cancelCtx() + go func() { + time.Sleep(10 * time.Millisecond) + tw.workersDone[0] <- struct{}{} + }() + tw.stop() +} + +func ptrTo[T any](v T) *T { + return &v +} diff --git a/pkg/apitypes/managed_tx.go b/pkg/apitypes/managed_tx.go index cdc13080..f4c7a40b 100644 --- a/pkg/apitypes/managed_tx.go +++ b/pkg/apitypes/managed_tx.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -208,6 +208,51 @@ type TXUpdates struct { ErrorMessage *string `json:"errorMessage,omitempty"` } +func (txu *TXUpdates) Merge(txu2 *TXUpdates) { + if txu2.Status != nil { + txu.Status = txu2.Status + } + if txu2.DeleteRequested != nil { + txu.DeleteRequested = txu2.DeleteRequested + } + if txu2.From != nil { + txu.From = txu2.From + } + if txu2.To != nil { + txu.To = txu2.To + } + if txu2.Nonce != nil { + txu.Nonce = txu2.Nonce + } + if txu2.Gas != nil { + txu.Gas = txu2.Gas + } + if txu2.Value != nil { + txu.Value = txu2.Value + } + if txu2.GasPrice != nil { + txu.GasPrice = txu2.GasPrice + } + if txu2.TransactionData != nil { + txu.TransactionData = txu2.TransactionData + } + if txu2.TransactionHash != nil { + txu.TransactionHash = txu2.TransactionHash + } + if txu2.PolicyInfo != nil { + txu.PolicyInfo = txu2.PolicyInfo + } + if txu2.FirstSubmit != nil { + txu.FirstSubmit = txu2.FirstSubmit + } + if txu2.LastSubmit != nil { + txu.LastSubmit = txu2.LastSubmit + } + if txu2.ErrorMessage != nil { + txu.ErrorMessage = txu2.ErrorMessage + } +} + // TXWithStatus is a convenience object that fetches all data about a transaction into one // large JSON payload (with limits on certain parts, such as the history entries). // Note that in LevelDB persistence this is the stored form of the single document object. diff --git a/pkg/apitypes/managed_tx_test.go b/pkg/apitypes/managed_tx_test.go index 06d63933..6eb84f29 100644 --- a/pkg/apitypes/managed_tx_test.go +++ b/pkg/apitypes/managed_tx_test.go @@ -85,3 +85,31 @@ func TestReceiptRecord(t *testing.T) { r.SetUpdated(t2) assert.Equal(t, t2, r.Updated) } + +func TestTXUpdatesMerge(t *testing.T) { + txu := &TXUpdates{} + txu2 := &TXUpdates{ + Status: ptrTo(TxStatusPending), + DeleteRequested: fftypes.Now(), + From: ptrTo("1111"), + To: ptrTo("2222"), + Nonce: fftypes.NewFFBigInt(3333), + Gas: fftypes.NewFFBigInt(4444), + Value: fftypes.NewFFBigInt(5555), + GasPrice: fftypes.JSONAnyPtr(`{"some": "stuff"}`), + TransactionData: ptrTo("xxxx"), + TransactionHash: ptrTo("yyyy"), + PolicyInfo: fftypes.JSONAnyPtr(`{"more": "stuff"}`), + FirstSubmit: fftypes.Now(), + LastSubmit: fftypes.Now(), + ErrorMessage: ptrTo("pop"), + } + txu.Merge(txu2) + assert.Equal(t, *txu2, *txu) + txu.Merge(&TXUpdates{}) + assert.Equal(t, *txu2, *txu) +} + +func ptrTo[T any](v T) *T { + return &v +} diff --git a/pkg/apitypes/query_request.go b/pkg/apitypes/query_request.go index 50668df8..79f7a9b3 100644 --- a/pkg/apitypes/query_request.go +++ b/pkg/apitypes/query_request.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,6 +24,7 @@ import ( type QueryRequest struct { Headers RequestHeaders `json:"headers"` ffcapi.TransactionInput + BlockNumber *string `json:"blockNumber,omitempty"` } // QueryResponse is the response payload for a query diff --git a/pkg/ffcapi/method_call.go b/pkg/ffcapi/method_call.go index 613496f3..9c61d488 100644 --- a/pkg/ffcapi/method_call.go +++ b/pkg/ffcapi/method_call.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -28,7 +28,7 @@ import ( // detected by the back-end connector. type QueryInvokeRequest struct { TransactionInput - BlockNumber *fftypes.FFBigInt `json:"blockNumber,omitempty"` + BlockNumber *string `json:"blockNumber,omitempty"` } type QueryInvokeResponse struct { diff --git a/pkg/fftm/route__root_command.go b/pkg/fftm/route__root_command.go index cb8407aa..81af0eec 100644 --- a/pkg/fftm/route__root_command.go +++ b/pkg/fftm/route__root_command.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -101,6 +101,7 @@ var postRootCommand = func(m *manager) *ffapi.Route { } res, _, err := m.connector.QueryInvoke(r.Req.Context(), &ffcapi.QueryInvokeRequest{ TransactionInput: tReq.TransactionInput, + BlockNumber: tReq.BlockNumber, }) if err != nil { return nil, err diff --git a/pkg/txhandler/simple/policyloop.go b/pkg/txhandler/simple/policyloop.go index 46e533c5..d0c692c2 100644 --- a/pkg/txhandler/simple/policyloop.go +++ b/pkg/txhandler/simple/policyloop.go @@ -103,6 +103,8 @@ func (sth *simpleTransactionHandler) markInflightUpdate() { } func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool { + sth.inflightRWMux.Lock() + defer sth.inflightRWMux.Unlock() oldInflight := sth.inflight sth.inflight = make([]*pendingState, 0, len(oldInflight)) @@ -118,6 +120,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool // If we are not at maximum, then query if there are more candidates now spaces := sth.maxInFlight - len(sth.inflight) + log.L(sth.ctx).Tracef("Number of spaces left '%v'", spaces) if spaces > 0 { var after string if len(sth.inflight) > 0 { @@ -152,7 +155,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool } newLen := len(sth.inflight) if newLen > 0 { - log.L(ctx).Debugf("Inflight set updated len=%d head-seq=%s tail-seq=%s old-tail=%s", len(sth.inflight), sth.inflight[0].mtx.SequenceID, sth.inflight[newLen-1].mtx.SequenceID, after) + log.L(ctx).Debugf("Inflight set updated with %d additional transactions, length is now %d head-id:%s head-seq=%s tail-id:%s tail-seq=%s old-tail=%s", len(additional), len(sth.inflight), sth.inflight[0].mtx.ID, sth.inflight[0].mtx.SequenceID, sth.inflight[newLen-1].mtx.ID, sth.inflight[newLen-1].mtx.SequenceID, after) } } sth.setTransactionInflightQueueMetrics(ctx) @@ -161,6 +164,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool } func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflightStale bool) { + log.L(ctx).Tracef("policyLoopCycle triggered inflightStatle=%v", inflightStale) // Process any synchronous commands first - these might not be in our inflight set sth.processPolicyAPIRequests(ctx) @@ -170,9 +174,12 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig return } } - // Go through executing the policy engine against them + sth.inflightRWMux.RLock() + defer sth.inflightRWMux.RUnlock() + // Go through executing the policy engine against them for _, pending := range sth.inflight { + log.L(ctx).Tracef("Executing policy against tx-id=%v", pending.mtx.ID) err := sth.execPolicy(ctx, pending, nil) if err != nil { log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err) @@ -204,13 +211,17 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex for _, request := range requests { var pending *pendingState + + sth.inflightRWMux.RLock() // If this transaction is in-flight, we use that record for _, inflight := range sth.inflight { - if inflight.mtx.ID == request.txID { + if inflight != nil && inflight.mtx != nil && inflight.mtx.ID == request.txID { pending = inflight break } } + sth.inflightRWMux.RUnlock() + // If this transaction is in-flight, we use that record if pending == nil { mtx, err := sth.getTransactionByID(ctx, request.txID) if err != nil { @@ -246,7 +257,9 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (ctx *RunContext, err error) { // Take a snapshot of the pending state under the lock - sth.mux.Lock() + pending.mux.Lock() + defer pending.mux.Unlock() + mtx := pending.mtx ctx = &RunContext{ Context: baseCtx, @@ -267,7 +280,6 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context ctx.UpdateType = Update // might change to delete later ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested } - sth.mux.Unlock() // Process any state updates that were queued to us from notifications from the confirmation manager if receiptNotify != nil { @@ -279,11 +291,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context sth.incTransactionOperationCounter(ctx, pending.mtx.Namespace(ctx), "received_receipt") // Clear the notification (as long as no other came through) - sth.mux.Lock() if pending.receiptNotify == receiptNotify { pending.receiptNotify = nil } - sth.mux.Unlock() } if confirmNotify != nil && ctx.Confirmations != nil { @@ -297,11 +307,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context } // Clear the notification (as long as no other came through) - sth.mux.Lock() if pending.confirmNotify == confirmNotify { pending.confirmNotify = nil } - sth.mux.Unlock() } return ctx, nil @@ -318,6 +326,7 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending completed := false switch { case ctx.Confirmed && ctx.SyncAction != ActionDelete: + log.L(sth.ctx).Tracef("Transaction '%s' confirmed", ctx.TX.ID) completed = true ctx.UpdateType = Update if ctx.Receipt != nil && ctx.Receipt.Success { @@ -481,44 +490,50 @@ func (sth *simpleTransactionHandler) policyEngineAPIRequest(ctx context.Context, func (sth *simpleTransactionHandler) HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) (err error) { // Will be picked up on the next policy loop cycle + sth.inflightRWMux.RLock() var pending *pendingState for _, p := range sth.inflight { - if p.mtx.ID == txID { + if p != nil && p.mtx != nil && p.mtx.ID == txID { pending = p break } } + sth.inflightRWMux.RUnlock() if pending == nil { err = i18n.NewError(ctx, tmmsgs.MsgTransactionNotFound, txID) return } - sth.mux.Lock() + pending.mux.Lock() pending.confirmed = notification.Confirmed pending.confirmNotify = fftypes.Now() pending.confirmations = notification + pending.mux.Unlock() log.L(ctx).Infof("Received %d confirmations (resync=%t)", len(notification.Confirmations), notification.NewFork) - sth.mux.Unlock() sth.markInflightUpdate() return } func (sth *simpleTransactionHandler) HandleTransactionReceiptReceived(ctx context.Context, txID string, receipt *ffcapi.TransactionReceiptResponse) (err error) { + log.L(ctx).Tracef("Handle transaction receipt received %s", txID) + sth.inflightRWMux.RLock() var pending *pendingState for _, p := range sth.inflight { - if p.mtx.ID == txID { + if p != nil && p.mtx != nil && p.mtx.ID == txID { pending = p break } } + sth.inflightRWMux.RUnlock() if pending == nil { err = i18n.NewError(ctx, tmmsgs.MsgTransactionNotFound, txID) return } + pending.mux.Lock() // Will be picked up on the next policy loop cycle - guaranteed to occur before Confirmed - sth.mux.Lock() pending.receiptNotify = fftypes.Now() pending.receipt = receipt - sth.mux.Unlock() + pending.mux.Unlock() + // Will be picked up on the next policy loop cycle - guaranteed to occur before Confirmed sth.markInflightUpdate() return } diff --git a/pkg/txhandler/simple/simple_transaction_handler.go b/pkg/txhandler/simple/simple_transaction_handler.go index 520ba9bb..5fbb1417 100644 --- a/pkg/txhandler/simple/simple_transaction_handler.go +++ b/pkg/txhandler/simple/simple_transaction_handler.go @@ -176,7 +176,8 @@ type simpleTransactionHandler struct { policyLoopDone chan struct{} inflightStale chan bool inflightUpdate chan bool - mux sync.Mutex + mux sync.RWMutex + inflightRWMux sync.RWMutex inflight []*pendingState policyEngineAPIRequests []*policyEngineAPIRequest maxInFlight int @@ -195,6 +196,9 @@ type pendingState struct { confirmNotify *fftypes.FFTime remove bool subStatus apitypes.TxSubStatus + // This mutex only works in a slice when the slice contains a pointer to this struct + // appends to a slice copy memory but when storing pointers it does not + mux sync.Mutex } type simplePolicyInfo struct { @@ -344,8 +348,8 @@ func (sth *simpleTransactionHandler) createManagedTx(ctx context.Context, txID s } func (sth *simpleTransactionHandler) submitTX(ctx *RunContext) (reason ffcapi.ErrorReason, err error) { - mtx := ctx.TX + mtx.GasPrice, err = sth.getGasPrice(ctx, sth.toolkit.Connector) if err != nil { ctx.AddSubStatusAction(apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`), fftypes.Now())