Skip to content

Commit

Permalink
Implementing some more changes to the streamer
Browse files Browse the repository at this point in the history
- Added a better way to manage the blocks
- Incremented the bufferSize for decompression to 40mb
- Fixed a documentation issue
  • Loading branch information
cjlapao committed Jan 23, 2025
1 parent 713abe9 commit ba8edc8
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 147 deletions.
5 changes: 5 additions & 0 deletions docs/docs/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
permalink: /docs
redirect_to: /docs/devops/
layout: default
---
157 changes: 106 additions & 51 deletions src/catalog/providers/aws_s3_bucket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,8 @@ func (s *AwsS3BucketProvider) PullFile(ctx basecontext.ApiContext, path string,

func (s *AwsS3BucketProvider) PullFileAndDecompress(ctx basecontext.ApiContext, path, filename, destination string) error {
cfg := config.Get()
ctx.LogInfof("Using Canary version og pullFileAndDecompress\n")
if cfg.IsCanaryEnabled() {
ctx.LogInfof("Using Canary version og pullFileAndDecompress")
ctx.LogInfof("\rUsing Canary version of pullFileAndDecompress")
return s.pullFileAndDecompressUnstable(ctx, path, filename, destination)
}
return s.pullFileAndDecompressStable(ctx, path, filename, destination)
Expand Down Expand Up @@ -1251,24 +1250,31 @@ func (s *AwsS3BucketProvider) pullFileAndDecompressUnstable2(ctx basecontext.Api

// We have a small shared state, protected by a mutex + condition variable
type chunkInfo struct {
filePath string
err error
index int
filePath string
err error
completed bool
}

type sharedState struct {
chunkFiles map[int]chunkInfo // chunk index -> info
onDisk int // how many chunk files are currently on disk
nextToWrite int // next chunk index streamer must write to the pipe
globalErr error // record a single global error
errOnce sync.Once // ensure we set globalErr only once
chunkInfos []chunkInfo
onDisk int // how many chunk files are currently on disk
nextToWrite int // next chunk index streamer must write to the pipe
globalErr error // record a single global error
errOnce sync.Once // ensure we set globalErr only once
activeWorkers int
}
type chunkInfoProvider interface {
// for example, your s.downloadChunkFile(...) signature
}

func (s *AwsS3BucketProvider) pullFileAndDecompressUnstable(ctx basecontext.ApiContext, path, filename, destination string) error {
ctx.LogInfof("Starting pullFileAndDecompressOrdered for %s", filename)
startTime := time.Now()
// firstChunkSize := int64(50 * 1024 * 1024)
chunkSize := int64(100 * 1024 * 1024)
workerCount := 6
keepChunkOnDiskCount := 20
keepChunkOnDiskCount := 40
var totalDownloaded int64
ns := notifications.Get()
msgPrefix := fmt.Sprintf("Pulling %s", filename)
Expand Down Expand Up @@ -1307,11 +1313,17 @@ func (s *AwsS3BucketProvider) pullFileAndDecompressUnstable(ctx basecontext.ApiC
group, groupCtx := errgroup.WithContext(ctxChunk)

st := &sharedState{
chunkFiles: make(map[int]chunkInfo),
chunkInfos: make([]chunkInfo, totalChunks),
onDisk: 0,
nextToWrite: 0,
}

for i := 0; i < int(totalChunks); i++ {
st.chunkInfos[i] = chunkInfo{
index: i,
}
}

mu := sync.Mutex{}
cond := sync.NewCond(&mu)

Expand All @@ -1323,23 +1335,57 @@ func (s *AwsS3BucketProvider) pullFileAndDecompressUnstable(ctx basecontext.ApiC
})
}

// Clean up leftover chunks if something fails
cleanupChunks := func() {
for _, ci := range st.chunkFiles {
if ci.filePath != "" {
_ = os.Remove(ci.filePath)
for i := range st.chunkInfos {
if st.chunkInfos[i].filePath != "" {
_ = os.Remove(st.chunkInfos[i].filePath)
}
}
}

// mu.Lock()
// st.onDisk++
// mu.Unlock()

// if e := s.downloadChunkFile(
// groupCtx,
// svc,
// ns,
// startTime,
// remoteFilePath,
// 0,
// int(totalChunks),
// totalSize,
// &totalDownloaded,
// firstChunkSize,
// msgPrefix,
// cid,
// st,
// &mu,
// cond,
// ); e != nil {
// setGlobalError(e)
// cleanupChunks()
// return e
// }

// // Mark chunk 0 as completed
// mu.Lock()
// // Suppose the local file path was set inside downloadChunkFile:
// // st.chunkInfos[0].filePath = "/some/tmp/path_for_chunk0"
// // st.chunkInfos[0].err = nil
// st.chunkInfos[0].completed = true
// mu.Unlock()
// cond.Broadcast() // notify the streamer chunk 0 is ready

// 5) Manager goroutine — schedules chunks for download (0..totalChunks-1)
group.Go(func() error {
defer ctx.LogInfof("Manager goroutine exited")

for idx := 0; idx < int(totalChunks); idx++ {
mu.Lock()
// Wait if we already have 'workerCount' files on disk
for st.onDisk >= keepChunkOnDiskCount && st.globalErr == nil {
// Wait while we have too many workers or too many chunks on disk
for (st.activeWorkers >= workerCount || st.onDisk >= keepChunkOnDiskCount) && st.globalErr == nil {
cond.Wait()
}
if st.globalErr != nil || groupCtx.Err() != nil {
Expand All @@ -1348,13 +1394,15 @@ func (s *AwsS3BucketProvider) pullFileAndDecompressUnstable(ctx basecontext.ApiC
return st.globalErr
}

// We have space to download chunk idx
st.onDisk++ // We expect to produce one more chunk on disk
// We can start a new download worker now:
st.activeWorkers++
st.onDisk++
mu.Unlock()

// Start a goroutine to download chunk idx
go func(chunkIndex int) {
if e := s.downloadChunkFile(groupCtx,
localErr := s.downloadChunkFile(
groupCtx,
svc,
ns,
startTime,
Expand All @@ -1368,8 +1416,21 @@ func (s *AwsS3BucketProvider) pullFileAndDecompressUnstable(ctx basecontext.ApiC
cid,
st,
&mu,
cond); e != nil {
setGlobalError(e)
cond,
)
mu.Lock()
if localErr != nil {
st.chunkInfos[chunkIndex].err = localErr
} else {
// st.chunkInfos[chunkIndex].filePath was set inside downloadChunkFile
st.chunkInfos[chunkIndex].completed = true
}
st.activeWorkers--
mu.Unlock()
cond.Broadcast()

if localErr != nil {
setGlobalError(localErr)
}
}(idx)
}
Expand All @@ -1387,54 +1448,42 @@ func (s *AwsS3BucketProvider) pullFileAndDecompressUnstable(ctx basecontext.ApiC

for i := 0; i < int(totalChunks); i++ {
mu.Lock()
for {
// Wait until chunk i is in chunkFiles OR an error occurs
if st.globalErr != nil {
mu.Unlock()
return st.globalErr
}
ci, found := st.chunkFiles[i]
if found && ci.filePath != "" {
// chunk i is ready
break
}
cond.Wait() // wait for signal from a worker
// Wait for chunk i to be completed or for an error
for !st.chunkInfos[i].completed && st.globalErr == nil {
cond.Wait()
}
ci := st.chunkFiles[i]
ci := st.chunkInfos[i]
mu.Unlock()

// If the worker had an error for chunk i
if ci.err != nil {
setGlobalError(ci.err)
return ci.err
}

// Open the chunk file to stream it
// At this point, chunk i is definitely downloaded
chunkFile, err := os.Open(ci.filePath)
if err != nil {
setGlobalError(err)
return fmt.Errorf("streamer failed opening chunk %d: %w", i, err)
}

// Write this chunk data into the pipe => decompressor can start right away
// Copy chunk i to the pipe => piped into decompressor
_, copyErr := io.Copy(w, chunkFile)
chunkFile.Close()
if copyErr != nil {
setGlobalError(copyErr)
return fmt.Errorf("streamer failed copying chunk %d to pipe: %w", i, copyErr)
return fmt.Errorf("streamer failed copying chunk %d: %w", i, copyErr)
}

// Remove chunk file from disk
// Remove the chunk from disk
if rmErr := os.Remove(ci.filePath); rmErr != nil {
ctx.LogInfof("failed to remove chunk file %s: %v", ci.filePath, rmErr)
}

// Freed one slot on disk
mu.Lock()
delete(st.chunkFiles, i)
st.chunkInfos[i].filePath = ""
st.onDisk--
cond.Broadcast() // manager can schedule the next chunk
mu.Unlock()
cond.Broadcast()
}
return nil
})
Expand Down Expand Up @@ -1501,7 +1550,8 @@ func (s *AwsS3BucketProvider) downloadChunkFile(
})
if err != nil {
mu.Lock()
st.chunkFiles[chunkIndex] = chunkInfo{filePath: "", err: err}
st.chunkInfos[chunkIndex].filePath = ""
st.chunkInfos[chunkIndex].err = err
cond.Broadcast()
mu.Unlock()
return fmt.Errorf("worker failed chunk %d range=%s: %w", chunkIndex, rangeHeader, err)
Expand All @@ -1512,22 +1562,24 @@ func (s *AwsS3BucketProvider) downloadChunkFile(
tmpFile, err := os.CreateTemp("", fmt.Sprintf("chunk_%d_", chunkIndex))
if err != nil {
mu.Lock()
st.chunkFiles[chunkIndex] = chunkInfo{filePath: "", err: err}
st.chunkInfos[chunkIndex].filePath = ""
st.chunkInfos[chunkIndex].err = err
cond.Broadcast()
mu.Unlock()
return fmt.Errorf("cannot create temp file for chunk %d: %w", chunkIndex, err)
}

// Copy from S3 to temp file
buf := make([]byte, 2*1024*1024) // 2MB buffer
buf := make([]byte, 6*1024*1024) // 2MB buffer
var chunkDownloaded int64
for {
n, readErr := resp.Body.Read(buf)
if n > 0 {
if _, writeErr := tmpFile.Write(buf[:n]); writeErr != nil {
tmpFile.Close()
mu.Lock()
st.chunkFiles[chunkIndex] = chunkInfo{filePath: "", err: writeErr}
st.chunkInfos[chunkIndex].filePath = ""
st.chunkInfos[chunkIndex].err = writeErr
cond.Broadcast()
mu.Unlock()
return fmt.Errorf("write error chunk %d: %w", chunkIndex, writeErr)
Expand All @@ -1553,7 +1605,8 @@ func (s *AwsS3BucketProvider) downloadChunkFile(
if readErr != nil {
tmpFile.Close()
mu.Lock()
st.chunkFiles[chunkIndex] = chunkInfo{filePath: "", err: readErr}
st.chunkInfos[chunkIndex].filePath = ""
st.chunkInfos[chunkIndex].err = readErr
cond.Broadcast()
mu.Unlock()
return fmt.Errorf("read error chunk %d: %w", chunkIndex, readErr)
Expand All @@ -1565,7 +1618,8 @@ func (s *AwsS3BucketProvider) downloadChunkFile(
if _, err := tmpFile.Seek(0, io.SeekStart); err != nil {
tmpFile.Close()
mu.Lock()
st.chunkFiles[chunkIndex] = chunkInfo{filePath: "", err: err}
st.chunkInfos[chunkIndex].filePath = ""
st.chunkInfos[chunkIndex].err = err
cond.Broadcast()
mu.Unlock()
return fmt.Errorf("failed to seek in chunk %d: %w", chunkIndex, err)
Expand All @@ -1576,7 +1630,8 @@ func (s *AwsS3BucketProvider) downloadChunkFile(

// Store the chunk file location in shared map
mu.Lock()
st.chunkFiles[chunkIndex] = chunkInfo{filePath: filePath, err: nil}
st.chunkInfos[chunkIndex].filePath = filePath
st.chunkInfos[chunkIndex].err = nil
cond.Broadcast() // let the streamer know chunk is ready
mu.Unlock()

Expand Down
Loading

0 comments on commit ba8edc8

Please sign in to comment.