Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shreya/hb metrics test #7

Open
wants to merge 20 commits into
base: shreya/hb-metrics
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/cloudspannerecosystem/harbourbridge/proto/migration"
)

var (
Expand All @@ -36,6 +37,8 @@ var (
sessionFile = "session.json"
)

const defaultWritersLimit = 40

// CommandLine provides the core processing for HarbourBridge when run as a command-line tool.
// It performs the following steps:
// 1. Run schema conversion (if dataOnly is set to false)
Expand Down Expand Up @@ -83,6 +86,16 @@ func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly,
return err
}
}

// Populating migration data in conv.
utils.PopulateMigrationData(conv, driver, targetDb)
if dataOnly {
conv.MigrationData.MigrationType = migration.MigrationData_DATA_ONLY.Enum()
} else {
conv.MigrationData.MigrationType = migration.MigrationData_SCHEMA_AND_DATA.Enum()
}
conversion.Report(driver, nil, ioHelper.BytesRead, "", conv, outputFilePrefix+reportFile, ioHelper.Out)

adminClient, err := utils.NewDatabaseAdminClient(ctx)
if err != nil {
return fmt.Errorf("can't create admin client: %w", utils.AnalyzeError(err, dbURI))
Expand All @@ -100,7 +113,7 @@ func CommandLine(ctx context.Context, driver, targetDb, dbURI string, dataOnly,

// We pass an empty string to the sqlConnectionStr parameter as this is the legacy codepath,
// which reads the environment variables and constructs the string later on.
bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, ioHelper, client, conv, dataOnly)
bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, ioHelper, client, conv, dataOnly, defaultWritersLimit)
if err != nil {
return fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
}
Expand Down
14 changes: 13 additions & 1 deletion cmd/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/cloudspannerecosystem/harbourbridge/proto/migration"
"github.com/google/subcommands"
)

Expand All @@ -25,6 +26,7 @@ type DataCmd struct {
skipForeignKeys bool
sessionJSON string
filePrefix string // TODO: move filePrefix to global flags
writeLimit int64
}

// Name returns the name of operation.
Expand Down Expand Up @@ -57,6 +59,7 @@ func (cmd *DataCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
f.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.Int64Var(&cmd.writeLimit, "write-limit", defaultWritersLimit, "Write limit for writes to spanner")
}

func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
Expand Down Expand Up @@ -135,6 +138,11 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
}
defer adminClient.Close()

// Populating migration data in conv.
utils.PopulateMigrationData(conv, sourceProfile.Driver, targetProfile.TargetDb)
conv.MigrationData.MigrationType = migration.MigrationData_DATA_ONLY.Enum()
conversion.Report(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix+reportFile, ioHelper.Out)

if !sourceProfile.UseTargetSchema() {
err = conversion.CreateOrUpdateDatabase(ctx, adminClient, dbURI, conv, ioHelper.Out)
if err != nil {
Expand All @@ -143,7 +151,8 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
}
}

bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, client, conv, true)
dataCoversionStartTime := time.Now()
bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, client, conv, true, cmd.writeLimit)
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
return subcommands.ExitFailure
Expand All @@ -154,6 +163,9 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
return subcommands.ExitFailure
}
}
dataCoversionEndTime := time.Now()
dataCoversionDuration := dataCoversionEndTime.Sub(dataCoversionStartTime)
conv.DataConversionDuration = dataCoversionDuration
banner := utils.GetBanner(now, dbURI)
conversion.Report(sourceProfile.Driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix+reportFile, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, cmd.filePrefix+badDataFile, ioHelper.Out)
Expand Down
6 changes: 4 additions & 2 deletions cmd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,16 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
cmd.filePrefix = dbName + "."
}

schemaConversionStartTime := time.Now()
var conv *internal.Conv
conv, err = conversion.SchemaConv(sourceProfile, targetProfile, &ioHelper)
if err != nil {
return subcommands.ExitFailure
}

now := time.Now()
conversion.WriteSchemaFile(conv, now, cmd.filePrefix+schemaFile, ioHelper.Out)
schemaCoversionEndTime := time.Now()
conv.SchemaConversionDuration = schemaCoversionEndTime.Sub(schemaConversionStartTime)
conversion.WriteSchemaFile(conv, schemaCoversionEndTime, cmd.filePrefix+schemaFile, ioHelper.Out)
conversion.WriteSessionFile(conv, cmd.filePrefix+sessionFile, ioHelper.Out)
conversion.Report(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix+reportFile, ioHelper.Out)
// Cleanup hb tmp data directory.
Expand Down
23 changes: 17 additions & 6 deletions cmd/schema_and_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cloudspannerecosystem/harbourbridge/conversion"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/profiles"
"github.com/cloudspannerecosystem/harbourbridge/proto/migration"
"github.com/google/subcommands"
)

Expand All @@ -24,6 +25,7 @@ type SchemaAndDataCmd struct {
targetProfile string
skipForeignKeys bool
filePrefix string // TODO: move filePrefix to global flags
writeLimit int64
}

// Name returns the name of operation.
Expand Down Expand Up @@ -55,6 +57,7 @@ func (cmd *SchemaAndDataCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
f.BoolVar(&cmd.skipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.Int64Var(&cmd.writeLimit, "write-limit", defaultWritersLimit, "Write limit for writes to spanner")
}

func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
Expand Down Expand Up @@ -91,11 +94,11 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
defer ioHelper.In.Close()
}

now := time.Now()
schemaConversionStartTime := time.Now()

// If filePrefix not explicitly set, use dbName as prefix.
if cmd.filePrefix == "" {
dbName, err := utils.GetDatabaseName(sourceProfile.Driver, now)
dbName, err := utils.GetDatabaseName(sourceProfile.Driver, schemaConversionStartTime)
if err != nil {
panic(fmt.Errorf("can't generate database name for prefix: %v", err))
}
Expand All @@ -108,11 +111,15 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
panic(err)
}

conversion.WriteSchemaFile(conv, now, cmd.filePrefix+schemaFile, ioHelper.Out)
// Populating migration data in conv.
utils.PopulateMigrationData(conv, sourceProfile.Driver, targetProfile.TargetDb)
conv.MigrationData.MigrationType = migration.MigrationData_SCHEMA_AND_DATA.Enum()

conversion.WriteSchemaFile(conv, schemaConversionStartTime, cmd.filePrefix+schemaFile, ioHelper.Out)
conversion.WriteSessionFile(conv, cmd.filePrefix+sessionFile, ioHelper.Out)
conversion.Report(sourceProfile.Driver, nil, ioHelper.BytesRead, "", conv, cmd.filePrefix+reportFile, ioHelper.Out)

project, instance, dbName, err := targetProfile.GetResourceIds(ctx, now, sourceProfile.Driver, ioHelper.Out)
project, instance, dbName, err := targetProfile.GetResourceIds(ctx, schemaConversionStartTime, sourceProfile.Driver, ioHelper.Out)
if err != nil {
return subcommands.ExitUsageError
}
Expand Down Expand Up @@ -141,7 +148,9 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
return subcommands.ExitFailure
}

bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, client, conv, true)
schemaCoversionEndTime := time.Now()
conv.SchemaConversionDuration = schemaCoversionEndTime.Sub(schemaConversionStartTime)
bw, err := conversion.DataConv(ctx, sourceProfile, targetProfile, &ioHelper, client, conv, true, cmd.writeLimit)
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
return subcommands.ExitFailure
Expand All @@ -152,7 +161,9 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
return subcommands.ExitFailure
}
}
banner := utils.GetBanner(now, dbURI)
dataCoversionEndTime := time.Now()
conv.DataConversionDuration = dataCoversionEndTime.Sub(schemaCoversionEndTime)
banner := utils.GetBanner(schemaConversionStartTime, dbURI)
conversion.Report(sourceProfile.Driver, bw.DroppedRowsByTable(), ioHelper.BytesRead, banner, conv, cmd.filePrefix+reportFile, ioHelper.Out)
conversion.WriteBadData(bw, conv, banner, cmd.filePrefix+badDataFile, ioHelper.Out)
// Cleanup hb tmp data directory.
Expand Down
111 changes: 110 additions & 1 deletion common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ package utils
import (
"bufio"
"context"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"math/rand"
"net/url"
"os"
"os/exec"
Expand All @@ -25,6 +26,7 @@ import (
"cloud.google.com/go/storage"
"github.com/cloudspannerecosystem/harbourbridge/common/constants"
"github.com/cloudspannerecosystem/harbourbridge/internal"
"github.com/cloudspannerecosystem/harbourbridge/proto/migration"
"github.com/cloudspannerecosystem/harbourbridge/sources/common"
"github.com/cloudspannerecosystem/harbourbridge/sources/spanner"
"golang.org/x/crypto/ssh/terminal"
Expand Down Expand Up @@ -492,3 +494,110 @@ func DialectToTarget(dialect string) string {
}
return constants.TargetSpanner
}

const alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"

func randomString(n int) string {
var sb strings.Builder
k := len(alphabet)

for i := 0; i < n; i++ {
c := alphabet[rand.Intn(k)]
sb.WriteByte(c)
}

return sb.String()
}

// PopulateMigrationData populates migration data like source schema details,
// request id, target dialect, connection mechanism etc in conv object
func PopulateMigrationData(conv *internal.Conv, driver, targetDb string) {

migrationRequestId := "HB" + randomString(14)

migrationData := migration.MigrationData{
MigrationRequestId: &migrationRequestId,
}
populateMigrationDataSourceDetails(driver, &migrationData)
populateMigrationDataSchemaPatterns(conv, &migrationData)

switch targetDb {
case constants.TargetSpanner:
migrationData.TargetDialect = migration.MigrationData_GOOGLE_STANDARD_SQL.Enum()
case constants.TargetExperimentalPostgres:
migrationData.TargetDialect = migration.MigrationData_POSTGRES.Enum()
}
conv.MigrationData = migrationData
}

// populateMigrationDataSchemaPatterns populates schema petterns like number of tables, foreign key, primary key,
// indexes, interleaves, max interleave depth and if source schema is missing primary key in migrationData object
func populateMigrationDataSchemaPatterns(conv *internal.Conv, migrationData *migration.MigrationData) {

numTables := int32(len(conv.SrcSchema))
var numForeignKey, numIndexes, numPrimaryKey, numInterleaves, maxInterleaveDepth int32 = 0, 0, 0, 0, 0
missingPrimaryKey := false

for _, table := range conv.SrcSchema {
if len(table.ForeignKeys) != 0 {
numForeignKey++
}
if len(table.PrimaryKeys) != 0 {
numPrimaryKey++
}
numIndexes += int32(len(table.Indexes))
}

for _, table := range conv.SpSchema {
if table.Parent != "" {
numInterleaves++
depth := 1
parentTableName := table.Parent
for conv.SpSchema[parentTableName].Parent != "" {
depth++
parentTableName = conv.SpSchema[parentTableName].Parent
}
maxInterleaveDepth = int32(math.Max(float64(maxInterleaveDepth), float64(depth)))
}
}

migrationData.SchemaPatterns = &migration.MigrationData_SchemaPatterns{
NumTables: &numTables,
NumForeignKey: &numForeignKey,
NumInterleaves: &numInterleaves,
MissingPrimaryKey: &missingPrimaryKey,
MaxInterleaveDepth: &maxInterleaveDepth,
NumIndexes: &numIndexes,
}
}

// populateMigrationDataSourceDetails populates source database type and
// source connection mechanism in migrationData object
func populateMigrationDataSourceDetails(driver string, migrationData *migration.MigrationData) {
switch driver {
case constants.PGDUMP:
migrationData.SourceConnectionMechanism = migration.MigrationData_DB_DUMP.Enum()
migrationData.Source = migration.MigrationData_POSTGRESQL.Enum()
case constants.MYSQLDUMP:
migrationData.SourceConnectionMechanism = migration.MigrationData_DB_DUMP.Enum()
migrationData.Source = migration.MigrationData_MYSQL.Enum()
case constants.POSTGRES:
migrationData.SourceConnectionMechanism = migration.MigrationData_DIRECT_CONNECTION.Enum()
migrationData.Source = migration.MigrationData_POSTGRESQL.Enum()
case constants.MYSQL:
migrationData.SourceConnectionMechanism = migration.MigrationData_DIRECT_CONNECTION.Enum()
migrationData.Source = migration.MigrationData_MYSQL.Enum()
case constants.DYNAMODB:
migrationData.SourceConnectionMechanism = migration.MigrationData_DIRECT_CONNECTION.Enum()
migrationData.Source = migration.MigrationData_DYNAMODB.Enum()
case constants.ORACLE:
migrationData.SourceConnectionMechanism = migration.MigrationData_DIRECT_CONNECTION.Enum()
migrationData.Source = migration.MigrationData_ORACLE.Enum()
case constants.SQLSERVER:
migrationData.SourceConnectionMechanism = migration.MigrationData_DIRECT_CONNECTION.Enum()
migrationData.Source = migration.MigrationData_SQL_SERVER.Enum()
case constants.CSV:
migrationData.SourceConnectionMechanism = migration.MigrationData_FILE.Enum()
migrationData.Source = migration.MigrationData_CSV.Enum()
}
}
Loading