Skip to content

Commit

Permalink
chore(blooms): Improve how block directory is extracted (#12030)
Browse files Browse the repository at this point in the history
This PR contains two changes:

1) Extracting the block without an intermediate temp file.
   The commit is cherry-picked from #12021, but that PR got merged into the wrong branch 🙈 

2) Strip `.tar.gz` suffix from block file when extracting it into a directory.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Feb 22, 2024
1 parent 1f56da2 commit 32a9a3f
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 95 deletions.
15 changes: 1 addition & 14 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
tc := tc
t.Run(name, func(t *testing.T) {
extractedBlockDirectory := t.TempDir()
blockFilePath, _, _, _ := createBlockArchive(t)
err := extractArchive(blockFilePath, extractedBlockDirectory)
require.NoError(t, err)
require.DirExists(t, extractedBlockDirectory)

blockDir := BlockDirectory{
Expand Down Expand Up @@ -61,20 +58,10 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
}

func Test_ClosableBlockQuerier(t *testing.T) {
blockFilePath, _, _, _ := createBlockArchive(t)
extractedBlockDirectory := t.TempDir()
err := extractArchive(blockFilePath, extractedBlockDirectory)
require.NoError(t, err)

blockDir := BlockDirectory{
Path: extractedBlockDirectory,
removeDirectoryTimeout: 100 * time.Millisecond,
refCount: atomic.NewInt32(0),
}
blockDir := NewBlockDirectory(BlockRef{}, t.TempDir(), log.NewNopLogger())

querier := blockDir.BlockQuerier()
require.Equal(t, int32(1), blockDir.refCount.Load())
require.NoError(t, querier.Close())
require.Equal(t, int32(0), blockDir.refCount.Load())

}
20 changes: 16 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"hash"
"io"
"strings"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
Expand All @@ -15,6 +16,7 @@ import (

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/util/encoding"
Expand Down Expand Up @@ -264,18 +266,28 @@ func (b *BloomClient) DeleteMetas(ctx context.Context, refs []MetaRef) error {
return err
}

// GetBlock downloads the blocks from objectStorage and returns the downloaded block
// GetBlock downloads the blocks from objectStorage and returns the directory
// in which the block data resides
func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error) {
key := b.Block(ref).Addr()
readCloser, _, err := b.client.GetObject(ctx, key)

rc, _, err := b.client.GetObject(ctx, key)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to get block from storage: %w", err)
}
defer rc.Close()

path := b.fsResolver.Block(ref).LocalPath()
err = extractBlock(readCloser, path, b.logger)
// the block directory should not contain the .tar.gz extension
path = strings.TrimSuffix(path, ".tar.gz")
err = util.EnsureDirectory(path)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to create block directory: %w", err)
}

err = v1.UnTarGz(path, rc)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to extract block into directory : %w", err)
return BlockDirectory{}, fmt.Errorf("failed to extract block: %w", err)
}

return NewBlockDirectory(ref, path, b.logger), nil
Expand Down
50 changes: 0 additions & 50 deletions pkg/storage/stores/shipper/bloomshipper/compress_utils.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package bloomshipper

import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
Expand All @@ -31,49 +27,3 @@ func CompressBloomBlock(ref BlockRef, archivePath, localDst string, logger log.L

return blockToUpload, nil
}

func writeDataToTempFile(workingDirectoryPath string, data io.ReadCloser) (string, error) {
defer data.Close()
archivePath := filepath.Join(workingDirectoryPath, uuid.New().String())

archiveFile, err := os.Create(archivePath)
if err != nil {
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
}
defer archiveFile.Close()
_, err = io.Copy(archiveFile, data)
if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err)
}
return archivePath, nil
}

func extractArchive(archivePath string, workingDirectoryPath string) error {
file, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("error opening archive file %s: %w", archivePath, err)
}
return v1.UnTarGz(workingDirectoryPath, file)
}

func extractBlock(data io.ReadCloser, blockDir string, logger log.Logger) error {
err := os.MkdirAll(blockDir, os.ModePerm)
if err != nil {
return fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(blockDir, data)
if err != nil {
return fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "error removing temp archive file", "err", err)
}
}()
err = extractArchive(archivePath, blockDir)
if err != nil {
return fmt.Errorf("error extracting archive: %w", err)
}
return nil
}
28 changes: 4 additions & 24 deletions pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,14 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func Test_blockDownloader_extractBlock(t *testing.T) {
blockFilePath, _, bloomFileContent, seriesFileContent := createBlockArchive(t)
blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700)
require.NoError(t, err)

workingDir := t.TempDir()

err = extractBlock(blockFile, workingDir, nil)
require.NoError(t, err)

require.FileExists(t, filepath.Join(workingDir, v1.BloomFileName))
require.FileExists(t, filepath.Join(workingDir, v1.SeriesFileName))

actualBloomFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.BloomFileName))
require.NoError(t, err)
require.Equal(t, bloomFileContent, string(actualBloomFileContent))

actualSeriesFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.SeriesFileName))
require.NoError(t, err)
require.Equal(t, seriesFileContent, string(actualSeriesFileContent))
}

func directoryDoesNotExist(path string) bool {
_, err := os.Lstat(path)
return err != nil
}

const testArchiveFileName = "test-block-archive"

func createBlockArchive(t *testing.T) (string, string, string, string) {
func createBlockArchive(t *testing.T) (string, io.Reader, string, string) {
dir := t.TempDir()
mockBlockDir := filepath.Join(dir, "mock-block-dir")
err := os.MkdirAll(mockBlockDir, 0777)
Expand All @@ -65,5 +43,7 @@ func createBlockArchive(t *testing.T) (string, string, string, string) {
err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir))
require.NoError(t, err)

return blockFilePath, mockBlockDir, bloomFileContent, seriesFileContent
_, _ = file.Seek(0, 0)

return blockFilePath, file, bloomFileContent, seriesFileContent
}
4 changes: 4 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"

"github.com/go-kit/log"
Expand Down Expand Up @@ -222,6 +223,9 @@ func (f *Fetcher) loadBlocksFromFS(_ context.Context, refs []BlockRef) ([]BlockD

for _, ref := range refs {
path := f.localFSResolver.Block(ref).LocalPath()
// the block directory does not contain the .tar.gz extension
// since it is stripped when the archive is extracted into a folder
path = strings.TrimSuffix(path, ".tar.gz")
if ok, clean := f.isBlockDir(path); ok {
blockDirs = append(blockDirs, NewBlockDirectory(ref, path, f.logger))
} else {
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -152,9 +153,9 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) {
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}},
}
dirs := []string{
resolver.Block(refs[0]).LocalPath(),
resolver.Block(refs[1]).LocalPath(),
resolver.Block(refs[2]).LocalPath(),
strings.TrimSuffix(resolver.Block(refs[0]).LocalPath(), ".tar.gz"),
strings.TrimSuffix(resolver.Block(refs[1]).LocalPath(), ".tar.gz"),
strings.TrimSuffix(resolver.Block(refs[2]).LocalPath(), ".tar.gz"),
}

createBlockDir(t, dirs[1])
Expand Down

0 comments on commit 32a9a3f

Please sign in to comment.