This repository has been archived by the owner on Jul 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathFaultTolerantHdfsReader_test.go
60 lines (54 loc) · 2.14 KB
/
FaultTolerantHdfsReader_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package main
import (
"errors"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"io"
"testing"
)
// Testing retry logic for Read()
func TestSeekAndReadWithRetries(t *testing.T) {
mockCtrl := gomock.NewController(t)
hdfsReader := NewMockReadSeekCloser(mockCtrl)
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
ftHdfsReader := NewFaultTolerantHdfsReader("/path/to/file", hdfsReader, hdfsAccessor, atMost2Attempts())
var err error
var nr int
// Performing succesfull read of 60 bytes of requested 100 at offset 1000
hdfsReader.EXPECT().Seek(int64(1000)).Return(nil)
err = ftHdfsReader.Seek(1000)
assert.Nil(t, err)
hdfsReader.EXPECT().Read(gomock.Any()).Return(60, nil)
nr, err = ftHdfsReader.Read(make([]byte, 100))
assert.Nil(t, err)
assert.Equal(t, 60, nr)
// Now the stream should be at position 160
// Requesting one more read of 200 bytes, but this time it will fail
hdfsReader.EXPECT().Read(gomock.Any()).Return(0, errors.New("Injected failure"))
// As a result, ftHdfsReader should close the stream...
hdfsReader.EXPECT().Close().Return(nil)
// ...and invoke an OpenRead() to get new HdfsReader
newHdfsReader := NewMockReadSeekCloser(mockCtrl)
hdfsAccessor.EXPECT().OpenRead("/path/to/file").Return(newHdfsReader, nil)
// It should seek at corret position (1060), and repeat the read
newHdfsReader.EXPECT().Seek(int64(1060)).Return(nil)
newHdfsReader.EXPECT().Read(gomock.Any()).Return(150, nil)
nr, err = ftHdfsReader.Read(make([]byte, 200))
assert.Nil(t, err)
assert.Equal(t, 150, nr)
}
// No retries on benigh errors (e.g. EOF)
func TestNoRetryOnEOF(t *testing.T) {
mockCtrl := gomock.NewController(t)
hdfsReader := NewMockReadSeekCloser(mockCtrl)
hdfsAccessor := NewMockHdfsAccessor(mockCtrl)
ftHdfsReader := NewFaultTolerantHdfsReader("/path/to/file", hdfsReader, hdfsAccessor, atMost2Attempts())
var err error
var nr int
hdfsReader.EXPECT().Read(gomock.Any()).Return(0, io.EOF)
nr, err = ftHdfsReader.Read(make([]byte, 100))
assert.Equal(t, io.EOF, err)
assert.Equal(t, 0, nr)
}