Skip to content

Commit

Permalink
fix: resolve e2e test ci
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee committed Jul 11, 2024
1 parent 3bc4ab2 commit 050760e
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 51 deletions.
3 changes: 1 addition & 2 deletions benchmark/anet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"

"github.com/zjregee/anet"
"github.com/zjregee/anet/benchmark/utils"
)

func runServer(port string, stopChan chan interface{}) {
Expand Down Expand Up @@ -86,7 +85,7 @@ func main() {
}

for k := 0; k < n; k++ {
message := utils.GetRandomString(messageLength - 1) + "\n"
message := anet.GetRandomString(messageLength - 1) + "\n"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("failed to send message: %v\n", err)
Expand Down
4 changes: 2 additions & 2 deletions benchmark/net/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"flag"
"bufio"

"github.com/zjregee/anet/benchmark/utils"
"github.com/zjregee/anet"
)

func runServer(port string, stopChan chan interface{}) {
Expand Down Expand Up @@ -90,7 +90,7 @@ func main() {
}

for k := 0; k < n; k++ {
message := utils.GetRandomString(messageLength - 1) + "\n"
message := anet.GetRandomString(messageLength - 1) + "\n"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("failed to send message: %v\n", err)
Expand Down
4 changes: 2 additions & 2 deletions benchmark/netpoll/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"bufio"
"context"

"github.com/zjregee/anet"
"github.com/cloudwego/netpoll"
"github.com/zjregee/anet/benchmark/utils"
)

func runServer(port string, stopChan chan interface{}) {
Expand Down Expand Up @@ -89,7 +89,7 @@ func main() {
}

for k := 0; k < n; k++ {
message := utils.GetRandomString(messageLength - 1) + "\n"
message := anet.GetRandomString(messageLength - 1) + "\n"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("failed to send message: %v\n", err)
Expand Down
4 changes: 2 additions & 2 deletions benchmark/uring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"flag"
"bufio"

"github.com/zjregee/anet/benchmark/utils"
"github.com/zjregee/anet"
)

func runServer(port string, stopChan chan interface{}) {
Expand Down Expand Up @@ -74,7 +74,7 @@ func main() {
}

for k := 0; k < n; k++ {
message := utils.GetRandomString(messageLength - 1) + "\n"
message := anet.GetRandomString(messageLength - 1) + "\n"
_, err = conn.Write([]byte(message))
if err != nil {
fmt.Printf("failed to send message: %v\n", err)
Expand Down
39 changes: 20 additions & 19 deletions e2e_test/echo_server/echo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,42 @@ import (
"context"

"github.com/zjregee/anet"
"github.com/sirupsen/logrus"
)

func runServer(port string, stopChan chan interface{}, logger *logrus.Logger) {
anet.SetLogger(logger)

func runServer(port string, stopChan chan interface{}) {
listener, err := anet.CreateListener("tcp", port)
if err != nil {
panic("shouldn't failed here")
}

eventLoop, err := anet.NewEventLoop(handleConnection)
if err != nil {
panic("shouldn't failed here")
}
eventLoop.ServeNonBlocking(listener)
go eventLoop.Serve(listener)

go func() {
<- stopChan
eventLoop.Shutdown()
eventLoop.Shutdown(context.Background())
listener.Close()
}()
}

func handleConnection(ctx context.Context, connection anet.Connection) error {
func handleConnection(_ context.Context, connection anet.Connection) error {
reader, writer := connection.Reader(), connection.Writer()
data, err := reader.ReadAll();
if err != nil {
return err
}
err = writer.WriteBytes(data, len(data));
if err != nil {
return err
}
err = writer.Flush()
if err != nil {
return err

for {
data, err := reader.ReadUtil(1);
if err != nil {
return err
}
err = writer.WriteBytes(data, len(data));
if err != nil {
return err
}
err = writer.Flush()
if err != nil {
return err
}
}
return nil
}
98 changes: 75 additions & 23 deletions e2e_test/echo_server/echo_server_test.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,24 @@
package echoserver

import (
"os"
"net"
"time"
"bufio"
"net"
"sync"
"errors"
"testing"
"math/rand"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/zjregee/anet"
)

const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

func randomString(length int) string {
seed := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]byte, length)
for i := range b {
b[i] = charset[seed.Intn(len(charset))]
}
return string(b)
}

func TestEchoServerSerial(t *testing.T) {
port := ":8000"
logger := logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.WarnLevel)
stopchan := make(chan interface{})
runServer(port, stopchan, logger)
runServer(port, stopchan)
defer close(stopchan)

m := 1
n := 10
m := 1000
n := 100
messageLength := 48

for i := 0; i < m ; i++ {
Expand All @@ -43,7 +28,7 @@ func TestEchoServerSerial(t *testing.T) {
}

for j := 0; j < n; j++ {
message := randomString(messageLength) + "\n"
message := anet.GetRandomString(messageLength - 1) + "\n"
_, err = conn.Write([]byte(message))
if err != nil {
t.Fatalf("failed to send message: %v", err)
Expand All @@ -60,3 +45,70 @@ func TestEchoServerSerial(t *testing.T) {
conn.Close()
}
}

func TestEchoServerConcurrent(t *testing.T) {
port := ":8000"
stopchan := make(chan interface{})
runServer(port, stopchan)
defer close(stopchan)

c := 12
m := 1000
n := 100
messageLength := 48

errChan := make(chan error)

var wg sync.WaitGroup
for i := 0; i < c; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < m; j++ {
conn, err := net.Dial("tcp", port)
if err != nil {
select {
case errChan <- errors.New("failed to connect to server"):
default:
}
return
}

for k := 0; k < n; k++ {
message := anet.GetRandomString(messageLength - 1) + "\n"
_, err = conn.Write([]byte(message))
if err != nil {
select {
case errChan <- errors.New("failed to send message"):
default:
}
return
}

response, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
select {
case errChan <- errors.New("failed to read response"):
default:
}
return
}

require.Equal(t, message, response)
}

conn.Close()
}
}(i)
}

go func() {
wg.Wait()
close(errChan)
}()

err := <- errChan
if err != nil {
t.Fatalf("error occured")
}
}
2 changes: 1 addition & 1 deletion benchmark/utils/utils.go → utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package anet

import (
"time"
Expand Down

0 comments on commit 050760e

Please sign in to comment.