Skip to content

Commit

Permalink
Reconnect. Fixes #1
Browse files Browse the repository at this point in the history
  • Loading branch information
darkwrat committed Mar 5, 2019
1 parent bb47c53 commit d6ea2d0
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 24 deletions.
49 changes: 34 additions & 15 deletions cmd/cctvd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"flag"
"fmt"
"google.golang.org/grpc"
"log"
"net"
"sync"
"time"

"github.com/darkwrat/cctvd/cctv"
"github.com/darkwrat/cctvd/dvr"
Expand Down Expand Up @@ -95,39 +97,56 @@ func (s *server) multicast(ch chan *dvr.Frame) {
}

var (
addr = flag.String("addr", "127.0.0.1:7620", "dvr host:port")
addr = flag.String("addr", "127.0.0.1:7620", "dvr host:port")
delay = flag.Duration("delay", 5*time.Second, "delay before relive after failure")
)

func live(opts dvr.ConnectOpts, ch chan *dvr.Frame) error {
c, err := dvr.Connect(opts)
if err != nil {
return fmt.Errorf("cannot connect to dvr: %s", err)
}
defer c.Close()

if err := c.Live(ch); err != nil {
return fmt.Errorf("cannot stream anymore: %s", err)
}

return nil
}

func main() {
flag.Parse()

lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
log.Fatalf("failed to listen: %s", err)
}

s := grpc.NewServer()
csrv := &server{m: make(map[uint8]cs)}
cctv.RegisterCCTVServer(s, csrv)

ch := make(chan *dvr.Frame, 100)
go csrv.multicast(ch)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("cannot serve grpc: %s", err)
}
}()

opts := dvr.ConnectOpts{
Addr: *addr,
User: "ADMIN",
Password: "0000",
}

c, err := dvr.Connect(opts)
if err != nil {
log.Fatal(err)
}
defer c.Close()

ch := make(chan *dvr.Frame, 100)
go csrv.multicast(ch)
go func() {
log.Fatal(c.Live(ch))
}()
for {
if err := live(opts, ch); err != nil {
log.Print(err)
}

if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
log.Printf("sleeping for %v seconds before retry", delay.Seconds())
time.Sleep(*delay)
}
}
24 changes: 15 additions & 9 deletions dvr/dvr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dvr
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
Expand All @@ -14,6 +15,10 @@ const (
packetHeaderMagic = uint16(0x2818)
)

var (
errMagicMismatch = errors.New("magic mismatch")
)

type command uint16

const (
Expand Down Expand Up @@ -42,12 +47,12 @@ type packetHeader struct {
func readPacketHeader(r io.Reader) (*packetHeader, error) {
buf := make([]byte, 6)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, fmt.Errorf("could not read packet header: %s", err)
return nil, err
}

magic := binary.BigEndian.Uint16(buf[:2])
if magic != packetHeaderMagic {
return nil, fmt.Errorf("magic mismatch: `%d' != `%d'", magic, packetHeaderMagic)
return nil, errMagicMismatch
}

header := &packetHeader{
Expand All @@ -59,11 +64,8 @@ func readPacketHeader(r io.Reader) (*packetHeader, error) {
}

func readPacketBody(r io.Reader, body []byte) error {
if _, err := io.ReadFull(r, body); err != nil {
return fmt.Errorf("could not read packet body: %s", err)
}

return nil
_, err := io.ReadFull(r, body)
return err
}

type packet struct {
Expand Down Expand Up @@ -365,11 +367,15 @@ func (c *Client) Live(ch chan *Frame) error {

for {
pkt, err := readPacket(c.conn)
if err != nil {
log.Printf("could not read packet: %s", err)
if err == errMagicMismatch {
log.Printf("skipping packet: %s", err)
continue
}

if err != nil {
return fmt.Errorf("could not read packet: %s", err)
}

select {
case <-ping.C:
_ = writePacket(c.conn, commandSysAliveRes, nil)
Expand Down

0 comments on commit d6ea2d0

Please sign in to comment.