Skip to content

Commit

Permalink
go/*: various updates for protocol changes (derive-refactor)
Browse files Browse the repository at this point in the history
These are mostly self-obvious, mechanical, or otherwise uninteresting
changes that reflect fallout of other changes.
  • Loading branch information
jgraettinger committed Apr 5, 2023
1 parent 7357155 commit 77e787c
Show file tree
Hide file tree
Showing 44 changed files with 684 additions and 1,423 deletions.
66 changes: 2 additions & 64 deletions go/flow/builds.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"database/sql"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"runtime"
Expand All @@ -23,7 +20,7 @@ import (
type BuildService struct {
baseURL *url.URL // URL to which buildIDs are joined.
builds map[string]*sharedBuild // All active builds.
gsClient *storage.Client // Google storage client which is initalized on first use.
gsClient *storage.Client // Google storage client which is initialized on first use.
mu sync.Mutex
}

Expand All @@ -43,11 +40,6 @@ type sharedBuild struct {
dbTempfile *os.File
dbErr error
dbOnce sync.Once

tsWorker *JSWorker
tsClient *http.Client
tsErr error
tsOnce sync.Once
}

// NewBuildService returns a new *BuildService.
Expand Down Expand Up @@ -116,14 +108,6 @@ func (b *Build) Extract(fn func(*sql.DB) error) error {
return fn(b.db)
}

// TypeScriptLocalSocket returns the TypeScript Unix Domain Socket of this Catalog.
// If a TypeScript worker isn't running, one is started
// and will be stopped on a future call to Build.Close().
func (b *Build) TypeScriptClient() (*http.Client, error) {
b.tsOnce.Do(func() { _ = b.initTypeScript() })
return b.tsClient, b.tsErr
}

// Close the Build. If this is the last remaining reference,
// then all allocated resources are cleaned up.
func (b *Build) Close() error {
Expand Down Expand Up @@ -184,46 +168,6 @@ func (b *Build) dbInit() (err error) {
return nil
}

func (b *Build) initTypeScript() (err error) {
defer func() { b.tsErr = err }()

var npmPackage []byte
if err = b.Extract(func(d *sql.DB) error {
npmPackage, err = catalog.LoadNPMPackage(b.db)
return err
}); err != nil {
return fmt.Errorf("loading NPM package: %w", err)
}

tsWorker, err := NewJSWorker(npmPackage)
if err != nil {
return fmt.Errorf("starting worker: %w", err)
}
b.tsWorker = tsWorker

// HTTP/S client which dials the TypeScript server over the loopback
// for both cleartext and (fake) HTTPS connections.
// The latter is a requirement for HTTP/2 support over unix domain sockets.
// See also: https://www.mailgun.com/blog/http-2-cleartext-h2c-client-example-go/
b.tsClient = &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", tsWorker.socketPath)
},
DialTLSContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", tsWorker.socketPath)
},
// Compression isn't desired over a local UDS transport.
DisableCompression: true,
// MaxConnsPerHost is the maximum concurrency with which
// we'll drive the lambda server.
MaxConnsPerHost: 8,
},
}

return nil
}

func (b *sharedBuild) destroy() error {
if b.db == nil {
// Nothing to close.
Expand All @@ -239,12 +183,6 @@ func (b *sharedBuild) destroy() error {
return fmt.Errorf("removing DB tempfile: %w", err)
}

if b.tsWorker == nil {
// Nothing to stop.
} else if err := b.tsWorker.Stop(); err != nil {
return fmt.Errorf("stopping typescript worker: %w", err)
}

return nil
}

Expand Down Expand Up @@ -273,7 +211,7 @@ func fetchResource(svc *BuildService, resource *url.URL) (path string, tempfile
}
defer r.Close()

if tempfile, err = ioutil.TempFile("", "build"); err != nil {
if tempfile, err = os.CreateTemp("", "build"); err != nil {
return "", nil, err
}
if _, err = io.Copy(tempfile, r); err != nil {
Expand Down
16 changes: 6 additions & 10 deletions go/flow/builds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"net/url"
"path"
"runtime"
"testing"

Expand Down Expand Up @@ -54,18 +55,19 @@ func TestBuildReferenceCounting(t *testing.T) {
}

func TestBuildLazyInitAndReuse(t *testing.T) {
var dir = t.TempDir()
var args = bindings.BuildArgs{
Context: context.Background(),
FileRoot: "./testdata",
BuildAPI_Config: pf.BuildAPI_Config{
BuildId: "a-build-id",
Directory: t.TempDir(),
BuildDb: path.Join(dir, "a-build-id"),
Source: "file:///specs_test.flow.yaml",
SourceType: pf.ContentType_CATALOG,
}}
require.NoError(t, bindings.BuildCatalog(args))

var builds, err = NewBuildService("file://" + args.Directory + "/")
var builds, err = NewBuildService("file://" + dir + "/")
require.NoError(t, err)

// Open. Expect DB is not initialized until first use.
Expand All @@ -80,7 +82,7 @@ func TestBuildLazyInitAndReuse(t *testing.T) {
collection, err = catalog.LoadCollection(db, "example/collection")
return err
}))
require.Equal(t, "example/collection", collection.Collection.String())
require.Equal(t, "example/collection", collection.Name.String())

// Database was initialized.
var db1 = b1.db
Expand All @@ -95,12 +97,6 @@ func TestBuildLazyInitAndReuse(t *testing.T) {
return nil
}))

// Our fixture doesn't build a typescript package, so initialization
// fails with an error. Expect the error is shared.
_, err = b1.TypeScriptClient()
require.Error(t, err)
require.Equal(t, err, b2.tsErr)

// Close both builds, dropping the reference count to zero.
require.NoError(t, b1.Close())
require.NoError(t, b2.Close())
Expand All @@ -114,7 +110,7 @@ func TestBuildLazyInitAndReuse(t *testing.T) {
collection, err = catalog.LoadCollection(db, "example/collection")
return err
}))
require.Equal(t, "example/collection", collection.Collection.String())
require.Equal(t, "example/collection", collection.Name.String())
}

func TestInitOfMissingBuild(t *testing.T) {
Expand Down
23 changes: 12 additions & 11 deletions go/flow/converge.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/estuary/flow/go/labels"
pf "github.com/estuary/flow/go/protocols/flow"
"github.com/estuary/flow/go/protocols/ops"
"go.gazette.dev/core/broker/client"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/consumer"
Expand All @@ -21,7 +22,7 @@ func ListShardsRequest(task pf.Task) pc.ListRequest {
Selector: pb.LabelSelector{
Include: pb.MustLabelSet(
labels.TaskName, task.TaskName(),
labels.TaskType, taskType(task),
labels.TaskType, taskType(task).String(),
),
},
}
Expand All @@ -34,7 +35,7 @@ func ListRecoveryLogsRequest(task pf.Task) pb.ListRequest {
Include: pb.MustLabelSet(
glabels.ContentType, glabels.ContentType_RecoveryLog,
labels.TaskName, task.TaskName(),
labels.TaskType, taskType(task),
labels.TaskType, taskType(task).String(),
),
},
}
Expand All @@ -44,7 +45,7 @@ func ListRecoveryLogsRequest(task pf.Task) pb.ListRequest {
func ListPartitionsRequest(collection *pf.CollectionSpec) pb.ListRequest {
return pb.ListRequest{
Selector: pf.LabelSelector{
Include: pb.MustLabelSet(labels.Collection, collection.Collection.String()),
Include: pb.MustLabelSet(labels.Collection, collection.Name.String()),
},
}
}
Expand Down Expand Up @@ -356,14 +357,14 @@ func ActivationChanges(
for _, collection := range collections {
var resp, err = client.ListAllJournals(ctx, jc, ListPartitionsRequest(collection))
if err != nil {
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Collection, err)
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Name, err)
}

var desired = MapPartitionsToCurrentSplits(resp.Journals)
journals, err = CollectionChanges(collection, resp.Journals, desired, journals)

if err != nil {
return nil, nil, fmt.Errorf("processing collection %s: %w", collection.Collection, err)
return nil, nil, fmt.Errorf("processing collection %s: %w", collection.Name, err)
}
}

Expand Down Expand Up @@ -411,7 +412,7 @@ func DeletionChanges(
for _, collection := range collections {
var resp, err = client.ListAllJournals(ctx, jc, ListPartitionsRequest(collection))
if err != nil {
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Collection, err)
return nil, nil, fmt.Errorf("listing partitions of %s: %w", collection.Name, err)
}

for _, cur := range resp.Journals {
Expand Down Expand Up @@ -452,14 +453,14 @@ func DeletionChanges(
}

// taskType returns the label matching this Task.
func taskType(task pf.Task) string {
func taskType(task pf.Task) ops.TaskType {
switch task.(type) {
case *pf.CaptureSpec:
return labels.TaskTypeCapture
case *pf.DerivationSpec:
return labels.TaskTypeDerivation
return ops.TaskType_capture
case *pf.CollectionSpec:
return ops.TaskType_derivation
case *pf.MaterializationSpec:
return labels.TaskTypeMaterialization
return ops.TaskType_materialization
default:
panic(task)
}
Expand Down
27 changes: 14 additions & 13 deletions go/flow/converge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"path"
"testing"

"github.com/bradleyjkemp/cupaloy"
Expand All @@ -24,20 +25,20 @@ func TestConvergence(t *testing.T) {
FileRoot: "./testdata",
BuildAPI_Config: pf.BuildAPI_Config{
BuildId: "fixture",
Directory: t.TempDir(),
BuildDb: path.Join(t.TempDir(), "build.db"),
Source: "file:///specs_test.flow.yaml",
SourceType: pf.ContentType_CATALOG,
}}
require.NoError(t, bindings.BuildCatalog(args))

var collection *pf.CollectionSpec
var derivation *pf.DerivationSpec
var derivation *pf.CollectionSpec

require.NoError(t, catalog.Extract(args.OutputPath(), func(db *sql.DB) (err error) {
require.NoError(t, catalog.Extract(args.BuildDb, func(db *sql.DB) (err error) {
if collection, err = catalog.LoadCollection(db, "example/collection"); err != nil {
return err
}
derivation, err = catalog.LoadDerivation(db, "example/derivation")
derivation, err = catalog.LoadCollection(db, "example/derivation")
return err
}))

Expand Down Expand Up @@ -70,7 +71,7 @@ func TestConvergence(t *testing.T) {
)))
require.NoError(t, err)

shardSpec1, err := BuildShardSpec(derivation.ShardTemplate,
shardSpec1, err := BuildShardSpec(derivation.Derivation.ShardTemplate,
labels.EncodeRange(pf.RangeSpec{
KeyBegin: 0x10000000,
KeyEnd: 0x2fffffff,
Expand All @@ -79,9 +80,9 @@ func TestConvergence(t *testing.T) {
}, pf.LabelSet{}),
)
require.NoError(t, err)
logSpec1 := BuildRecoverySpec(derivation.RecoveryLogTemplate, shardSpec1)
logSpec1 := BuildRecoverySpec(derivation.Derivation.RecoveryLogTemplate, shardSpec1)

shardSpec2, err := BuildShardSpec(derivation.ShardTemplate,
shardSpec2, err := BuildShardSpec(derivation.Derivation.ShardTemplate,
labels.EncodeRange(pf.RangeSpec{
KeyBegin: 0x30000000,
KeyEnd: 0x3fffffff,
Expand All @@ -90,9 +91,9 @@ func TestConvergence(t *testing.T) {
}, pf.LabelSet{}),
)
require.NoError(t, err)
logSpec2 := BuildRecoverySpec(derivation.RecoveryLogTemplate, shardSpec2)
logSpec2 := BuildRecoverySpec(derivation.Derivation.RecoveryLogTemplate, shardSpec2)

shardSpec3, err := BuildShardSpec(derivation.ShardTemplate,
shardSpec3, err := BuildShardSpec(derivation.Derivation.ShardTemplate,
labels.EncodeRange(pf.RangeSpec{
KeyBegin: 0x30000000,
KeyEnd: 0x3fffffff,
Expand All @@ -101,7 +102,7 @@ func TestConvergence(t *testing.T) {
}, pf.LabelSet{}),
)
require.NoError(t, err)
logSpec3 := BuildRecoverySpec(derivation.RecoveryLogTemplate, shardSpec3)
logSpec3 := BuildRecoverySpec(derivation.Derivation.RecoveryLogTemplate, shardSpec3)

var allPartitions = []pb.ListResponse_Journal{
{Spec: *partitionSpec1, ModRevision: 11},
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestConvergence(t *testing.T) {
})

t.Run("shard-split-errors", func(t *testing.T) {
var shard, err = BuildShardSpec(derivation.ShardTemplate,
var shard, err = BuildShardSpec(derivation.Derivation.ShardTemplate,
labels.EncodeRange(pf.RangeSpec{
KeyEnd: 0x10000000,
RClockEnd: 0x10000000,
Expand Down Expand Up @@ -330,7 +331,7 @@ func TestConvergence(t *testing.T) {
var ctx = context.Background()
var jc = &mockJournals{
collections: map[string]*pb.ListResponse{
collection.Collection.String(): {Journals: allPartitions},
collection.Name.String(): {Journals: allPartitions},
},
logs: map[string]*pb.ListResponse{
derivation.TaskName(): {Journals: allLogs},
Expand Down Expand Up @@ -361,7 +362,7 @@ func TestConvergence(t *testing.T) {
var ctx = context.Background()
var jc = &mockJournals{
collections: map[string]*pb.ListResponse{
collection.Collection.String(): {Journals: allPartitions},
collection.Name.String(): {Journals: allPartitions},
},
logs: map[string]*pb.ListResponse{
derivation.TaskName(): {Journals: allLogs},
Expand Down
6 changes: 3 additions & 3 deletions go/flow/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewMapper(
func PartitionPointers(spec *pf.CollectionSpec) []string {
var ptrs = make([]string, len(spec.PartitionFields))
for i, field := range spec.PartitionFields {
ptrs[i] = pf.GetProjectionByField(field, spec.Projections).Ptr
ptrs[i] = spec.GetProjection(field).Ptr
}
return ptrs
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (m *Mapper) Map(mappable message.Mappable) (pb.Journal, string, error) {
"journal": applySpec.Name,
"readThrough": readThrough,
}).Info("created partition")
createdPartitionsCounters.WithLabelValues(msg.Spec.Collection.String()).Inc()
createdPartitionsCounters.WithLabelValues(msg.Spec.Name.String()).Inc()
}

m.journals.Mu.RLock()
Expand Down Expand Up @@ -274,7 +274,7 @@ func (m Mappable) SetUUID(uuid message.UUID) {
func NewAcknowledgementMessage(spec *pf.CollectionSpec) Mappable {
return Mappable{
Spec: spec,
Doc: append(json.RawMessage(nil), spec.AckJsonTemplate...),
Doc: append(json.RawMessage(nil), spec.AckTemplateJson...),
}
}

Expand Down
Loading

0 comments on commit 77e787c

Please sign in to comment.