Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add changes. #2

Open
wants to merge 3 commits into
base: hashjoin-lip
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,23 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
defaultValues = make([]types.Datum, e.innerExec.Schema().Len())
}
}

if e.joinType == plannercore.InnerJoin || e.joinType == plannercore.SemiJoin {
if outerReader, ok := e.outerExec.(*TableReaderExecutor); ok {
bfSize := uint64(e.innerEstCount) / 50
if bfSize < 10 {
bfSize = 10
}
e.bloomFilter = make([]uint64, bfSize)
outerReader.bloomFilter = e.bloomFilter

outerReader.joinKeyIdx = make([]int64, len(e.outerKeys))
for i := range e.innerKeys {
outerReader.joinKeyIdx[i] = int64(e.outerKeys[i].Index)
}
}
}

e.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
Expand Down
69 changes: 57 additions & 12 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"fmt"
"hash/fnv"
"sync"
"sync/atomic"

Expand All @@ -24,6 +25,8 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -67,6 +70,11 @@ type HashJoinExec struct {
joinChkResourceCh []chan *chunk.Chunk
joinResultCh chan *hashjoinWorkerResult

// bloomFilter will be built from inner table and sent to outer table
// data source to "pre-filter" data.
// bloomFilter is initialized in executor builder.
bloomFilter []uint64

memTracker *memory.Tracker // track memory usage.
prepared bool
isOuterJoin bool
Expand Down Expand Up @@ -128,7 +136,7 @@ func (e *HashJoinExec) Close() error {

// Open implements the Executor Open interface.
func (e *HashJoinExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
if err := e.innerExec.Open(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -166,18 +174,11 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
required := int(atomic.LoadInt64(&e.requiredRows))
outerResult.SetRequiredRows(required, e.maxChunkSize)
}
err := Next(ctx, e.outerExec, outerResult)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: err,
}
return
}
if !hasWaitedForInner {
if outerResult.NumRows() == 0 {
e.finished.Store(true)
return
}
// if outerResult.NumRows() == 0 {
// e.finished.Store(true)
// return
// }
jobFinished, innerErr := e.wait4Inner()
if innerErr != nil {
e.joinResultCh <- &hashjoinWorkerResult{
Expand All @@ -189,6 +190,13 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
}
hasWaitedForInner = true
}
err := Next(ctx, e.outerExec, outerResult)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: err,
}
return
}

if outerResult.NumRows() == 0 {
return
Expand Down Expand Up @@ -487,6 +495,12 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
e.innerFinished <- errors.Trace(err)
close(doneCh)
}

if err := e.outerExec.Open(ctx); err != nil {
e.innerFinished <- errors.Trace(err)
close(doneCh)
}

// Wait fetchInnerRows be finished.
// 1. if buildHashTableForList fails
// 2. if outerResult.NumRows() == 0, fetchOutChunks will not wait for inner.
Expand Down Expand Up @@ -520,6 +534,37 @@ func (e *HashJoinExec) buildHashTableForList(innerResultCh <-chan *chunk.Chunk)
if err != nil {
return err
}
if e.bloomFilter != nil {
err = e.putChunk2BloomFilter(chk, innerKeyColIdx, allTypes)
if err != nil {
return err
}
}
}
return nil
}

func (e *HashJoinExec) putChunk2BloomFilter(chk *chunk.Chunk, innerKeyColIdx []int, allTypes []*types.FieldType) error {
// We use []uint64 as bit array for bloom filter, a single uint64 is a unit
bfUnitLen := uint64(64) // a uint64 has 64 bits
bfUnitsNum := uint64(len(e.bloomFilter))
bfBitLen := bfUnitLen * bfUnitsNum

numRows := chk.NumRows()
for i := 0; i <= numRows-1; i++ {
row := chk.GetRow(i)
hasher := fnv.New64a()
for _, colIdx := range innerKeyColIdx {
datum := row.GetDatum(colIdx, allTypes[colIdx])
encoded, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, nil, datum)
if err != nil {
return err
}
_, _ = hasher.Write(encoded)
}
hashSum := hasher.Sum64()
hashSum %= bfBitLen
e.bloomFilter[hashSum/bfUnitLen] |= 1 << (hashSum % bfUnitLen)
}
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"fmt"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -72,6 +73,10 @@ type TableReaderExecutor struct {
memTracker *memory.Tracker
selectResultHook // for testing

// bloomFilter is built in hash join executor before `Open` is called here.
bloomFilter []uint64
joinKeyIdx []int64

keepOrder bool
desc bool
streaming bool
Expand Down Expand Up @@ -100,6 +105,15 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
return err
}
}

if e.bloomFilter != nil {
bfExec := &tipb.BloomFilter{
BitSet: e.bloomFilter,
ColIdx: e.joinKeyIdx,
}
e.dagPB.Executors = append(e.dagPB.Executors, &tipb.Executor{Tp: tipb.ExecType_TypeBloomFilter, BloomFilter: bfExec})
}

if e.runtimeStats != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
Expand Down Expand Up @@ -157,6 +171,7 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error
e.feedback.Invalidate()
return err
}
time.Sleep(time.Millisecond)
return nil
}

Expand Down
17 changes: 17 additions & 0 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/bloom"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
mockpkg "github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -157,6 +158,8 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor,
currExec, err = h.buildIndexScan(ctx, curr)
case tipb.ExecType_TypeSelection:
currExec, err = h.buildSelection(ctx, curr)
case tipb.ExecType_TypeBloomFilter:
currExec, err = h.buildBloomFilterExec(ctx, curr)
case tipb.ExecType_TypeAggregation:
currExec, err = h.buildHashAgg(ctx, curr)
case tipb.ExecType_TypeStreamAgg:
Expand Down Expand Up @@ -248,6 +251,20 @@ func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*
return e, nil
}

func (h *rpcHandler) buildBloomFilterExec(ctx *dagContext, executor *tipb.Executor) (*bloomFilterExec, error) {
relatedColumnOffsets := executor.BloomFilter.GetColIdx()
filter, err := bloom.NewFilterBySlice(executor.BloomFilter.GetBitSet())
if err != nil {
return nil, err
}

return &bloomFilterExec{
bf: filter,
relatedColOffsets: relatedColumnOffsets,
execDetail: new(execDetail),
}, nil
}

func (h *rpcHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*selectionExec, error) {
var err error
var relatedColOffsets []int
Expand Down
89 changes: 89 additions & 0 deletions store/mockstore/mocktikv/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mocktikv
import (
"bytes"
"context"
"fmt"
"sort"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/bloom"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -39,6 +41,7 @@ var (
_ executor = &selectionExec{}
_ executor = &limitExec{}
_ executor = &topNExec{}
_ executor = &bloomFilterExec{}
)

type execDetail struct {
Expand Down Expand Up @@ -720,3 +723,89 @@ func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, p
}
return exprs, nil
}

type bloomFilterExec struct {
bf *bloom.Filter
relatedColOffsets []int64
src executor
execDetail *execDetail
match float32
total float32
}

func (e *bloomFilterExec) ExecDetails() []*execDetail {
var suffix []*execDetail
if e.src != nil {
suffix = e.src.ExecDetails()
}
return append(suffix, e.execDetail)
}

func (e *bloomFilterExec) SetSrcExec(exec executor) {
e.src = exec
}

func (e *bloomFilterExec) GetSrcExec() executor {
return e.src
}

func (e *bloomFilterExec) ResetCounts() {
e.src.ResetCounts()
}

func (e *bloomFilterExec) Counts() []int64 {
return e.src.Counts()
}

func (e *bloomFilterExec) Cursor() ([]byte, bool) {
return e.src.Cursor()
}

func (e *bloomFilterExec) Next(ctx context.Context) (value [][]byte, err error) {
defer func(begin time.Time) {
e.execDetail.update(begin, value)
}(time.Now())
for {
value, err = e.src.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if value == nil {
fmt.Println("\033[0;32mMatched:", int(e.match), " Total:", int(e.total), " Match Rate:", e.match/e.total, "\033[0m")
return nil, nil
}

// if any of the attribute is null, continue
flag := checkIsNull(value, e.relatedColOffsets)
if flag {
continue
}

key := buildKey(value, e.relatedColOffsets)
e.total++
match := e.bf.Probe(key)
if match {
e.match++
return value, nil
}
}
}

// return true, if any one of the choice attributes value is null. else return false.
func checkIsNull(value [][]byte, relatedColOffsets []int64) bool {
for _, offset := range relatedColOffsets {
if value[offset][0] == codec.NilFlag {
return true
}
}
return false
}

// build the key for probe the bloom filter, by concatenate related col's value.
func buildKey(value [][]byte, relatedColOffsets []int64) []byte {
var key []byte
for _, offset := range relatedColOffsets {
key = append(key, value[offset]...)
}
return key
}
Loading