Skip to content

Commit

Permalink
Add support to ETCD revision format changes. See design doc https://g… (
Browse files Browse the repository at this point in the history
  • Loading branch information
Sindica authored May 22, 2020
1 parent 82b1405 commit 1031ecd
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 3 deletions.
14 changes: 12 additions & 2 deletions staging/src/k8s.io/apimachinery/pkg/util/diff/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,23 @@ load(

go_test(
name = "go_default_test",
srcs = ["diff_test.go"],
srcs = [
"compare_test.go",
"diff_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)

go_library(
name = "go_default_library",
srcs = ["diff.go"],
srcs = [
"compare.go",
"diff.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/diff",
importpath = "k8s.io/apimachinery/pkg/util/diff",
deps = [
Expand Down
109 changes: 109 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/diff/compare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2020 Authors of Arktos.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package diff

import "time"

var allowedNTPDiffInMilliSecond = uint64(5000)

// For backwards compatible - can still use ETCD revision # starting from 1 and increases by 1 as backend storage
var v2MinRevision = getRevisionNumber(time.Date(2020, 5, 10, 0, 0, 0, 0, time.UTC).UnixNano(), 0, 1)

// RevisionIsNewer is used in event comparison to check whether revision 1 is newer than
// revision 2 and should be sent/accepted/processed
func RevisionIsNewer(revision1, revision2 uint64) bool {
if revision1 < v2MinRevision || revision2 < v2MinRevision {
return revision1 > revision2
}

result, isSameCluster := revisionCompare(revision1, revision2)
if isSameCluster {
return result > 0
}
// if from different cluster, allow sent result == 0
return result >= 0
}

// revisionCompare compares two revision #s.
// Return -1 if revision 1 is less than revision 2
// Return 0 if revision 1 is equal to revision 2
// Return 1 if revision 1 is larger than revision 2
// Note: the definition of less than, equal to, and larger than is defined as follows:
// If they are from same cluster id, their value will be directly compared
// If they are not from same cluster id, we use allowedNTPDiffInSecond to check whether
// the two revisions happened roughly in the same time
func revisionCompare(revision1, revision2 uint64) (result int, isSameCluster bool) {
clusterId1 := extractClusterId(revision1)
clusterId2 := extractClusterId(revision2)
isSameCluster = clusterId1 == clusterId2
if isSameCluster {
if revision1 < revision2 {
result = -1
} else if revision1 == revision2 {
result = 0
} else {
result = 1
}
} else {
ms1 := extractMilliSecond(revision1)
ms2 := extractMilliSecond(revision2)
if ms1 == ms2 {
result = 0
} else {
var absDiff uint64

if ms1 < ms2 {
result = -1
absDiff = ms2 - ms1
} else { // ms1 > ms2
result = 1
absDiff = ms1 - ms2
}

// add NTP variation tolerance
if absDiff < allowedNTPDiffInMilliSecond {
result = 0
}
}
}

return
}

// Based on design, int64 (ETCD revision type) has bits
// 0-12 as event number
// 13-18 as cluster id
// TODO: Need to test whether it is the same bit number when etcd revision # is used in API server
// as we are using uint64 here. Not sure how conversion happened
func extractClusterId(rev uint64) uint64 {
return (rev >> 13) % 64
}

func extractMilliSecond(rev uint64) uint64 {
return rev >> 19
}

func getRevisionNumber(testTimeInNano int64, clusterId int, seq int) (rev uint64) {
// got time stamp to milliseconds
revision := testTimeInNano / 1000000
// bit 13-18 is clusterId
revision = revision<<6 + int64(clusterId)
// bit 0-12 is sequence id
revision = revision<<13 + int64(seq)

return uint64(revision)
}
191 changes: 191 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/diff/compare_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
Copyright 2020 Authors of Arktos.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package diff

import (
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/rand"
"math"
"testing"
"time"
)

func TestExtractClusterId(t *testing.T) {
for i := 0; i < 64; i++ {
rev := getRevisionNumber(time.Now().UnixNano(), i, 1)
extractedClusterId := extractClusterId(rev)
assert.Equal(t, uint64(i), extractedClusterId, "Expecting cluster id %v but got %v", i, extractedClusterId)
}
}

func TestExtractedMillisecond(t *testing.T) {
for i := 0; i < 1000; i++ {
testTimeInMS := time.Now().Unix()*int64(1000) + int64(i)
testTimeInNano := testTimeInMS * 1000000
rev := getRevisionNumber(testTimeInNano, 1, 1)
extractedMS := extractMilliSecond(rev)
assert.Equal(t, uint64(testTimeInMS), extractedMS, "Expecting time %v but got %v", testTimeInMS, extractedMS)
}
}

func TestRevisionCompare(t *testing.T) {
originalAllowedNTPDiff := allowedNTPDiffInMilliSecond
testRevisionCompare(t, 0)
testRevisionCompare(t, 500)
testRevisionCompare(t, 1000)
testRevisionCompare(t, 10000)
allowedNTPDiffInMilliSecond = originalAllowedNTPDiff
}

func testRevisionCompare(t *testing.T, timeDiffInMS int64) {
allowedNTPDiffInMilliSecond = uint64(timeDiffInMS)

for i := 0; i < 64; i++ {
seq1 := 10
testTimeInNano := time.Now().UnixNano()

for j := 0; j < 64; j++ {
rev1 := getRevisionNumber(testTimeInNano, i, seq1)

// seq 1 < seq 2
seq2 := seq1 + 1
testTimeInNano2 := testTimeInNano + (timeDiffInMS+1)*1000000
rev2 := getRevisionNumber(testTimeInNano2, j, seq2)
result, isSameCluster := revisionCompare(rev1, rev2)
assertRevCompare(t, -1, result, i, j, rev1, rev2, testTimeInNano, testTimeInNano2)
logCluster(t, i, j, rev1, rev2, isSameCluster)

// seq 1 == seq 2
seq2 = seq1
if allowedNTPDiffInMilliSecond > 0 && i != j {
testTimeInNano2 = testTimeInNano + (timeDiffInMS-1)*1000000
} else {
testTimeInNano2 = testTimeInNano
}
rev2 = getRevisionNumber(testTimeInNano2, j, seq2)
result, isSameCluster = revisionCompare(rev1, rev2)
assertRevCompare(t, 0, result, i, j, rev1, rev2, testTimeInNano, testTimeInNano2)
logCluster(t, i, j, rev1, rev2, isSameCluster)

// seq 1 > seq 2
seq1 = seq2 + 1
rev2 = getRevisionNumber(testTimeInNano, j, seq2)
testTimeInNano1 := testTimeInNano + (timeDiffInMS+2)*1000000
rev1 = getRevisionNumber(testTimeInNano1, i, seq1)
result, isSameCluster = revisionCompare(rev1, rev2)
assertRevCompare(t, 1, result, i, j, rev1, rev2, testTimeInNano1, testTimeInNano)
logCluster(t, i, j, rev1, rev2, isSameCluster)
}
}
}

func assertRevCompare(t *testing.T, expectedResult, result interface{}, i, j int, rev1, rev2 uint64, testTime1, testTime2 int64) {
assert.Equal(t, expectedResult, result,
"Expecting result %v but got %v. i=%d, j=%d, rev1=%v, rev2=%v, allowedNTPDiffInMilliSecond=%v",
expectedResult, result, i, j, rev1, rev2, allowedNTPDiffInMilliSecond)
if expectedResult != result {
ms1 := extractMilliSecond(rev1)
ms2 := extractMilliSecond(rev2)
t.Logf("Expected result %v but got %v, i=%d, j=%d, ms1=%v, ms2=%v, nano1=%v, nano2=%v, msDiff=%v, nanoDiff=%v, rev1=%v, rev2=%v, allowedNTPDiffInMilliSecond=%v",
expectedResult, result, i, j, ms1, ms2, testTime1, testTime2, int64(ms1)-int64(ms2), testTime1-testTime2, rev1, rev2, allowedNTPDiffInMilliSecond)
}
}

func logCluster(t *testing.T, i, j int, rev1, rev2 uint64, isSameCluster bool) {
assert.Equal(t, i == j, isSameCluster,
"Expecting same cluster evaluation %v but got %v. i=%d, j=%d, rev1=%v, rev2=%v, allowedNTPDiffInMilliSecond=%v",
i == j, isSameCluster, i, j, rev1, rev2, allowedNTPDiffInMilliSecond)
if i != j && isSameCluster {
clusterId1 := extractClusterId(rev1)
clusterId2 := extractClusterId(rev2)
t.Logf("Expected same cluster %v evaluation but got %v. i=%d, j=%d, cluster1=%v, cluster2=%v, rev1 %v, rev2 %v, allowedNTPDiffInMilliSecond %v",
i == j, isSameCluster, i, j, clusterId1, clusterId2, rev1, rev2, allowedNTPDiffInMilliSecond)
}
}

func TestRevisionIsNewerBackwardCompatible(t *testing.T) {
rev1 := v2MinRevision - 1
rev2 := uint64(rand.Int63nRange(0, int64(rev1)))
result := RevisionIsNewer(rev1, rev2)
assert.True(t, result, "Expecting revision %v is newer than %v but not true", rev1, rev2)

result = RevisionIsNewer(rev2, rev1)
assert.False(t, result, "Expecting revision %v is not newer than %v but not true", rev2, rev1)

result = RevisionIsNewer(rev1, rev1)
assert.False(t, result, "Expecting revision %v is not newer than %v but not true", rev1, rev1)

rev3 := uint64(rand.Int63nRange(int64(rev1+1), math.MaxInt64))
result = RevisionIsNewer(rev3, rev1)
assert.True(t, result, "Expecting revision %v is newer than %v but not true", rev3, rev1)

result = RevisionIsNewer(rev1, rev3)
assert.False(t, result, "Expecting revision %v is not newer than %v but not true", rev1, rev3)
}

// test revision based on timestamp + clusterId + sequence #
func TestRevisionIsNewerWithTimeStamp(t *testing.T) {
originalAllowedNTPDiff := allowedNTPDiffInMilliSecond
testRevisionIsNewer(t, 0)
testRevisionIsNewer(t, 500)
testRevisionIsNewer(t, 1000)
testRevisionIsNewer(t, 10000)
allowedNTPDiffInMilliSecond = originalAllowedNTPDiff
}

func testRevisionIsNewer(t *testing.T, timeDiffInMS int64) {
allowedNTPDiffInMilliSecond = uint64(timeDiffInMS)

for i := 0; i < 64; i++ {
seq1 := 10
testTimeInNano := time.Now().UnixNano()

for j := 0; j < 64; j++ {
rev1 := getRevisionNumber(testTimeInNano, i, seq1)

// seq 1 < seq 2
seq2 := seq1 + 1
testTimeInNano2 := testTimeInNano + (timeDiffInMS+1)*1000000
rev2 := getRevisionNumber(testTimeInNano2, j, seq2)
result := RevisionIsNewer(rev2, rev1)
assertRevCompare(t, true, result, i, j, rev1, rev2, testTimeInNano, testTimeInNano2)

// seq 1 == seq 2
seq2 = seq1
if allowedNTPDiffInMilliSecond > 0 && i != j {
testTimeInNano2 = testTimeInNano + (timeDiffInMS-1)*1000000
} else {
testTimeInNano2 = testTimeInNano
}
rev2 = getRevisionNumber(testTimeInNano2, j, seq2)
result = RevisionIsNewer(rev2, rev1)
if i != j {
assertRevCompare(t, true, result, i, j, rev1, rev2, testTimeInNano, testTimeInNano2)
} else {
assertRevCompare(t, testTimeInNano2 > testTimeInNano, result, i, j, rev1, rev2, testTimeInNano, testTimeInNano2)
}

// seq 1 > seq 2
seq1 = seq2 + 1
rev2 = getRevisionNumber(testTimeInNano, j, seq2)
testTimeInNano1 := testTimeInNano + (timeDiffInMS+2)*1000000
rev1 = getRevisionNumber(testTimeInNano1, i, seq1)
result = RevisionIsNewer(rev2, rev1)
assertRevCompare(t, false, result, i, j, rev1, rev2, testTimeInNano1, testTimeInNano)
}
}
}
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
Expand Down
7 changes: 6 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cacher
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/diff"
"net/http"
"reflect"
"sync"
Expand Down Expand Up @@ -410,6 +411,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre

// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) watch.AggregatedWatchInterface {
klog.V(6).Infof("Cacher Watch key %s, resource version %s, selection predicate [%+v]", key, resourceVersion, pred)
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
aggWc := watch.NewAggregatedWatcher()
if err != nil {
Expand Down Expand Up @@ -473,6 +475,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}

klog.V(6).Infof("Cacher Watch key %s, initEvents [%+v], watchRV %v", key, initEvents, watchRV)
func() {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -1240,8 +1243,10 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
if diff.RevisionIsNewer(event.ResourceVersion, resourceVersion) {
c.sendWatchCacheEvent(event)
klog.V(6).Infof("Sent event resourceVersion %v, event resource version %v, event type %v, object [%+v]",
resourceVersion, event.ResourceVersion, event.Type, event.Object)
}
case <-ctx.Done():
return
Expand Down

0 comments on commit 1031ecd

Please sign in to comment.