Skip to content

Commit

Permalink
feat(server): Add delete and recreate buttons
Browse files Browse the repository at this point in the history
  • Loading branch information
pando85 committed Jan 23, 2024
1 parent 6a9aeb5 commit fd9a8b6
Show file tree
Hide file tree
Showing 14 changed files with 721 additions and 278 deletions.
5 changes: 2 additions & 3 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ const (
CanceledNotificationStatus NotificationStatus = "canceled"
FailedNotificationStatus NotificationStatus = "failed"

CancelJob JobAction = "cancel"
EncodeJobType JobType = "encode"
PGSToSrtJobType JobType = "pgstosrt"
EncodeJobType JobType = "encode"
PGSToSrtJobType JobType = "pgstosrt"
)

type Identity interface {
Expand Down
20 changes: 20 additions & 0 deletions server/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Repository interface {
PingServerUpdate(ctx context.Context, name string, ip string, queueName string) error
GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEvent, error)
GetJob(ctx context.Context, uuid string) (*model.Video, error)
DeleteJob(ctx context.Context, uuid string) error
GetJobs(ctx context.Context, page int, pageSize int) (*[]model.Video, error)
GetJobByPath(ctx context.Context, path string) (*model.Video, error)
AddNewTaskEvent(ctx context.Context, event *model.TaskEvent) error
Expand Down Expand Up @@ -134,6 +135,7 @@ func (S *SQLRepository) prepareDatabase(ctx context.Context) (returnError error)
if err != nil {
return err
}
log.Debug("prepare database")
_, err = con.ExecContext(ctx, databaseScript)
return err
})
Expand Down Expand Up @@ -182,6 +184,15 @@ func (S *SQLRepository) GetJob(ctx context.Context, uuid string) (video *model.V
return video, err
}

func (S *SQLRepository) DeleteJob(ctx context.Context, uuid string) error {
db, err := S.getConnection(ctx)
if err != nil {
return err
}
err = S.deleteJob(db, uuid)
return err
}

func (S *SQLRepository) GetJobs(ctx context.Context, page int, pageSize int) (videos *[]model.Video, returnError error) {
db, err := S.getConnection(ctx)
if err != nil {
Expand Down Expand Up @@ -235,6 +246,15 @@ func (S *SQLRepository) getJob(ctx context.Context, tx Transaction, uuid string)
return &video, nil
}

func (S *SQLRepository) deleteJob(tx Transaction, uuid string) error {
sqlResult, err := tx.Exec("DELETE FROM videos WHERE id=$1", uuid)
log.Debugf("query result: +%v", sqlResult)
if err != nil {
return err
}
return nil
}

func (S *SQLRepository) getJobs(ctx context.Context, tx Transaction, page int, pageSize int) (*[]model.Video, error) {
offset := (page - 1) * pageSize
query := fmt.Sprintf("SELECT id FROM videos LIMIT %d OFFSET %d", pageSize, offset)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
DELETE FROM video_events
WHERE video_id NOT IN (SELECT id FROM videos);

ALTER TABLE video_events
DROP CONSTRAINT video_events_video_id_fkey;

ALTER TABLE video_events
ADD CONSTRAINT fk_video_events_videos
FOREIGN KEY (video_id) REFERENCES videos(id) ON DELETE CASCADE;

-- Update video_status table
ALTER TABLE video_status
ADD CONSTRAINT fk_video_status_videos
FOREIGN KEY (video_id) REFERENCES videos(id) ON DELETE CASCADE;
188 changes: 105 additions & 83 deletions server/repository/resources/database.sql
Original file line number Diff line number Diff line change
@@ -1,99 +1,121 @@
CREATE TABLE IF NOT EXISTS videos
(
id varchar(255) primary key,
source_path text not null,
destination_path text not null
-- Define videos table
CREATE TABLE IF NOT EXISTS videos (
id varchar(255) PRIMARY KEY,
source_path text NOT NULL,
destination_path text NOT NULL
);

CREATE TABLE IF NOT EXISTS video_events(
video_id varchar(255) not null,
video_event_id int not null,
worker_name varchar(255) not null,
event_time timestamp not null,
event_type varchar(50) not null,
notification_type varchar(50) not null,
status varchar(20) not null,
-- Define video_events table
CREATE TABLE IF NOT EXISTS video_events (
video_id varchar(255) NOT NULL,
video_event_id int NOT NULL,
worker_name varchar(255) NOT NULL,
event_time timestamp NOT NULL,
event_type varchar(50) NOT NULL,
notification_type varchar(50) NOT NULL,
status varchar(20) NOT NULL,
message text,
primary key (video_id,video_event_id),
foreign KEY (video_id) REFERENCES videos(id)
PRIMARY KEY (video_id, video_event_id),
FOREIGN KEY (video_id) REFERENCES videos(id) ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS workers
(
name varchar(100) primary key not null,
ip varchar(100) not null,
queue_name varchar(255) not null,
last_seen timestamp not null
-- Define workers table
CREATE TABLE IF NOT EXISTS workers (
name varchar(100) PRIMARY KEY NOT NULL,
ip varchar(100) NOT NULL,
queue_name varchar(255) NOT NULL,
last_seen timestamp NOT NULL
);

-- Define video_status table
CREATE TABLE IF NOT EXISTS video_status (
video_id varchar(255) not null,
video_event_id integer not null,
video_path text not null,
worker_name varchar(255) not null,
event_time timestamp not null,
event_type varchar(50) not null,
notification_type varchar(50) not null,
status varchar(20) not null,
message text,
constraint video_status_pkey
primary key (video_id)
video_id varchar(255) NOT NULL,
video_event_id integer NOT NULL,
video_path text NOT NULL,
worker_name varchar(255) NOT NULL,
event_time timestamp NOT NULL,
event_type varchar(50) NOT NULL,
notification_type varchar(50) NOT NULL,
status varchar(20) NOT NULL,
message text,
CONSTRAINT video_status_pkey PRIMARY KEY (video_id),
FOREIGN KEY (video_id) REFERENCES videos(id) ON DELETE CASCADE
);

--Function to insert update on video_status
create or replace function fn_video_status_update(p_video_id varchar, p_video_event_id integer,
p_worker_name varchar, p_event_time timestamp, p_event_type varchar, p_notification_type varchar, p_status varchar, p_message text) returns void
security definer
language plpgsql as $$
declare
-- Function to insert or update video_status
CREATE OR REPLACE FUNCTION fn_video_status_update(
p_video_id varchar,
p_video_event_id integer,
p_worker_name varchar,
p_event_time timestamp,
p_event_type varchar,
p_notification_type varchar,
p_status varchar,
p_message text
) RETURNS VOID SECURITY DEFINER LANGUAGE plpgsql AS $$
DECLARE
p_video_path varchar;
begin
select v.source_path into p_video_path from videos v where v.id=p_video_id;
insert into video_status(video_id, video_event_id, video_path,worker_name, event_time, event_type, notification_type, status, message)
values (p_video_id, p_video_event_id,p_video_path, p_worker_name, p_event_time, p_event_type, p_notification_type,
p_status, p_message)
on conflict on constraint video_status_pkey
do update set video_event_id=p_video_event_id, video_path=p_video_path,worker_name=p_worker_name,
event_time=p_event_time, event_type=p_event_type,
notification_type=p_notification_type, status=p_status, message=p_message;
end;
$$;
BEGIN
SELECT v.source_path INTO p_video_path
FROM videos v
WHERE v.id = p_video_id;

--trigger function for video_status_update
create or replace function fn_trigger_video_status_update() returns trigger
security definer
language plpgsql
as $$
begin
perform fn_video_status_update(new.video_id, new.video_event_id,
new.worker_name,new.event_time,new.event_type,new.notification_type,
new.status,new.message);
return new;
end;
INSERT INTO video_status (
video_id,
video_event_id,
video_path,
worker_name,
event_time,
event_type,
notification_type,
status,
message
)
VALUES (
p_video_id,
p_video_event_id,
p_video_path,
p_worker_name,
p_event_time,
p_event_type,
p_notification_type,
p_status,
p_message
)
ON CONFLICT ON CONSTRAINT video_status_pkey DO UPDATE SET
video_event_id = p_video_event_id,
video_path = p_video_path,
worker_name = p_worker_name,
event_time = p_event_time,
event_type = p_event_type,
notification_type = p_notification_type,
status = p_status,
message = p_message;
END;
$$;
--trigger video_events
drop trigger if exists event_insert_video_status_update on video_events;
create trigger event_insert_video_status_update after insert on video_events
for each row
execute procedure fn_trigger_video_status_update();

-- Trigger function for video_status_update
CREATE OR REPLACE FUNCTION fn_trigger_video_status_update() RETURNS TRIGGER SECURITY DEFINER LANGUAGE plpgsql AS $$
BEGIN
PERFORM fn_video_status_update(
NEW.video_id,
NEW.video_event_id,
NEW.worker_name,
NEW.event_time,
NEW.event_type,
NEW.notification_type,
NEW.status,
NEW.message
);
RETURN NEW;
END;
$$;

--To Reload Everything!!
--do language plpgsql $$
-- declare
-- e record;
-- i integer:=1;
-- begin
-- for e in (select * from video_events order by event_time asc) loop
-- perform fn_video_status_update(e.video_id, e.video_event_id,
-- e.worker_name,e.event_time,e.event_type,e.notification_type,
-- e.status,e.message);
-- i:=i+1;
-- IF MOD(i, 200) = 0 THEN
-- COMMIT;
-- END IF;
-- end loop;
-- end;
--$$;

-- Drop existing trigger if it exists
DROP TRIGGER IF EXISTS event_insert_video_status_update ON video_events;

-- Create trigger for video_events
CREATE TRIGGER event_insert_video_status_update
AFTER INSERT ON video_events
FOR EACH ROW
EXECUTE PROCEDURE fn_trigger_video_status_update();
44 changes: 9 additions & 35 deletions server/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type Scheduler interface {
Run(wg *sync.WaitGroup, ctx context.Context)
ScheduleJobRequests(ctx context.Context, jobRequest *model.JobRequest) (*ScheduleJobRequestResult, error)
GetJob(ctx context.Context, uuid string) (videos *model.Video, err error)
DeleteJob(ctx context.Context, uuid string) error
GetJobs(ctx context.Context, page int, pageSize int) (*[]model.Video, error)
GetUploadJobWriter(ctx context.Context, uuid string) (*UploadJobStream, error)
GetDownloadJobWriter(ctx context.Context, uuid string) (*DownloadJobStream, error)
GetChecksum(ctx context.Context, uuid string) (string, error)
CancelJob(ctx context.Context, uuid string) error
}

type SchedulerConfig struct {
Expand Down Expand Up @@ -273,9 +273,9 @@ func (R *RuntimeScheduler) scheduleJobRequest(ctx context.Context, jobRequest *m
}
}

downloadURL, _ := url.Parse(fmt.Sprintf("%s/api/v1/download/%s", R.config.Domain.String(), video.Id.String()))
uploadURL, _ := url.Parse(fmt.Sprintf("%s/api/v1/upload/%s", R.config.Domain.String(), video.Id.String()))
checksumURL, _ := url.Parse(fmt.Sprintf("%s/api/v1/checksum/%s", R.config.Domain.String(), video.Id.String()))
downloadURL, _ := url.Parse(fmt.Sprintf("%s/api/v1/job/%s/download", R.config.Domain.String(), video.Id.String()))
uploadURL, _ := url.Parse(fmt.Sprintf("%s/api/v1/job/%s/upload", R.config.Domain.String(), video.Id.String()))
checksumURL, _ := url.Parse(fmt.Sprintf("%s/api/v1/job/%s/checksum", R.config.Domain.String(), video.Id.String()))
task := &model.TaskEncode{
Id: video.Id,
DownloadURL: downloadURL.String(),
Expand Down Expand Up @@ -330,42 +330,16 @@ func (R *RuntimeScheduler) ScheduleJobRequests(ctx context.Context, jobRequest *
return result, returnError
}

func (R *RuntimeScheduler) GetJob(ctx context.Context, uuid string) (videos *model.Video, err error) {
func (R *RuntimeScheduler) GetJob(ctx context.Context, uuid string) (*model.Video, error) {
return R.repo.GetJob(ctx, uuid)
}

func (R *RuntimeScheduler) GetJobs(ctx context.Context, page int, pageSize int) (videos *[]model.Video, err error) {
return R.repo.GetJobs(ctx, page, pageSize)
func (R *RuntimeScheduler) DeleteJob(ctx context.Context, uuid string) error {
return R.repo.DeleteJob(ctx, uuid)
}

func (R *RuntimeScheduler) CancelJob(ctx context.Context, uuid string) error {
video, err := R.repo.GetJob(ctx, uuid)
if err != nil {
if errors.Is(err, repository.ElementNotFound) {
return ErrorJobNotFound
}
return err
}
lastEvent := video.Events.GetLatestPerNotificationType(model.JobNotification)
status := lastEvent.Status
if status == model.StartedNotificationStatus {
jobAction := &model.JobEvent{
Id: video.Id,
Action: model.CancelJob,
}

worker, err := R.repo.GetWorker(ctx, lastEvent.WorkerName)
if err != nil {
if errors.Is(err, repository.ElementNotFound) {
return ErrorJobNotFound
}
return err
}
R.queue.PublishJobEvent(jobAction, worker.QueueName)
} else {
return fmt.Errorf("%w: job in status %s", ErrorInvalidStatus, status)
}
return nil
func (R *RuntimeScheduler) GetJobs(ctx context.Context, page int, pageSize int) (*[]model.Video, error) {
return R.repo.GetJobs(ctx, page, pageSize)
}

func (R *RuntimeScheduler) isValidStremeableJob(ctx context.Context, uuid string) (*model.Video, error) {
Expand Down
18 changes: 9 additions & 9 deletions server/web/ui/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const App: React.FC = () => {
};

const Jobs: React.FC = () => (
<div className="contentContainer">
<div className="content-container">
{showJobTable && <JobTable token={token} setShowJobTable={setShowJobTable} />}
</div>
);
Expand All @@ -54,26 +54,26 @@ const App: React.FC = () => {
>
<Theme />
<Router>
<div className="tableContainer">
<div className="page">
<Navigation/>
{!showJobTable && (
<div className="centeredContainer">
<div className="centered-container">
<div className="auth-form">
<form className="tokenInput" onSubmit={handleTokenSubmit}>
<form className="token-input" onSubmit={handleTokenSubmit}>
<div className="field">
<label className="is-label">Token</label>
<div className="passwordInputContainer">
<div className="password-input-container">
<input
className="passwordInput"
className="password-input"
type={showToken ? 'text' : 'password'}
value={token}
onChange={handleTokenInput}
/>
<div className="passwordInputSuffix">
<div className="password-input-suffix">
{showToken ? (
<VisibilityOff className="eyeIcon" onClick={handleToggleShowToken} />
<VisibilityOff className="eye-icon" onClick={handleToggleShowToken} />
) : (
<Visibility className="eyeIcon" onClick={handleToggleShowToken} />
<Visibility className="eye-icon" onClick={handleToggleShowToken} />
)}
</div>
</div>
Expand Down
Loading

0 comments on commit fd9a8b6

Please sign in to comment.