Skip to content

Commit

Permalink
merge develop/v0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Dec 23, 2024
2 parents c8c4dc1 + f117a25 commit bfe3cf9
Show file tree
Hide file tree
Showing 65 changed files with 3,098 additions and 1,877 deletions.
918 changes: 455 additions & 463 deletions backend/gen/go/protos/mgmt/v1alpha1/connection_data.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion backend/internal/cmds/mgmt/serve/connect/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/nucleuscloud/neosync/internal/ee/license"
presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio"
neomigrate "github.com/nucleuscloud/neosync/internal/migrate"
neosynctypes "github.com/nucleuscloud/neosync/internal/neosync-types"
neosyncotel "github.com/nucleuscloud/neosync/internal/otel"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -553,7 +554,7 @@ func serve(ctx context.Context) error {
if err != nil {
return err
}
if runLogConfig != nil && !cascadelicense.IsValid() {
if runLogConfig != nil && runLogConfig.IsEnabled && !cascadelicense.IsValid() {
return errors.New("run logs are enabled but no license is present")
}

Expand Down Expand Up @@ -634,6 +635,7 @@ func serve(ctx context.Context) error {
),
)

neosynctyperegistry := neosynctypes.NewTypeRegistry(slogger)
gcpmanager := neosync_gcp.NewManager()
connectionDataService := v1alpha1_connectiondataservice.New(
&v1alpha1_connectiondataservice.Config{},
Expand All @@ -646,6 +648,7 @@ func serve(ctx context.Context) error {
mongoconnector,
sqlmanager,
gcpmanager,
neosynctyperegistry,
)
api.Handle(
mgmtv1alpha1connect.NewConnectionDataServiceHandler(
Expand Down
26 changes: 26 additions & 0 deletions backend/pkg/integration-test/integration-test-util.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,32 @@ func CreateS3Connection(
return resp.Msg.GetConnection()
}

func CreateDynamoDBConnection(
ctx context.Context,
t *testing.T,
connclient mgmtv1alpha1connect.ConnectionServiceClient,
accountId, name, endpoint string,
credentials *mgmtv1alpha1.AwsS3Credentials,
) *mgmtv1alpha1.Connection {
resp, err := connclient.CreateConnection(
ctx,
connect.NewRequest(&mgmtv1alpha1.CreateConnectionRequest{
AccountId: accountId,
Name: name,
ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{
Config: &mgmtv1alpha1.ConnectionConfig_DynamodbConfig{
DynamodbConfig: &mgmtv1alpha1.DynamoDBConnectionConfig{
Credentials: credentials,
Endpoint: &endpoint,
},
},
},
}),
)
RequireNoErrResp(t, resp, err)
return resp.Msg.GetConnection()
}

func SetUser(ctx context.Context, t *testing.T, client mgmtv1alpha1connect.UserAccountServiceClient) string {
resp, err := client.SetUser(ctx, connect.NewRequest(&mgmtv1alpha1.SetUserRequest{}))
RequireNoErrResp(t, resp, err)
Expand Down
5 changes: 3 additions & 2 deletions backend/protos/mgmt/v1alpha1/connection_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ message GetConnectionDataStreamRequest {

// Each stream response is a single row in the requested schema and table
message GetConnectionDataStreamResponse {
// A map of column name to the bytes value of the data that was found for that column and row
map<string, bytes> row = 1;
reserved 1; // Was: map<string, bytes> row = 1;
// A map of column name to column value, where the value is serialized as bytes. The value represents a map[string]any structure.
bytes row_bytes = 2;
}

message PostgresSchemaConfig {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package v1alpha1_connectiondataservice

import (
"bytes"
"compress/gzip"
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -30,12 +32,20 @@ import (
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
neosync_dynamodb "github.com/nucleuscloud/neosync/internal/dynamodb"
neosyncgob "github.com/nucleuscloud/neosync/internal/gob"
myutil "github.com/nucleuscloud/neosync/internal/mysql"
pgutil "github.com/nucleuscloud/neosync/internal/postgres"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder"

"go.mongodb.org/mongo-driver/bson"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/structpb"
)

func init() {
neosyncgob.RegisterGobTypes()
}

type DatabaseSchema struct {
TableSchema string `db:"table_schema,omitempty"`
TableName string `db:"table_name,omitempty"`
Expand All @@ -61,6 +71,9 @@ func (ds *DateScanner) Scan(input any) error {
}
}

// GetConnectionDataStream streams data from a connection source (e.g. MySQL, Postgres, S3, etc)
// The data is first converted from its native format into Go types, then encoded using gob encoding
// before being streamed back to the client.
func (s *Service) GetConnectionDataStream(
ctx context.Context,
req *connect.Request[mgmtv1alpha1.GetConnectionDataStreamRequest],
Expand All @@ -85,7 +98,7 @@ func (s *Service) GetConnectionDataStream(
return err
}

conn, err := s.sqlConnector.NewDbFromConnectionConfig(connection.ConnectionConfig, logger, sqlconnect.WithConnectionTimeout(connectionTimeout), sqlconnect.WithMysqlParseTimeDisabled())
conn, err := s.sqlConnector.NewDbFromConnectionConfig(connection.ConnectionConfig, logger, sqlconnect.WithConnectionTimeout(connectionTimeout))
if err != nil {
return err
}
Expand All @@ -97,7 +110,7 @@ func (s *Service) GetConnectionDataStream(

table := sqlmanager_shared.BuildTable(req.Msg.Schema, req.Msg.Table)
// used to get column names
query, err := querybuilder.BuildSelectLimitQuery("mysql", table, 1)
query, err := querybuilder.BuildSelectLimitQuery("mysql", table, 0)
if err != nil {
return err
}
Expand All @@ -121,21 +134,16 @@ func (s *Service) GetConnectionDataStream(
}

for rows.Next() {
values := make([][]byte, len(columnNames))
valuesWrapped := make([]any, 0, len(columnNames))
for i := range values {
valuesWrapped = append(valuesWrapped, &values[i])
}
if err := rows.Scan(valuesWrapped...); err != nil {
return err
r, err := myutil.MysqlSqlRowToMap(rows)
if err != nil {
return fmt.Errorf("unable to convert mysql row to map: %w", err)
}
row := map[string][]byte{}
for i, v := range values {
col := columnNames[i]
row[col] = v
var rowbytes bytes.Buffer
enc := gob.NewEncoder(&rowbytes)
if err := enc.Encode(r); err != nil {
return fmt.Errorf("unable to encode mysql row: %w", err)
}

if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{Row: row}); err != nil {
if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{RowBytes: rowbytes.Bytes()}); err != nil {
return err
}
}
Expand All @@ -158,7 +166,7 @@ func (s *Service) GetConnectionDataStream(

table := sqlmanager_shared.BuildTable(req.Msg.Schema, req.Msg.Table)
// used to get column names
query, err := querybuilder.BuildSelectLimitQuery(sqlmanager_shared.GoquPostgresDriver, table, 1)
query, err := querybuilder.BuildSelectLimitQuery(sqlmanager_shared.GoquPostgresDriver, table, 0)
if err != nil {
return err
}
Expand All @@ -183,23 +191,17 @@ func (s *Service) GetConnectionDataStream(
}
defer rows.Close()

// todo: this is probably way fucking broken now
for rows.Next() {
values := make([][]byte, len(columnNames))
valuesWrapped := make([]any, 0, len(columnNames))
for i := range values {
valuesWrapped = append(valuesWrapped, &values[i])
}
if err := rows.Scan(valuesWrapped...); err != nil {
return err
r, err := pgutil.SqlRowToPgTypesMap(rows)
if err != nil {
return fmt.Errorf("unable to convert postgres row to map: %w", err)
}
row := map[string][]byte{}
for i, v := range values {
col := columnNames[i]
row[col] = v
var rowbytes bytes.Buffer
enc := gob.NewEncoder(&rowbytes)
if err := enc.Encode(r); err != nil {
return fmt.Errorf("unable to encode postgres row using gob: %w", err)
}

if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{Row: row}); err != nil {
if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{RowBytes: rowbytes.Bytes()}); err != nil {
return err
}
}
Expand Down Expand Up @@ -282,40 +284,35 @@ func (s *Service) GetConnectionDataStream(

decoder := json.NewDecoder(gzr)
for {
var data map[string]any

var rowData map[string]any
// Decode the next JSON object
err = decoder.Decode(&data)
err = decoder.Decode(&rowData)
if err != nil && err == io.EOF {
break // End of file, stop the loop
} else if err != nil {
result.Body.Close()
gzr.Close()
return err
}
rowMap := make(map[string][]byte)
for key, value := range data {
var byteValue []byte
switch v := value.(type) {
case string:
// try converting string directly to []byte
// prevents quoted strings
byteValue = []byte(v)
default:
// if not a string use JSON encoding
byteValue, err = json.Marshal(v)
if err != nil {
result.Body.Close()
gzr.Close()
return err
}
if string(byteValue) == "null" {
byteValue = nil
}

for k, v := range rowData {
newVal, err := s.neosynctyperegistry.Unmarshal(v)
if err != nil {
return fmt.Errorf("unable to unmarshal row value using neosync type registry: %w", err)
}
rowMap[key] = byteValue
rowData[k] = newVal
}

// Encode the row data using gob
var rowbytes bytes.Buffer
enc := gob.NewEncoder(&rowbytes)
if err := enc.Encode(rowData); err != nil {
result.Body.Close()
gzr.Close()
return fmt.Errorf("unable to encode S3 row data using gob: %w", err)
}
if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{Row: rowMap}); err != nil {

if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{RowBytes: rowbytes.Bytes()}); err != nil {
result.Body.Close()
gzr.Close()
return err
Expand Down Expand Up @@ -357,7 +354,12 @@ func (s *Service) GetConnectionDataStream(
}

onRecord := func(record map[string][]byte) error {
return stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{Row: record})
var rowbytes bytes.Buffer
enc := gob.NewEncoder(&rowbytes)
if err := enc.Encode(record); err != nil {
return fmt.Errorf("unable to encode gcp record using gob: %w", err)
}
return stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{RowBytes: rowbytes.Bytes()})
}
tablePath := neosync_gcp.GetWorkflowActivityDataPrefix(jobRunId, sqlmanager_shared.BuildTable(req.Msg.Schema, req.Msg.Table), gcpConfig.PathPrefix)
err = gcpclient.GetRecordStreamFromPrefix(ctx, gcpConfig.GetBucket(), tablePath, onRecord)
Expand All @@ -378,14 +380,17 @@ func (s *Service) GetConnectionDataStream(
}

for _, item := range output.Items {
row := make(map[string][]byte)

itemBits, err := neosync_dynamodb.ConvertMapToJSONBytes(item)
itemBits, err := neosync_dynamodb.ConvertDynamoItemToGoMap(item)
if err != nil {
return err
}
row["item"] = itemBits
if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{Row: row}); err != nil {

var itemBytes bytes.Buffer
enc := gob.NewEncoder(&itemBytes)
if err := enc.Encode(itemBits); err != nil {
return fmt.Errorf("unable to encode dynamodb item using gob: %w", err)
}
if err := stream.Send(&mgmtv1alpha1.GetConnectionDataStreamResponse{RowBytes: itemBytes.Bytes()}); err != nil {
return fmt.Errorf("failed to send stream response: %w", err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/nucleuscloud/neosync/backend/pkg/sqlmanager"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
awsmanager "github.com/nucleuscloud/neosync/internal/aws"
neosynctypes "github.com/nucleuscloud/neosync/internal/neosync-types"
"github.com/nucleuscloud/neosync/internal/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -499,6 +501,7 @@ func createServiceMock(t *testing.T) *serviceMocks {
mockMongoConnector,
mockSqlManager,
mockGcpManager,
neosynctypes.NewTypeRegistry(testutil.GetTestLogger(t)),
)

return &serviceMocks{
Expand Down
25 changes: 15 additions & 10 deletions backend/services/mgmt/v1alpha1/connection-data-service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nucleuscloud/neosync/backend/pkg/sqlconnect"
sql_manager "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager"
awsmanager "github.com/nucleuscloud/neosync/internal/aws"
neosynctypes "github.com/nucleuscloud/neosync/internal/neosync-types"
)

type Service struct {
Expand All @@ -25,6 +26,8 @@ type Service struct {

mongoconnector mongoconnect.Interface
gcpmanager neosync_gcp.ManagerInterface

neosynctyperegistry neosynctypes.NeosyncTypeRegistry
}

type Config struct {
Expand All @@ -43,17 +46,19 @@ func New(
mongoconnector mongoconnect.Interface,
sqlmanager sql_manager.SqlManagerClient,
gcpmanager neosync_gcp.ManagerInterface,
neosynctyperegistry neosynctypes.NeosyncTypeRegistry,
) *Service {
return &Service{
cfg: cfg,
connectionService: connectionService,
jobService: jobService,
awsManager: awsManager,
sqlConnector: sqlConnector,
pgquerier: pgquerier,
mysqlquerier: mysqlquerier,
sqlmanager: sqlmanager,
mongoconnector: mongoconnector,
gcpmanager: gcpmanager,
cfg: cfg,
connectionService: connectionService,
jobService: jobService,
awsManager: awsManager,
sqlConnector: sqlConnector,
pgquerier: pgquerier,
mysqlquerier: mysqlquerier,
sqlmanager: sqlmanager,
mongoconnector: mongoconnector,
gcpmanager: gcpmanager,
neosynctyperegistry: neosynctyperegistry,
}
}
4 changes: 2 additions & 2 deletions charts/neosync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ A Helm chart for Neosync that contains the api, app, and worker
| api.updateStrategy | string | `nil` | The strategy to use when rolling out new replicas |
| api.volumeMounts | list | `[]` | Volumes that will be mounted to the deployment |
| api.volumes | list | `[]` | Volumes that will be attached to the deployment |
| app.analytics.enabled | bool | `true` | Enables analytics such as Posthog/Koala (if keys have been provided for them) |
| app.analytics.enabled | bool | `true` | Enables analytics such as Posthog/Unify (if keys have been provided for them) |
| app.auth.audience | string | `nil` | The audience that should be present in the JWT token |
| app.auth.clientId | string | `nil` | The client id that will be used by the app to retrieve user tokens |
| app.auth.clientSecret | string | `nil` | The client secret that will be used by the app |
Expand All @@ -137,7 +137,6 @@ A Helm chart for Neosync that contains the api, app, and worker
| app.ingress.enabled | bool | `false` | Enable this if using K8s ingress to expose the backend to the internet |
| app.istio.enabled | bool | `false` | Whether or not to apply the default istio annotations/labels to the deployment |
| app.jobHooks.enabled | bool | `false` | Enables Job Hooks on the frontend. Note: This will only work if it has also been enabled via the backend with a valid license |
| app.koala.key | string | `nil` | Koala Key |
| app.nameOverride | string | `nil` | Override the name specified on the Chart, which defaults to .Chart.Name |
| app.neosyncApi.url | string | `"http://neosync-api"` | The URL to the Neosync API instance |
| app.neosyncCloud.enabled | bool | `false` | Whether or not this is NeosyncCloud |
Expand All @@ -163,6 +162,7 @@ A Helm chart for Neosync that contains the api, app, and worker
| app.sidecarContainers | list | `[]` | Provide sidecars that will be appended directly to the deployment next to the user-container |
| app.terminationGracePeriodSeconds | string | `nil` | The amount of time in seconds to wait for the pod to shut down when a termination event has occurred. |
| app.tolerations | list | `[]` | Any tolerations that should be applied to the deployment |
| app.unify.key | string | `nil` | Unify Key |
| app.updateStrategy | string | `nil` | The strategy to use when rolling out new replicas |
| worker.autoscaling.enabled | bool | `false` | Whether or not to install the HPA autoscaler |
| worker.autoscaling.maxReplicas | int | `4` | The maximum number of replicas to scale to |
Expand Down
Loading

0 comments on commit bfe3cf9

Please sign in to comment.