Skip to content

Commit

Permalink
feat: polish code
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee committed Jul 13, 2024
1 parent bd7b3ff commit d7b4c4c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 47 deletions.
2 changes: 1 addition & 1 deletion benchmark/anet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func handleConnection(_ context.Context, connection anet.Connection) error {
reader, writer := connection.Reader(), connection.Writer()

for {
data, err := reader.ReadUtil(1);
data, err := reader.ReadUtil('\n');
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package anet
type Reader interface {
Seek(n int) ([]byte, error)
SeekAck(n int) error
SeekAll() ([]byte, error)
ReadAll() ([]byte, error)
ReadUtil(n int) ([]byte, error)
ReadUtil(delim byte) ([]byte, error)
ReadBytes(n int) ([]byte, error)
ReadString(n int) (string, error)
Len() int
Expand Down
71 changes: 41 additions & 30 deletions buffer_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,39 @@ var _ Writer = &bytesBuffer{}
var _ ReadWriter = &bytesBuffer{}

func (b *bytesBuffer) Seek(n int) ([]byte, error) {
if b.end - b.start < n {
if b.Len() < n {
return nil, errors.New("not enough data in buffer")
}
data := b.buffer[b.start:b.start + n]
return data, nil
}

func (b *bytesBuffer) SeekAck(n int) error {
if b.end - b.start < n {
if b.Len() < n {
return errors.New("not enough data in buffer")
}
b.start += n
return nil
}

func (b *bytesBuffer) SeekAll() ([]byte, error) {
data := b.buffer[b.start:b.end]
return data, nil
}

func (b *bytesBuffer) ReadAll() ([]byte, error) {
data := b.buffer[b.start:b.end]
b.start = b.end
b.start = 0
b.end = 0
return data, nil
}

func (b *bytesBuffer) ReadUtil(n int) ([]byte, error) {
func (b *bytesBuffer) ReadUtil(delim byte) ([]byte, error) {
panic("unreachable code")
}

func (b *bytesBuffer) ReadBytes(n int) ([]byte, error) {
if b.end - b.start < n {
if b.Len() < n {
return nil, errors.New("not enough data in buffer")
}
data := b.buffer[b.start:b.start + n]
Expand All @@ -60,7 +66,7 @@ func (b *bytesBuffer) ReadBytes(n int) ([]byte, error) {
}

func (b *bytesBuffer) ReadString(n int) (string, error) {
if b.end - b.start < n {
if b.Len() < n {
return "", errors.New("not enough data in buffer")
}
data := b.buffer[b.start:b.start + n]
Expand All @@ -72,51 +78,56 @@ func (b *bytesBuffer) Len() int {
return b.end - b.start
}

func (b *bytesBuffer) Book(n int) []byte {
if b.remain() < n {
b.increase(n)
}
return b.buffer[b.end:b.end + n]
}

func (b *bytesBuffer) BookAck(n int) error {
if b.remain() < n {
return errors.New("not enough space in buffer")
}
b.end += n
return nil
}

func (b *bytesBuffer) WriteBytes(data []byte, n int) error {
if b.cap - b.end < n {
b.increase()
if b.remain() < n {
b.increase(n)
}
copy(b.buffer[b.end:b.end + n], data)
b.end += n
return nil
}

func (b *bytesBuffer) WriteString(data string, n int) error {
if b.cap - b.end < n {
b.increase()
if b.remain() < n {
b.increase(n)
}
copy(b.buffer[b.end:b.end + n], []byte(data))
b.end += n
return nil
}

func (b *bytesBuffer) Flush() error {
b.start = 0
b.end = 0
return nil
panic("unreachable code")
}

func (b *bytesBuffer) Book(n int) []byte {
if b.cap - b.end < n {
b.increase()
func (b *bytesBuffer) increase(n int) {
if b.cap < n {
b.cap = n * 2
} else {
b.cap = b.cap * 2
}
return b.buffer[b.end:b.end + n]
}

func (b *bytesBuffer) BookAck(n int) error {
if b.cap - b.end < n {
return errors.New("not enough space in buffer")
}
b.end += n
return nil
}

// FIXME: We should guarantee increase size > required
func (b *bytesBuffer) increase() {
b.cap *= 2
newBuffer := make([]byte, b.end, b.cap)
copy(newBuffer, b.buffer[b.start:b.end])
b.buffer = newBuffer
b.end -= b.start
b.start = 0
}

func (b *bytesBuffer) remain() int {
return b.cap - b.end
}
51 changes: 36 additions & 15 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,16 @@ func (c *connection) SeekAck(n int) error {
return c.inputBuffer.SeekAck(n)
}

func (c *connection) SeekAll() ([]byte, error) {
return c.inputBuffer.SeekAll()
}

func (c *connection) ReadAll() ([]byte, error) {
return c.inputBuffer.ReadAll()
}

// FIXME: We should let ReadUtil wait for delim, not a specific number of bytes.
func (c *connection) ReadUtil(n int) ([]byte, error) {
if c.inputBuffer.Len() >= n {
return c.inputBuffer.ReadBytes(n)
}
var err error
if c.readTimeout != 0 {
err = c.waitReadWithTimeout(n, c.readTimeout)
} else {
err = c.waitRead(n)
}
if err != nil {
return nil, err
}
return c.inputBuffer.ReadAll()
func (c *connection) ReadUtil(delim byte) ([]byte, error) {
return c.waitReadUntil(delim)
}

func (c *connection) ReadBytes(n int) ([]byte, error) {
Expand Down Expand Up @@ -207,6 +198,36 @@ func (c *connection) waitRead(n int) error {
return nil
}

func (c *connection) waitReadUntil(delim byte) ([]byte, error) {
for {
if c.inputBuffer.Len() > 0 {
data, err := c.inputBuffer.SeekAll()
if err != nil {
return nil, err
}
index := -1
for i, b := range data {
if b == delim {
index = i
break
}
}
if index != -1 {
c.inputBuffer.SeekAck(index + 1)
return data[:index + 1], nil
}
}
c.submitRead()
err := <-c.readTrigger
if err != nil {
return nil, err
}
if c.inputBuffer.Len() == 0 {
return nil, errors.New("EOF")
}
}
}

func (c *connection) waitFlush() error {
if c.outputBuffer.Len() == 0 {
return nil
Expand Down

0 comments on commit d7b4c4c

Please sign in to comment.