Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove temp file usage for logs #607

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions input/postgres/log_pg_read_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,29 +82,22 @@ func LogPgReadFile(ctx context.Context, server *state.Server, globalCollectionOp
for _, fileName := range fileNames {
if err != nil {
err = fmt.Errorf("LogFileSql/Scan: %s", err)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}
var logData string
var newOffset int64
prevOffset, _ := psl.ReadFileMarkers[fileName]
err = db.QueryRowContext(ctx, QueryMarkerSQL+logReadSql, fileName, prevOffset).Scan(&newOffset, &logData)
if err != nil {
err = fmt.Errorf("LogReadSql/QueryRow: %s", err)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}

var logFile state.LogFile
logFile, err = state.NewLogFile(nil, fileName)
logFile, err = state.NewLogFile(fileName)
if err != nil {
err = fmt.Errorf("error initializing log file: %s", err)
goto ErrorCleanup
}

_, err := logFile.TmpFile.WriteString(logData)
if err != nil {
err = fmt.Errorf("Error writing to tempfile: %s", err)
logFile.Cleanup(logger)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}

logReader := bufio.NewReader(strings.NewReader(logData))
Expand All @@ -119,11 +112,4 @@ func LogPgReadFile(ctx context.Context, server *state.Server, globalCollectionOp
psl.ReadFileMarkers = newMarkers

return psl, logFiles, samples, err

ErrorCleanup:
for _, logFile := range logFiles {
logFile.Cleanup(logger)
}

return server.LogPrevState, nil, nil, err
}
114 changes: 11 additions & 103 deletions input/system/rds/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"time"

Expand Down Expand Up @@ -60,58 +57,38 @@ func DownloadLogFiles(ctx context.Context, server *state.Server, logger *util.Lo
var newMarkers = make(map[string]string)

for _, rdsLogFile := range resp.DescribeDBLogFiles {
var content strings.Builder
var lastMarker *string
var bytesWritten int64

prevMarker, ok := psl.AwsMarkers[*rdsLogFile.LogFileName]
if ok {
lastMarker = &prevMarker
}

var tmpFile *os.File
tmpFile, err = ioutil.TempFile("", "")
if err != nil {
err = fmt.Errorf("Error allocating tempfile for logs: %s", err)
goto ErrorCleanup
}

for {
var newBytesWritten int
var fileContent *string
var newMarker *string
var additionalDataPending bool
newBytesWritten, newMarker, additionalDataPending, err = downloadRdsLogFilePortion(rdsSvc, tmpFile, logger, &identifier, rdsLogFile.LogFileName, lastMarker)
fileContent, newMarker, additionalDataPending, err = downloadRdsLogFilePortion(rdsSvc, logger, &identifier, rdsLogFile.LogFileName, lastMarker)
if err != nil {
util.CleanUpTmpFile(tmpFile, logger)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}

bytesWritten += int64(newBytesWritten)
content.WriteString(*fileContent)
if newMarker != nil {
lastMarker = newMarker
}

if !additionalDataPending {
break
}
}

var buf []byte
buf, tmpFile, err = readLogFilePortion(tmpFile, bytesWritten, logger)
if err != nil {
util.CleanUpTmpFile(tmpFile, logger)
goto ErrorCleanup
}

fileContent := bufio.NewReader(strings.NewReader(string(buf)))
newLogLines, newSamples := logs.ParseAndAnalyzeBuffer(fileContent, linesNewerThan, server)
stream := bufio.NewReader(strings.NewReader(content.String()))
newLogLines, newSamples := logs.ParseAndAnalyzeBuffer(stream, linesNewerThan, server)

// Pass responsibility to LogFile for cleaning up the temp file
var logFile state.LogFile
logFile, err = state.NewLogFile(tmpFile, *rdsLogFile.LogFileName)
logFile, err = state.NewLogFile(*rdsLogFile.LogFileName)
if err != nil {
err = fmt.Errorf("error initializing log file: %s", err)
util.CleanUpTmpFile(tmpFile, logger)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}
logFile.LogLines = append(logFile.LogLines, newLogLines...)
samples = append(samples, newSamples...)
Expand All @@ -125,13 +102,6 @@ func DownloadLogFiles(ctx context.Context, server *state.Server, logger *util.Lo
psl.AwsMarkers = newMarkers

return psl, logFiles, samples, err

ErrorCleanup:
for _, logFile := range logFiles {
logFile.Cleanup(logger)
}

return server.LogPrevState, nil, nil, err
}

var DescribeDBClustersErrorCache *util.TTLMap = util.NewTTLMap(10 * 60)
Expand Down Expand Up @@ -164,7 +134,7 @@ func getAwsDbInstanceID(config config.ServerConfig, sess *session.Session) (stri
return *instance.DBInstanceIdentifier, nil
}

func downloadRdsLogFilePortion(rdsSvc *rds.RDS, tmpFile *os.File, logger *util.Logger, identifier *string, logFileName *string, lastMarker *string) (newBytesWritten int, newMarker *string, additionalDataPending bool, err error) {
func downloadRdsLogFilePortion(rdsSvc *rds.RDS, logger *util.Logger, identifier *string, logFileName *string, lastMarker *string) (content *string, newMarker *string, additionalDataPending bool, err error) {
var resp *rds.DownloadDBLogFilePortionOutput
resp, err = rdsSvc.DownloadDBLogFilePortion(&rds.DownloadDBLogFilePortionInput{
DBInstanceIdentifier: identifier,
Expand All @@ -182,71 +152,9 @@ func downloadRdsLogFilePortion(rdsSvc *rds.RDS, tmpFile *os.File, logger *util.L
return
}

if len(*resp.LogFileData) > 0 {
newBytesWritten, err = tmpFile.WriteString(*resp.LogFileData)
if err != nil {
err = fmt.Errorf("Error writing to tempfile: %s", err)
return
}
}

content = resp.LogFileData
newMarker = resp.Marker
additionalDataPending = *resp.AdditionalDataPending

return
}

// Analyze and submit at most the trailing 10 megabytes of the retrieved RDS log file portions
//
// This avoids an OOM in two edge cases:
// 1) When starting the collector, as we always load the last 10,000 lines (which may be very long)
// 2) When extremely large values are output in a single log event (e.g. query parameters in a DETAIL line)
//
// We intentionally throw away data here (and warn the user about it), since the alternative
// is often a collector crash (due to OOM), which would be less desirable.
const maxLogParsingSize = 10 * 1024 * 1024
seanlinsley marked this conversation as resolved.
Show resolved Hide resolved

func readLogFilePortion(tmpFile *os.File, bytesWritten int64, logger *util.Logger) ([]byte, *os.File, error) {
var err error
var readSize int64

exceededMaxParsingSize := bytesWritten > maxLogParsingSize
if exceededMaxParsingSize {
logger.PrintWarning("RDS log file portion exceeded more than 10 MB of data in 30 second interval, collecting most recent data only (skipping %d bytes)", bytesWritten-maxLogParsingSize)
readSize = maxLogParsingSize
} else {
readSize = bytesWritten
}

// Read the data into memory for analysis
_, err = tmpFile.Seek(bytesWritten-readSize, io.SeekStart)
if err != nil {
return nil, tmpFile, fmt.Errorf("Error seeking tempfile: %s", err)
}
buf := make([]byte, readSize)
_, err = io.ReadFull(tmpFile, buf)
if err != nil {
return nil, tmpFile, fmt.Errorf("Error reading %d bytes from tempfile: %s", len(buf), err)
}

// If necessary, recreate tempfile with just the data we're analyzing
// (this supports the later read of the temp file during the log upload)
if exceededMaxParsingSize {
truncatedTmpFile, err := ioutil.TempFile("", "")
if err != nil {
return nil, tmpFile, fmt.Errorf("Error allocating tempfile for logs: %s", err)
}

_, err = truncatedTmpFile.Write(buf)
if err != nil {
util.CleanUpTmpFile(truncatedTmpFile, logger)
return nil, tmpFile, fmt.Errorf("Error writing to tempfile: %s", err)
}

// We succeeded, so remove the previous file and use the new one going forward
util.CleanUpTmpFile(tmpFile, logger)
tmpFile = truncatedTmpFile
}

return buf, tmpFile, nil
}
5 changes: 0 additions & 5 deletions logs/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -2266,11 +2266,6 @@ func AnalyzeBackendLogLines(logLines []state.LogLine) (logLinesOut []state.LogLi
logLinesOut = append(logLinesOut, logLine)
}

// Remove log line content. Note that ReplaceSecrets adds it back after secrets have been removed.
for idx := range logLinesOut {
logLinesOut[idx].Content = ""
}

return
}

Expand Down
6 changes: 6 additions & 0 deletions logs/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4328,6 +4328,9 @@ var tests = []testpair{
func TestAnalyzeLogLines(t *testing.T) {
for _, pair := range tests {
l, s := logs.AnalyzeLogLines(pair.logLinesIn)
for idx := range l {
l[idx].Content = ""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple tests now do this to avoid needing to copy-paste the log content into the expected output struct. I could do that if desired, it would just be tedious.

}

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true
Expand Down Expand Up @@ -4387,6 +4390,9 @@ func TestAnalyzeLogLinesHeroku(t *testing.T) {
t.Setenv("PORT", "dummy")
for _, pair := range testsHeroku {
l, s := logs.AnalyzeLogLines(pair.logLinesIn)
for idx := range l {
l[idx].Content = ""
}

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true
Expand Down
12 changes: 4 additions & 8 deletions logs/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/pganalyze/collector/state"
)

func PrintDebugInfo(logFileContents string, logLines []state.LogLine, samples []state.PostgresQuerySample) {
func PrintDebugInfo(logLines []state.LogLine, samples []state.PostgresQuerySample) {
fmt.Printf("log lines: %d, query samples: %d\n", len(logLines), len(samples))
groups := map[pganalyze_collector.LogLineInformation_LogClassification]int{}
unclassifiedLogLines := []state.LogLine{}
Expand All @@ -35,15 +35,13 @@ func PrintDebugInfo(logFileContents string, logLines []state.LogLine, samples []
if len(unclassifiedLogLines) > 0 {
fmt.Printf("\nUnclassified log lines:\n")
for _, logLine := range unclassifiedLogLines {
fmt.Printf("%s\n", logFileContents[logLine.ByteStart:logLine.ByteEnd])
fmt.Printf(" Level: %s\n", logLine.LogLevel)
fmt.Printf(" Content: %#v\n", logFileContents[logLine.ByteContentStart:logLine.ByteEnd])
fmt.Printf(" %s: %s", logLine.LogLevel, logLine.Content)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried running the debug command yet. Someone might have a preference on the output format.

One downside to this PR is we can no longer debug the entire log content including the unparsed log prefix.

fmt.Printf("---\n")
}
}
}

func PrintDebugLogLines(logFileContents string, logLines []state.LogLine, classifications map[pganalyze_collector.LogLineInformation_LogClassification]bool) {
func PrintDebugLogLines(logLines []state.LogLine, classifications map[pganalyze_collector.LogLineInformation_LogClassification]bool) {
fmt.Println("\nParsed log lines:")
linesById := make(map[uuid.UUID]*state.LogLine)
for _, logLine := range logLines {
Expand All @@ -63,15 +61,13 @@ func PrintDebugLogLines(logFileContents string, logLines []state.LogLine, classi
if err != nil {
panic(err)
}
fmt.Printf("%s\n", logFileContents[logLine.ByteStart:logLine.ByteEnd])
fmt.Printf(" Level: %s\n", logLine.LogLevel)
fmt.Printf(" %s: %s", logLine.LogLevel, logLine.Content)
if logLine.ParentUUID == uuid.Nil {
fmt.Printf(" Classification: %s (%d)\n", logLine.Classification, logLine.Classification)
}
if len(logLine.Details) > 0 {
fmt.Printf(" Details: %s\n", detailsStr)
}
fmt.Printf(" Content: %#v\n", logFileContents[logLine.ByteContentStart:logLine.ByteEnd])
fmt.Printf("---\n")
}
}
Expand Down
4 changes: 2 additions & 2 deletions logs/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

const replacement = "[redacted]"

func ReplaceSecrets(input []byte, logLines []state.LogLine, filterLogSecret []state.LogSecretKind) {
func ReplaceSecrets(logLines []state.LogLine, filterLogSecret []state.LogSecretKind) {
filterUnidentified := false
for _, k := range filterLogSecret {
if k == state.UnidentifiedLogSecret {
Expand All @@ -20,10 +20,10 @@ func ReplaceSecrets(input []byte, logLines []state.LogLine, filterLogSecret []st
if filterUnidentified && logLines[idx].Classification == 0 {
logLines[idx].Content = replacement + "\n"
} else {
content := input[logLines[idx].ByteContentStart:logLines[idx].ByteEnd]
sort.Slice(logLine.SecretMarkers, func(i, j int) bool {
return logLine.SecretMarkers[i].ByteStart < logLine.SecretMarkers[j].ByteEnd
})
content := []byte(logLine.Content)
bytesChecked := 0
offset := 0
for _, m := range logLine.SecretMarkers {
Expand Down
2 changes: 1 addition & 1 deletion logs/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestReplaceSecrets(t *testing.T) {
server := state.MakeServer(config.ServerConfig{}, false)
server.LogParser = logs.NewLogParser(logs.LogPrefixAmazonRds, nil, false)
logLines, _ := logs.ParseAndAnalyzeBuffer(reader, time.Time{}, server)
logs.ReplaceSecrets([]byte(pair.input), logLines, state.ParseFilterLogSecret(pair.filterLogSecret))
logs.ReplaceSecrets(logLines, state.ParseFilterLogSecret(pair.filterLogSecret))

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true
Expand Down
12 changes: 3 additions & 9 deletions logs/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,14 @@ func isAdditionalLineLevel(str pganalyze_collector.LogLineInformation_LogLevel)
return false
}

// writeTmpLogFile - Setup temporary file that will be used for encryption
func writeTmpLogFile(readyLogLines []state.LogLine, logger *util.Logger) (state.LogFile, error) {
logFile, err := state.NewLogFile(nil, "")
func createLogFile(readyLogLines []state.LogLine, logger *util.Logger) (state.LogFile, error) {
logFile, err := state.NewLogFile("")
if err != nil {
return state.LogFile{}, fmt.Errorf("could not initialize log file: %s", err)
}

currentByteStart := int64(0)
for idx, logLine := range readyLogLines {
_, err = logFile.TmpFile.WriteString(logLine.Content)
if err != nil {
logFile.Cleanup(logger)
return logFile, err
}
logLine.ByteStart = currentByteStart
logLine.ByteContentStart = currentByteStart
logLine.ByteEnd = currentByteStart + int64(len(logLine.Content))
Expand Down Expand Up @@ -307,7 +301,7 @@ func AnalyzeStreamInGroups(logLines []state.LogLine, now time.Time, server *stat
}
}

logFile, err := writeTmpLogFile(analyzableLogLines, logger)
logFile, err := createLogFile(analyzableLogLines, logger)
if err != nil {
return state.TransientLogState{}, state.LogFile{}, logLines, err
}
Expand Down
11 changes: 3 additions & 8 deletions logs/stream/stream_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stream_test

import (
"io/ioutil"
"log"
"os"
"sort"
Expand Down Expand Up @@ -364,16 +363,12 @@ func TestAnalyzeStreamInGroups(t *testing.T) {
server := state.MakeServer(config.ServerConfig{}, false)
TransientLogState, logFile, tooFreshLogLines, err := stream.AnalyzeStreamInGroups(pair.logLines, now, server, &util.Logger{Destination: log.New(os.Stderr, "", log.LstdFlags)})
logFileContent := ""
if logFile.TmpFile != nil {
dat, err := ioutil.ReadFile(logFile.TmpFile.Name())
if err != nil {
t.Errorf("Error reading temporary log file: %s", err)
}
logFileContent = string(dat)
for idx, logLine := range logFile.LogLines {
logFileContent += logLine.Content
logFile.LogLines[idx].Content = ""
}

TransientLogState.CollectedAt = time.Time{} // Avoid comparing against time.Now()
logFile.TmpFile = nil // Avoid comparing against tempfile
logFile.UUID = uuid.UUID{} // Avoid comparing against a generated UUID

cfg := pretty.CompareConfig
Expand Down
Loading
Loading