Skip to content

Commit

Permalink
add check to ensure formats are equivalent
Browse files Browse the repository at this point in the history
  • Loading branch information
tedim52 committed Oct 18, 2024
1 parent e724d90 commit 1d6ff1e
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
// https://vector.dev/docs/reference/configuration/template-syntax/
baseLogsFilepath = "\"" + logsStorageDirpath + "%%Y/%%V/%%u/%%H/"

uuidLogsFilepath = baseLogsFilepath + "{{ enclave_uuid }}/{{ service_uuid }}.json\""
VectorLogsFilepathFormat = baseLogsFilepath + "{{ enclave_uuid }}/{{ service_uuid }}.json\""

sourceConfigFileTemplateName = "srcVectorConfigFileTemplate"
sinkConfigFileTemplateName = "sinkVectorConfigFileTemplate"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func newDefaultVectorConfig(listeningPortNumber uint16) *VectorConfig {
Id: "uuid_" + fileSinkIdSuffix,
Type: fileTypeId,
Inputs: []string{fluentBitSourceId},
Filepath: uuidLogsFilepath,
Filepath: VectorLogsFilepathFormat,
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,27 +194,32 @@ func TestSundayIsConvertedFromStrftimeToGolangTime(t *testing.T) {
require.Equal(t, expectedFilepath, actualFilePath)
}

//func TestGetLogFilePathsWithHourlyRetentionReturnsCorrectPathsIfHoursMissingInBetween(t *testing.T) {
// filesystem := volume_filesystem.NewMockedVolumeFilesystem()
//
// currentTime := logs_clock.NewMockLogsClockPerHour(defaultYear, defaultWeek, defaultDay, 1)
// fileLayout := NewPerWeekFileLayout(currentTime)
//
// // ../week/enclave uuid/service uuid.json
// week52filepath := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerDay(defaultYear, 0, 0).Now(), testEnclaveUuid, testUserService1Uuid)
// week1filepath := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerDay(defaultYear, 1, 0).Now(), testEnclaveUuid, testUserService1Uuid)
// week3filepath := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerDay(defaultYear, 3, 0).Now(), testEnclaveUuid, testUserService1Uuid)
//
// _, _ = filesystem.Create(week52filepath)
// _, _ = filesystem.Create(week1filepath)
// _, _ = filesystem.Create(week3filepath)
// retentionPeriod := retentionPeriodInWeeksForTesting * oneWeekDuration
// logFilePaths, err := fileLayout.GetLogFilePaths(filesystem, retentionPeriod, -1, testEnclaveUuid, testUserService1Uuid)
//
// require.NoError(t, err)
// require.Len(t, logFilePaths, 1)
// require.Equal(t, week3filepath, logFilePaths[0]) // should only return week 3 because week 2 is missing
//}
func TestGetLogFilePathsWithHourlyRetentionReturnsCorrectPathsIfHoursMissingInBetween(t *testing.T) {
filesystem := volume_filesystem.NewMockedVolumeFilesystem()

currentTime := logs_clock.NewMockLogsClockPerHour(2024, 1, 1, 2)
fileLayout := NewPerHourFileLayout(currentTime, volume_consts.LogsStorageDirpath)

hourZeroFp := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerHour(2023, 52, 0, 21).Now(), testEnclaveUuid, testUserService1Uuid)
hourOneFp := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerHour(2023, 52, 0, 22).Now(), testEnclaveUuid, testUserService1Uuid)
hourTwoFp := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerHour(2023, 52, 0, 23).Now(), testEnclaveUuid, testUserService1Uuid)
hourThreeFp := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerHour(2023, 1, 1, 3).Now(), testEnclaveUuid, testUserService1Uuid)
hourFiveFp := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerHour(2024, 1, 1, 2).Now(), testEnclaveUuid, testUserService1Uuid)

createFilepaths(t, filesystem, []string{
hourZeroFp,
hourOneFp,
hourTwoFp,
hourThreeFp,
hourFiveFp,
})

retentionPeriod := 6 * time.Hour // this would return all filepaths, but hour three is missing
logFilePaths, err := fileLayout.GetLogFilePaths(filesystem, retentionPeriod, -1, testEnclaveUuid, testUserService1Uuid)
require.NoError(t, err)
require.Len(t, logFilePaths, 1)
require.Equal(t, hourFiveFp, logFilePaths[0]) // should only return hour 5 3 because hour 4 is missing
}

func createFilepaths(t *testing.T, filesystem volume_filesystem.VolumeFilesystem, filepaths []string) {
for _, path := range filepaths {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ func (manager *LogFileManager) RemoveEnclaveLogs(enclaveUuid string) error {
return nil
}

func (manager *LogFileManager) GetLogFileLayoutFormat() string {
return manager.fileLayout.GetLogFileLayoutFormat()
}

func (manager *LogFileManager) getEnclaveAndServiceInfo(ctx context.Context) (map[enclave.EnclaveUUID][]*service.ServiceRegistration, error) {
enclaveToServicesMap := map[enclave.EnclaveUUID][]*service.ServiceRegistration{}

Expand Down
14 changes: 11 additions & 3 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package main
import (
"context"
"fmt"
vector_consts "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/logs_aggregator_functions/implementations/vector"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_consts"
"io/fs"
Expand Down Expand Up @@ -181,7 +182,11 @@ func runMain() error {
if err != nil {
return stacktrace.Propagate(err, "An error occurred parsing a duration from provided log retention period string: %v", serverArgs.LogRetentionPeriod)
}
logsDatabaseClient := getLogsDatabaseClient(serverArgs.KurtosisBackendType, kurtosisBackend, logRetentionPeriodDuration)
logsDatabaseClient, err := getLogsDatabaseClient(serverArgs.KurtosisBackendType, kurtosisBackend, logRetentionPeriodDuration)
if err != nil {
// already wrapped
return err
}
logsDatabaseClient.StartLogFileManagement(ctx)

enclaveManager, err := getEnclaveManager(
Expand Down Expand Up @@ -406,7 +411,7 @@ func getKurtosisBackend(ctx context.Context, kurtosisBackendType args.KurtosisBa
}

// if cluster is docker, return logs client for centralized logging, otherwise use logs db of kurtosis backend which uses k8s logs under the hood
func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosisBackend backend_interface.KurtosisBackend, logRetentionPeriod time.Duration) centralized_logs.LogsDatabaseClient {
func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosisBackend backend_interface.KurtosisBackend, logRetentionPeriod time.Duration) (centralized_logs.LogsDatabaseClient, error) {
var logsDatabaseClient centralized_logs.LogsDatabaseClient
switch kurtosisBackendType {
case args.KurtosisBackendType_Docker:
Expand All @@ -420,12 +425,15 @@ func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosi
osFs := volume_filesystem.NewOsVolumeFilesystem()
perHourFileLayout := file_layout.NewPerHourFileLayout(realTime, volume_consts.LogsStorageDirpath)
logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, perHourFileLayout, realTime, logRetentionPeriod, volume_consts.LogsStorageDirpath)
if logFileManager.GetLogFileLayoutFormat() != vector_consts.VectorLogsFilepathFormat {
return nil, stacktrace.NewError("Log file format for this logs database client does not much format output by Vector logs aggregator. This is a Kurtosis bug.")
}
streamLogsStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(realTime, logRetentionPeriod, perHourFileLayout)
logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, logFileManager, streamLogsStrategy)
case args.KurtosisBackendType_Kubernetes:
logsDatabaseClient = kurtosis_backend.NewKurtosisBackendLogsDatabaseClient(kurtosisBackend)
}
return logsDatabaseClient
return logsDatabaseClient, nil
}

func formatFilenameFunctionForLogs(filename string, functionName string) string {
Expand Down

0 comments on commit 1d6ff1e

Please sign in to comment.