From 4fdc57b583ecdfc4355f011c89a06b050b095cd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien?= Date: Thu, 26 Sep 2024 14:19:24 +0200 Subject: [PATCH 1/2] gpu support in container --- master/taskMgr.go | 5 +++-- worker/dockerengine.go | 31 ++++++++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/master/taskMgr.go b/master/taskMgr.go index 5bf6d0ae..5b1a231f 100644 --- a/master/taskMgr.go +++ b/master/taskMgr.go @@ -99,7 +99,7 @@ func (gf *GroupInfo) GetHash() string { sortedpairs := make([]*KVPair, 0) for k, v := range *gf { - DEBUG.Println("group k: %s, v: %+v\r\n", k, v) + DEBUG.Printf("group k: %s, v: %+v\r\n", k, v) kvpair := KVPair{} kvpair.Key = k @@ -292,12 +292,13 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp for i := 0; i < len(task.Inputs); i++ { if task.Inputs[i].ID == entity.ID && !(task.Inputs[i].Location.IsEqual(&entity.Location)) { locationChanged = true - DEBUG.Println("[location changed] entity: ", entity.ID) + DEBUG.Println("[location changed] entity: ", entity.ID, " - task.Inputs[", i, "]: ", task.Inputs[i]) // update the input entities with the new location task.Inputs[i].Location = entity.Location break } } + DEBUG.Println("Done dealing with location change") // if the location is changed, calculate the new optimal worker assignment newOptimalWorkerID := task.WorkerID diff --git a/worker/dockerengine.go b/worker/dockerengine.go index 58486533..ca536d89 100644 --- a/worker/dockerengine.go +++ b/worker/dockerengine.go @@ -127,7 +127,7 @@ func (dockerengine *DockerEngine) findFreePortNumber() int { return l.Addr().(*net.TCPAddr).Port } -//functionCode string, taskID string, adminCfg []interface{}, servicePorts []string) +// functionCode string, taskID string, adminCfg []interface{}, servicePorts []string) func (dockerengine *DockerEngine) StartTask(task *ScheduledTaskInstance, brokerURL string) (string, string, error) { dockerImage := task.DockerImage INFO.Println("to execute Task [", task.ID, "] to perform Operation [", @@ -179,6 +179,8 @@ func (dockerengine *DockerEngine) StartTask(task *ScheduledTaskInstance, brokerU commands = append(commands, setOutputCmd) } + hostConfig := docker.HostConfig{} + // check if it is required to set up the portmapping for its endpoint services servicePorts := make([]string, 0) @@ -187,6 +189,31 @@ func (dockerengine *DockerEngine) StartTask(task *ScheduledTaskInstance, brokerU if parameter.Name == "service_port" { servicePorts = strings.Split(parameter.Value, ";") } + + var gpu_count = 0 + var err error + if parameter.Name == "gpus" { + DEBUG.Println("Enable NVIDIA gpus") + hostConfig.Runtime = "nvidia" + + if parameter.Value == "all" { + gpu_count = -1 + } else { + count, err = strconv.Atoi(parameter.Value) + DEBUG.Println("Error converting parameter to int: ", err) + } + + if count != 0 { + // Configure GPU resources + // DEBUG.Println("Request GPU devices") + hostConfig.DeviceRequests = append(hostConfig.DeviceRequests, docker.DeviceRequest{ + Driver: "nvidia", + Count: gpu_count, // Allocate all available GPUs + Capabilities: [][]string{{"compute", "utility"}}, + }) + } + // DEBUG.Printf("hostConfig: %v", hostConfig) + } } // prepare the configuration for a docker container, host mode for the container network @@ -199,8 +226,6 @@ func (dockerengine *DockerEngine) StartTask(task *ScheduledTaskInstance, brokerU config := docker.Config{Image: dockerImage, Env: evs} - hostConfig := docker.HostConfig{} - //if runtime.GOOS == "darwin" { already use the bridge model internalPort := docker.Port(freePort + "/tcp") portBindings := map[docker.Port][]docker.PortBinding{ From e5930fdb2c51d9536c7776528fdb819fc2de8c55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien?= Date: Thu, 17 Oct 2024 15:02:39 +0200 Subject: [PATCH 2/2] Revert debug messages --- master/taskMgr.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/master/taskMgr.go b/master/taskMgr.go index 54e301ab..e13672cc 100644 --- a/master/taskMgr.go +++ b/master/taskMgr.go @@ -304,13 +304,12 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp for i := 0; i < len(task.Inputs); i++ { if task.Inputs[i].ID == entity.ID && !(task.Inputs[i].Location.IsEqual(&entity.Location)) { locationChanged = true - DEBUG.Println("[location changed] entity: ", entity.ID, " - task.Inputs[", i, "]: ", task.Inputs[i]) + DEBUG.Println("[location changed] entity: ", entity.ID) // update the input entities with the new location task.Inputs[i].Location = entity.Location break } } - DEBUG.Println("Done dealing with location change") // if the location is changed, calculate the new optimal worker assignment newOptimalWorkerID := task.WorkerID