Skip to content

Commit

Permalink
Sync from server repo (2f4d104)
Browse files Browse the repository at this point in the history
  • Loading branch information
roypaulin committed Nov 9, 2023
1 parent 2545b61 commit 9c50b53
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 8 deletions.
2 changes: 2 additions & 0 deletions vclusterops/create_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type VCreateDatabaseOptions struct {
// part 5: other params
SkipStartupPolling *bool
ConfigDirectory *string
GenerateHTTPCerts *bool

// hidden options (which cache information only)
bootstrapHost []string
Expand Down Expand Up @@ -82,6 +83,7 @@ func (opt *VCreateDatabaseOptions) setDefaultValues() {
opt.ForceCleanupOnFailure = new(bool)
opt.ForceRemovalAtCreation = new(bool)
opt.SkipPackageInstall = new(bool)
opt.GenerateHTTPCerts = new(bool)

defaultTimeoutNodeStartupSeconds := util.DefaultTimeoutSeconds
opt.TimeoutNodeStartupSeconds = &defaultTimeoutNodeStartupSeconds
Expand Down
53 changes: 50 additions & 3 deletions vclusterops/http_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,44 @@ import (

type HTTPAdapter struct {
OpBase
host string
host string
respBodyHandler responseBodyHandler
}

func makeHTTPAdapter(log vlog.Printer) HTTPAdapter {
newHTTPAdapter := HTTPAdapter{}
newHTTPAdapter.name = "HTTPAdapter"
newHTTPAdapter.log = log.WithName(newHTTPAdapter.name)
newHTTPAdapter.respBodyHandler = &responseBodyReader{}
return newHTTPAdapter
}

// makeHTTPDownloadAdapter creates an HTTP adaptor which will
// download a response body to a file via streaming read and
// buffered write, rather than copying the body to memory.
func makeHTTPDownloadAdapter(log vlog.Printer,
destFilePath string) HTTPAdapter {
newHTTPAdapter := makeHTTPAdapter(log)
newHTTPAdapter.respBodyHandler = &responseBodyDownloader{
log,
destFilePath,
}
return newHTTPAdapter
}

type responseBodyHandler interface {
readResponseBody(resp *http.Response) (string, error)
}

// empty struct for default behavior of reading response body into memory
type responseBodyReader struct{}

// for downloading response body to file instead of reading into memory
type responseBodyDownloader struct {
log vlog.Printer
destFilePath string
}

const (
certPathBase = "/opt/vertica/config/https_certs"
nmaPort = 5554
Expand Down Expand Up @@ -128,7 +156,7 @@ func (adapter *HTTPAdapter) sendRequest(request *HostHTTPRequest, resultChannel
}

func (adapter *HTTPAdapter) generateResult(resp *http.Response) HostHTTPResult {
bodyString, err := adapter.readResponseBody(resp)
bodyString, err := adapter.respBodyHandler.readResponseBody(resp)
if err != nil {
return adapter.makeExceptionResult(err)
}
Expand All @@ -138,7 +166,7 @@ func (adapter *HTTPAdapter) generateResult(resp *http.Response) HostHTTPResult {
return adapter.makeFailResult(resp.Header, bodyString, resp.StatusCode)
}

func (adapter *HTTPAdapter) readResponseBody(resp *http.Response) (bodyString string, err error) {
func (reader *responseBodyReader) readResponseBody(resp *http.Response) (bodyString string, err error) {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
err = fmt.Errorf("fail to read the response body: %w", err)
Expand All @@ -149,6 +177,25 @@ func (adapter *HTTPAdapter) readResponseBody(resp *http.Response) (bodyString st
return bodyString, nil
}

func (downloader *responseBodyDownloader) readResponseBody(resp *http.Response) (bodyString string, err error) {
bytesWritten, err := downloader.downloadFile(resp)
if err != nil {
err = fmt.Errorf("fail to stream the response body to file %s: %w", downloader.destFilePath, err)
} else {
downloader.log.Info("File downloaded", "File", downloader.destFilePath, "Bytes", bytesWritten)
}
return "", err
}

func (downloader *responseBodyDownloader) downloadFile(resp *http.Response) (bytesWritten int64, err error) {
file, err := os.Create(downloader.destFilePath)
if err != nil {
return 0, err
}
defer file.Close()
return io.Copy(file, resp.Body)
}

// makeSuccessResult is a factory method for HostHTTPResult when a success
// response comes back from a REST endpoints.
func (adapter *HTTPAdapter) makeSuccessResult(content string, statusCode int) HostHTTPResult {
Expand Down
6 changes: 3 additions & 3 deletions vclusterops/http_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (m *MockReadCloser) Close() error {
}

func TestHandleSuccessResponseCodes(t *testing.T) {
adapter := HTTPAdapter{}
adapter := HTTPAdapter{respBodyHandler: &responseBodyReader{}}
mockBodyReader := MockReadCloser{
body: []byte("success!"),
}
Expand All @@ -144,7 +144,7 @@ func TestHandleSuccessResponseCodes(t *testing.T) {
}

func TestHandleRFC7807Response(t *testing.T) {
adapter := HTTPAdapter{}
adapter := HTTPAdapter{respBodyHandler: &responseBodyReader{}}
rfcErr := rfc7807.New(rfc7807.CommunalAccessError).
WithDetail("Cannot access communal storage")
b, err := json.Marshal(rfcErr)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestHandleGenericErrorResponse(t *testing.T) {
Header: http.Header{},
Body: &mockBodyReader,
}
adapter := HTTPAdapter{}
adapter := HTTPAdapter{respBodyHandler: &responseBodyReader{}}
result := adapter.generateResult(mockResp)
assert.Equal(t, result.status, FAILURE)
assert.NotEqual(t, result.err, nil)
Expand Down
12 changes: 12 additions & 0 deletions vclusterops/http_request_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func (dispatcher *HTTPRequestDispatcher) setup(hosts []string) {
}
}

// set up the pool connection for each host to download a file
func (dispatcher *HTTPRequestDispatcher) setupForDownload(hosts []string,
hostToFilePathsMap map[string]string) {
dispatcher.pool = getPoolInstance(dispatcher.log)

for _, host := range hosts {
adapter := makeHTTPDownloadAdapter(dispatcher.log, hostToFilePathsMap[host])
adapter.host = host
dispatcher.pool.connections[host] = &adapter
}
}

func (dispatcher *HTTPRequestDispatcher) sendRequest(clusterHTTPRequest *ClusterHTTPRequest) error {
dispatcher.log.Info("HTTP request dispatcher's sendRequest is called")
return dispatcher.pool.sendRequest(clusterHTTPRequest)
Expand Down
4 changes: 4 additions & 0 deletions vclusterops/nma_bootstrap_catalog_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type bootstrapCatalogRequestData struct {
NumShards int `json:"num_shards"`
CommunalStorageURL string `json:"communal_storage"`
SuperuserName string `json:"superuser_name"`
GenerateHTTPCerts bool `json:"generate_http_certs"`
SensitiveFields
}

Expand Down Expand Up @@ -108,6 +109,9 @@ func (op *NMABootstrapCatalogOp) setupRequestBody(vdb *VCoordinationDatabase, op
bootstrapData.SuperuserName = *options.UserName
bootstrapData.DBPassword = *options.Password

// Flag to generate certs and tls configuration
bootstrapData.GenerateHTTPCerts = *options.GenerateHTTPCerts

// Eon params
bootstrapData.NumShards = vdb.NumShards
bootstrapData.CommunalStorageURL = vdb.CommunalStorageLocation
Expand Down
145 changes: 145 additions & 0 deletions vclusterops/nma_get_scrutinize_tar_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
(c) Copyright [2023] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"errors"
"fmt"
"os"

"github.com/vertica/vcluster/vclusterops/util"
"github.com/vertica/vcluster/vclusterops/vlog"
)

type NMAGetScrutinizeTarOp struct {
OpBase
id string
hostNodeNameMap map[string]string // must correspond to host list exactly!
batch string
}

func makeNMAGetScrutinizeTarOp(log vlog.Printer,
id, batch string,
hosts []string,
hostNodeNameMap map[string]string) (NMAGetScrutinizeTarOp, error) {
op := NMAGetScrutinizeTarOp{}
op.name = "NMAGetScrutinizeTarOp"
op.log = log.WithName(op.name)
op.id = id
op.batch = batch
op.hostNodeNameMap = hostNodeNameMap
op.hosts = hosts

// the caller is responsible for making sure hosts and maps match up exactly
err := validateHostMaps(hosts, hostNodeNameMap)
if err != nil {
return op, err
}
err = op.createOutputDir()
return op, err
}

// createOutputDir creates a subdirectory {id} under /tmp/scrutinize/remote, which
// may also be created by this function. the "remote" subdirectory is created to
// separate local scrutinize data staged by the NMA (placed in /tmp/scrutinize/) from
// data gathered by vcluster from all reachable hosts.
func (op *NMAGetScrutinizeTarOp) createOutputDir() error {
const OwnerReadWriteExecute = 0700
outputDir := fmt.Sprintf("%s/%s/", scrutinizeRemoteOutputPath, op.id)
if err := os.MkdirAll(outputDir, OwnerReadWriteExecute); err != nil {
return err
}
stagingDirPathAccess := util.CanWriteAccessDir(outputDir)
if stagingDirPathAccess == util.FileNotExist {
return fmt.Errorf("opening scrutinize output directory failed: '%s'", outputDir)
}
if stagingDirPathAccess == util.NoWritePerm {
return fmt.Errorf("scrutinize output directory not writeable: '%s'", outputDir)
}
return nil
}

func (op *NMAGetScrutinizeTarOp) setupClusterHTTPRequest(hosts []string) error {
op.clusterHTTPRequest = ClusterHTTPRequest{}
op.clusterHTTPRequest.RequestCollection = make(map[string]HostHTTPRequest)
op.setVersionToSemVar()

for _, host := range hosts {
nodeName := op.hostNodeNameMap[host]

httpRequest := HostHTTPRequest{}
httpRequest.Method = GetMethod
httpRequest.buildNMAEndpoint(scrutinizeURLPrefix + op.id + "/" + nodeName + "/" + op.batch)
op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}

return nil
}

func (op *NMAGetScrutinizeTarOp) prepare(execContext *OpEngineExecContext) error {
hostToFilePathsMap := map[string]string{}
for _, host := range op.hosts {
hostToFilePathsMap[host] = fmt.Sprintf("%s/%s/%s-%s.tgz",
scrutinizeRemoteOutputPath,
op.id,
op.hostNodeNameMap[host],
op.batch)
}
execContext.dispatcher.setupForDownload(op.hosts, hostToFilePathsMap)

return op.setupClusterHTTPRequest(op.hosts)
}

func (op *NMAGetScrutinizeTarOp) execute(execContext *OpEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

func (op *NMAGetScrutinizeTarOp) finalize(_ *OpEngineExecContext) error {
return nil
}

func (op *NMAGetScrutinizeTarOp) processResult(_ *OpEngineExecContext) error {
var allErrs error

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if result.isPassing() {
op.log.Info("Retrieved tarball",
"Host", host,
"Node", op.hostNodeNameMap[host],
"Batch", op.batch)
} else {
op.log.Error(result.err, "Failed to retrieve tarball",
"Host", host,
"Node", op.hostNodeNameMap[host],
"Batch", op.batch)
if result.isInternalError() {
op.log.PrintWarning("Failed to tar batch %s on host %s. Skipping.", op.batch, host)
} else {
err := fmt.Errorf("failed to retrieve tarball batch %s on host %s, details %w",
op.batch, host, result.err)
allErrs = errors.Join(allErrs, err)
}
}
}

return allErrs
}
3 changes: 2 additions & 1 deletion vclusterops/nma_stage_vertica_logs_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ func makeNMAStageVerticaLogsOp(log vlog.Printer,
logSizeLimitBytes int64,
logAgeHours int) (NMAStageVerticaLogsOp, error) {
nmaStageVerticaLogsOp := NMAStageVerticaLogsOp{}
nmaStageVerticaLogsOp.name = "NMAStageVerticaLogsOp"
nmaStageVerticaLogsOp.log = log.WithName(nmaStageVerticaLogsOp.name)
nmaStageVerticaLogsOp.id = id
nmaStageVerticaLogsOp.hostNodeNameMap = hostNodeNameMap
nmaStageVerticaLogsOp.hostCatPathMap = hostCatPathMap
nmaStageVerticaLogsOp.batch = scrutinizeBatchNormal
nmaStageVerticaLogsOp.log = log
nmaStageVerticaLogsOp.hosts = hosts
nmaStageVerticaLogsOp.logSizeLimitBytes = logSizeLimitBytes
nmaStageVerticaLogsOp.logAgeHours = logAgeHours
Expand Down
15 changes: 14 additions & 1 deletion vclusterops/scrutinize.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const VScrutinizeTypeName = "scrutinize"
// top level handler for scrutinize operations
const scrutinizeURLPrefix = "scrutinize/"

// folders used by scrutinize
const scrutinizeOutputBasePath = "/tmp/scrutinize"
const scrutinizeRemoteOutputPath = scrutinizeOutputBasePath + "/remote"

// these could be replaced with options later
const scrutinizeLogAgeHours = 24 // copy archived logs produced in recent 24 hours
const scrutinizeLogLimitBytes = 10737418240 // 10GB in bytes
Expand Down Expand Up @@ -213,7 +217,7 @@ func (options *VScrutinizeOptions) getVDBForScrutinize(log vlog.Printer,
// - Stage vertica logs on all nodes
// - TODO Stage ErrorReport.txt on all nodes
// - TODO Stage DC tables on all nodes
// - TODO Tar and retrieve vertica logs from all nodes (batch normal)
// - Tar and retrieve vertica logs from all nodes (batch normal)
// - TODO Tar and retrieve error report from all nodes (batch context)
// - TODO Tar and retrieve DC tables from all nodes (batch context)
// - TODO (If applicable) Poll for system table staging completion on task node
Expand All @@ -226,6 +230,7 @@ func (vcc *VClusterCommands) produceScrutinizeInstructions(options *VScrutinizeO
return nil, fmt.Errorf("failed to process retrieved node info, details %w", err)
}

// stage Vertica logs
nmaStageVerticaLogsOp, err := makeNMAStageVerticaLogsOp(vcc.Log, options.ID, options.Hosts,
hostNodeNameMap, hostCatPathMap, scrutinizeLogLimitBytes, scrutinizeLogAgeHours)
if err != nil {
Expand All @@ -234,6 +239,14 @@ func (vcc *VClusterCommands) produceScrutinizeInstructions(options *VScrutinizeO
}
instructions = append(instructions, &nmaStageVerticaLogsOp)

// get 'normal' batch tarball (inc. Vertica logs)
getNormalTarballOp, err := makeNMAGetScrutinizeTarOp(vcc.Log, options.ID, scrutinizeBatchNormal,
options.Hosts, hostNodeNameMap)
if err != nil {
return nil, err
}
instructions = append(instructions, &getNormalTarballOp)

return instructions, nil
}

Expand Down

0 comments on commit 9c50b53

Please sign in to comment.