Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b465181

Browse files
committedFeb 22, 2025··
combined caveat and namespaces methods
Signed-off-by: Kartikay <[email protected]>
1 parent bdc6eeb commit b465181

File tree

1 file changed

+75
-120
lines changed

1 file changed

+75
-120
lines changed
 

‎internal/datastore/mysql/watch.go

+75-120
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,9 @@ func (mds *Datastore) loadChanges(
177177
return nil, 0, err
178178
}
179179

180-
// Load namespace changes for the revision range.
181180
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
182-
if err := mds.loadNamespaceChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
183-
return nil, 0, err
184-
}
185-
}
186-
187-
// Load caveat changes for the revision range.
188-
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
189-
if err := mds.loadCaveatChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
181+
// Load namespace and caveat changes for the revision range
182+
if err := mds.loadSchemaChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
190183
return nil, 0, err
191184
}
192185
}
@@ -287,127 +280,89 @@ func (mds *Datastore) loadRelationshipChanges(ctx context.Context, afterRevision
287280
return
288281
}
289282

290-
func (mds *Datastore) loadNamespaceChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
291-
sql, args, err := mds.QueryChangedNamespacesQuery.Where(sq.Or{
292-
sq.And{
293-
sq.Gt{colCreatedTxn: afterRevision},
294-
sq.LtOrEq{colCreatedTxn: newRevision},
295-
},
296-
sq.And{
297-
sq.Gt{colDeletedTxn: afterRevision},
298-
sq.LtOrEq{colDeletedTxn: newRevision},
299-
},
300-
}).ToSql()
301-
if err != nil {
302-
return fmt.Errorf("unable to prepare changes SQL: %w", err)
303-
}
304-
305-
rows, err := mds.db.QueryContext(ctx, sql, args...)
306-
if err != nil {
307-
if errors.Is(err, context.Canceled) {
308-
err = datastore.NewWatchCanceledErr()
283+
func (mds *Datastore) loadSchemaChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) error {
284+
for _, schemaType := range []struct {
285+
query sq.SelectBuilder
286+
objectType string
287+
}{
288+
{mds.QueryChangedNamespacesQuery, "namespace"},
289+
{mds.QueryChangedCaveatsQuery, "caveat"},
290+
} {
291+
sql, args, err := schemaType.query.Where(sq.Or{
292+
sq.And{
293+
sq.Gt{colCreatedTxn: afterRevision},
294+
sq.LtOrEq{colCreatedTxn: newRevision},
295+
},
296+
sq.And{
297+
sq.Gt{colDeletedTxn: afterRevision},
298+
sq.LtOrEq{colDeletedTxn: newRevision},
299+
},
300+
}).ToSql()
301+
if err != nil {
302+
return fmt.Errorf("unable to prepare %s changes SQL: %w", schemaType.objectType, err)
309303
}
310-
return
311-
}
312-
defer common.LogOnError(ctx, rows.Close)
313304

314-
for rows.Next() {
315-
var createdTxn uint64
316-
var deletedTxn uint64
317-
var config []byte
318-
319-
err = rows.Scan(
320-
&config,
321-
&createdTxn,
322-
&deletedTxn,
323-
)
305+
rows, err := mds.db.QueryContext(ctx, sql, args...)
324306
if err != nil {
325-
return
326-
}
327-
loaded := &core.NamespaceDefinition{}
328-
if err := loaded.UnmarshalVT(config); err != nil {
329-
return fmt.Errorf("unable to parse changed namespace: %w", err)
307+
if errors.Is(err, context.Canceled) {
308+
err = datastore.NewWatchCanceledErr()
309+
}
310+
return err
330311
}
331312

332-
if createdTxn > afterRevision && createdTxn <= newRevision {
333-
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
334-
return
313+
for rows.Next() {
314+
var (
315+
name string
316+
config []byte
317+
createdTxn uint64
318+
deletedTxn uint64
319+
)
320+
var loaded datastore.SchemaDefinition
321+
322+
switch schemaType.objectType {
323+
case "namespace":
324+
err = rows.Scan(&config, &createdTxn, &deletedTxn)
325+
if err != nil {
326+
return fmt.Errorf("unable to parse changed namespace: %w", err)
327+
}
328+
def := &core.NamespaceDefinition{}
329+
if err := def.UnmarshalVT(config); err != nil {
330+
return fmt.Errorf("unable to parse changed namespace: %w", err)
331+
}
332+
loaded = def
333+
case "caveat":
334+
err = rows.Scan(&name, &config, &createdTxn, &deletedTxn)
335+
if err != nil {
336+
return fmt.Errorf("unable to parse changed caveat: %w", err)
337+
}
338+
def := &core.CaveatDefinition{}
339+
if err := def.UnmarshalVT(config); err != nil {
340+
return fmt.Errorf(errUnableToReadConfig, err)
341+
}
342+
loaded = def
335343
}
336-
}
337-
if deletedTxn > afterRevision && deletedTxn <= newRevision {
338-
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
339-
return
344+
if createdTxn > afterRevision && createdTxn <= newRevision {
345+
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
346+
return err
347+
}
340348
}
341-
}
342-
}
343-
344-
if err = rows.Err(); err != nil {
345-
return fmt.Errorf("unable to load changes: %w", err)
346-
}
347-
348-
return
349-
}
350-
351-
func (mds *Datastore) loadCaveatChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
352-
sql, args, err := mds.QueryChangedCaveatsQuery.Where(sq.Or{
353-
sq.And{
354-
sq.Gt{colCreatedTxn: afterRevision},
355-
sq.LtOrEq{colCreatedTxn: newRevision},
356-
},
357-
sq.And{
358-
sq.Gt{colDeletedTxn: afterRevision},
359-
sq.LtOrEq{colDeletedTxn: newRevision},
360-
},
361-
}).ToSql()
362-
if err != nil {
363-
return fmt.Errorf("unable to prepare changes SQL: %w", err)
364-
}
365349

366-
rows, err := mds.db.QueryContext(ctx, sql, args...)
367-
if err != nil {
368-
if errors.Is(err, context.Canceled) {
369-
err = datastore.NewWatchCanceledErr()
370-
}
371-
return
372-
}
373-
374-
defer common.LogOnError(ctx, rows.Close)
375-
376-
for rows.Next() {
377-
var createdTxn uint64
378-
var deletedTxn uint64
379-
var config []byte
380-
var name string
381-
382-
err = rows.Scan(
383-
&name,
384-
&config,
385-
&createdTxn,
386-
&deletedTxn,
387-
)
388-
if err != nil {
389-
return fmt.Errorf("unable to parse changed caveat: %w", err)
390-
}
391-
loaded := &core.CaveatDefinition{}
392-
if err := loaded.UnmarshalVT(config); err != nil {
393-
return fmt.Errorf(errUnableToReadConfig, err)
394-
}
395-
396-
if createdTxn > afterRevision && createdTxn <= newRevision {
397-
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
398-
return
399-
}
400-
}
401-
if deletedTxn > afterRevision && deletedTxn <= newRevision {
402-
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
403-
return
350+
if deletedTxn > afterRevision && deletedTxn <= newRevision {
351+
if schemaType.objectType == "namespace" {
352+
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.GetName()); err != nil {
353+
return err
354+
}
355+
} else if schemaType.objectType == "caveat" {
356+
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.GetName()); err != nil {
357+
return err
358+
}
359+
}
404360
}
405361
}
406-
}
407362

408-
if err = rows.Err(); err != nil {
409-
return fmt.Errorf("unable to load changes: %w", err)
363+
if err = rows.Err(); err != nil {
364+
return fmt.Errorf("unable to load %s changes: %w", schemaType.objectType, err)
365+
}
410366
}
411-
412-
return
367+
return nil
413368
}

0 commit comments

Comments
 (0)
Please sign in to comment.