Skip to content

Commit

Permalink
Add acceptance test for PubSub
Browse files Browse the repository at this point in the history
  • Loading branch information
FelisiaM committed Jun 3, 2024
1 parent 273f67b commit 2b14224
Show file tree
Hide file tree
Showing 11 changed files with 478 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ cloud-service-broker
*.brokerpak
.csb.db
integration-tests.test
acceptance-tests/acceptance-tests.test
44 changes: 44 additions & 0 deletions acceptance-tests/apps/pubsubapp/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module pubsubapp

go 1.22.3

require (
cloud.google.com/go/pubsub v1.38.0
github.com/cloudfoundry-community/go-cfenv v1.18.0
github.com/mitchellh/mapstructure v1.5.0
google.golang.org/api v0.182.0
)

require (
cloud.google.com/go v0.114.0 // indirect
cloud.google.com/go/auth v0.4.2 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
)
191 changes: 191 additions & 0 deletions acceptance-tests/apps/pubsubapp/go.sum

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions acceptance-tests/apps/pubsubapp/internal/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package app

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"google.golang.org/api/option"
"log"
"net/http"
"pubsubapp/internal/credentials"
)

func App(creds credentials.PubSubCredentials) http.HandlerFunc {
client, _ := pubsub.NewClient(context.Background(), creds.ProjectID, option.WithCredentialsJSON([]byte(creds.Credentials)))

return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodHead:
aliveness(w, r)
case http.MethodGet:
handleReceive(w, r, client, creds.SubscriptionName)
case http.MethodPut:
handlePublish(w, r, client, creds.TopicName)
default:
fail(w, http.StatusMethodNotAllowed, http.StatusText(http.StatusMethodNotAllowed))
}
}
}

func aliveness(w http.ResponseWriter, r *http.Request) {
log.Printf("Handled aliveness test.")
w.WriteHeader(http.StatusNoContent)
}

func fail(w http.ResponseWriter, code int, format string, a ...any) {
msg := fmt.Sprintf(format, a...)
log.Println(msg)
http.Error(w, msg, code)
}
45 changes: 45 additions & 0 deletions acceptance-tests/apps/pubsubapp/internal/app/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package app

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"io"
"net/http"
"sync"
)

func handlePublish(w http.ResponseWriter, r *http.Request, client *pubsub.Client, topicName string) {
ctx := context.Background()

data, err := io.ReadAll(r.Body)
if err != nil {
fail(w, http.StatusBadRequest, "could not read body: %q", err)
return
}
defer r.Body.Close()
body := string(data)

t := client.Topic(topicName)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte(body),
})

var finished sync.WaitGroup
finished.Add(1)

go func(res *pubsub.PublishResult) {
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
id, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Fprintf(w, "Failed to publish: %v", err)
return
}
fmt.Fprintf(w, "Published message msg ID: %v\n", id)
finished.Done()
}(result)

w.WriteHeader(http.StatusCreated)
}
33 changes: 33 additions & 0 deletions acceptance-tests/apps/pubsubapp/internal/app/receive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package app

import (
"cloud.google.com/go/pubsub"
"context"
"log"
"net/http"
"time"
)

func handleReceive(w http.ResponseWriter, r *http.Request, client *pubsub.Client, subscriptionName string) {
sub := client.Subscription(subscriptionName)

// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()

err := sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
w.Write(msg.Data)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "text/html")

msg.Ack()

log.Println("Receive done.")
})
if err != nil {
fail(w, http.StatusInternalServerError, "sub.Receive: %v", err)
return
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package credentials

import (
"fmt"
"github.com/cloudfoundry-community/go-cfenv"
"github.com/mitchellh/mapstructure"
)

type PubSubCredentials struct {
Credentials string `mapstructure:"credentials"`
TopicName string `mapstructure:"topic_name"`
SubscriptionName string `mapstructure:"subscription_name"`
ProjectID string `mapstructure:"ProjectId"`
}

func Read() (PubSubCredentials, error) {
app, err := cfenv.Current()
if err != nil {
return PubSubCredentials{}, fmt.Errorf("error reading app env: %w", err)
}
svs, err := app.Services.WithTag("pubsub")
if err != nil {
return PubSubCredentials{}, fmt.Errorf("error reading PubSub service details")
}

return readBinding(svs[0].Credentials)
}

func readBinding(creds any) (PubSubCredentials, error) {
var r PubSubCredentials
if err := mapstructure.Decode(creds, &r); err != nil {
return r, fmt.Errorf("failed to decode credentials: %w", err)
}

if r.Credentials == "" || r.TopicName == "" || r.ProjectID == "" {
return r, fmt.Errorf("parsed credentials are not valid")
}

return r, nil
}
32 changes: 32 additions & 0 deletions acceptance-tests/apps/pubsubapp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"fmt"
"log"
"net/http"
"os"
"pubsubapp/internal/app"
"pubsubapp/internal/credentials"
)

func main() {
log.Println("Starting.")

log.Println("Reading credentials.")
creds, err := credentials.Read()
if err != nil {
panic(err)
}

port := port()
log.Printf("Listening on port: %s", port)
http.HandleFunc("/", app.App(creds))
http.ListenAndServe(port, nil)
}

func port() string {
if port := os.Getenv("PORT"); port != "" {
return fmt.Sprintf(":%s", port)
}
return ":8080"
}
1 change: 1 addition & 0 deletions acceptance-tests/helpers/apps/testapps.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
StackdriverTrace AppCode = "stackdrivertraceapp"
JDBCTestApp AppCode = "jdbctestapp"
SpringStorageApp AppCode = "springstorageapp"
PubSubApp AppCode = "pubsubapp"
)

func (a AppCode) Dir() string {
Expand Down
45 changes: 45 additions & 0 deletions acceptance-tests/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package acceptance_test

import (
"csbbrokerpakgcp/acceptance-tests/helpers/apps"
"csbbrokerpakgcp/acceptance-tests/helpers/matchers"
"csbbrokerpakgcp/acceptance-tests/helpers/random"
"csbbrokerpakgcp/acceptance-tests/helpers/services"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("PubSub", Label("pubsub"), func() {
It("can be accessed by an app", func() {
By("creating a service instance")
serviceInstance := services.CreateInstance(
"csb-google-pubsub",
"default",
services.WithParameters(map[string]any{"subscription_name": random.Name()}))
defer serviceInstance.Delete()

By("pushing the unstarted app twice")
publisherApp := apps.Push(apps.WithApp(apps.PubSubApp))
subscriberApp := apps.Push(apps.WithApp(apps.PubSubApp))
defer apps.Delete(publisherApp, subscriberApp)

By("binding the apps to the storage service instance")
binding := serviceInstance.BindWithParams(publisherApp, `{"role":"pubsub.editor"}`)
serviceInstance.BindWithParams(subscriberApp, `{"role":"pubsub.editor"}`)

By("starting the apps")
apps.Start(publisherApp, subscriberApp)

By("checking that the app environment has a credhub reference for credentials")
Expect(binding.Credential()).To(matchers.HaveCredHubRef)

By("publishing a message with the publisher app")
messageData := random.Hexadecimal()
publisherApp.PUT(messageData, "")

By("retrieving a message with the subscriber app")
got := subscriberApp.GET("").String()
Expect(got).To(Equal(messageData))
})
})
7 changes: 7 additions & 0 deletions scripts/push-broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ if [[ -z "$GSB_SERVICE_CSB_GOOGLE_REDIS_PLANS" ]]; then
fi
echo " GSB_SERVICE_CSB_GOOGLE_REDIS_PLANS: $(echo "$GSB_SERVICE_CSB_GOOGLE_REDIS_PLANS" | jq @json)" >>$cfmf

if [[ -z "$GSB_SERVICE_CSB_GOOGLE_PUBSUB_PLANS" ]]; then
echo "Missing GSB_SERVICE_CSB_GOOGLE_PUBSUB_PLANS variable"
exit 1
fi
echo " GSB_SERVICE_CSB_GOOGLE_PUBSUB_PLANS: $(echo "$GSB_SERVICE_CSB_GOOGLE_PUBSUB_PLANS" | jq @json)" >>$cfmf


cf push --no-start -f "${cfmf}" --var app=${APP_NAME}

if [[ -z ${MSYQL_INSTANCE} ]]; then
Expand Down

0 comments on commit 2b14224

Please sign in to comment.