Skip to content

Commit

Permalink
Merge pull request #2180 from josephschorr/crdb-ds-coverage
Browse files Browse the repository at this point in the history
Additional datastore tests
  • Loading branch information
josephschorr authored Dec 30, 2024
2 parents 5fa9528 + cbdcdd7 commit 50e2445
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 1 deletion.
118 changes: 118 additions & 0 deletions internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
testdatastore "github.com/authzed/spicedb/internal/testserver/datastore"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/test"
"github.com/authzed/spicedb/pkg/genutil/mapz"
"github.com/authzed/spicedb/pkg/migrate"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
Expand Down Expand Up @@ -74,6 +75,29 @@ func TestCRDBDatastoreWithoutIntegrity(t *testing.T) {

return ds, nil
}), false)

t.Run("TestWatchStreaming", createDatastoreTest(
b,
StreamingWatchTest,
RevisionQuantization(0),
GCWindow(veryLargeGCWindow),
))
}

type datastoreTestFunc func(t *testing.T, ds datastore.Datastore)

func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewCRDBDatastore(ctx, uri, options...)
require.NoError(t, err)
return ds
})
defer ds.Close()

tf(t, ds)
}
}

func TestCRDBDatastoreWithFollowerReads(t *testing.T) {
Expand Down Expand Up @@ -593,3 +617,97 @@ func RelationshipIntegrityWatchTest(t *testing.T, tester test.DatastoreTester) {
require.Fail("Timed out")
}
}

func StreamingWatchTest(t *testing.T, rawDS datastore.Datastore) {
require := require.New(t)

ds, rev := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, `
caveat somecaveat(somecondition int) {
somecondition == 42
}
caveat somecaveat2(somecondition int) {
somecondition == 42
}
definition user {}
definition user2 {}
definition resource {
relation viewer: user
}
definition resource2 {
relation viewer: user2
}
`, []tuple.Relationship{
tuple.MustParse("resource:foo#viewer@user:tom"),
tuple.MustParse("resource:foo#viewer@user:fred"),
}, require)
ctx := context.Background()

// Touch and delete some relationships, add a namespace and caveat and delete a namespace and caveat.
_, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
err := rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{
tuple.Touch(tuple.MustParse("resource:foo#viewer@user:tom")),
tuple.Delete(tuple.MustParse("resource:foo#viewer@user:fred")),
})
require.NoError(err)

err = rwt.DeleteNamespaces(ctx, "resource2")
require.NoError(err)

err = rwt.DeleteCaveats(ctx, []string{"somecaveat2"})
require.NoError(err)

err = rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{
Name: "somenewnamespace",
})
require.NoError(err)

err = rwt.WriteCaveats(ctx, []*core.CaveatDefinition{{
Name: "somenewcaveat",
}})
require.NoError(err)

return nil
})
require.NoError(err)

// Ensure the watch API returns the integrity information.
opts := datastore.WatchOptions{
Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints,
WatchBufferLength: 128,
WatchBufferWriteTimeout: 1 * time.Minute,
EmissionStrategy: datastore.EmitImmediatelyStrategy,
}

expectedChanges := mapz.NewSet[string]()
expectedChanges.Add("DELETE(resource:foo#viewer@user:fred)\n")
expectedChanges.Add("DeletedCaveat: somecaveat2\n")
expectedChanges.Add("DeletedNamespace: resource2\n")
expectedChanges.Add("Definition: *corev1.NamespaceDefinition:somenewnamespace\n")
expectedChanges.Add("Definition: *corev1.CaveatDefinition:somenewcaveat\n")

changes, errchan := ds.Watch(ctx, rev, opts)
for {
select {
case change, ok := <-changes:
if !ok {
require.Fail("Timed out waiting for WatchDisconnectedError")
}

debugString := change.DebugString()
require.True(expectedChanges.Has(debugString), "unexpected change: %s", debugString)
expectedChanges.Delete(change.DebugString())
if expectedChanges.IsEmpty() {
return
}
case err := <-errchan:
require.Failf("Failed waiting for changes with error", "error: %v", err)
case <-time.NewTimer(10 * time.Second).C:
require.Fail("Timed out")
}
}
}
File renamed without changes.
2 changes: 1 addition & 1 deletion internal/datastore/memdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri
}

if foundRaw == nil {
return fmt.Errorf("unable to find namespace to delete")
return fmt.Errorf("namespace not found")
}

if err := tx.Delete(tableNamespace, foundRaw); err != nil {
Expand Down
17 changes: 17 additions & 0 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/genutil/mapz"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
"github.com/authzed/spicedb/pkg/tuple"
Expand Down Expand Up @@ -372,7 +373,23 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ...
}

func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error {
namespaces, err := rwt.LookupNamespacesWithNames(ctx, nsNames)
if err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

if len(namespaces) != len(nsNames) {
expectedNamespaceNames := mapz.NewSet[string](nsNames...)
for _, ns := range namespaces {
expectedNamespaceNames.Delete(ns.Definition.Name)
}

return fmt.Errorf(errUnableToDeleteConfig, fmt.Errorf("namespaces not found: %v", expectedNamespaceNames.AsSlice()))
}

for _, nsName := range nsNames {
// Ensure the namespace exists.

relFilter := &v1.RelationshipFilter{ResourceType: nsName}
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
Expand Down
26 changes: 26 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ type RevisionChanges struct {
Metadata *structpb.Struct
}

func (rc *RevisionChanges) DebugString() string {
if rc.IsCheckpoint {
return "[checkpoint]"
}

debugString := ""

for _, relChange := range rc.RelationshipChanges {
debugString += relChange.DebugString() + "\n"
}

for _, def := range rc.ChangedDefinitions {
debugString += fmt.Sprintf("Definition: %T:%s\n", def, def.GetName())
}

for _, ns := range rc.DeletedNamespaces {
debugString += fmt.Sprintf("DeletedNamespace: %s\n", ns)
}

for _, caveat := range rc.DeletedCaveats {
debugString += fmt.Sprintf("DeletedCaveat: %s\n", caveat)
}

return debugString
}

func (rc *RevisionChanges) MarshalZerologObject(e *zerolog.Event) {
e.Str("revision", rc.Revision.String())
e.Bool("is-checkpoint", rc.IsCheckpoint)
Expand Down
1 change: 1 addition & 0 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories,
t.Run("TestNamespaceMultiDelete", runner(tester, NamespaceMultiDeleteTest))
t.Run("TestEmptyNamespaceDelete", runner(tester, EmptyNamespaceDeleteTest))
t.Run("TestStableNamespaceReadWrite", runner(tester, StableNamespaceReadWriteTest))
t.Run("TestNamespaceDeleteInvalidNamespace", runner(tester, NamespaceDeleteInvalidNamespaceTest))

t.Run("TestSimple", runner(tester, SimpleTest))
t.Run("TestObjectIDs", runner(tester, ObjectIDsTest))
Expand Down
41 changes: 41 additions & 0 deletions pkg/datastore/test/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,47 @@ func EmptyNamespaceDeleteTest(t *testing.T, tester DatastoreTester) {
require.True(errors.As(err, &datastore.NamespaceNotFoundError{}))
}

// NamespaceDeleteInvalidNamespaceTest tests deleting an invalid namespace in the datastore.
func NamespaceDeleteInvalidNamespaceTest(t *testing.T, tester DatastoreTester) {
require := require.New(t)

schemaString := `definition user {}
definition document {
relation viewer: user
}`

// Compile namespace to write to the datastore.
compiled, err := compiler.Compile(compiler.InputSchema{
Source: input.Source("schema"),
SchemaString: schemaString,
}, compiler.AllowUnprefixedObjectType())
require.NoError(err)
require.Equal(2, len(compiled.OrderedDefinitions))

// Write the namespace definition to the datastore.
ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)

ctx := context.Background()
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
err := rwt.WriteCaveats(ctx, compiled.CaveatDefinitions)
if err != nil {
return err
}

return rwt.WriteNamespaces(ctx, compiled.ObjectDefinitions...)
})
require.NoError(err)

// Attempt to delete the invalid namespace.
_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
return rwt.DeleteNamespaces(ctx, "invalid")
})
require.Error(err)
require.ErrorContains(err, "not found")
}

// StableNamespaceReadWriteTest tests writing a namespace to the datastore and reading it back,
// ensuring that it does not change in any way and that the deserialized data matches that stored.
func StableNamespaceReadWriteTest(t *testing.T, tester DatastoreTester) {
Expand Down

0 comments on commit 50e2445

Please sign in to comment.