-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprocess_s3.go
142 lines (118 loc) · 3.3 KB
/
process_s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"fmt"
"io"
"os"
"os/exec"
"strings"
"github.com/schollz/progressbar/v3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/rs/zerolog/log"
)
type progressReader struct {
reader io.Reader
bar *progressbar.ProgressBar
}
func (pr *progressReader) Read(p []byte) (int, error) {
n, err := pr.reader.Read(p)
if err != nil {
return 0, err
}
err = pr.bar.Add(n)
if err != nil {
return 0, err
}
return n, nil
}
func restoreBackupFromS3() (string, error) {
log.Info().Msg("Downloading SQL file from S3...")
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"), // Update with your region
Credentials: credentials.NewStaticCredentials(awsAccessKey, awsSecretKey, ""),
})
if err != nil {
return "", fmt.Errorf("failed to create AWS session: %v", err)
}
s3Client := s3.New(sess)
// Check if we need to fetch the latest file key
if s3FileKey == "latest" {
latestFileKey, err := getLatestFileKey(s3Client)
if err != nil {
return "", fmt.Errorf("failed to get latest file key: %v", err)
}
s3FileKey = latestFileKey
}
tempFile, err := os.CreateTemp("", "backup-*.dump")
if err != nil {
return "", fmt.Errorf("failed to create temporary file: %v", err)
}
defer tempFile.Close()
resp, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s3BucketName),
Key: aws.String(s3FileKey),
})
if err != nil {
return "", fmt.Errorf("failed to download file from S3: %v", err)
}
defer resp.Body.Close()
// Create a progress bar
bar := progressbar.DefaultBytes(
*resp.ContentLength,
"Downloading",
)
// Create a progress reader
progressReader := &progressReader{
reader: resp.Body,
bar: bar,
}
// Copy the data from the progress reader to the temp file
_, err = io.Copy(tempFile, progressReader)
if err != nil {
return "", fmt.Errorf("failed to read from S3 response body: %v", err)
}
fileName := tempFile.Name()
_ = restoreBackupToDB(fileName)
return tempFile.Name(), nil
}
func getLatestFileKey(s3Client *s3.S3) (string, error) {
resp, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s3BucketName),
Key: aws.String("latest_backup.txt"),
})
if err != nil {
return "", fmt.Errorf("failed to get latest_backup.txt: %v", err)
}
defer resp.Body.Close()
content, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read latest_backup.txt content: %v", err)
}
latestFileKey := strings.TrimSpace(string(content))
if latestFileKey == "" {
return "", fmt.Errorf("latest_backup.txt is empty")
}
return latestFileKey, nil
}
func restoreBackupToDB(filePath string) error {
cmd := exec.Command(
"pg_restore",
"-h", dbPool.Config().ConnConfig.Host,
"-p", fmt.Sprintf("%d", dbPool.Config().ConnConfig.Port),
"-U", dbPool.Config().ConnConfig.User,
"-d", dbPool.Config().ConnConfig.Database,
"-j", fmt.Sprintf("%d", parallelJobs),
"-v", filePath,
)
cmd.Env = append(os.Environ(), "PGPASSWORD="+dbPool.Config().ConnConfig.Password)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
log.Err(err).Msg("Failed to run pg_restore command")
return err
}
log.Info().Msg("Restored DB from DUMP")
return nil
}