From e87f926a400d776104b09b3790bc0e4d4c836b06 Mon Sep 17 00:00:00 2001
From: xu wu <wuxu1103@163.com>
Date: Sat, 1 May 2021 11:00:11 +0800
Subject: [PATCH] BlockCollector detects repeated block (#172)

---
 pkg/infra/benchmark_test.go           |  54 ++-----
 pkg/infra/bitmap/bitmap.go            |  59 +++++++
 pkg/infra/bitmap/bitmap_suite_test.go |  13 ++
 pkg/infra/bitmap/bitmap_test.go       |  91 +++++++++++
 pkg/infra/block_collector.go          |  90 +++++------
 pkg/infra/block_collector_test.go     | 217 ++++++--------------------
 pkg/infra/observer.go                 |  13 +-
 pkg/infra/observer_test.go            |   5 +-
 pkg/infra/process.go                  |   3 +-
 9 files changed, 278 insertions(+), 267 deletions(-)
 create mode 100644 pkg/infra/bitmap/bitmap.go
 create mode 100644 pkg/infra/bitmap/bitmap_suite_test.go
 create mode 100644 pkg/infra/bitmap/bitmap_test.go

diff --git a/pkg/infra/benchmark_test.go b/pkg/infra/benchmark_test.go
index 3d2b0252..ddbd82d9 100644
--- a/pkg/infra/benchmark_test.go
+++ b/pkg/infra/benchmark_test.go
@@ -57,62 +57,26 @@ func BenchmarkPeerEndorsement2(b *testing.B) { benchmarkNPeer(2, b) }
 func BenchmarkPeerEndorsement4(b *testing.B) { benchmarkNPeer(4, b) }
 func BenchmarkPeerEndorsement8(b *testing.B) { benchmarkNPeer(8, b) }
 
-func benchmarkSyncCollector(concurrency int, b *testing.B) {
-	instance, _ := NewBlockCollector(concurrency, concurrency)
-	processed := make(chan struct{}, b.N)
-	defer close(processed)
-	now := time.Now()
-	finishCh := make(chan struct{})
-	b.ReportAllocs()
-	b.ResetTimer()
-	for i := 0; i < concurrency; i++ {
-		go func() {
-			for j := 0; j < b.N; j++ {
-				ft := make([]*peer.FilteredTransaction, 1)
-				fb := &peer.FilteredBlock{
-					Number:               uint64(j),
-					FilteredTransactions: ft,
-				}
-				block := &peer.DeliverResponse_FilteredBlock{
-					FilteredBlock: fb,
-				}
-				if instance.Commit(block, finishCh, now) {
-					processed <- struct{}{}
-				}
-			}
-		}()
-	}
-	var n int
-	for n < b.N {
-		<-processed
-		n++
-	}
-	b.StopTimer()
-}
-
-func BenchmarkSyncCollector1(b *testing.B)  { benchmarkSyncCollector(1, b) }
-func BenchmarkSyncCollector2(b *testing.B)  { benchmarkSyncCollector(2, b) }
-func BenchmarkSyncCollector4(b *testing.B)  { benchmarkSyncCollector(4, b) }
-func BenchmarkSyncCollector8(b *testing.B)  { benchmarkSyncCollector(8, b) }
-func BenchmarkSyncCollector16(b *testing.B) { benchmarkSyncCollector(16, b) }
-
 func benchmarkAsyncCollector(concurrent int, b *testing.B) {
 	instance, _ := NewBlockCollector(concurrent, concurrent)
-	block := make(chan *peer.FilteredBlock, 100)
+	block := make(chan *AddressedBlock, 100)
 	done := make(chan struct{})
 	go instance.Start(context.Background(), block, done, b.N, time.Now(), false)
 
 	b.ReportAllocs()
 	b.ResetTimer()
 	for i := 0; i < concurrent; i++ {
-		go func() {
+		go func(idx int) {
 			for j := 0; j < b.N; j++ {
-				block <- &peer.FilteredBlock{
-					Number:               uint64(j),
-					FilteredTransactions: make([]*peer.FilteredTransaction, 1),
+				block <- &AddressedBlock{
+					FilteredBlock: &peer.FilteredBlock{
+						Number:               uint64(j),
+						FilteredTransactions: make([]*peer.FilteredTransaction, 1),
+					},
+					Address: idx,
 				}
 			}
-		}()
+		}(i)
 	}
 	<-done
 	b.StopTimer()
diff --git a/pkg/infra/bitmap/bitmap.go b/pkg/infra/bitmap/bitmap.go
new file mode 100644
index 00000000..973e1f4b
--- /dev/null
+++ b/pkg/infra/bitmap/bitmap.go
@@ -0,0 +1,59 @@
+package bitmap
+
+import "github.com/pkg/errors"
+
+type BitMap struct {
+	count      int // number of bits set
+	capability int // total number of bits
+	bits       []uint64
+}
+
+// Has determine whether the specified position is set
+func (b *BitMap) Has(num int) bool {
+	if num >= b.capability {
+		return false
+	}
+	c, bit := num/64, uint(num%64)
+	return (c < len(b.bits)) && (b.bits[c]&(1<<bit) != 0)
+}
+
+// Set set the specified position
+// If the position has been set or exceeds the maximum number of bits, set is a no-op.
+func (b *BitMap) Set(num int) {
+	if b.Has(num) {
+		return
+	}
+	if b.capability <= num {
+		return
+	}
+
+	c, bit := num/64, uint(num%64)
+	b.bits[c] |= 1 << bit
+	b.count++
+	return
+}
+
+func (b *BitMap) Count() int {
+	return b.count
+}
+
+func (b *BitMap) Cap() int {
+	return b.capability
+}
+
+func (b *BitMap) BitsLen() int {
+	return len(b.bits)
+}
+
+// NewBitsMap create a new BitsMap
+func NewBitMap(cap int) (BitMap, error) {
+	if cap < 1 {
+		return BitMap{}, errors.New("cap should not be less than 1")
+	}
+	bitsLen := cap / 64
+	if cap%64 > 0 {
+		bitsLen++
+	}
+
+	return BitMap{bits: make([]uint64, bitsLen), capability: cap}, nil
+}
diff --git a/pkg/infra/bitmap/bitmap_suite_test.go b/pkg/infra/bitmap/bitmap_suite_test.go
new file mode 100644
index 00000000..7098b1eb
--- /dev/null
+++ b/pkg/infra/bitmap/bitmap_suite_test.go
@@ -0,0 +1,13 @@
+package bitmap_test
+
+import (
+	"testing"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+func TestBitmap(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Bitmap Suite")
+}
diff --git a/pkg/infra/bitmap/bitmap_test.go b/pkg/infra/bitmap/bitmap_test.go
new file mode 100644
index 00000000..88fe6a60
--- /dev/null
+++ b/pkg/infra/bitmap/bitmap_test.go
@@ -0,0 +1,91 @@
+package bitmap_test
+
+import (
+	"tape/pkg/infra/bitmap"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+var _ = Describe("Bitmap", func() {
+
+	Context("New BitsMap", func() {
+		It("the environment is properly set", func() {
+			b, err := bitmap.NewBitMap(4)
+			Expect(err).To(BeNil())
+			Expect(b.Cap()).To(Equal(4))
+			Expect(b.Count()).To(Equal(0))
+			Expect(b.BitsLen()).To(Equal(1))
+
+			b, err = bitmap.NewBitMap(65)
+			Expect(err).To(BeNil())
+			Expect(b.Cap()).To(Equal(65))
+			Expect(b.Count()).To(Equal(0))
+			Expect(b.BitsLen()).To(Equal(2))
+		})
+
+		It("should error which cap is less than 1", func() {
+			_, err := bitmap.NewBitMap(0)
+			Expect(err).NotTo(BeNil())
+
+			_, err = bitmap.NewBitMap(-1)
+			Expect(err).NotTo(BeNil())
+		})
+	})
+
+	Context("Operate BitsMap", func() {
+		It("the len of bits is just one ", func() {
+			b, err := bitmap.NewBitMap(4)
+			Expect(err).To(BeNil())
+			b.Set(0)
+			Expect(b.Count()).To(Equal(1))
+			b.Set(2)
+			Expect(b.Count()).To(Equal(2))
+			ok := b.Has(0)
+			Expect(ok).To(BeTrue())
+			ok = b.Has(2)
+			Expect(ok).To(BeTrue())
+			ok = b.Has(1)
+			Expect(ok).To(BeFalse())
+			ok = b.Has(4)
+			Expect(ok).To(BeFalse())
+
+			b.Set(4)
+			Expect(b.Count()).To(Equal(2))
+			b.Set(2)
+			Expect(b.Count()).To(Equal(2))
+		})
+
+		It("the len of bits is more than one", func() {
+			b, err := bitmap.NewBitMap(80)
+			Expect(err).To(BeNil())
+			b.Set(0)
+			Expect(b.Count()).To(Equal(1))
+			b.Set(2)
+			Expect(b.Count()).To(Equal(2))
+			b.Set(70)
+			Expect(b.Count()).To(Equal(3))
+			b.Set(79)
+			Expect(b.Count()).To(Equal(4))
+			ok := b.Has(0)
+			Expect(ok).To(BeTrue())
+			ok = b.Has(2)
+			Expect(ok).To(BeTrue())
+			ok = b.Has(70)
+			Expect(ok).To(BeTrue())
+			ok = b.Has(79)
+			Expect(ok).To(BeTrue())
+			ok = b.Has(1)
+			Expect(ok).To(BeFalse())
+			ok = b.Has(3)
+			Expect(ok).To(BeFalse())
+			ok = b.Has(69)
+			Expect(ok).To(BeFalse())
+
+			b.Set(80)
+			Expect(b.Count()).To(Equal(4))
+			b.Set(2)
+			Expect(b.Count()).To(Equal(4))
+		})
+	})
+})
diff --git a/pkg/infra/block_collector.go b/pkg/infra/block_collector.go
index 8182324a..87a03d35 100644
--- a/pkg/infra/block_collector.go
+++ b/pkg/infra/block_collector.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"sync"
+	"tape/pkg/infra/bitmap"
 	"time"
 
 	"github.com/hyperledger/fabric-protos-go/peer"
@@ -17,12 +18,21 @@ type BlockCollector struct {
 	sync.Mutex
 	thresholdP, totalP int
 	totalTx            int
-	registry           map[uint64]int
+	registry           map[uint64]*bitmap.BitMap
+}
+
+// AddressedBlock describe the source of block
+type AddressedBlock struct {
+	*peer.FilteredBlock
+	Address int // source peer's number
 }
 
 // NewBlockCollector creates a BlockCollector
 func NewBlockCollector(threshold int, total int) (*BlockCollector, error) {
-	registry := make(map[uint64]int)
+	registry := make(map[uint64]*bitmap.BitMap)
+	if threshold <= 0 || total <= 0 {
+		return nil, errors.New("threshold and total must be greater than zero")
+	}
 	if threshold > total {
 		return nil, errors.Errorf("threshold [%d] must be less than or equal to total [%d]", threshold, total)
 	}
@@ -35,70 +45,60 @@ func NewBlockCollector(threshold int, total int) (*BlockCollector, error) {
 
 func (bc *BlockCollector) Start(
 	ctx context.Context,
-	blockCh <-chan *peer.FilteredBlock,
+	blockCh <-chan *AddressedBlock,
 	finishCh chan struct{},
 	totalTx int,
 	now time.Time,
 	printResult bool, // controls whether to print block commit message. Tests set this to false to avoid polluting stdout.
 ) {
-	// TODO block collector should be able to detect repeated block, and exclude it from total tx counting.
 	for {
 		select {
 		case block := <-blockCh:
-			cnt := bc.registry[block.Number] // cnt is default to 0 when key does not exist
-			cnt++
-
-			// newly committed block just hits threshold
-			if cnt == bc.thresholdP {
-				if printResult {
-					fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
-				}
-
-				bc.totalTx += len(block.FilteredTransactions)
-				if bc.totalTx >= totalTx {
-					close(finishCh)
-				}
-			}
-
-			if cnt == bc.totalP {
-				// committed on all peers, remove from registry
-				delete(bc.registry, block.Number)
-			} else {
-				// upsert back to registry
-				bc.registry[block.Number] = cnt
-			}
+			bc.commit(block, finishCh, totalTx, now, printResult)
 		case <-ctx.Done():
 			return
 		}
 	}
 }
 
-// Deprecated
-//
-// Commit commits a block to collector. It returns true iff the number of peers on which
-// this block has been committed has satisfied thresholdP.
-func (bc *BlockCollector) Commit(block *peer.DeliverResponse_FilteredBlock, finishCh chan struct{}, now time.Time) (committed bool) {
-	bc.Lock()
-	defer bc.Unlock()
+// TODO This function contains too many functions and needs further optimization
+// commit commits a block to collector.
+// If the number of peers on which this block has been committed has satisfied thresholdP,
+// adds the number to the totalTx.
+func (bc *BlockCollector) commit(block *AddressedBlock, finishCh chan struct{}, totalTx int, now time.Time, printResult bool) {
+	bitMap, ok := bc.registry[block.Number]
+	if !ok {
+		// The block with Number is received for the first time
+		b, err := bitmap.NewBitMap(bc.totalP)
+		if err != nil {
+			panic("Can not make new bitmap for BlockCollector" + err.Error())
+		}
+		bc.registry[block.Number] = &b
+		bitMap = &b
+	}
+	// When the block from Address has been received before, return directly.
+	if bitMap.Has(block.Address) {
+		return
+	}
 
-	cnt := bc.registry[block.FilteredBlock.Number] // cnt is default to 0 when key does not exist
-	cnt++
+	bitMap.Set(block.Address)
+	cnt := bitMap.Count()
 
 	// newly committed block just hits threshold
 	if cnt == bc.thresholdP {
-		committed = true
-		duration := time.Since(now)
-		bc.totalTx += len(block.FilteredBlock.FilteredTransactions)
-		fmt.Printf("tx: %d, duration: %+v, tps: %f\n", bc.totalTx, duration, float64(bc.totalTx)/duration.Seconds())
+		if printResult {
+			fmt.Printf("Time %8.2fs\tBlock %6d\tTx %6d\t \n", time.Since(now).Seconds(), block.Number, len(block.FilteredTransactions))
+		}
+
+		bc.totalTx += len(block.FilteredTransactions)
+		if bc.totalTx >= totalTx {
+			close(finishCh)
+		}
 	}
 
+	// TODO issue176
 	if cnt == bc.totalP {
 		// committed on all peers, remove from registry
-		delete(bc.registry, block.FilteredBlock.Number)
-	} else {
-		// upsert back to registry
-		bc.registry[block.FilteredBlock.Number] = cnt
+		delete(bc.registry, block.Number)
 	}
-
-	return
 }
diff --git a/pkg/infra/block_collector_test.go b/pkg/infra/block_collector_test.go
index 0c99baaf..f23bd9f2 100644
--- a/pkg/infra/block_collector_test.go
+++ b/pkg/infra/block_collector_test.go
@@ -11,22 +11,24 @@ import (
 	. "github.com/onsi/gomega"
 )
 
-var _ = Describe("BlockCollector", func() {
+func newAddressedBlock(addr int, blockNum uint64) *infra.AddressedBlock {
+	return &infra.AddressedBlock{Address: addr, FilteredBlock: &peer.FilteredBlock{Number: blockNum, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}}
+}
 
-	now := time.Now()
+var _ = Describe("BlockCollector", func() {
 
 	Context("Async Commit", func() {
 		It("should work with threshold 1 and observer 1", func() {
 			instance, err := infra.NewBlockCollector(1, 1)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 			go instance.Start(context.Background(), block, done, 2, time.Now(), false)
 
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 1)
 			Eventually(done).Should(BeClosed())
 		})
 
@@ -34,19 +36,19 @@ var _ = Describe("BlockCollector", func() {
 			instance, err := infra.NewBlockCollector(1, 2)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 			go instance.Start(context.Background(), block, done, 2, time.Now(), false)
 
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(1, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 1)
 			Eventually(done).Should(BeClosed())
 
 			select {
-			case block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}:
+			case block <- newAddressedBlock(1, 1):
 			default:
 				Fail("Block collector should still be able to consume blocks")
 			}
@@ -56,26 +58,26 @@ var _ = Describe("BlockCollector", func() {
 			instance, err := infra.NewBlockCollector(4, 4)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 			go instance.Start(context.Background(), block, done, 2, time.Now(), false)
 
-			block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 1)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(1, 1)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(2, 1)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(3, 1)
 			Consistently(done).ShouldNot(BeClosed())
 
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(1, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(2, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(3, 0)
 			Eventually(done).Should(BeClosed())
 		})
 
@@ -83,13 +85,13 @@ var _ = Describe("BlockCollector", func() {
 			instance, err := infra.NewBlockCollector(2, 4)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 			go instance.Start(context.Background(), block, done, 1, time.Now(), false)
 
-			block <- &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(1, 0)
 			Eventually(done).Should(BeClosed())
 		})
 
@@ -97,16 +99,16 @@ var _ = Describe("BlockCollector", func() {
 			instance, err := infra.NewBlockCollector(1, 1)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 			go instance.Start(context.Background(), block, done, 2, time.Now(), false)
 
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 0)
 			Consistently(done).ShouldNot(BeClosed())
-			block <- &peer.FilteredBlock{Number: 0, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 0)
 			Consistently(done).ShouldNot(BeClosed())
 
-			block <- &peer.FilteredBlock{Number: 1, FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+			block <- newAddressedBlock(0, 1)
 			Eventually(done).Should(BeClosed())
 		})
 
@@ -116,21 +118,31 @@ var _ = Describe("BlockCollector", func() {
 			Expect(instance).Should(BeNil())
 		})
 
+		It("should return err when threshold or total is zero", func() {
+			instance, err := infra.NewBlockCollector(0, 1)
+			Expect(err).Should(MatchError("threshold and total must be greater than zero"))
+			Expect(instance).Should(BeNil())
+
+			instance, err = infra.NewBlockCollector(1, 0)
+			Expect(err).Should(MatchError("threshold and total must be greater than zero"))
+			Expect(instance).Should(BeNil())
+		})
+
 		It("Should supports parallel committers", func() {
 			instance, err := infra.NewBlockCollector(100, 100)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 			go instance.Start(context.Background(), block, done, 1, time.Now(), false)
 
 			var wg sync.WaitGroup
 			wg.Add(100)
 			for i := 0; i < 100; i++ {
-				go func() {
+				go func(idx int) {
 					defer wg.Done()
-					block <- &peer.FilteredBlock{FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
-				}()
+					block <- newAddressedBlock(idx, 0)
+				}(i)
 			}
 			wg.Wait()
 			Eventually(done).Should(BeClosed())
@@ -140,18 +152,16 @@ var _ = Describe("BlockCollector", func() {
 			instance, err := infra.NewBlockCollector(3, 5)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 			go instance.Start(context.Background(), block, done, 10, time.Now(), false)
 
 			for i := 0; i < 3; i++ {
-				go func() {
+				go func(idx int) {
 					for j := 0; j < 10; j++ {
-						block <- &peer.FilteredBlock{
-							Number:               uint64(j),
-							FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+						block <- newAddressedBlock(idx, uint64(j))
 					}
-				}()
+				}(i)
 			}
 			Eventually(done).Should(BeClosed())
 		})
@@ -160,145 +170,18 @@ var _ = Describe("BlockCollector", func() {
 			instance, err := infra.NewBlockCollector(5, 5)
 			Expect(err).NotTo(HaveOccurred())
 
-			block := make(chan *peer.FilteredBlock)
+			block := make(chan *infra.AddressedBlock)
 			done := make(chan struct{})
 
 			go instance.Start(context.Background(), block, done, 10, time.Now(), false)
 			for i := 0; i < 5; i++ {
-				go func() {
+				go func(idx int) {
 					for j := 0; j < 10; j++ {
-						block <- &peer.FilteredBlock{
-							Number:               uint64(j),
-							FilteredTransactions: make([]*peer.FilteredTransaction, 1)}
+						block <- newAddressedBlock(idx, uint64(j))
 					}
-				}()
+				}(i)
 			}
 			Eventually(done).Should(BeClosed())
 		})
 	})
-
-	Context("Sync Commit", func() {
-		It("should work with threshold 1 and observer 1", func() {
-			finishCh := make(chan struct{})
-			instance, err := infra.NewBlockCollector(1, 1)
-			Expect(err).NotTo(HaveOccurred())
-			ft := make([]*peer.FilteredTransaction, 1)
-			fb := &peer.FilteredBlock{
-				Number:               uint64(1),
-				FilteredTransactions: ft,
-			}
-			block := &peer.DeliverResponse_FilteredBlock{
-				FilteredBlock: fb,
-			}
-			Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
-		})
-
-		It("should work with threshold 1 and observer 2", func() {
-			finishCh := make(chan struct{})
-			instance, err := infra.NewBlockCollector(1, 2)
-			Expect(err).NotTo(HaveOccurred())
-			ft := make([]*peer.FilteredTransaction, 1)
-			fb := &peer.FilteredBlock{
-				Number:               uint64(1),
-				FilteredTransactions: ft,
-			}
-			block := &peer.DeliverResponse_FilteredBlock{
-				FilteredBlock: fb,
-			}
-			Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
-			Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
-		})
-
-		It("should work with threshold 4 and observer 4", func() {
-			finishCh := make(chan struct{})
-			instance, err := infra.NewBlockCollector(4, 4)
-			Expect(err).NotTo(HaveOccurred())
-			ft := make([]*peer.FilteredTransaction, 1)
-			fb := &peer.FilteredBlock{
-				Number:               uint64(1),
-				FilteredTransactions: ft,
-			}
-			block := &peer.DeliverResponse_FilteredBlock{
-				FilteredBlock: fb,
-			}
-			Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
-			Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
-			Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
-			Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
-		})
-
-		It("should work with threshold 2 and observer 4", func() {
-			finishCh := make(chan struct{})
-			instance, err := infra.NewBlockCollector(2, 4)
-			Expect(err).NotTo(HaveOccurred())
-			ft := make([]*peer.FilteredTransaction, 1)
-			fb := &peer.FilteredBlock{
-				Number:               uint64(1),
-				FilteredTransactions: ft,
-			}
-			block := &peer.DeliverResponse_FilteredBlock{
-				FilteredBlock: fb,
-			}
-			Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
-			Expect(instance.Commit(block, finishCh, now)).To(BeTrue())
-			Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
-			Expect(instance.Commit(block, finishCh, now)).To(BeFalse())
-		})
-
-		It("should return err when threshold is greater than total", func() {
-			instance, err := infra.NewBlockCollector(2, 1)
-			Expect(err).Should(MatchError("threshold [2] must be less than or equal to total [1]"))
-			Expect(instance).Should(BeNil())
-		})
-
-		It("Should work with threshold 3 and observer 5 in parallel", func() {
-			instance, _ := infra.NewBlockCollector(3, 5)
-			finishCh := make(chan struct{})
-			var wg sync.WaitGroup
-			wg.Add(3)
-			for i := 0; i < 3; i++ {
-				go func() {
-					defer wg.Done()
-					ft := make([]*peer.FilteredTransaction, 1)
-					fb := &peer.FilteredBlock{
-						Number:               uint64(1),
-						FilteredTransactions: ft,
-					}
-					block := &peer.DeliverResponse_FilteredBlock{
-						FilteredBlock: fb,
-					}
-					if instance.Commit(block, finishCh, now) {
-						close(finishCh)
-					}
-				}()
-			}
-			wg.Wait()
-			Eventually(finishCh).Should(BeClosed())
-		})
-
-		It("Should work with threshold 5 and observer 5 in parallel", func() {
-			instance, _ := infra.NewBlockCollector(5, 5)
-			finishCh := make(chan struct{})
-			var wg sync.WaitGroup
-			wg.Add(5)
-			for i := 0; i < 5; i++ {
-				go func() {
-					defer wg.Done()
-					ft := make([]*peer.FilteredTransaction, 1)
-					fb := &peer.FilteredBlock{
-						Number:               uint64(1),
-						FilteredTransactions: ft,
-					}
-					block := &peer.DeliverResponse_FilteredBlock{
-						FilteredBlock: fb,
-					}
-					if instance.Commit(block, finishCh, now) {
-						close(finishCh)
-					}
-				}()
-			}
-			wg.Wait()
-			Eventually(finishCh).Should(BeClosed())
-		})
-	})
 })
diff --git a/pkg/infra/observer.go b/pkg/infra/observer.go
index 5e682c73..9ae8cfc8 100644
--- a/pkg/infra/observer.go
+++ b/pkg/infra/observer.go
@@ -14,6 +14,7 @@ type Observers struct {
 }
 
 type Observer struct {
+	index   int
 	Address string
 	d       peer.Deliver_DeliverFilteredClient
 	logger  *log.Logger
@@ -21,17 +22,18 @@ type Observer struct {
 
 func CreateObservers(ctx context.Context, channel string, nodes []Node, crypto *Crypto, logger *log.Logger) (*Observers, error) {
 	var workers []*Observer
-	for _, node := range nodes {
+	for i, node := range nodes {
 		worker, err := CreateObserver(ctx, channel, node, crypto, logger)
 		if err != nil {
 			return nil, err
 		}
+		worker.index = i
 		workers = append(workers, worker)
 	}
 	return &Observers{workers: workers}, nil
 }
 
-func (o *Observers) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock, now time.Time) {
+func (o *Observers) Start(errorCh chan error, blockCh chan<- *AddressedBlock, now time.Time) {
 	for i := 0; i < len(o.workers); i++ {
 		go o.workers[i].Start(errorCh, blockCh, now)
 	}
@@ -60,8 +62,9 @@ func CreateObserver(ctx context.Context, channel string, node Node, crypto *Cryp
 	return &Observer{Address: node.Addr, d: deliverer, logger: logger}, nil
 }
 
-func (o *Observer) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock, now time.Time) {
-	o.logger.Debugf("start observer for orderer %s", o.Address)
+func (o *Observer) Start(errorCh chan error, blockCh chan<- *AddressedBlock, now time.Time) {
+	o.logger.Debugf("start observer for peer %s", o.Address)
+
 	for {
 		r, err := o.d.Recv()
 		if err != nil {
@@ -76,6 +79,6 @@ func (o *Observer) Start(errorCh chan error, blockCh chan<- *peer.FilteredBlock,
 		fb := r.Type.(*peer.DeliverResponse_FilteredBlock)
 		o.logger.Debugf("receivedTime %8.2fs\tBlock %6d\tTx %6d\t Address %s\n", time.Since(now).Seconds(), fb.FilteredBlock.Number, len(fb.FilteredBlock.FilteredTransactions), o.Address)
 
-		blockCh <- fb.FilteredBlock
+		blockCh <- &AddressedBlock{fb.FilteredBlock, o.index}
 	}
 }
diff --git a/pkg/infra/observer_test.go b/pkg/infra/observer_test.go
index 6a41c522..185eb412 100644
--- a/pkg/infra/observer_test.go
+++ b/pkg/infra/observer_test.go
@@ -9,7 +9,6 @@ import (
 	"tape/pkg/infra"
 	"time"
 
-	"github.com/hyperledger/fabric-protos-go/peer"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
 	log "github.com/sirupsen/logrus"
@@ -81,7 +80,7 @@ var _ = Describe("Observer", func() {
 		start := time.Now()
 		blockCollector, err := infra.NewBlockCollector(config.CommitThreshold, len(config.Committers))
 		Expect(err).NotTo(HaveOccurred())
-		blockCh := make(chan *peer.FilteredBlock)
+		blockCh := make(chan *infra.AddressedBlock)
 		go blockCollector.Start(ctx, blockCh, finishCh, mock.MockTxSize, time.Now(), true)
 		go observers.Start(errorCh, blockCh, start)
 		go func() {
@@ -141,7 +140,7 @@ var _ = Describe("Observer", func() {
 		start := time.Now()
 		blockCollector, err := infra.NewBlockCollector(config.CommitThreshold, len(config.Committers))
 		Expect(err).NotTo(HaveOccurred())
-		blockCh := make(chan *peer.FilteredBlock)
+		blockCh := make(chan *infra.AddressedBlock)
 		go blockCollector.Start(ctx, blockCh, finishCh, mock.MockTxSize, time.Now(), true)
 		go observers.Start(errorCh, blockCh, start)
 		for i := 0; i < TotalPeers; i++ {
diff --git a/pkg/infra/process.go b/pkg/infra/process.go
index 3d06e933..bc9dbe80 100644
--- a/pkg/infra/process.go
+++ b/pkg/infra/process.go
@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/hyperledger/fabric-protos-go/peer"
 	"github.com/pkg/errors"
 	log "github.com/sirupsen/logrus"
 )
@@ -23,7 +22,7 @@ func Process(configPath string, num int, burst int, rate float64, logger *log.Lo
 	signed := make([]chan *Elements, len(config.Endorsers))
 	processed := make(chan *Elements, burst)
 	envs := make(chan *Elements, burst)
-	blockCh := make(chan *peer.FilteredBlock)
+	blockCh := make(chan *AddressedBlock)
 	finishCh := make(chan struct{})
 	errorCh := make(chan error, burst)
 	assembler := &Assembler{Signer: crypto}