Skip to content

Commit

Permalink
PTEUDO-1991 on restore failure, create a fresh schema
Browse files Browse the repository at this point in the history
    When a database is partially or fully restored, we fail to
    migrate to it. This is problematic as user can not easily
    recover from this situation.

    We will attempt to version the schema by migrating whatever
    is there and starting with a new schema. In situations where
    the issue is the pgdump itself, we will be creating multiple
    copies of an empty schema which is cheap. In more typical
    situations, we will move a partially or fully migrated schema
    to another location and attempt a restore with a fresh schema.

    - change restore to return sql errors rather than exit codes
    - sanitize dsn emitted by restore
  • Loading branch information
drewwells committed Nov 15, 2024
1 parent 600e772 commit 377979c
Show file tree
Hide file tree
Showing 20 changed files with 739 additions and 281 deletions.
5 changes: 4 additions & 1 deletion cmd/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ defaultMajorVersion: 15
passwordConfig:
passwordComplexity: enabled
minPasswordLength: 15
passwordRotationPeriod: 60
# 60s is a common error requeue in the codebase. Set this
# to something other than 60s to distinguish between
# success requeue and those error requeues.
passwordRotationPeriod: 65s

systemFunctions:
ib_realm: "ib_realm"
Expand Down
140 changes: 112 additions & 28 deletions internal/controller/databasecontroller_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ import (
"github.com/infobloxopen/db-controller/pkg/pgctl"
)

var _ = Describe("claim migrate", func() {
var _ = Describe("Migrate", func() {

// Define utility constants for object names and testing timeouts/durations and intervals.

var (
ctxLogger context.Context
cancel func()
success = ctrl.Result{Requeue: false, RequeueAfter: 60}
success = ctrl.Result{Requeue: false, RequeueAfter: 65 * time.Second}
)

BeforeEach(func() {
Expand Down Expand Up @@ -93,12 +93,11 @@ var _ = Describe("claim migrate", func() {
// Namespace: "default",
// }

claim := &persistancev1.DatabaseClaim{}

kctx := context.Background()

BeforeEach(func() {

claim := &persistancev1.DatabaseClaim{}
By("ensuring the resource does not exist")
Expect(k8sClient.Get(kctx, typeNamespacedName, claim)).To(HaveOccurred())

Expand All @@ -119,7 +118,7 @@ var _ = Describe("claim migrate", func() {
},
Spec: persistancev1.DatabaseClaimSpec{
Class: ptr.To(""),
DatabaseName: "postgres",
DatabaseName: "sample_app",
SecretName: claimSecretName,
EnableSuperUser: ptr.To(false),
EnableReplicationRole: ptr.To(false),
Expand All @@ -128,6 +127,7 @@ var _ = Describe("claim migrate", func() {
SourceDataFrom: &persistancev1.SourceDataFrom{
Type: "database",
Database: &persistancev1.Database{
DSN: testDSN,
SecretRef: &persistancev1.SecretRef{
Name: sourceSecretName,
Namespace: "default",
Expand Down Expand Up @@ -190,10 +190,12 @@ var _ = Describe("claim migrate", func() {

_, err := controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
var claim persistancev1.DatabaseClaim
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &claim)).NotTo(HaveOccurred())
Expect(claim.Status.Error).To(Equal(""))
By("Ensuring the active db connection info is set")
Eventually(func() *persistancev1.DatabaseClaimConnectionInfo {
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, claim)).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &claim)).NotTo(HaveOccurred())
return claim.Status.ActiveDB.ConnectionInfo
}).ShouldNot(BeNil())

Expand Down Expand Up @@ -223,16 +225,18 @@ var _ = Describe("claim migrate", func() {
Expect(dsn.Redacted()).To(Equal(redacted))
})

It("Migrate", func() {
It("Populate a new database", func() {

Expect(controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})).To(Equal(success))
var dbc persistancev1.DatabaseClaim
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(claim.Status.Error).To(Equal(""))
Expect(dbc.Status.Error).To(Equal(""))
By("Ensuring the active db connection info is set")
Expect(controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})).To(Equal(success))

Eventually(func() *persistancev1.DatabaseClaimConnectionInfo {
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
return claim.Status.ActiveDB.ConnectionInfo
return dbc.Status.ActiveDB.ConnectionInfo
}).ShouldNot(BeNil())

hostParams, err := hostparams.New(controllerReconciler.Config.Viper, &dbc)
Expand All @@ -242,7 +246,7 @@ var _ = Describe("claim migrate", func() {

By(fmt.Sprintf("Mocking a RDS pod to look like crossplane set it up: %s", fakeCPSecretName))
fakeCli, fakeDSN, fakeCancel := dockerdb.MockRDS(GinkgoT(), ctxLogger, k8sClient, fakeCPSecretName, "migrate", dbc.Spec.DatabaseName)
DeferCleanup(fakeCancel)
defer fakeCancel()

fakeCPSecret := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -269,7 +273,7 @@ var _ = Describe("claim migrate", func() {
By("Check source DSN looks roughly correct")
activeDB := dbc.Status.ActiveDB
Expect(activeDB.ConnectionInfo).NotTo(BeNil())
compareDSN := strings.Replace(testDSN, "//postgres:postgres", fmt.Sprintf("//%s_b:", migratedowner), 1)
compareDSN := strings.Replace(testDSN, "//postgres:postgres", fmt.Sprintf("//%s_a:", migratedowner), 1)
Expect(activeDB.ConnectionInfo.Uri()).To(Equal(compareDSN))

By("Check target DSN looks roughly correct")
Expand All @@ -282,28 +286,18 @@ var _ = Describe("claim migrate", func() {
var tempCreds corev1.Secret
// temp-migrate-dbclaim-creds
Expect(k8sClient.Get(ctxLogger, types.NamespacedName{Name: "temp-" + claimSecretName, Namespace: "default"}, &tempCreds)).NotTo(HaveOccurred())
for k, v := range tempCreds.Data {
logger.Info("tempcreds", k, string(v))
}

By("CR reconciles but must be requeued to perform migration, reconcile manually for test")
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(res.Requeue).To(BeFalse())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))
By("Requeue for as long as Controller requests it")
Eventually(func() reconcile.Result {
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
return res
}).Should(Equal(success))

By("Waiting to disable source, reconcile manually again")
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(res.RequeueAfter).To(Equal(time.Duration(60 * time.Second)))
By("Verify migration is complete on this reconcile")
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(res.Requeue).To(BeFalse())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))
Expect(dbc.Status.MigrationState).To(Equal(pgctl.S_Completed.String()))

activeDB = dbc.Status.ActiveDB
Expect(activeDB.ConnectionInfo).NotTo(BeNil())
Expect(activeDB.ConnectionInfo.Uri()).To(Equal(compareDSN))
Expand All @@ -316,5 +310,95 @@ var _ = Describe("claim migrate", func() {

})

It("Existing schema", func() {

Expect(controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})).To(Equal(success))
var dbc persistancev1.DatabaseClaim
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))
By("Ensuring the active db connection info is set")
Eventually(func() *persistancev1.DatabaseClaimConnectionInfo {
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
return dbc.Status.ActiveDB.ConnectionInfo
}).ShouldNot(BeNil())

oldDB, err := url.Parse(testDSN)
Expect(err).NotTo(HaveOccurred())
aConn := dbc.Status.ActiveDB.ConnectionInfo
Expect(aConn.Port).To(Equal(oldDB.Port()))

logger.Info("what", "status", dbc.Status.ActiveDB.ConnectionInfo.Uri())

hostParams, err := hostparams.New(controllerReconciler.Config.Viper, &dbc)
Expect(err).ToNot(HaveOccurred())

fakeCPSecretName := fmt.Sprintf("%s-%s-%s", env, resourceName, hostParams.Hash())

By(fmt.Sprintf("Mocking a RDS pod to look like crossplane set it up: %s", fakeCPSecretName))
fakeCli, fakeDSN, fakeCancel := dockerdb.MockRDS(GinkgoT(), ctxLogger, k8sClient, fakeCPSecretName, "migrate", dbc.Spec.DatabaseName)
defer fakeCancel()

dockerdb.MustSQL(ctxLogger, fakeCli, "testdata/mock.sql")

fakeCPSecret := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: fakeCPSecretName,
Namespace: "default",
},
}
nname := types.NamespacedName{
Name: fakeCPSecretName,
Namespace: "default",
}
Eventually(k8sClient.Get(ctxLogger, nname, &fakeCPSecret)).Should(Succeed())
logger.Info("debugsecret", "rdssecret", fakeCPSecret)

By("Disabling UseExistingSource")
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
dbc.Spec.UseExistingSource = ptr.To(false)
Expect(k8sClient.Update(ctxLogger, &dbc)).NotTo(HaveOccurred())

res, err := controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(dbc.Status.Error).To(Equal(""))
Expect(res.Requeue).To(BeTrue())

By("Check DSNs looks roughly correct")
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
activeDB := dbc.Status.ActiveDB
Expect(activeDB.ConnectionInfo).NotTo(BeNil())
compareDSN := strings.Replace(testDSN, "//postgres:postgres", fmt.Sprintf("//%s_a:", migratedowner), 1)
Expect(activeDB.ConnectionInfo.Uri()).To(Equal(compareDSN))

newDB := dbc.Status.NewDB
compareDSN = strings.Replace(fakeDSN, "//migrate:postgres", fmt.Sprintf("//%s_a:", migratedowner), 1)

Expect(newDB.ConnectionInfo).NotTo(BeNil())
Expect(newDB.ConnectionInfo.Uri()).To(Equal(compareDSN))

By("Ensuring migration is in progress")
Expect(dbc.Status.MigrationState).To(Equal(pgctl.S_MigrationInProgress.String()))
var tempCreds corev1.Secret
// temp-migrate-dbclaim-creds
Expect(k8sClient.Get(ctxLogger, types.NamespacedName{Name: "temp-" + claimSecretName, Namespace: "default"}, &tempCreds)).NotTo(HaveOccurred())

By("Requeue for as long as Controller requests it")
Eventually(func() reconcile.Result {

res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
return res
}).Should(Equal(success))

Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))

Expect(dbc.Status.MigrationState).To(Equal(pgctl.S_Completed.String()))
activeDB = dbc.Status.ActiveDB
Expect(activeDB.ConnectionInfo).NotTo(BeNil())
Expect(activeDB.ConnectionInfo.Uri()).To(Equal(compareDSN))

})

})
})
13 changes: 4 additions & 9 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"database/sql"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -130,22 +131,16 @@ var _ = BeforeSuite(func() {
Expect(k8sClient).NotTo(BeNil())

now := time.Now()
testdb, testDSN, cleanupTestDB = dockerdb.Run(dockerdb.Config{
logger.Info("start postgres setup")
testdb, testDSN, cleanupTestDB = dockerdb.Run(logger, dockerdb.Config{
Database: "postgres",
Username: "postgres",
Password: "postgres",
DockerTag: "15",
})
logger.Info("postgres_setup_took", "duration", time.Since(now))

// Mock table for testing migrations
_, err = testdb.Exec(`CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)`)
Expect(err).NotTo(HaveOccurred())
dockerdb.MustSQL(context.TODO(), testdb, "testdata/mock.sql")

// Setup controller
By("setting up the database controller")
Expand Down
68 changes: 68 additions & 0 deletions internal/controller/testdata/mock.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- Create a table with a primary key and a sequence
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL
);

-- Create another table with a foreign key reference to users
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
user_id INT REFERENCES users(user_id) ON DELETE CASCADE,
order_total NUMERIC(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create a table with a check constraint
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price NUMERIC(10, 2) NOT NULL CHECK (price > 0)
);

-- Create a function to calculate the total price of orders for a user
CREATE OR REPLACE FUNCTION calculate_user_total(userId INT) RETURNS NUMERIC AS $$
DECLARE
total NUMERIC(10, 2);
BEGIN
SELECT COALESCE(SUM(order_total), 0) INTO total
FROM orders WHERE user_id = userId;
RETURN total;
END;
$$ LANGUAGE plpgsql;

-- Create an index on the orders table to optimize queries by created_at
CREATE INDEX idx_orders_created_at ON orders(created_at);

-- Create a materialized view that summarizes total orders per user
CREATE MATERIALIZED VIEW user_order_totals AS
SELECT u.user_id, u.username, COALESCE(SUM(o.order_total), 0) AS total_spent
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
GROUP BY u.user_id, u.username;

-- Create a trigger to log inserts into the orders table
CREATE TABLE order_logs (
log_id SERIAL PRIMARY KEY,
order_id INT NOT NULL,
log_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE OR REPLACE FUNCTION log_order_insert() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO order_logs (order_id) VALUES (NEW.order_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER after_order_insert
AFTER INSERT ON orders
FOR EACH ROW EXECUTE FUNCTION log_order_insert();

-- Create a table with an exclusion constraint
CREATE TABLE event_schedule (
event_id SERIAL PRIMARY KEY,
event_name VARCHAR(100) NOT NULL,
event_time TSRANGE NOT NULL,
EXCLUDE USING gist (event_time WITH &&) -- Ensures no overlapping events
);
22 changes: 20 additions & 2 deletions internal/dockerdb/mockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package dockerdb
import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"net/url"
"strings"

Expand All @@ -11,12 +13,28 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// MustSQL will open a file and exec it on the database.
// It will panic on any errors
func MustSQL(ctx context.Context, db *sql.DB, fileName string, args ...any) {
bs, err := ioutil.ReadFile(fileName)
if err != nil {
panic(err)
}
_, err = db.ExecContext(ctx, string(bs), args...)
if err != nil {
panic(err)
}
}

func MockRDS(t GinkgoTInterface, ctx context.Context, cli client.Client, secretName, userName, databaseName string) (*sql.DB, string, func()) {
t.Helper()

dbCli, fakeDSN, clean := Run(Config{
logger := log.FromContext(ctx).WithName("mockrds")

dbCli, fakeDSN, clean := Run(logger, Config{
Database: databaseName,
Username: userName,
Password: "postgres",
Expand Down Expand Up @@ -49,7 +67,7 @@ func MockRDS(t GinkgoTInterface, ctx context.Context, cli client.Client, secretN

return dbCli, fakeDSN, func() {
if err := cli.Delete(ctx, secret); err != nil {
t.Logf("failed to delete secret: %v", err)
panic(fmt.Sprintf("failed to delete secret: %v", err))
}
clean()
}
Expand Down
Loading

0 comments on commit 377979c

Please sign in to comment.