Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR : plain text support for external spanner hosts #24

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ listeners:
databaseId: YOUR_SPANNER_DATABASE
# [Optional] - Global else default to TableConfigurations
configTableName: TableConfigurations
# [Optional] endpoint configuration for spanner
endpoint: YOUR_ENDPOINT

# Spanner Connection Pool Settings
Session:
Expand Down
71 changes: 66 additions & 5 deletions schema_converter/cql_to_spanner_schema_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"github.com/cloudspannerecosystem/cassandra-to-spanner-proxy/third_party/datastax/parser"
"github.com/cloudspannerecosystem/cassandra-to-spanner-proxy/translator"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type ColumnMetadata struct {
Expand Down Expand Up @@ -136,11 +140,14 @@ func main() {
projectID := flag.String("project", "", "The project ID")
instanceID := flag.String("instance", "", "The Spanner instance ID")
databaseID := flag.String("database", "", "The Spanner database ID")
endpoint := flag.String("endpoint", "", "The Spanner External Host")
cqlFile := flag.String("cql", "", "Path to the CQL file")
keyspaceFlatter := flag.Bool("keyspaceFlatter", false, "Whether to enable keyspace flattening (default: false)")
tableName := flag.String("table", "TableConfigurations", "The name of the table (default: TableConfigurations)")
enableUsingTimestamp := flag.Bool("enableUsingTimestamp", false, "Whether to enable using timestamp (default: false)")
enableUsingTTL := flag.Bool("enableUsingTTL", false, "Whether to enable TTL (default: false)")
usePlainText := flag.Bool("usePlainText", false, "Whether to use plain text to establish connection")
ca_certificate_file := flag.String("ca_certificate_file", "", "The CA certificate file to use for TLS")
flag.Parse()

// Check if all required flags are provided
Expand Down Expand Up @@ -169,9 +176,11 @@ func main() {
os.Exit(1)
}

// Ensure that GCP credentials are set
if err := checkGCPCredentials(); err != nil {
log.Fatalf("Error: %v", err)
// Ensure that GCP credentials are set except for spanner external host connections
if *endpoint == "" {
if err := checkGCPCredentials(); err != nil {
log.Fatalf("Error: %v", err)
}
}

ctx := context.Background()
Expand All @@ -180,7 +189,35 @@ func main() {
db := fmt.Sprintf("projects/%s/instances/%s/databases/%s", *projectID, *instanceID, *databaseID)

// Create a Spanner Database Admin client
adminClient, err := database.NewDatabaseAdminClient(ctx)
var adminClient *database.DatabaseAdminClient
var err error

if *endpoint == "" {
adminClient, err = database.NewDatabaseAdminClient(ctx)
} else {
if *usePlainText {
adminClient, err = database.NewDatabaseAdminClient(
ctx,
option.WithEndpoint(*endpoint),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
} else {
if *ca_certificate_file == "" {
log.Fatalf("ca_certificate_file required for TLS connection on spanner endpoint: %v", err)
}
creds, credsErr := credentials.NewClientTLSFromFile(*ca_certificate_file, "")
if credsErr != nil {
log.Fatalf("Error in provided ca_client_certificate : %v", err)
}
adminClient, err = database.NewDatabaseAdminClient(
ctx,
option.WithEndpoint(*endpoint),
option.WithGRPCDialOption(grpc.WithTransportCredentials(creds)),
)
}

}
if err != nil {
log.Fatalf("Failed to create admin client: %v", err)
}
Expand Down Expand Up @@ -215,7 +252,31 @@ func main() {
}

// Create a Spanner client to interact with the database
spannerClient, err := spanner.NewClient(ctx, db)
var spannerClient *spanner.Client
if *endpoint == "" {
spannerClient, err = spanner.NewClient(ctx, db)
} else {
if *usePlainText {
spannerClient, err = spanner.NewClient(ctx, db,
option.WithEndpoint("localhost:15000"),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
} else {
if *ca_certificate_file == "" {
log.Fatalf("ca_certificate_file required for TLS connection on spanner endpoint: %v", err)
}
creds, credsErr := credentials.NewClientTLSFromFile(*ca_certificate_file, "")
if credsErr != nil {
log.Fatalf("Error in provided ca_client_certificate : %v", err)
}
spannerClient, err = spanner.NewClient(ctx, db,
option.WithEndpoint(*endpoint),
option.WithGRPCDialOption(grpc.WithTransportCredentials(creds)),
)
}

}
if err != nil {
log.Fatalf("Failed to create spanner client: %v", err)
}
Expand Down
42 changes: 38 additions & 4 deletions third_party/datastax/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"net"
"net/url"
"os"
"reflect"
"sync"
Expand All @@ -49,6 +50,7 @@ import (
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand Down Expand Up @@ -151,6 +153,7 @@ type SpannerConfig struct {
NumOfChannels int
InstanceName string
GCPProjectID string
Endpoint string
MaxSessions uint64
MinSessions uint64
MaxCommitDelay uint64
Expand Down Expand Up @@ -1968,10 +1971,41 @@ var NewSpannerClient = func(ctx context.Context, config Config, ot *otelgo.OpenT
}

database := fmt.Sprintf(SpannerConnectionString, config.SpannerConfig.GCPProjectID, config.SpannerConfig.InstanceName, config.SpannerConfig.DatabaseName)
client, err := spanner.NewClientWithConfig(ctx, database,
cfg,
option.WithGRPCConnectionPool(config.SpannerConfig.NumOfChannels),
option.WithGRPCDialOption(pool))

var client *spanner.Client
var err error

endpoint := config.SpannerConfig.Endpoint

if endpoint == "YOUR_ENDPOINT" || endpoint == "" {
client, err = spanner.NewClientWithConfig(ctx, database,
cfg,
option.WithGRPCConnectionPool(config.SpannerConfig.NumOfChannels),
option.WithGRPCDialOption(pool))
} else {
parsedURL, err := url.Parse(endpoint)
if err != nil {
config.Logger.Error("Failed parsing spanner:endpoint" + err.Error())
return nil
}
if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
client, err = spanner.NewClientWithConfig(ctx, database,
cfg,
option.WithGRPCConnectionPool(config.SpannerConfig.NumOfChannels),
option.WithGRPCDialOption(pool),
option.WithEndpoint(config.SpannerConfig.Endpoint),
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
} else {
client, err = spanner.NewClientWithConfig(ctx, database,
cfg,
option.WithGRPCConnectionPool(config.SpannerConfig.NumOfChannels),
option.WithGRPCDialOption(pool),
option.WithEndpoint(config.SpannerConfig.Endpoint),
)
}
}
// Create the Spanner client
if err != nil {
config.Logger.Error("Failed to create client" + err.Error())
Expand Down
2 changes: 2 additions & 0 deletions third_party/datastax/proxy/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Spanner struct {
InstanceID string `yaml:"instanceId"`
DatabaseID string `yaml:"databaseId"`
ConfigTableName string `yaml:"configTableName"`
Endpoint string `yaml:"endpoint"`
Session Session `yaml:"Session"`
Operation Operation `yaml:"Operation"`
}
Expand Down Expand Up @@ -302,6 +303,7 @@ func Run(ctx context.Context, args []string) int {
ConfigTableName: listener.Spanner.ConfigTableName,
InstanceName: listener.Spanner.InstanceID,
GCPProjectID: listener.Spanner.ProjectID,
Endpoint: listener.Spanner.Endpoint,
DatabaseName: listener.Spanner.DatabaseID,
MaxSessions: uint64(listener.Spanner.Session.Max),
MinSessions: uint64(listener.Spanner.Session.Min),
Expand Down