Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Dataset events #572

Closed
wants to merge 9 commits into from
25 changes: 25 additions & 0 deletions dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact"

"github.com/flyteorg/flyteadmin/pkg/common"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -138,10 +140,33 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create a signed url. Error: %v", err)
}

// The artifact returned here has no relevant entry in the admin database, so call the artifact service synchronously
// to persist the information.
artifactCreate := &artifact.CreateArtifactRequest{
ArtifactKey: &core.ArtifactKey{
Project: req.Project,
Domain: req.Domain,
},
Spec: req.GetArtifactSpec(),
}
fmt.Printf("Will call artifact service with request: %v\n", artifactCreate)
// artifact := artifact_service.CreateArtifact

return &service.CreateUploadLocationResponse{
SignedUrl: resp.URL.String(),
NativeUrl: storagePath.String(),
ExpiresAt: timestamppb.New(time.Now().Add(req.ExpiresIn.AsDuration())),
// replace with created artifact
Artifact: &artifact.Artifact{
ArtifactId: &core.ArtifactID{
ArtifactKey: &core.ArtifactKey{
Project: req.Project,
Domain: req.Domain,
},
},
Uri: "flyte://random/returned/url",
Spec: req.GetArtifactSpec(),
},
}, nil
}

Expand Down
9 changes: 9 additions & 0 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ flyteadmin:
- "metadata"
- "admin"
useOffloadedWorkflowClosure: false
artifacts:
host: localhost
port: 50051
insecure: true
database:
postgres:
port: 30001
Expand Down Expand Up @@ -112,6 +116,11 @@ externalEvents:
eventsPublisher:
topicName: "bar"
eventTypes: all
cloudEvents:
enable: true
type: redis
redis:
addr: "localhost:6379"
Logger:
show-source: true
level: 5
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0
github.com/redis/go-redis/v9 v9.0.5
github.com/robfig/cron/v3 v3.0.0
github.com/sendgrid/sendgrid-go v3.10.0+incompatible
github.com/spf13/cobra v1.4.0
Expand Down Expand Up @@ -90,6 +91,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0 // indirect
github.com/dgraph-io/ristretto v0.0.3 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down Expand Up @@ -210,6 +212,8 @@ require (

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a

replace github.com/flyteorg/flyteidl => ../flyteidl

// Retracted versions
// This was published in error when attempting to create 1.5.1 Flyte release.
retract v1.1.94
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ github.com/bmatcuk/doublestar/v2 v2.0.3/go.mod h1:QMmcs3H2AUQICWhfzLXz+IYln8lRQm
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 h1:rRISKWyXfVxvoa702s91Zl5oREZTrR3yv+tXrrX7G/g=
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
Expand Down Expand Up @@ -247,6 +249,8 @@ github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70d
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
Expand Down Expand Up @@ -293,8 +297,6 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0=
github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE=
github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA=
github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA=
Expand Down Expand Up @@ -1279,6 +1281,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rhnvrm/simples3 v0.5.0/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down
38 changes: 38 additions & 0 deletions pkg/artifacts/artifact_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package artifacts

import (
"context"
"crypto/tls"
"fmt"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flytestdlib/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
if opts == nil {
// Initialize opts list to the potential number of options we will add. Initialization optimizes memory
// allocation.
opts = make([]grpc.DialOption, 0, 5)
}

if cfg.Insecure {
opts = append(opts, grpc.WithInsecure())
} else {
tlsConfig := &tls.Config{} //nolint
creds := credentials.NewTLS(tlsConfig)
opts = append(opts, grpc.WithTransportCredentials(creds))
}

return grpc.Dial(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), opts...)
}

func InitializeArtifactClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) artifact.ArtifactRegistryClient {
conn, err := NewArtifactConnection(ctx, cfg, opts...)
if err != nil {
logger.Panicf(ctx, "failed to initialize Artifact connection. Err: %s", err.Error())
panic(err)
}
return artifact.NewArtifactRegistryClient(conn)
}
7 changes: 7 additions & 0 deletions pkg/artifacts/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package artifacts

type Config struct {
Host string `json:"host"`
Port int `json:"port"`
Insecure bool `json:"insecure"`
}
62 changes: 46 additions & 16 deletions pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"time"

dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces"
repositoryInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flytestdlib/storage"

"github.com/NYTimes/gizmo/pubsub"
gizmoAWS "github.com/NYTimes/gizmo/pubsub/aws"
gizmoGCP "github.com/NYTimes/gizmo/pubsub/gcp"
Expand All @@ -13,25 +17,28 @@ import (
"github.com/flyteorg/flyteadmin/pkg/async"
cloudEventImplementations "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/implementations"
"github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces"
redisPublisher "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/redis"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations"
"github.com/flyteorg/flyteadmin/pkg/common"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
)

func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.CloudEventsConfig, scope promutils.Scope) interfaces.Publisher {
if !config.Enable {
func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Repository, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, cloudEventsConfig runtimeInterfaces.CloudEventsConfig, remoteDataConfig runtimeInterfaces.RemoteDataConfig, scope promutils.Scope) interfaces.Publisher {
if !cloudEventsConfig.Enable {
return implementations.NewNoopPublish()
}
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
switch config.Type {
reconnectAttempts := cloudEventsConfig.ReconnectAttempts
reconnectDelay := time.Duration(cloudEventsConfig.ReconnectDelaySeconds) * time.Second

var sender interfaces.Sender
switch cloudEventsConfig.Type {
case common.AWS:
snsConfig := gizmoAWS.SNSConfig{
Topic: config.EventsPublisherConfig.TopicName,
Topic: cloudEventsConfig.EventsPublisherConfig.TopicName,
}
snsConfig.Region = config.AWSConfig.Region
snsConfig.Region = cloudEventsConfig.AWSConfig.Region

var publisher pubsub.Publisher
var err error
Expand All @@ -44,12 +51,13 @@ func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.Cloud
if err != nil {
panic(err)
}
return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.PubSubSender{Pub: publisher}, scope, config.EventsPublisherConfig.EventTypes)
sender = &cloudEventImplementations.PubSubSender{Pub: publisher}

case common.GCP:
pubsubConfig := gizmoGCP.Config{
Topic: config.EventsPublisherConfig.TopicName,
Topic: cloudEventsConfig.EventsPublisherConfig.TopicName,
}
pubsubConfig.ProjectID = config.GCPConfig.ProjectID
pubsubConfig.ProjectID = cloudEventsConfig.GCPConfig.ProjectID
var publisher pubsub.MultiPublisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
Expand All @@ -60,30 +68,52 @@ func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.Cloud
if err != nil {
panic(err)
}
return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.PubSubSender{Pub: publisher}, scope, config.EventsPublisherConfig.EventTypes)
sender = &cloudEventImplementations.PubSubSender{Pub: publisher}

case cloudEventImplementations.Kafka:
saramaConfig := sarama.NewConfig()
var err error
saramaConfig.Version, err = sarama.ParseKafkaVersion(config.KafkaConfig.Version)
saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)
}
sender, err := kafka_sarama.NewSender(config.KafkaConfig.Brokers, saramaConfig, config.EventsPublisherConfig.TopicName)
kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName)
if err != nil {
panic(err)
}
client, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
client, err := cloudevents.NewClient(kafkaSender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
logger.Fatalf(ctx, "failed to create kafka client, %v", err)
panic(err)
}
return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.KafkaSender{Client: client}, scope, config.EventsPublisherConfig.EventTypes)
sender = &cloudEventImplementations.KafkaSender{Client: client}

case common.Redis:
var publisher pubsub.Publisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = redisPublisher.NewPublisher(cloudEventsConfig.RedisConfig)
return err
})
logger.Infof(ctx, "Using Redis cloud events publisher [%v]", publisher)

// Persistent errors should hard fail
if err != nil {
panic(err)
}
sender = &cloudEventImplementations.PubSubSender{Pub: publisher}

case common.Local:
fallthrough
default:
logger.Infof(ctx,
"Using default noop cloud events publisher implementation for config type [%s]", config.Type)
"Using default noop cloud events publisher implementation for config type [%s]", cloudEventsConfig.Type)
return implementations.NewNoopPublish()
}

if !cloudEventsConfig.TransformToCloudEvents {
return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes)
}
return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig)
}
Loading
Loading