Skip to content

Commit

Permalink
Merge pull request #130 from insolar/PENV-418-same-records-test
Browse files Browse the repository at this point in the history
PENV-418: same records test
  • Loading branch information
megge-dream authored Jul 29, 2020
2 parents 4ec6239 + b3c4c7a commit 49b4968
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 11 deletions.
16 changes: 9 additions & 7 deletions etl/extractor/platform_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"sync/atomic"
"time"

"github.com/insolar/block-explorer/etl/interfaces"
"github.com/insolar/block-explorer/etl/types"
"github.com/insolar/block-explorer/instrumentation/belogger"
"github.com/insolar/insolar/insolar"
"github.com/insolar/insolar/insolar/pulse"
"github.com/insolar/insolar/ledger/heavy/exporter"

"github.com/insolar/block-explorer/etl/interfaces"
"github.com/insolar/block-explorer/etl/types"
"github.com/insolar/block-explorer/instrumentation/belogger"
)

type PlatformExtractor struct {
Expand Down Expand Up @@ -100,12 +101,13 @@ func closeStream(ctx context.Context, stream exporter.RecordExporter_ExportClien
func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int64) {
pu := &exporter.FullPulse{PulseNumber: insolar.PulseNumber(from)}
var err error
log := belogger.FromContext(ctx).WithField("pulse_number", pu.PulseNumber)

log.Debug("retrievePulses(): Start")
log := belogger.FromContext(ctx)

halfPulse := time.Duration(e.continuousPulseRetrievingHalfPulseSeconds) * time.Second
for {
log = log.WithField("pulse_number", pu.PulseNumber)
log.Debug("retrievePulses(): Start")

select {
case <-ctx.Done(): // we need context with cancel
log.Debug("retrievePulses(): terminating")
Expand All @@ -126,7 +128,7 @@ func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int6
time.Sleep(halfPulse)
continue
}
log.Errorf("retrievePulses(): before=%d err=%s", before, err)
log.Errorf("retrievePulses(): before=%d err=%s", before.PulseNumber, err)
time.Sleep(time.Second)
continue
}
Expand Down
23 changes: 23 additions & 0 deletions etl/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ func TestTransform_sortRecords_ErrorNoPrevRecord(t *testing.T) {
require.Nil(t, result)
}

func TestTransform_sortRecords_ErrorSameRecord(t *testing.T) {
firstObj := gen.Reference().Bytes()
record1 := testutils.CreateRecordCanonical()
record1.ObjectReference = firstObj
record2 := testutils.CreateRecordCanonical()
record2.ObjectReference = firstObj
record3 := testutils.CreateRecordCanonical()
record3.ObjectReference = firstObj
record4 := testutils.CreateRecordCanonical()
record4.ObjectReference = firstObj

// make lifeline: 1 <- 2 <- 3 <- 4
record4.PrevRecordReference = record3.Ref
record3.PrevRecordReference = record2.Ref
record2.PrevRecordReference = record1.Ref

// provide record1 and record3 two times
result, err := sortRecords([]types.Record{record1, record2, record3, record1, record3})
require.Error(t, err)
require.Contains(t, err.Error(), "Number of records before sorting (5) changes after (3)")
require.Nil(t, result)
}

func TestTransform_transferToCanonicalRecord_SkipUnsortedRecord(t *testing.T) {
unsupportedRecord := &exporter.Record{
Record: ins_record.Material{
Expand Down
116 changes: 112 additions & 4 deletions test/integration/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ import (
"io"
"testing"

"github.com/insolar/insolar/insolar"
"github.com/insolar/insolar/ledger/heavy/exporter"
"github.com/insolar/insolar/pulse"
"github.com/stretchr/testify/require"

"github.com/insolar/block-explorer/etl/models"
"github.com/insolar/block-explorer/etl/transformer"
"github.com/insolar/block-explorer/etl/types"
"github.com/insolar/block-explorer/instrumentation/converter"
"github.com/insolar/block-explorer/test/heavymock"
"github.com/insolar/block-explorer/testutils"
"github.com/insolar/block-explorer/testutils/clients"
"github.com/insolar/insolar/insolar"
"github.com/insolar/insolar/ledger/heavy/exporter"
"github.com/insolar/insolar/pulse"
"github.com/stretchr/testify/require"
)

func TestIntegrationWithDb_GetRecords(t *testing.T) {
Expand Down Expand Up @@ -104,6 +105,113 @@ func TestIntegrationWithDb_GetRecords(t *testing.T) {
}
}

func TestIntegrationWithDb_GetRecords_ErrorSameRecords(t *testing.T) {
t.Log("C5498 Process same records; duplicated records not saved in database")
ts := NewBlockExplorerTestSetup(t)
defer ts.Stop(t)
records := make([]*exporter.Record, 0)

pulsesNumber := 2
recordsInPulse := 5
recordsWithDifferencePulses := testutils.GenerateRecordsWithDifferencePulses(pulsesNumber, recordsInPulse, int64(pulse.MinTimePulse))
stream, err := ts.ConMngr.ImporterClient.Import(context.Background())
require.NoError(t, err)

ts.BE.PulseClient.SetNextFinalizedPulseFunc(ts.ConMngr.Importer)

for i := 0; i < pulsesNumber*recordsInPulse; i++ {
record, _ := recordsWithDifferencePulses()
records = append(records, record)
if err := stream.Send(record); err != nil {
if err == io.EOF {
break
}
t.Fatal("Error sending to stream", err)
}
// send record second times for first pulse
if i < recordsInPulse {
if err := stream.Send(record); err != nil {
if err == io.EOF {
break
}
t.Fatal("Error sending to stream", err)
}
}
}
reply, err := stream.CloseAndRecv()
require.NoError(t, err)
require.True(t, reply.Ok)
require.Len(t, records, pulsesNumber*recordsInPulse)

jetDrops := make([]types.PlatformJetDrops, 0)
var notSavedRefs [][]byte
var jetsInPulse []exporter.JetDropContinue
var recs []*exporter.Record
for i, r := range records {
p, err := clients.GetFullPulse(uint32(r.Record.ID.Pulse()), nil)
require.NoError(t, err)
if p.PulseNumber == pulse.MinTimePulse {
notSavedRefs = append(notSavedRefs, r.Record.ID.Bytes())
continue
}
recs = append(recs, r)
jetsInPulse = append(jetsInPulse, exporter.JetDropContinue{JetID: r.Record.JetID, Hash: testutils.GenerateRandBytes()})
if i%recordsInPulse == (recordsInPulse - 1) {
jetDrop := types.PlatformJetDrops{Pulse: p,
Records: recs}
jetDrops = append(jetDrops, jetDrop)
recs = []*exporter.Record{}
}
}

require.Len(t, jetDrops, pulsesNumber-1)

for _, jetDrop := range jetDrops {
jetDrop.Pulse.Jets = jetsInPulse
}

ts.StartBE(t)
defer ts.StopBE(t)

refs := make([]types.Reference, 0)
ctx := context.Background()
for _, jd := range jetDrops {
transform, err := transformer.Transform(ctx, &jd)
if err != nil {
t.Logf("error transforming record: %v", err)
return
}
for _, tr := range transform {
records := tr.MainSection.Records
require.NotEmpty(t, records)
for _, r := range records {
ref := r.Ref
require.NotEmpty(t, ref)
refs = append(refs, ref)
}
}
}
require.Len(t, refs, recordsInPulse)

// last record with the biggest pulse number won't be processed, so we do not expect this record in DB
expRecordsCount := recordsInPulse * (pulsesNumber - 1)
ts.WaitRecordsCount(t, expRecordsCount, 6000)

for _, ref := range refs[:expRecordsCount] {
modelRef := models.ReferenceFromTypes(ref)
record, err := ts.BE.Storage().GetRecord(modelRef)
require.NoError(t, err, "Error executing GetRecord from db")
require.NotEmpty(t, record, "Record is empty")
require.Equal(t, modelRef, record.Reference, "Reference not equal")
}
for _, ref := range notSavedRefs {
modelRef := models.ReferenceFromTypes(ref)
_, err := ts.BE.Storage().GetRecord(modelRef)
require.Error(t, err, "Record must be not saved")
require.Equal(t, err.Error(), "record not found", "Wrong error message")
}
}

func TestIntegrationWithDb_GetJetDrops(t *testing.T) {
t.Log("C4992 Process records and get saved jetDrops by pulse number from database")

Expand Down

0 comments on commit 49b4968

Please sign in to comment.