Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unstable test for improve speed in stream #281

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflow_scripts/announce_discord.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ curl -H "Content-Type: application/json" \
if [ $? -eq 0 ]; then
echo "Successfully posted changelog to Discord"
else
echo "Failed to post changelog to Discord"
echo "Failed to post changelog to Discord webhook"
exit 1
fi
264 changes: 263 additions & 1 deletion src/catalog/providers/aws_s3_bucket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,268 @@ func (s *AwsS3BucketProvider) PullFileAndDecompress(ctx basecontext.ApiContext,
ctx.LogInfof("Pulling file %s", filename)
startTime := time.Now()
remoteFilePath := strings.TrimPrefix(filepath.Join(path, filename), "/")

// Notification bits
ns := notifications.Get()
cid := helpers.GenerateId()
msgPrefix := fmt.Sprintf("Pulling %s", filename)

session, err := s.createNewSession()
if err != nil {
return err
}
svc := s3.New(session)

// Determine total size
headResp, err := svc.HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String(s.Bucket.Name),
Key: aws.String(remoteFilePath),
})
if err != nil {
return fmt.Errorf("failed HeadObject: %w", err)
}
totalSize := *headResp.ContentLength

const chunkSize int64 = 500 * 1024 * 1024 // e.g. 500 MiB
var offset int64 = 0
index := 0

// Identify each chunk’s start and end
var chunks []struct {
index int
start int64
end int64
}
for offset < totalSize {
end := offset + chunkSize - 1
if end >= totalSize {
end = totalSize - 1
}
chunks = append(chunks, struct {
index int
start int64
end int64
}{
index: index,
start: offset,
end: end,
})
offset = end + 1
index++
}

// Prepare to stream data to the decompressor
r, w := io.Pipe()

// For concurrency
const maxWorkers = 4
chunkTasks := make(chan struct {
index int
start int64
end int64
})
chunkResults := make(chan struct {
index int
tmpFilePath string
})

// Track progress with an atomic
var totalDownloaded int64

// Build an errgroup and context
ctxBck := context.Background()
ctxChunk, cancel := context.WithTimeout(ctxBck, 5*time.Hour)
group, groupCtx := errgroup.WithContext(ctxChunk)
defer cancel()

// Worker goroutines to fetch chunks concurrently
for wkr := 0; wkr < maxWorkers; wkr++ {
group.Go(func() error {
buf := make([]byte, 2*1024*1024) // 2 MiB buffer
for task := range chunkTasks {
// Honour cancellations
select {
case <-groupCtx.Done():
return groupCtx.Err()
default:
}

rangeHeader := fmt.Sprintf("bytes=%d-%d", task.start, task.end)
getObjResp, err := svc.GetObjectWithContext(groupCtx, &s3.GetObjectInput{
Bucket: aws.String(s.Bucket.Name),
Key: aws.String(remoteFilePath),
Range: aws.String(rangeHeader),
})
if err != nil {
return fmt.Errorf("failed to fetch S3 range %s: %w", rangeHeader, err)
}

// Create a temporary file for this chunk
tmpFile, err := os.CreateTemp("", "s3_chunk_")
if err != nil {
getObjResp.Body.Close()
return fmt.Errorf("failed to create temp file: %w", err)
}

// Copy chunk data
var chunkDownloaded int64
for {
n, readErr := getObjResp.Body.Read(buf)
if n > 0 {
if _, writeErr := tmpFile.Write(buf[:n]); writeErr != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
getObjResp.Body.Close()
return fmt.Errorf("failed writing chunk to temp file: %w", writeErr)
}
atomic.AddInt64(&totalDownloaded, int64(n))
chunkDownloaded += int64(n)

// Progress update
if ns != nil && totalSize > 0 {
percent := int(float64(atomic.LoadInt64(&totalDownloaded)) / float64(totalSize) * 100)
msg := notifications.NewProgressNotificationMessage(cid, msgPrefix, percent).
SetCurrentSize(atomic.LoadInt64(&totalDownloaded)).
SetTotalSize(totalSize)
ns.Notify(msg)
}
}
if readErr != nil {
getObjResp.Body.Close()
if readErr != io.EOF {
tmpFile.Close()
os.Remove(tmpFile.Name())
return fmt.Errorf("failed reading from S3 chunk: %w", readErr)
}
// EOF => done with this chunk
break
}
}

if _, err := tmpFile.Seek(0, io.SeekStart); err != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
return fmt.Errorf("failed to seek in temp file: %w", err)
}
getObjResp.Body.Close()
tmpName := tmpFile.Name()
tmpFile.Close()

// Send result
chunkResults <- struct {
index int
tmpFilePath string
}{
index: task.index,
tmpFilePath: tmpName,
}
}
return nil
})
}

// Feeder goroutine: queue up all chunks to be fetched
group.Go(func() error {
defer close(chunkTasks)
for _, c := range chunks {
select {
case <-groupCtx.Done():
return groupCtx.Err()
case chunkTasks <- c:
}
}
return nil
})

// Streamer goroutine: receive chunk results in *any* order, but write them
// to the pipe in ascending order as soon as the next chunk is available.
group.Go(func() error {
defer w.Close()

results := make([]*struct {
index int
tmpFilePath string
}, len(chunks))

nextToWrite := 0
receivedCount := 0
totalChunks := len(chunks)

outer:
for {
// If we've written all chunks, we're finished
if nextToWrite >= totalChunks {
break outer
}

select {
case <-groupCtx.Done():
return groupCtx.Err()

case res, ok := <-chunkResults:
if !ok {
// If channel closed unexpectedly and we still have chunks to write, it’s an error
if nextToWrite < totalChunks {
return fmt.Errorf("chunkResults channel closed too soon")
}
break outer
}
// Store the result
results[res.index] = &res
receivedCount++

// Now see if we can write any newly-available chunks in sequence
for nextToWrite < totalChunks && results[nextToWrite] != nil {
rres := results[nextToWrite]

// Stream this chunk to the decompressor pipe
f, err := os.Open(rres.tmpFilePath)
if err != nil {
return fmt.Errorf("failed to open temp file: %w", err)
}
if _, copyErr := io.Copy(w, f); copyErr != nil {
f.Close()
os.Remove(rres.tmpFilePath)
return fmt.Errorf("failed to copy chunk to pipe: %w", copyErr)
}
f.Close()
os.Remove(rres.tmpFilePath)

nextToWrite++
}
}
}
return nil
})

// Decompressor goroutine: decompress from the pipe as soon as data becomes available
group.Go(func() error {
if err := compressor.DecompressFromReader(ctx, r, destination); err != nil {
// If decompression fails, close the pipe
_ = w.CloseWithError(err)
return fmt.Errorf("decompression failed: %w", err)
}
return nil
})

// Wait for everything to complete
if err := group.Wait(); err != nil {
return err
}

// Final notification
finalMsg := fmt.Sprintf("Pulling %s", filename)
ns.NotifyProgress(cid, finalMsg, 100)
ns.NotifyInfo(fmt.Sprintf("Finished pulling and decompressing file %s, took %v",
filename, time.Since(startTime)))

return nil
}

func (s *AwsS3BucketProvider) PullFileAndDecompress1(ctx basecontext.ApiContext, path, filename, destination string) error {
ctx.LogInfof("Pulling file %s", filename)
startTime := time.Now()
remoteFilePath := strings.TrimPrefix(filepath.Join(path, filename), "/")
ns := notifications.Get()

session, err := s.createNewSession()
Expand Down Expand Up @@ -279,7 +541,7 @@ func (s *AwsS3BucketProvider) PullFileAndDecompress(ctx basecontext.ApiContext,
group.Go(func() error {
defer close(chunkFilesChan) // Signal no more chunks

buf := make([]byte, 2*1024*1024) // 2MB buffer for reading from S3
buf := make([]byte, 50*1024*1024) // 2MB buffer for reading from S3

for start < totalSize {
// Honor cancellations
Expand Down
Loading