diff --git a/api/action.go b/api/action.go index 406e496b4..25c5859fd 100644 --- a/api/action.go +++ b/api/action.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" "net/http" + "strconv" ) func configActionRouter(r *gin.Engine, wrapper dao.DaoWrapper) { @@ -41,7 +42,16 @@ func (a actionRoutes) getFailedActions(c *gin.Context) { func (a actionRoutes) getActionById(c *gin.Context) { ctx := context.Background() - model, err := a.dao.GetActionByID(ctx, c.Param("id")) + id, err := strconv.Atoi(c.Param("id")) + if err != nil { + _ = c.Error(tools.RequestError{ + Status: http.StatusBadRequest, + CustomMessage: "Invalid action id", + Err: err, + }) + return + } + model, err := a.dao.GetActionByID(ctx, uint(id)) if err != nil { _ = c.Error(tools.RequestError{ Status: http.StatusNotFound, diff --git a/api/runner_grpc.go b/api/runner_grpc.go index 97b3c21cc..3c29a94e2 100644 --- a/api/runner_grpc.go +++ b/api/runner_grpc.go @@ -9,7 +9,6 @@ import ( "github.com/TUM-Dev/gocast/model" "github.com/TUM-Dev/gocast/tools" "github.com/getsentry/sentry-go" - log "github.com/sirupsen/logrus" "github.com/tum-dev/gocast/runner/protobuf" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -20,6 +19,7 @@ import ( "net" "net/http" "regexp" + "strconv" "strings" "time" ) @@ -53,32 +53,26 @@ func (g GrpcRunnerServer) Register(ctx context.Context, request *protobuf.Regist } func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.HeartbeatRequest) (*protobuf.HeartbeatResponse, error) { - runner := model.Runner{ - Hostname: request.Hostname, - Port: int(request.Port), - } - - r, err := g.RunnerDao.Get(ctx, runner.Hostname) + r, err := g.RunnerDao.Get(ctx, request.Hostname) if err != nil { - log.WithError(err).Error("Failed to get runner") + logger.Error("Failed to get runner", "err", err) return &protobuf.HeartbeatResponse{Ok: false}, err } - newStats := model.Runner{ - Hostname: request.Hostname, - Port: int(request.Port), - LastSeen: time.Now(), - Status: "Alive", - Workload: uint(request.Workload), - CPU: request.CPU, - Memory: request.Memory, - Disk: request.Disk, - Uptime: request.Uptime, - Version: request.Version, - Actions: request.CurrentAction, - } - ctx = context.WithValue(ctx, "newStats", newStats) - log.Info("Updating runner stats ", "runner", r) + newStat := make(map[string]interface{}) + newStat["LastSeen"] = time.Now() + newStat["Status"] = "Alive" + newStat["Workload"] = uint(request.Workload) + newStat["CPU"] = request.CPU + newStat["Memory"] = request.Memory + newStat["Disk"] = request.Disk + newStat["Uptime"] = request.Uptime + newStat["Version"] = request.Version + newStat["Actions"] = request.CurrentAction + + logger.Info("the actions of this runner", "runner", r, "actions", request.CurrentAction) + ctx = context.WithValue(ctx, "newStats", newStat) + logger.Info("Updating runner stats ", "runner", r) p, err := r.UpdateStats(dao.DB, ctx) return &protobuf.HeartbeatResponse{Ok: p}, err } @@ -243,7 +237,7 @@ func (g GrpcRunnerServer) RequestSelfStream(ctx context.Context, request *protob return nil, err } if !(time.Now().After(stream.Start.Add(time.Minute*-30)) && time.Now().Before(stream.End.Add(time.Minute*30))) { - log.WithFields(log.Fields{"streamId": stream.ID}).Warn("Stream rejected, time out of bounds") + logger.Warn("Stream rejected, time out of bounds", "streamID", stream.ID) return nil, errors.New("stream rejected") } ingestServer, err := g.IngestServerDao.GetBestIngestServer() @@ -328,7 +322,7 @@ func (g GrpcRunnerServer) NotifyStreamStarted(ctx context.Context, request *prot logger.Error("Can't set StreamLiveNowTimestamp", "err", err) } - hlsUrl := fmt.Sprintf("%v:%v/%v", tools.Cfg.Edge.Domain, tools.Cfg.Edge.Port, request.HLSUrl) + hlsUrl := fmt.Sprintf("%v/%v", tools.Cfg.Edge.Domain, request.HLSUrl) time.Sleep(time.Second * 5) if !isHLSUrlOk(hlsUrl) { @@ -458,7 +452,7 @@ func NotifyForStreams(dao dao.DaoWrapper) func() { values["source"] = lectureHallForStream.PresIP err = CreateJob(dao, ctx, values) //presentation if err != nil { - log.Error("Can't create job", err) + logger.Error("Can't create job", err) } break case 2: //camera @@ -508,9 +502,10 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { logger.Error("Can't get running actions", err) } for _, action := range activeAction { - if action.End.Before(time.Now().Add(5 * time.Minute)) { + if action.End.Before(time.Now().Add(-5 * time.Minute)) { action.SetToIgnored() - log.Info("Action ignored, check for progress manually", "action", action.ID) + err = dao.ActionDao.UpdateAction(ctx, &action) + logger.Info("Action ignored, check for progress manually", "action", action.ID) continue } runner, err := action.GetCurrentRunner() @@ -523,7 +518,8 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { } continue } - if !runner.IsAlive() && !action.IsCompleted() { + hasAction := strings.Contains(runner.Actions, strconv.Itoa(int(action.ID))) + if !runner.IsAlive() && !action.IsCompleted() && hasAction { action.SetToFailed() err = dao.ActionDao.UpdateAction(ctx, &action) if err != nil { @@ -539,13 +535,10 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { } for _, failedAction := range failedActions { failedAction.SetToRunning() - err = AssignRunnerAction(dao, &failedAction) - err = dao.ActionDao.UpdateAction(ctx, &failedAction) - if err != nil { - return - } + err := AssignRunnerAction(dao, &failedAction) if err != nil { logger.Error("Can't assign runner to action", err) + return } } @@ -556,6 +549,9 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { return } for _, job := range jobs { + if job.Actions[0].Status != 3 { + continue + } action, err := job.GetNextAction() if err != nil { logger.Error("Can't get next action", err) @@ -565,12 +561,13 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { logger.Error("Can't update job", err) continue } + action.SetToRunning() err = AssignRunnerAction(dao, action) if err != nil { logger.Error("Can't assign runner to action", err) continue } - action.SetToRunning() + err = dao.ActionDao.UpdateAction(ctx, action) if err != nil { return @@ -591,15 +588,18 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error { return err } runner, err := getRunnerWithLeastWorkloadForJob(runners, action.Type) - action.AssignRunner(runner) ctx := context.Background() - + err = dao.AssignRunner(ctx, action, &runner) if err != nil { - logger.Error("Can't unmarshal json", err) + logger.Error("Can't assign action", err) return err } values := map[string]interface{}{} err = json.Unmarshal([]byte(action.Values), &values) + if err != nil { + logger.Error("Can't unmarshal json", err) + return err + } for key, value := range values { //logger.Info("values", "value", value) ctx = context.WithValue(ctx, key, value) @@ -615,7 +615,12 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error { //TranscodingRequest(ctx, dao, runner) break } - action.SetToRunning() + logger.Info("runner counts", "count", len(action.AllRunners)) + err = dao.ActionDao.UpdateAction(ctx, action) + if err != nil { + logger.Error("Can't update action", err) + return err + } return nil } @@ -636,16 +641,28 @@ func CreateJob(dao dao.DaoWrapper, ctx context.Context, values map[string]interf Status: 3, Type: "stream", Values: string(value), - }, model.Action{ + End: values["end"].(time.Time), + }) + job.Actions = append(job.Actions, actions...) + break + case "transcode": + actions = append(actions, model.Action{ Status: 3, Type: "transcode", Values: string(value), - }, model.Action{ + End: values["end"].(time.Time), + }) + job.Actions = append(job.Actions, actions...) + break + case "upload": + actions = append(actions, model.Action{ Status: 3, Type: "upload", Values: string(value), + End: values["end"].(time.Time), }) job.Actions = append(job.Actions, actions...) + break } err = dao.CreateJob(ctx, job) if err != nil { @@ -664,7 +681,7 @@ func (g GrpcRunnerServer) mustEmbedUnimplementedFromRunnerServer() { func StartGrpcRunnerServer() { lis, err := net.Listen("tcp", ":50056") if err != nil { - log.WithError(err).Error("Failed to init grpc server") + logger.Error("Failed to init grpc server", "err", err) return } grpcServer := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ @@ -678,7 +695,7 @@ func StartGrpcRunnerServer() { reflection.Register(grpcServer) go func() { if err = grpcServer.Serve(lis); err != nil { - log.WithError(err).Errorf("Can't serve grpc") + logger.Error("Can't serve grpc", "err", err) } }() } diff --git a/config.yaml b/config.yaml index c48f9fea4..87a9edbec 100644 --- a/config.yaml +++ b/config.yaml @@ -31,8 +31,7 @@ db: password: example user: root edge: - domain: http://localhost - port: 8089 + domain: http://localhost:8089 ingestbase: rtmp://ingest.tum.live/ jwtkey: # This is an example key, delete and restart to generate a proper one | diff --git a/dao/Action.go b/dao/action.go similarity index 69% rename from dao/Action.go rename to dao/action.go index 912921890..06caa11ea 100644 --- a/dao/Action.go +++ b/dao/action.go @@ -10,15 +10,16 @@ import ( type ActionDao interface { CreateAction(ctx context.Context, action *model.Action) error - CompleteAction(ctx context.Context, actionID string) error - GetActionByID(ctx context.Context, actionID string) (model.Action, error) - GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error) + CompleteAction(ctx context.Context, actionID uint) error + GetActionByID(ctx context.Context, actionID uint) (model.Action, error) + GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error) GetAwaitingActions(ctx context.Context) ([]model.Action, error) GetRunningActions(ctx context.Context) ([]model.Action, error) GetAll(ctx context.Context) ([]model.Action, error) GetAllFailedActions(ctx context.Context) ([]model.Action, error) UpdateAction(ctx context.Context, action *model.Action) error - GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error) + AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error + GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error) } type actionDao struct { @@ -33,17 +34,17 @@ func (d actionDao) CreateAction(ctx context.Context, action *model.Action) error return d.db.WithContext(ctx).Create(&action).Error } -func (d actionDao) CompleteAction(ctx context.Context, actionID string) error { +func (d actionDao) CompleteAction(ctx context.Context, actionID uint) error { return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", actionID).Update("status", "completed").Error } -func (d actionDao) GetActionByID(ctx context.Context, actionID string) (model.Action, error) { +func (d actionDao) GetActionByID(ctx context.Context, actionID uint) (model.Action, error) { var action model.Action err := d.db.WithContext(ctx).First(&action, "id = ?", actionID).Error return action, err } -func (d actionDao) GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error) { +func (d actionDao) GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error) { var actions []model.Action err := d.db.WithContext(ctx).Find(&actions, "job_id = ?", jobID).Error return actions, err @@ -57,7 +58,7 @@ func (d actionDao) GetAwaitingActions(ctx context.Context) ([]model.Action, erro func (d actionDao) GetRunningActions(ctx context.Context) ([]model.Action, error) { var actions []model.Action - err := d.db.WithContext(ctx).Find(&actions, "status = ?", 1).Error + err := d.db.WithContext(ctx).Preload("AllRunners").Find(&actions, "status = ?", 1).Error return actions, err } @@ -74,10 +75,17 @@ func (d actionDao) GetAllFailedActions(ctx context.Context) ([]model.Action, err } func (d actionDao) UpdateAction(ctx context.Context, action *model.Action) error { - return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error + err := d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error + act, _ := d.GetActionByID(ctx, action.ID) + logger.Info("updated action", "action", len(act.AllRunners)) + return err } -func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error) { +func (d actionDao) AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error { + return d.db.WithContext(ctx).Model(&action).Association("AllRunners").Append(runner) +} + +func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error) { var actions []model.Action err := d.db.WithContext(ctx).Joins("AllRunners").Where("id = ?", runnerID).Find(&actions).Error return actions, err diff --git a/dao/Jobs.go b/dao/jobs.go similarity index 100% rename from dao/Jobs.go rename to dao/jobs.go diff --git a/go.work.sum b/go.work.sum index 800998eef..d467ace35 100644 --- a/go.work.sum +++ b/go.work.sum @@ -684,6 +684,7 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= @@ -692,6 +693,7 @@ golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -751,7 +753,6 @@ google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQf google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/model/action.go b/model/action.go index 9d065af0b..39a3302ca 100644 --- a/model/action.go +++ b/model/action.go @@ -64,16 +64,13 @@ func (a *Action) IsCompleted() bool { } func (a *Action) GetCurrentRunner() (*Runner, error) { + logger.Info("runner count", "info", a) if len(a.AllRunners) == 0 { return nil, errors.New("no runner assigned") } return &a.AllRunners[len(a.AllRunners)-1], nil } -func (a *Action) AssignRunner(runner Runner) { - a.AllRunners = append(a.AllRunners, runner) -} - func (a *Action) GetValues() string { return a.Values } diff --git a/model/runner.go b/model/runner.go index ab592e7db..8e79feda4 100644 --- a/model/runner.go +++ b/model/runner.go @@ -38,17 +38,17 @@ func (r *Runner) BeforeCreate(tx *gorm.DB) (err error) { // UpdateStats SendHeartbeat updates the last seen time of the runner and gives runner stats func (r *Runner) UpdateStats(tx *gorm.DB, ctx context.Context) (bool, error) { - newStats := ctx.Value("newStats").(Runner) + newStats := ctx.Value("newStats").(map[string]interface{}) + logger.Info("updating stats", "newStats", newStats) err := tx.WithContext(ctx).Model(&r).Updates(newStats).Error if err != nil { return false, err } - return true, nil } func (r *Runner) IsAlive() bool { - r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -2)) + r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -1)) if r.Alive { r.Status = "Alive" } else { diff --git a/runner/actions/stream.go b/runner/actions/stream.go index 7a5c1938a..3af65b337 100644 --- a/runner/actions/stream.go +++ b/runner/actions/stream.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/tum-dev/gocast/runner/protobuf" + "io" "log/slog" "os" "os/exec" @@ -77,19 +78,19 @@ func (a *ActionProvider) StreamAction() *Action { } } - src := "" + opt := "" if strings.HasPrefix(source, "rtsp") { - src += "-rtsp_transport tcp" + opt += "-rtsp_transport tcp" } else if strings.HasPrefix(source, "rtmp") { - src += "-rw_timeout 5000000" // timeout selfstream s after 5 seconds of no data + opt += "-rw_timeout 5000000" // timeout selfstream s after 5 seconds of no data } else { - src += "-re" // read input at native framerate, e.g. when streaming a file in realtime + opt += "-re" // read input at native framerate, e.g. when streaming a file in realtime } log.Info("streaming", "source", source, "end", time.Until(end).Seconds()) //changing the end variable from a date to a duration and adding the duration to the current time - cmd := fmt.Sprintf(a.Cmd.Stream, src, time.Until(end).Seconds(), source, filename, filepath.Join(a.GetLiveDir(courseID, streamID, version), end.Format("15-04-05")), livePlaylist) + cmd := fmt.Sprintf(a.Cmd.Stream, opt, time.Until(end).Seconds(), source, filepath.Join(a.GetLiveDir(courseID, streamID, version), end.Format("15-04-05")), livePlaylist) c := exec.CommandContext(ctx, "ffmpeg", strings.Split(cmd, " ")...) c.Stderr = os.Stderr @@ -124,6 +125,33 @@ func (a *ActionProvider) StreamAction() *Action { time.Sleep(5 * time.Second) // little backoff to prevent dossing source continue } + //move the files into the ceph storage + file, err := os.Open(filename) + if err != nil { + log.Warn("streamAction: failed to open file, move file to dest manually", "err", err) + return ctx, err + } + defer file.Close() + + copy, err := os.Create(filepath.Join(a.GetMassDir(courseID, streamID, version), end.Format("15-04-05")+".ts")) + if err != nil { + log.Warn("streamAction: failed to create file, move file to dest manually", "err", err) + return ctx, err + } + defer copy.Close() + + _, err = io.Copy(copy, file) + if err != nil { + log.Warn("streamAction: failed to copy file, move file to dest manually", "err", err) + return ctx, err + } + err = os.Remove(filename) + if err != nil { + log.Warn("streamAction: failed to remove file, move file to dest manually", "err", err) + return ctx, err + } + //end file moving + log.Info("stream finished. now sending notification") resp = a.Server.NotifyStreamEnded(ctx, &protobuf.StreamEnded{ RunnerID: hostname, diff --git a/runner/actions/upload.go b/runner/actions/upload.go index 569e67e84..721f46a7b 100644 --- a/runner/actions/upload.go +++ b/runner/actions/upload.go @@ -2,8 +2,8 @@ package actions import ( "context" - "fmt" "github.com/TUM-Dev/gocast/worker/cfg" + "github.com/tum-dev/gocast/runner/protobuf" "io" "log/slog" "mime/multipart" @@ -18,66 +18,36 @@ func (a *ActionProvider) UploadAction() *Action { Type: UploadAction, ActionFn: func(ctx context.Context, log *slog.Logger) (context.Context, error) { - streamID, ok := ctx.Value("stream").(uint64) - if !ok { - return ctx, fmt.Errorf("%w: context doesn't contain stream", ErrRequiredContextValNotFound) - } - courseID, ok := ctx.Value("course").(uint64) - if !ok { - return ctx, fmt.Errorf("%w: context doesn't contain courseID", ErrRequiredContextValNotFound) - } - version, ok := ctx.Value("version").(string) - if !ok { - return ctx, fmt.Errorf("%w: context doesn't contain version", ErrRequiredContextValNotFound) - } + //course := ctx.Value("course").(uint32) + stream := ctx.Value("stream").(uint32) + url := ctx.Value("url").(string) - URLstring := ctx.Value("URL").(string) - - fileName := fmt.Sprintf("%s/%s/%s/%s.mp4", a.MassDir, courseID, streamID, version) + file := ctx.Value("uploadFile").(string) + //this is the part that from worker/upload.go client := &http.Client{ - // 5 minutes timeout, some large files can take a while. - Timeout: time.Minute * 5, + Timeout: time.Minute * 15, } - r, w := io.Pipe() writer := multipart.NewWriter(w) - //the same function as in the worker but without function calling - //so analyzing it and changing it later won't give much to look through - go func() { - defer func(w *io.PipeWriter) { - err := w.Close() - if err != nil { - - } - }(w) - defer func(writer *multipart.Writer) { - err := writer.Close() - if err != nil { - - } - }(writer) - formFileWriter, err := writer.CreateFormFile("filename", fileName) + defer w.Close() + defer writer.Close() + formFileWriter, err := writer.CreateFormFile("filename", file) if err != nil { - log.Error("Cannot create form file: ", err) + log.Error("cannot create form file: ", err) return } - FileReader, err := os.Open(fileName) + fileReader, err := os.Open(file) if err != nil { - log.Error("Cannot create form file: ", err) + log.Error("cannot create form file: ", err) return } - defer func(FileReader *os.File) { - err := FileReader.Close() - if err != nil { - - } - }(FileReader) - _, err = io.Copy(formFileWriter, FileReader) + defer fileReader.Close() + _, err = io.Copy(formFileWriter, fileReader) if err != nil { - log.Error("Cannot create form file: ", err) + log.Error("cannot create form file: ", err) return } @@ -91,39 +61,37 @@ func (a *ActionProvider) UploadAction() *Action { } for name, value := range fields { - formFileWriter, err := writer.CreateFormField(name) - if err != nil { - log.Error("Cannot create form field: ", err) - return - } - _, err = io.Copy(formFileWriter, strings.NewReader(value)) + formFieldWriter, err := writer.CreateFormField(name) if err != nil { log.Error("Cannot create form field: ", err) return } + _, err = io.Copy(formFieldWriter, strings.NewReader(value)) if err != nil { log.Error("Cannot create form field: ", err) return } } }() - rsp, err := client.Post(URLstring, writer.FormDataContentType(), r) + rsp, err := client.Post(cfg.UploadUrl, writer.FormDataContentType(), r) if err == nil && rsp.StatusCode != http.StatusOK { log.Error("Request failed with response code: ", rsp.StatusCode) } if err == nil && rsp != nil { all, err := io.ReadAll(rsp.Body) if err == nil { - log.Debug(string(all), "fileUploaded", fileName) + log.Info("File got uploaded", "Uploaded file", file) + log.Debug("file", all) } } - if err != nil { - log.Error("Failed to post video to TUMLive", "error", err) - return ctx, err - } - log.Info("Successfully posted video to TUMLive", "stream", fileName) - - return ctx, err + a.Server.NotifyVoDUploadFinished(ctx, &protobuf.VoDUploadFinished{ + HLSUrl: url, + StreamID: stream, + RunnerID: "", + SourceType: "", + ThumbnailUrl: "", + }) + return ctx, nil }, } } diff --git a/runner/cmd.yaml b/runner/cmd.yaml index 5d2aa34e7..6e1d5d301 100644 --- a/runner/cmd.yaml +++ b/runner/cmd.yaml @@ -1,4 +1,6 @@ -stream: '-y -hide_banner -loglevel quiet -nostats %v -t %.0f -i %v -c:v copy -c:a copy -f mpegts %v -c:v libx264 -preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -c:a aac -ar 44100 -b:a 128k -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %v/%%05d.ts %v' +stream: '-y -hide_banner -loglevel quiet -nostats %s -t %.0f -i %s -c:v copy -c:a copy -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %s/%%05d.ts %s' + +selfstream: '-preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -ar 44100 -b:a 128k' SeparateAudioFast: "-i %v -vn -c:a copy %v" SeparateAudio: "-i %v -vn %v" diff --git a/runner/config/cmd.go b/runner/config/cmd.go index bd69cb954..9999d9a8e 100644 --- a/runner/config/cmd.go +++ b/runner/config/cmd.go @@ -10,7 +10,7 @@ import ( type CmdList struct { //this is for adding extra parameters - Stream string `Default:"-y -hide_banner -nostats %x -t &.0f -i %s -c:v copy -c:a copy -f mpegts %x -c:v libx264 -preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -c:a aac -ar 44100 -b:a 128k -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %x %x"` + Stream string `Default:"-y -hide_banner -loglevel quiet -nostats %v -t %.0f -i %v -c:v copy -c:a copy -f mpegts %v -c:v libx264 -preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -c:a aac -ar 44100 -b:a 128k -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %v/%%05d.ts %v"` SeparateAudioFast string `Default:"-i %v -vn -c:a copy %v"` SeparateAudio string `Default:"-i %v -vn %v"` AudioNormalize1 string `Default:"-i %v -nostats -y -af loudnorm=I=-23:TP=-2:LRA=7:print_format=json -f null -"` diff --git a/runner/handlers.go b/runner/handlers.go index f4329751f..803a156bd 100644 --- a/runner/handlers.go +++ b/runner/handlers.go @@ -24,7 +24,6 @@ func contextFromTranscodingReq(req *protobuf.TranscodingRequest, ctx context.Con } func (r *Runner) RequestStream(ctx context.Context, req *protobuf.StreamRequest) (*protobuf.StreamResponse, error) { - r.ReadDiagnostics(5) ctx = context.Background() ctx = contextFromStreamReq(req, ctx) ctx = context.WithValue(ctx, "URL", "") diff --git a/runner/ServerHandler.go b/runner/serverhandler.go similarity index 98% rename from runner/ServerHandler.go rename to runner/serverhandler.go index 2055ca5e5..f9bf05e3c 100644 --- a/runner/ServerHandler.go +++ b/runner/serverhandler.go @@ -39,7 +39,7 @@ func (r *Runner) RegisterWithGocast(retries int) { go func() { for { r.ReadDiagnostics(5) - time.Sleep(time.Minute) + time.Sleep(time.Second * 30) } }() } @@ -78,10 +78,12 @@ func (r *Runner) ReadDiagnostics(retries int) { return } actions := "" - for id, _ := range r.activeActions { + for id := range r.activeActions { actions += fmt.Sprintln(id + ",") } + r.log.Info("current actions of runner", "actions", actions) + _, err = con.Heartbeat(context.Background(), &protobuf.HeartbeatRequest{ Hostname: r.cfg.Hostname, Port: int32(r.cfg.Port), diff --git a/web/ts/runner.ts b/web/ts/runner.ts index 3b90dbf99..2a0b953ae 100644 --- a/web/ts/runner.ts +++ b/web/ts/runner.ts @@ -30,32 +30,30 @@ export function getFailedAction(): void { ); }); }); - r + r; } export function listActions(actions: string): void { window.dispatchEvent(new CustomEvent("load-actions")); actions.split(",\n").forEach((id) => { - if (id === "") { - return; - } - fetch("/api/Actions/" + id).then((res) => { - res.text().then((text) => { - window.dispatchEvent( - new CustomEvent("ActionListing", { - detail: { - actions: JSON.parse(text), - }, - }), - ); - }); - }); + if (id === "") { + return; } - ); - var actionwindow = document.getElementById("actionList"); + fetch("/api/Actions/" + id).then((res) => { + res.text().then((text) => { + window.dispatchEvent( + new CustomEvent("ActionListing", { + detail: { + actions: JSON.parse(text), + }, + }), + ); + }); + }); + }); + const actionwindow = document.getElementById("actionList"); actionwindow.classList.toggle("show"); console.log(actionwindow.style.getPropertyValue("visibility")); //actionwindow.classList.add - } diff --git a/worker/cfg/cfg.go b/worker/cfg/cfg.go index 858639d02..ad54a45fe 100644 --- a/worker/cfg/cfg.go +++ b/worker/cfg/cfg.go @@ -19,6 +19,7 @@ var ( LrzSubDir string MainBase string LrzUploadUrl string + UploadUrl string VodURLTemplate string LogDir string Hostname string @@ -45,6 +46,7 @@ func SetConfig() { LrzPhone = os.Getenv("LrzPhone") LrzSubDir = os.Getenv("LrzSubDir") LrzUploadUrl = os.Getenv("LrzUploadUrl") + UploadUrl = os.Getenv("UploadUrl") MainBase = os.Getenv("MainBase") // eg. live.mm.rbg.tum.de VodURLTemplate = os.Getenv("VodURLTemplate") // eg. https://stream.lrz.de/vod/_definst_/mp4:tum/RBG/%s.mp4/playlist.m3u8