Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote ai worker #1

Open
wants to merge 39 commits into
base: ai-video
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
703455f
[net,server]: RemoteAIWorker gRPC
kyriediculous Jun 12, 2024
f1a2f9e
[cmd,core]: initial RemoteAIWorkerManager setup
kyriediculous Jun 12, 2024
bb660b8
[core,server]: add ai results route and registerAiworker route for or…
kyriediculous Jun 12, 2024
fdbe545
cmd: start transcoder with full capabilities (ai + transcoder)
kyriediculous Jun 12, 2024
419ba92
[core,server]: Orchestrator.ServeRemoteAIWorker
kyriediculous Jun 12, 2024
b7fefc8
[core,net]: RemoteAIWorkerManager task management
kyriediculous Jun 12, 2024
c5b720b
[core,server]: hander remote AI worker job results on orchestrator
kyriediculous Jun 12, 2024
da2ad1b
[core,net]: example remote AI request handling
kyriediculous Jun 12, 2024
658859c
server: add aiResults route to server
kyriediculous Jun 12, 2024
eed2f25
[server,core]: fix RegisterAIWorker interface implementaiton
kyriediculous Jun 13, 2024
303c219
temporary models config in remote AI manager mode
kyriediculous Jun 21, 2024
eb38e0b
[core]: fix nil map error for task channels on remotee transcoder man…
kyriediculous Jun 21, 2024
6553ceb
cmd: set transcoding capabilties in remote AI mode without needing tr…
kyriediculous Jun 21, 2024
9700e69
wip: log received AI job on transcoder
kyriediculous Jun 27, 2024
e172b58
wip: handle ai job on transcoder
kyriediculous Jun 27, 2024
dce0900
wip: temp disable ai worker
kyriediculous Jun 27, 2024
5338590
wip: log ai result on orchestrator
kyriediculous Jun 27, 2024
e9817d2
fix: post ai results to correct route
kyriediculous Jun 27, 2024
7edce9e
server: image-to-image remote worker
reubenr0d Jun 28, 2024
e65535c
server: handle remote upscale job
reubenr0d Jul 1, 2024
4b18bb7
server: handlers for image-to-image + upscale
reubenr0d Jul 1, 2024
4ecbed7
server: advertise ai capabilities to remote O
reubenr0d Jul 2, 2024
44bccbc
remove double start
kyriediculous Jul 2, 2024
f5b434b
fix: uncomment running actual ai worker jobs
kyriediculous Jul 2, 2024
e27592f
fix: remove mock ai responses
kyriediculous Jul 2, 2024
2c0dae9
wip: rotate remote ai workers by model
reubenr0d Jul 5, 2024
bdcf406
ai: map remote workers by pipeline
reubenr0d Jul 6, 2024
dcdaea6
core,server: improve remote AI worker code, clean up
kyriediculous Jul 24, 2024
705ebec
fix: remote transcoder selection bug fixes
kyriediculous Jul 25, 2024
f000acc
fix: improve CheckAICapacity to have no breaking changes
kyriediculous Jul 26, 2024
f1d0229
fix: nil pointer error on startup
kyriediculous Jul 26, 2024
4ea28d8
fix: improve ot_rpc remote ai worker error handling
kyriediculous Jul 27, 2024
6a24dd4
fix: propagate remote AI result error properly
kyriediculous Jul 27, 2024
f4abad8
fix: aiErr unmarshal to json
kyriediculous Jul 27, 2024
be093d7
feat: rotate workers until timeout, success, or all fail
kyriediculous Jul 27, 2024
59f4693
feat: add image to video for remote worker
kyriediculous Jul 27, 2024
f84e135
chore: go mod tidy
kyriediculous Jul 27, 2024
6523765
feat: add audio-to-text
kyriediculous Jul 27, 2024
4258462
fix nil pointer on error from RemoteAIResult
kyriediculous Jul 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 96 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if len(aiCaps) > 0 {
capability := aiCaps[len(aiCaps)-1]
price := n.GetBasePriceForCap("default", capability, config.ModelID)
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s at price %d per %d unit", config.Pipeline, capability, config.ModelID, price.Num(), price.Denom())
if price != nil {
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s at price %d per %d unit", config.Pipeline, capability, config.ModelID, price.Num(), price.Denom())
}
}
}
} else {
Expand All @@ -650,6 +652,92 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}()
}

if !*cfg.AIWorker && *cfg.AIModels != "" {
configs, err := core.ParseAIModelConfigs(*cfg.AIModels)
if err != nil {
glog.Errorf("Error parsing -aiModels: %v", err)
return
}

for _, config := range configs {
modelConstraint := &core.ModelConstraint{Warm: config.Warm}

// If the config contains a URL we call Warm() anyway because AIWorker will just register
// the endpoint for an external container
if config.Warm || config.URL != "" {
endpoint := worker.RunnerEndpoint{URL: config.URL, Token: config.Token}
if err := n.AIWorker.Warm(ctx, config.Pipeline, config.ModelID, endpoint, config.OptimizationFlags); err != nil {
glog.Errorf("Error AI worker warming %v container: %v", config.Pipeline, err)
return
}
}

// Show warning if people set OptimizationFlags but not Warm.
if len(config.OptimizationFlags) > 0 && !config.Warm {
glog.Warningf("Model %v has 'optimization_flags' set without 'warm'. Optimization flags are currently only used for warm containers.", config.ModelID)
}

switch config.Pipeline {
case "text-to-image":
_, ok := constraints[core.Capability_TextToImage]
if !ok {
aiCaps = append(aiCaps, core.Capability_TextToImage)
constraints[core.Capability_TextToImage] = &core.Constraints{
Models: make(map[string]*core.ModelConstraint),
}
}

constraints[core.Capability_TextToImage].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_TextToImage, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
case "image-to-image":
_, ok := constraints[core.Capability_ImageToImage]
if !ok {
aiCaps = append(aiCaps, core.Capability_ImageToImage)
constraints[core.Capability_ImageToImage] = &core.Constraints{
Models: make(map[string]*core.ModelConstraint),
}
}

constraints[core.Capability_ImageToImage].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_ImageToImage, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
case "image-to-video":
_, ok := constraints[core.Capability_ImageToVideo]
if !ok {
aiCaps = append(aiCaps, core.Capability_ImageToVideo)
constraints[core.Capability_ImageToVideo] = &core.Constraints{
Models: make(map[string]*core.ModelConstraint),
}
}

constraints[core.Capability_ImageToVideo].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_ImageToVideo, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
case "upscale":
_, ok := constraints[core.Capability_Upscale]
if !ok {
aiCaps = append(aiCaps, core.Capability_Upscale)
constraints[core.Capability_Upscale] = &core.Constraints{
Models: make(map[string]*core.ModelConstraint),
}
}

constraints[core.Capability_Upscale].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_Upscale, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
}

if len(aiCaps) > 0 {
capability := aiCaps[len(aiCaps)-1]
price := n.GetBasePriceForCap("default", capability, config.ModelID)
if price != nil {
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s at price %d per %d unit", config.Pipeline, capability, config.ModelID, price.Num(), price.Denom())
}
}
}
}

if *cfg.Redeemer {
n.NodeType = core.RedeemerNode
} else if *cfg.Orchestrator {
Expand All @@ -658,6 +746,12 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
n.TranscoderManager = core.NewRemoteTranscoderManager()
n.Transcoder = n.TranscoderManager
}
if !*cfg.AIWorker {
n.AIManager = core.NewRemoteAIWorkerManager()
n.AIWorker = n.AIManager
// set transcoder capabilties since we don't use transcoder flag here
transcoderCaps = append(core.DefaultCapabilities(), core.OptionalCapabilities()...)
}
} else if *cfg.Transcoder {
n.NodeType = core.TranscoderNode
} else if *cfg.Broadcaster {
Expand Down Expand Up @@ -1385,7 +1479,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Exit("Missing -orchAddr")
}

go server.RunTranscoder(n, orchURLs[0].Host, core.MaxSessions, transcoderCaps)
go server.RunTranscoder(n, orchURLs[0].Host, core.MaxSessions, n.Capabilities)
}

switch n.NodeType {
Expand Down
Loading