Skip to content

Commit

Permalink
Code refactoring for api/streams
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Jun 7, 2024
1 parent bf303ed commit aca0781
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 99 deletions.
105 changes: 105 additions & 0 deletions internal/streams/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package streams

import (
"net/http"

"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/pkg/probe"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)

func apiStreams(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
src := query.Get("src")

// without source - return all streams list
if src == "" && r.Method != "POST" {
api.ResponseJSON(w, streams)
return
}

// Not sure about all this API. Should be rewrited...
switch r.Method {
case "GET":
stream := Get(src)
if stream == nil {
http.Error(w, "", http.StatusNotFound)
return
}

cons := probe.NewProbe(query)
if len(cons.Medias) != 0 {
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

api.ResponsePrettyJSON(w, stream)

stream.RemoveConsumer(cons)
} else {
api.ResponsePrettyJSON(w, streams[src])
}

case "PUT":
name := query.Get("name")
if name == "" {
name = src
}

if New(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
return
}

if err := app.PatchConfig(name, src, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}

case "PATCH":
name := query.Get("name")
if name == "" {
http.Error(w, "", http.StatusBadRequest)
return
}

// support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass
if Patch(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
}

case "POST":
// with dst - redirect source to dst
if dst := query.Get("dst"); dst != "" {
if stream := Get(dst); stream != nil {
if err := Validate(src); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Play(src); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
api.ResponseJSON(w, stream)
}
} else if stream = Get(src); stream != nil {
if err := Validate(dst); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Publish(dst); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.Error(w, "", http.StatusNotFound)
}
} else {
http.Error(w, "", http.StatusBadRequest)
}

case "DELETE":
delete(streams, src)

if err := app.PatchConfig(src, nil, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
}
100 changes: 1 addition & 99 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package streams

import (
"errors"
"net/http"
"net/url"
"regexp"
"sync"
"time"

"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/pkg/probe"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)

Expand All @@ -29,7 +26,7 @@ func Init() {
streams[name] = NewStream(item)
}

api.HandleFunc("api/streams", streamsHandler)
api.HandleFunc("api/streams", apiStreams)

if cfg.Publish == nil {
return
Expand Down Expand Up @@ -145,101 +142,6 @@ func Delete(id string) {
delete(streams, id)
}

func streamsHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
src := query.Get("src")

// without source - return all streams list
if src == "" && r.Method != "POST" {
api.ResponseJSON(w, streams)
return
}

// Not sure about all this API. Should be rewrited...
switch r.Method {
case "GET":
stream := Get(src)
if stream == nil {
http.Error(w, "", http.StatusNotFound)
return
}

cons := probe.NewProbe(query)
if len(cons.Medias) != 0 {
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

api.ResponsePrettyJSON(w, stream)

stream.RemoveConsumer(cons)
} else {
api.ResponsePrettyJSON(w, streams[src])
}

case "PUT":
name := query.Get("name")
if name == "" {
name = src
}

if New(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
return
}

if err := app.PatchConfig(name, src, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}

case "PATCH":
name := query.Get("name")
if name == "" {
http.Error(w, "", http.StatusBadRequest)
return
}

// support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass
if Patch(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
}

case "POST":
// with dst - redirect source to dst
if dst := query.Get("dst"); dst != "" {
if stream := Get(dst); stream != nil {
if err := Validate(src); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Play(src); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
api.ResponseJSON(w, stream)
}
} else if stream = Get(src); stream != nil {
if err := Validate(dst); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Publish(dst); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.Error(w, "", http.StatusNotFound)
}
} else {
http.Error(w, "", http.StatusBadRequest)
}

case "DELETE":
delete(streams, src)

if err := app.PatchConfig(src, nil, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
}

var log zerolog.Logger
var streams = map[string]*Stream{}
var streamsMu sync.Mutex

0 comments on commit aca0781

Please sign in to comment.