Skip to content

Commit

Permalink
cleanup: refactor and use bicopy everywhere
Browse files Browse the repository at this point in the history
There are 3 instances of this pattern and each does this slightly
differently. Clean up the implementation to return errors using
`errors.Join` (which wasn't available when the original was written) and
use it everywhere.

This doesn't change behavior because the error return is always just
logged (see the only called of `(*pseudoLoopbackForwarder).forward`.

Note that the removal of the special handling of `io.EOF` returned from
`io.Copy` doesn't change behavior because it can never happen per the
latter's documentation.

Signed-off-by: Tamir Duberstein <[email protected]>
  • Loading branch information
tamird committed Dec 3, 2024
1 parent 7cea6f8 commit 29d9e37
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 113 deletions.
79 changes: 13 additions & 66 deletions pkg/bicopy/bicopy.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
// From https://raw.githubusercontent.com/norouter/norouter/v0.6.5/pkg/agent/bicopy/bicopy.go
/*
Copyright (C) NoRouter authors.
Copyright (C) libnetwork authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package bicopy

import (
Expand All @@ -26,56 +7,22 @@ import (
"github.com/sirupsen/logrus"
)

// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104
// (originally from libnetwork, Apache License 2.0).
func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) {
type closeReader interface {
CloseRead() error
}
type closeWriter interface {
CloseWrite() error
}
var wg sync.WaitGroup
broker := func(to, from io.ReadWriter) {
if _, err := io.Copy(to, from); err != nil {
logrus.WithError(err).Debug("failed to call io.Copy")
}
if fromCR, ok := from.(closeReader); ok {
if err := fromCR.CloseRead(); err != nil {
logrus.WithError(err).Debug("failed to call CloseRead")
}
}
if toCW, ok := to.(closeWriter); ok {
if err := toCW.CloseWrite(); err != nil {
logrus.WithError(err).Debug("failed to call CloseWrite")
}
}
wg.Done()
func broker(dst, src io.ReadWriter, dstName, srcName string) {
if _, err := io.Copy(dst, src); err != nil {
logrus.WithError(err).Errorf("io.Copy(%s, %s)", dstName, srcName)
}
}

func Bicopy(left, right io.ReadWriter, leftName, rightName string) {
var wg sync.WaitGroup
wg.Add(2)
go broker(x, y)
go broker(y, x)
finish := make(chan struct{})
go func() {
wg.Wait()
close(finish)
defer wg.Done()
broker(left, right, leftName, rightName)
}()

select {
case <-quit:
case <-finish:
}
if xCloser, ok := x.(io.Closer); ok {
if err := xCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call xCloser.Close")
}
}
if yCloser, ok := y.(io.Closer); ok {
if err := yCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call yCloser.Close")
}
}
<-finish
// TODO: return copied bytes
go func() {
defer wg.Done()
broker(right, left, rightName, leftName)
}()
wg.Wait()
}
15 changes: 6 additions & 9 deletions pkg/hostagent/port_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,20 @@ func (plf *pseudoLoopbackForwarder) Serve() error {
ac.Close()
continue
}
go func(ac *net.TCPConn) {
if fErr := plf.forward(ac); fErr != nil {
logrus.Error(fErr)
}
}(ac)
go plf.forward(ac)
}
}

func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) error {
func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) {
defer ac.Close()
unixConn, err := net.DialUnix("unix", nil, plf.unixAddr)
if err != nil {
return err
logrus.WithError(err).Errorf("pseudoloopback forwarder: failed to dial %q", plf.unixAddr)
return
}
defer unixConn.Close()
bicopy.Bicopy(ac, unixConn, nil)
return nil

bicopy.Bicopy(ac, unixConn, "tcp", "unix")
}

func (plf *pseudoLoopbackForwarder) Close() error {
Expand Down
21 changes: 2 additions & 19 deletions pkg/portfwd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"

"github.com/lima-vm/lima/pkg/bicopy"
"github.com/lima-vm/lima/pkg/guestagent/api"
guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client"
"github.com/sirupsen/logrus"
Expand All @@ -24,27 +25,9 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen
return
}

g, _ := errgroup.WithContext(ctx)

rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr}
g.Go(func() error {
_, err := io.Copy(rw, conn)
if errors.Is(err, io.EOF) {
return nil
}
return err
})
g.Go(func() error {
_, err := io.Copy(conn, rw)
if errors.Is(err, io.EOF) {
return nil
}
return err
})

if err := g.Wait(); err != nil {
logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err)
}
bicopy.Bicopy(rw, conn, guestAddr, id)
}

func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) {
Expand Down
21 changes: 2 additions & 19 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"
"github.com/lima-vm/lima/pkg/bicopy"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -66,24 +66,7 @@ func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if _, err := io.Copy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
}
}()

go func() {
defer wg.Done()
if _, err := io.Copy(vzConn, qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
}
}()

wg.Wait()
bicopy.Bicopy(qemuConn, vzConn, "VMNET", "VZ")
}

// qemuPacketConn converts raw network packet to a QEMU supported network packet.
Expand Down

0 comments on commit 29d9e37

Please sign in to comment.