Skip to content

Commit

Permalink
unstable test for improve speed in stream
Browse files Browse the repository at this point in the history
  • Loading branch information
cjlapao committed Jan 21, 2025
1 parent 67172cf commit 0faf99a
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflow_scripts/announce_discord.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ curl -H "Content-Type: application/json" \
if [ $? -eq 0 ]; then
echo "Successfully posted changelog to Discord"
else
echo "Failed to post changelog to Discord"
echo "Failed to post changelog to Discord webhook"
exit 1
fi
264 changes: 263 additions & 1 deletion src/catalog/providers/aws_s3_bucket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,268 @@ func (s *AwsS3BucketProvider) PullFileAndDecompress(ctx basecontext.ApiContext,
ctx.LogInfof("Pulling file %s", filename)
startTime := time.Now()
remoteFilePath := strings.TrimPrefix(filepath.Join(path, filename), "/")

// Notification bits
ns := notifications.Get()
cid := helpers.GenerateId()
msgPrefix := fmt.Sprintf("Pulling %s", filename)

session, err := s.createNewSession()
if err != nil {
return err
}
svc := s3.New(session)

// Determine total size
headResp, err := svc.HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String(s.Bucket.Name),
Key: aws.String(remoteFilePath),
})
if err != nil {
return fmt.Errorf("failed HeadObject: %w", err)
}
totalSize := *headResp.ContentLength

const chunkSize int64 = 500 * 1024 * 1024 // e.g. 500 MiB
var offset int64 = 0
index := 0

// Identify each chunk’s start and end
var chunks []struct {
index int
start int64
end int64
}
for offset < totalSize {
end := offset + chunkSize - 1
if end >= totalSize {
end = totalSize - 1
}
chunks = append(chunks, struct {
index int
start int64
end int64
}{
index: index,
start: offset,
end: end,
})
offset = end + 1
index++
}

// Prepare to stream data to the decompressor
r, w := io.Pipe()

// For concurrency
const maxWorkers = 4
chunkTasks := make(chan struct {
index int
start int64
end int64
})
chunkResults := make(chan struct {
index int
tmpFilePath string
})

// Track progress with an atomic
var totalDownloaded int64

// Build an errgroup and context
ctxBck := context.Background()
ctxChunk, cancel := context.WithTimeout(ctxBck, 5*time.Hour)
group, groupCtx := errgroup.WithContext(ctxChunk)
defer cancel()

// Worker goroutines to fetch chunks concurrently
for wkr := 0; wkr < maxWorkers; wkr++ {
group.Go(func() error {
buf := make([]byte, 2*1024*1024) // 2 MiB buffer
for task := range chunkTasks {
// Honour cancellations
select {
case <-groupCtx.Done():
return groupCtx.Err()
default:
}

rangeHeader := fmt.Sprintf("bytes=%d-%d", task.start, task.end)
getObjResp, err := svc.GetObjectWithContext(groupCtx, &s3.GetObjectInput{
Bucket: aws.String(s.Bucket.Name),
Key: aws.String(remoteFilePath),
Range: aws.String(rangeHeader),
})
if err != nil {
return fmt.Errorf("failed to fetch S3 range %s: %w", rangeHeader, err)
}

// Create a temporary file for this chunk
tmpFile, err := os.CreateTemp("", "s3_chunk_")
if err != nil {
getObjResp.Body.Close()
return fmt.Errorf("failed to create temp file: %w", err)
}

// Copy chunk data
var chunkDownloaded int64
for {
n, readErr := getObjResp.Body.Read(buf)
if n > 0 {
if _, writeErr := tmpFile.Write(buf[:n]); writeErr != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
getObjResp.Body.Close()
return fmt.Errorf("failed writing chunk to temp file: %w", writeErr)
}
atomic.AddInt64(&totalDownloaded, int64(n))
chunkDownloaded += int64(n)

// Progress update
if ns != nil && totalSize > 0 {
percent := int(float64(atomic.LoadInt64(&totalDownloaded)) / float64(totalSize) * 100)
msg := notifications.NewProgressNotificationMessage(cid, msgPrefix, percent).
SetCurrentSize(atomic.LoadInt64(&totalDownloaded)).
SetTotalSize(totalSize)
ns.Notify(msg)
}
}
if readErr != nil {
getObjResp.Body.Close()
if readErr != io.EOF {
tmpFile.Close()
os.Remove(tmpFile.Name())
return fmt.Errorf("failed reading from S3 chunk: %w", readErr)
}
// EOF => done with this chunk
break
}
}

if _, err := tmpFile.Seek(0, io.SeekStart); err != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
return fmt.Errorf("failed to seek in temp file: %w", err)
}
getObjResp.Body.Close()
tmpName := tmpFile.Name()
tmpFile.Close()

// Send result
chunkResults <- struct {
index int
tmpFilePath string
}{
index: task.index,
tmpFilePath: tmpName,
}
}
return nil
})
}

// Feeder goroutine: queue up all chunks to be fetched
group.Go(func() error {
defer close(chunkTasks)
for _, c := range chunks {
select {
case <-groupCtx.Done():
return groupCtx.Err()
case chunkTasks <- c:
}
}
return nil
})

// Streamer goroutine: receive chunk results in *any* order, but write them
// to the pipe in ascending order as soon as the next chunk is available.
group.Go(func() error {
defer w.Close()

results := make([]*struct {
index int
tmpFilePath string
}, len(chunks))

nextToWrite := 0
receivedCount := 0
totalChunks := len(chunks)

outer:
for {
// If we've written all chunks, we're finished
if nextToWrite >= totalChunks {
break outer
}

select {
case <-groupCtx.Done():
return groupCtx.Err()

case res, ok := <-chunkResults:
if !ok {
// If channel closed unexpectedly and we still have chunks to write, it’s an error
if nextToWrite < totalChunks {
return fmt.Errorf("chunkResults channel closed too soon")
}
break outer
}
// Store the result
results[res.index] = &res
receivedCount++

// Now see if we can write any newly-available chunks in sequence
for nextToWrite < totalChunks && results[nextToWrite] != nil {
rres := results[nextToWrite]

// Stream this chunk to the decompressor pipe
f, err := os.Open(rres.tmpFilePath)
if err != nil {
return fmt.Errorf("failed to open temp file: %w", err)
}
if _, copyErr := io.Copy(w, f); copyErr != nil {
f.Close()
os.Remove(rres.tmpFilePath)
return fmt.Errorf("failed to copy chunk to pipe: %w", copyErr)
}
f.Close()
os.Remove(rres.tmpFilePath)

nextToWrite++
}
}
}
return nil
})

// Decompressor goroutine: decompress from the pipe as soon as data becomes available
group.Go(func() error {
if err := compressor.DecompressFromReader(ctx, r, destination); err != nil {
// If decompression fails, close the pipe
_ = w.CloseWithError(err)
return fmt.Errorf("decompression failed: %w", err)
}
return nil
})

// Wait for everything to complete
if err := group.Wait(); err != nil {
return err
}

// Final notification
finalMsg := fmt.Sprintf("Pulling %s", filename)
ns.NotifyProgress(cid, finalMsg, 100)
ns.NotifyInfo(fmt.Sprintf("Finished pulling and decompressing file %s, took %v",
filename, time.Since(startTime)))

return nil
}

func (s *AwsS3BucketProvider) PullFileAndDecompress1(ctx basecontext.ApiContext, path, filename, destination string) error {
ctx.LogInfof("Pulling file %s", filename)
startTime := time.Now()
remoteFilePath := strings.TrimPrefix(filepath.Join(path, filename), "/")
ns := notifications.Get()

session, err := s.createNewSession()
Expand Down Expand Up @@ -279,7 +541,7 @@ func (s *AwsS3BucketProvider) PullFileAndDecompress(ctx basecontext.ApiContext,
group.Go(func() error {
defer close(chunkFilesChan) // Signal no more chunks

buf := make([]byte, 2*1024*1024) // 2MB buffer for reading from S3
buf := make([]byte, 50*1024*1024) // 2MB buffer for reading from S3

for start < totalSize {
// Honor cancellations
Expand Down

0 comments on commit 0faf99a

Please sign in to comment.