Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Subset stage 5: Virtual foreign keys #185

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ backward-compatible with existing PostgreSQL utilities.

# Features

* **Database subset** - Dumps only the necessary data consistently based on the subset condition, reducing the size
of the dump and speeding up the restoration process.
* **Deterministic transformers** — deterministic approach to data transformation based on the hash
functions. This ensures that the same input data will always produce the same output data. Almost each transformer
supports either `random` or `hash` engine making it universal for any use case.
Expand Down
318 changes: 310 additions & 8 deletions docs/database_subset.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ obfuscation process remains fresh, predictable, and transparent.

## Key features

* **Database subset** - Dumps only the necessary data consistently based on the subset condition, reducing the size
of the dump and speeding up the restoration process.
* **Deterministic transformers** — deterministic approach to data transformation based on the hash
functions. This ensures that the same input data will always produce the same output data. Almost each transformer
supports either `random` or `hash` engine making it universal for any use case.
Expand Down
6 changes: 4 additions & 2 deletions internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ func (d *Dump) startMainTx(ctx context.Context, conn *pgx.Conn) (pgx.Tx, error)
}

func (d *Dump) buildContextAndValidate(ctx context.Context, tx pgx.Tx) (err error) {
d.context, err = runtimeContext.NewRuntimeContext(ctx, tx, d.config.Dump.Transformation, d.registry,
d.pgDumpOptions, d.version)
d.context, err = runtimeContext.NewRuntimeContext(
ctx, tx, d.config.Dump.Transformation, d.registry, d.pgDumpOptions,
d.config.Dump.VirtualReferences, d.version,
)
if err != nil {
return fmt.Errorf("unable to build runtime context: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/db/postgres/cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ func (v *Validate) Run(ctx context.Context) (int, error) {
}
v.config.Dump.Transformation = tablesToValidate

v.context, err = runtimeContext.NewRuntimeContext(ctx, tx, v.config.Dump.Transformation, v.registry,
v.pgDumpOptions, v.version)
v.context, err = runtimeContext.NewRuntimeContext(
ctx, tx, v.config.Dump.Transformation, v.registry,
v.pgDumpOptions, v.config.Dump.VirtualReferences, v.version,
)
if err != nil {
return nonZeroExitCode, fmt.Errorf("unable to build runtime context: %w", err)
}
Expand Down
29 changes: 25 additions & 4 deletions internal/db/postgres/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog/log"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/db/postgres/pgdump"
Expand Down Expand Up @@ -61,8 +62,9 @@ type RuntimeContext struct {
//
// warnings are fatal procedure must be terminated immediately due to lack of objects required on the next step
func NewRuntimeContext(
ctx context.Context, tx pgx.Tx, cfg []*domains.Table, r *transformersUtils.TransformerRegistry, opt *pgdump.Options,
version int,
ctx context.Context, tx pgx.Tx, cfg []*domains.Table,
r *transformersUtils.TransformerRegistry, opt *pgdump.Options,
vr []*domains.VirtualReference, version int,
) (*RuntimeContext, error) {
var salt []byte
saltHex := os.Getenv("GREENMASK_GLOBAL_SALT")
Expand Down Expand Up @@ -98,8 +100,14 @@ func NewRuntimeContext(
if err != nil {
return nil, fmt.Errorf("cannot get database schema: %w", err)
}
vrWarns := validateVirtualReferences(vr, tablesEntries)
warnings = append(warnings, vrWarns...)
if len(vrWarns) > 0 {
// if there are any warnings, we should use them in the graph build
vr = nil
}

graph, err := subset.NewGraph(ctx, tx, tablesEntries)
graph, err := subset.NewGraph(ctx, tx, tablesEntries, vr)
if err != nil {
return nil, fmt.Errorf("error creating graph: %w", err)
}
Expand All @@ -109,7 +117,7 @@ func NewRuntimeContext(
if err = subset.SetSubsetQueries(graph); err != nil {
return nil, fmt.Errorf("cannot set subset queries: %w", err)
}

debugQueries(tablesEntries)
} else {
// if there are no subset tables, we can sort them by size and transformation costs
// TODO: Implement tables ordering for subsetted tables as well
Expand Down Expand Up @@ -182,3 +190,16 @@ func hasSubset(tables []*entries.Table) bool {
return len(table.SubsetConds) > 0
})
}

func debugQueries(tables []*entries.Table) {
for _, t := range tables {
if t.Query == "" {
continue
}
log.Debug().
Str("Schema", t.Schema).
Str("Table", t.Name).
Msg("Debug query")
log.Logger.Println(t.Query)
}
}
180 changes: 180 additions & 0 deletions internal/db/postgres/context/virtual_references.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package context

import (
"slices"

"github.com/greenmaskio/greenmask/internal/db/postgres/entries"
"github.com/greenmaskio/greenmask/internal/domains"
"github.com/greenmaskio/greenmask/pkg/toolkit"
)

func getReferencedKeys(r *domains.Reference) (res []string) {
for _, ref := range r.Columns {
if ref.Name != "" {
res = append(res, ref.Name)
} else if ref.Expression != "" {
res = append(res, ref.Expression)
}
}
return
}

func validateVirtualReferences(vrs []*domains.VirtualReference, tables []*entries.Table) (res toolkit.ValidationWarnings) {
for idx, vr := range vrs {
res = append(res, validateVirtualReference(idx, vr, tables)...)
}
return
}

func validateVirtualReference(tableIdx int, vr *domains.VirtualReference, tables []*entries.Table) (res toolkit.ValidationWarnings) {
if vr.Schema == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("schema is required").
AddMeta("TableIdx", tableIdx)
res = append(res, w)
}
if vr.Name == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("table name is required").
AddMeta("TableIdx", tableIdx)
res = append(res, w)
}
if len(vr.References) == 0 {
w := toolkit.NewValidationWarning().
SetMsg("virtual reference error: references are required: received empty").
SetSeverity(toolkit.ErrorValidationSeverity).
AddMeta("TableIdx", tableIdx).
AddMeta("TableName", vr.Name).
AddMeta("TableSchema", vr.Name)
res = append(res, w)
}

referencedTableIdx := slices.IndexFunc(tables, func(t *entries.Table) bool {
return t.Name == vr.Name && t.Schema == vr.Schema
})

if referencedTableIdx == -1 {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: table not found").
AddMeta("TableIdx", tableIdx).
AddMeta("TableName", vr.Name).
AddMeta("TableSchema", vr.Schema)
res = append(res, w)
return
}

fkT := tables[referencedTableIdx]

for idx, v := range vr.References {
var vrWarns toolkit.ValidationWarnings

primaryKeyTableIdx := slices.IndexFunc(tables, func(t *entries.Table) bool {
return t.Name == v.Name && t.Schema == v.Schema
})
if primaryKeyTableIdx == -1 {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: table not found").
AddMeta("ReferenceIdx", idx).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
vrWarns = append(vrWarns, w)
continue
}
pkT := tables[primaryKeyTableIdx]

for _, w := range validateReference(idx, v, fkT, pkT) {
w.AddMeta("TableIdx", tableIdx).
SetSeverity(toolkit.ErrorValidationSeverity).
AddMeta("TableName", vr.Name).
AddMeta("TableSchema", vr.Schema)
vrWarns = append(vrWarns, w)
}
res = append(res, vrWarns...)
}
return res
}

func validateReference(vrIdx int, v *domains.Reference, fkT, pkT *entries.Table) (res toolkit.ValidationWarnings) {
if v.Schema == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: schema is required").
AddMeta("ReferenceIdx", vrIdx)
res = append(res, w)
}
if v.Name == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: table name is required").
AddMeta("ReferenceIdx", vrIdx)
res = append(res, w)
}
if len(v.Columns) == 0 {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("columns are required: received empty").
AddMeta("ReferenceIdx", vrIdx).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
res = append(res, w)
}
refCols := getReferencedKeys(v)
if len(refCols) != len(pkT.PrimaryKey) {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: number of columns in reference does not match primary key").
AddMeta("ReferenceIdx", vrIdx).
AddMeta("ReferencedTableColumns", refCols).
AddMeta("PrimaryTableColumns", pkT.PrimaryKey).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
res = append(res, w)
}

for idx, c := range v.Columns {
var vrWarns toolkit.ValidationWarnings
for _, w := range validateColumn(idx, c, fkT) {
w.AddMeta("ReferenceIdx", vrIdx).
SetSeverity(toolkit.ErrorValidationSeverity).
AddMeta("ReferenceName", v.Name).
AddMeta("ReferenceSchema", v.Schema)
vrWarns = append(vrWarns, w)
}
res = append(res, vrWarns...)
}

return res
}

func validateColumn(colIdx int, c *domains.ReferencedColumn, fkT *entries.Table) (res toolkit.ValidationWarnings) {
if c.Name == "" && c.Expression == "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: name or expression is required").
AddMeta("ColumnIdx", colIdx)
res = append(res, w)
}
if c.Name != "" && c.Expression != "" {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: name and expression are mutually exclusive").
AddMeta("ColumnIdx", colIdx)
res = append(res, w)
}
if c.Name != "" && !slices.ContainsFunc(fkT.Columns, func(column *toolkit.Column) bool {
return column.Name == c.Name
}) {
w := toolkit.NewValidationWarning().
SetSeverity(toolkit.ErrorValidationSeverity).
SetMsg("virtual reference error: column not found").
AddMeta("ColumnIdx", colIdx).
AddMeta("ColumnName", c.Name)
res = append(res, w)
}

return res
}
6 changes: 0 additions & 6 deletions internal/db/postgres/subset/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Component struct {
// Cycles
cycles [][]*Edge
cyclesIdents map[string]struct{}
keys []string
// groupedCycles - cycles grouped by the vertexes
groupedCycles map[string][]int
// groupedCyclesGraph - contains the mapping of the vertexes in the component to the edges in the original graph
Expand All @@ -36,11 +35,6 @@ func NewComponent(id int, componentGraph map[int][]*Edge, tables map[int]*entrie
cyclesIdents: make(map[string]struct{}),
}
c.findCycles()
if c.hasCycle() {
c.keys = c.getComponentKeys()
} else {
c.keys = c.getOneTable().PrimaryKey
}
c.groupCycles()
c.buildCyclesGraph()

Expand Down
2 changes: 1 addition & 1 deletion internal/db/postgres/subset/component_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ type ComponentLink struct {
component *Component
}

func NewComponentLink(idx int, c *Component, keys, overriddenKeys []string) *ComponentLink {
func NewComponentLink(idx int, c *Component) *ComponentLink {
return &ComponentLink{
idx: idx,
component: c,
Expand Down
Loading