diff --git a/README.md b/README.md index 6e579a5..adfbaad 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,62 @@ -# Virtual SocketCAN Kubernetes device plugin +# SocketCAN Kubernetes device plugin This plugins enables you to create virtual [SocketCAN](https://en.wikipedia.org/wiki/SocketCAN) interfaces inside your Kubernetes Pods. `vcan` allows processes inside the pod to communicate with each other using the full Linux SocketCAN API. ## Usage example -Assuming you have a microk8s Kubernetes cluster with `kubectl` configured properly you can install the SocketCAN plugin: +Assuming you have a [microk8s](https://microk8s.io) Kubernetes cluster you can install the SocketCAN plugin: - kubectl apply -f https://raw.githubusercontent.com/jpc/k8s-socketcan/main/k8s-socketcan-daemonset-microk8s.yaml + microk8s kubectl apply -f https://raw.githubusercontent.com/Collabora/k8s-socketcan/main/k8s-socketcan-daemonset.yaml -There is also a YAML file for Azure AKS. Using it with other k8s providers may required an adjustment of the -`run-containerd` hostPath volume to point to the containerd control socket. +NOTE: Using it with other k8s providers should require only an adjustment to the `init` container script to add a new +search path for the `containerd.sock` control socket and/or install the `vcan` kernel module. -Next, you can create a simple Pod that has two vcan interfaces enabled: +Next, you can create a simple Pod that has two `vcan` interfaces enabled: - kubectl apply -f https://raw.githubusercontent.com/jpc/k8s-socketcan/main/k8s-socketcan-client-example.yaml + microk8s kubectl apply -f https://raw.githubusercontent.com/Collabora/k8s-socketcan/main/k8s-socketcan-client-example.yaml Afterwards you can run these two commands in two separate terminals to verify it's working correctly: - kubectl exec -it k8s-socketcan-client-example -- candump vcan0 - kubectl exec -it k8s-socketcan-client-example -- cansend vcan0 5A1#11.2233.44556677.88 + microk8s kubectl exec -it k8s-socketcan-client-example -- candump vcan0 + microk8s kubectl exec -it k8s-socketcan-client-example -- cansend vcan0 5A1#11.2233.44556677.88 + +Adding SocketCAN support to an existing Pod is as easy as adding a resource limit in the container spec: + +```yaml +resources: + limits: + k8s.collabora.com/vcan: 1 +``` + +## Hardware CAN interfaces + +The SocketCAN device plugin also supports hardware CAN interfaces which is useful if you want to use (for example) +[K3s](https://k3s.io) to manage your embedded system software. It allows you to improve security by moving a SocketCAN network +interface into a single container and fully isolating it from any other applications on the system. It's a perfect +solution if you have a daemon that arbitrates all access to the CAN bus and you wish to containerize it. + +To move a hardware CAN interface into a Pod you have to modify the DaemonSet to specify the names of the interfaces +you wish to make available. The names should be passed as a space separated list in the `SOCKETCAN_DEVICES` environment +variable: + +```yaml +containers: +- name: k8s-socketcan + image: ghcr.io/collabora/k8s-socketcan:latest + env: + - name: SOCKETCAN_DEVICES + value: "can1 can2" +``` + +Afterwards, in the client container definition, instead of `k8s.collabora.com/vcan` you can specify the name of +the interface you wish to use (adding the `socketcan-` prefix to make sure it's unambiguous): + +```yaml +resources: + limits: + k8s.collabora.com/socketcan-can1: 1 +``` ## Learn more @@ -37,12 +74,18 @@ Other resources: ## Limitations +The plugin requires kernel support for SocketCAN (compiled in or as a module) on the cluster Nodes. +This is a package installation away on Ubuntu (so microk8s and Azure AKS work great) but unfortunatelly does not seem +possible at all on Alpine (so for example Rancher Desktop <= v1.1.1 does not work). + Currently each Pod get it's own isolated virtual SocketCAN network. There is no support for bridging -this to other Pods on the same node or to other nodes in the cluster. +this to other Pods on the same node or to other nodes in the cluster. [Adding local bridging would be possible with +the `vxcan` functionality in the kernel and the `cangw` tool.](https://www.lagerdata.com/articles/forwarding-can-bus-traffic-to-a-docker-container-using-vxcan-on-raspberry-pi) Transparent bridging to other cluster nodes over +the network should be possible manually with [cannelloni](https://github.com/mguentner/cannelloni). Pull requests to either of these cases automatically +are more then welcome. -[Adding local bridging would be possible with the `vxcan` functionality in the kernel and the `cangw` tool.](https://www.lagerdata.com/articles/forwarding-can-bus-traffic-to-a-docker-container-using-vxcan-on-raspberry-pi) -Transparent bridging to other cluster nodes over the network should be possible manually with [cannelloni](https://github.com/mguentner/cannelloni). -Pull requests to integrate both of these automatically are more then welcome. +Currently the plugin only work with clusters based on containerd, which includes most production clusters but +not Docker Desktop (we recommend using [microk8s](https://microk8s.io) instead). Pull requests to support dockerd are of course welcome. ## Other solutions @@ -50,7 +93,8 @@ This project was inspired by the [k8s-device-plugin-socketcan](https://github.co from scrach and has some significant improvements: - it has a valid Open Source license (MIT) -- it supports `containerd` (which is used by default in most k8s clusters, like AKS, these days) instead of the Docker daemon +- it supports `containerd` (which is used by default in most k8s clusters, like AKS, these days) instead of the `dockerd` - it is capable of handling multiple Pods starting at the same time, which avoids head-of-the-line blocking issues when you have Pods that take a long time to start +- it supports exclusive use of real CAN interfaces -Both projects currently support only separate per-Pod SocketCAN networks. +Neither project currently supports sharing a single SocketCAN interface among multiple Pods. diff --git a/k8s-socketcan-client-example.yaml b/k8s-socketcan-client-example.yaml index acba83b..64b4f53 100644 --- a/k8s-socketcan-client-example.yaml +++ b/k8s-socketcan-client-example.yaml @@ -13,7 +13,7 @@ spec: name: k8s-socketcan-client-example resources: limits: - k8s.collabora.com/socketcan: 2 + k8s.collabora.com/vcan: 2 tolerations: - key: "sku" operator: "Equal" diff --git a/k8s-socketcan-daemonset.yaml b/k8s-socketcan-daemonset.yaml index 1f145de..f3a5fbd 100644 --- a/k8s-socketcan-daemonset.yaml +++ b/k8s-socketcan-daemonset.yaml @@ -34,6 +34,9 @@ spec: containers: - name: k8s-socketcan image: ghcr.io/collabora/k8s-socketcan:latest + env: + - name: SOCKETCAN_DEVICES + value: "" securityContext: privileged: true capabilities: diff --git a/socketcan.go b/socketcan.go index 0e61ad8..630f0a1 100644 --- a/socketcan.go +++ b/socketcan.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "net" + "os" + "strings" "time" "github.com/containerd/containerd" @@ -22,22 +24,33 @@ const ( ) // Enumeration class -type SocketCANLister struct{} +type SocketCANLister struct { + real_devices []string +} func (scl SocketCANLister) GetResourceNamespace() string { return resourceNamespace } func (scl SocketCANLister) Discover(pluginListCh chan dpm.PluginNameList) { - var plugins = dpm.PluginNameList{"socketcan"} + all_devices := append([]string{"vcan"}, scl.real_devices...) + var plugins = dpm.PluginNameList(all_devices) pluginListCh <- plugins } -func (scl SocketCANLister) NewPlugin(socketcan string) dpm.PluginInterface { - glog.V(3).Infof("Creating device plugin %s", socketcan) - return &SocketCANDevicePlugin{ - assignmentCh: make(chan *Assignment), +func (scl SocketCANLister) NewPlugin(kind string) dpm.PluginInterface { + glog.V(3).Infof("Creating device plugin %s", kind) + + if kind == "vcan" { + return &VCANDevicePlugin{ + assignmentCh: make(chan *Assignment), + } + } else { + return &SocketCANDevicePlugin{ + assignmentCh: make(chan *Assignment), + device_name: strings.TrimPrefix(kind, "socketcan-"), + } } } @@ -46,7 +59,7 @@ const ( fakeDevicePath = "/var/run/k8s-socketcan/fakedev" ) -type SocketCANDevicePlugin struct { +type VCANDevicePlugin struct { assignmentCh chan *Assignment device_paths map[string]*Assignment client *containerd.Client @@ -61,7 +74,7 @@ type Assignment struct { // PluginInterfaceStart is an optional interface that could be implemented by plugin. // If case Start is implemented, it will be executed by Manager after plugin instantiation and before its registartion to kubelet. // This method could be used to prepare resources before they are offered to Kubernetes. -func (p *SocketCANDevicePlugin) Start() error { +func (p *VCANDevicePlugin) Start() error { go p.interfaceCreator() return nil } @@ -69,12 +82,12 @@ func (p *SocketCANDevicePlugin) Start() error { // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disappears, ListAndWatch // returns the new list -func (scdp *SocketCANDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { +func (scdp *VCANDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { devices := make([]*pluginapi.Device, 100) for i := range devices { devices[i] = &pluginapi.Device{ - ID: fmt.Sprintf("socketcan-%d", i), + ID: fmt.Sprintf("vcan-%d", i), Health: pluginapi.Healthy, } } @@ -88,7 +101,7 @@ func (scdp *SocketCANDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi. // Allocate is called during container creation so that the Device // Plugin can run device specific operations and instruct Kubelet // of the steps to make the Device available in the container -func (scdp *SocketCANDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { +func (scdp *VCANDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { var response pluginapi.AllocateResponse for _, req := range r.ContainerRequests { @@ -118,14 +131,14 @@ func (scdp *SocketCANDevicePlugin) Allocate(ctx context.Context, r *pluginapi.Al // GetDevicePluginOptions returns options to be communicated with Device // Manager -func (SocketCANDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { +func (VCANDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { return nil, nil } // PreStartContainer is called, if indicated by Device Plugin during registeration phase, // before each container start. Device plugin can run device specific operations // such as reseting the device before making devices available to the container -func (SocketCANDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { +func (VCANDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { return nil, nil } @@ -134,7 +147,7 @@ func (SocketCANDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreSt // guaranteed to be the allocation ultimately performed by the // devicemanager. It is only designed to help the devicemanager make a more // informed allocation decision when possible. -func (SocketCANDevicePlugin) GetPreferredAllocation(ctx context.Context, in *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { +func (VCANDevicePlugin) GetPreferredAllocation(ctx context.Context, in *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { return nil, nil } @@ -142,7 +155,7 @@ func (SocketCANDevicePlugin) GetPreferredAllocation(ctx context.Context, in *plu // K8s only support setting up mounts and `/dev` devices, so we create a fake device node // and keep checking all containers to look for this sentinel device. After we find one, we // inject the network interface into it's namespace. -func (p *SocketCANDevicePlugin) interfaceCreator() { +func (p *VCANDevicePlugin) interfaceCreator() { client, err := containerd.New("/var/run/k8s-socketcan/containerd.sock") if err != nil { glog.V(3).Info("Failed to connect to containerd") @@ -181,7 +194,7 @@ func (p *SocketCANDevicePlugin) interfaceCreator() { } // searches through all containers for matching fake devices and creates the network interfaces -func (p *SocketCANDevicePlugin) tryAllocatingDevices() { +func (p *VCANDevicePlugin) tryAllocatingDevices() { containers, err := p.client.Containers(p.ctx, "") if err != nil { glog.V(3).Infof("Failed to get container list: %v", err) @@ -223,7 +236,7 @@ func (p *SocketCANDevicePlugin) tryAllocatingDevices() { } // creates the named vcan interface inside the pod namespace -func (nbdp *SocketCANDevicePlugin) createSocketcanInPod(ifname string, containerPid int) error { +func (nbdp *VCANDevicePlugin) createSocketcanInPod(ifname string, containerPid int) error { la := netlink.NewLinkAttrs() la.Name = ifname la.Flags = net.FlagUp @@ -235,6 +248,187 @@ func (nbdp *SocketCANDevicePlugin) createSocketcanInPod(ifname string, container }) } +type SocketCANDevicePlugin struct { + assignmentCh chan *Assignment + device_name string + device_paths map[string]*Assignment + client *containerd.Client + ctx context.Context +} + +// PluginInterfaceStart is an optional interface that could be implemented by plugin. +// If case Start is implemented, it will be executed by Manager after plugin instantiation and before its registartion to kubelet. +// This method could be used to prepare resources before they are offered to Kubernetes. +func (p *SocketCANDevicePlugin) Start() error { + go p.interfaceCreator() + return nil +} + +// ListAndWatch returns a stream of List of Devices +// Whenever a Device state change or a Device disappears, ListAndWatch +// returns the new list +func (scdp *SocketCANDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { + devices := make([]*pluginapi.Device, 1) + + for i := range devices { + devices[i] = &pluginapi.Device{ + ID: scdp.device_name, + Health: pluginapi.Healthy, + } + } + s.Send(&pluginapi.ListAndWatchResponse{Devices: devices}) + + for { + time.Sleep(10 * time.Second) + } +} + +// Allocate is called during container creation so that the Device +// Plugin can run device specific operations and instruct Kubelet +// of the steps to make the Device available in the container +func (scdp *SocketCANDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + var response pluginapi.AllocateResponse + + for _, req := range r.ContainerRequests { + var devices []*pluginapi.DeviceSpec + for _, devid := range req.DevicesIDs { + dev := new(pluginapi.DeviceSpec) + containerPath := fmt.Sprintf("/tmp/k8s-socketcan/socketcan-%s", devid) + dev.HostPath = fakeDevicePath + dev.ContainerPath = containerPath + dev.Permissions = "r" + devices = append(devices, dev) + + scdp.assignmentCh <- &Assignment{ + containerPath, + scdp.device_name, + } + } + + response.ContainerResponses = append(response.ContainerResponses, &pluginapi.ContainerAllocateResponse{ + Devices: devices, + }) + + } + + return &response, nil +} + +// GetDevicePluginOptions returns options to be communicated with Device +// Manager +func (SocketCANDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return nil, nil +} + +// PreStartContainer is called, if indicated by Device Plugin during registeration phase, +// before each container start. Device plugin can run device specific operations +// such as reseting the device before making devices available to the container +func (SocketCANDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return nil, nil +} + +// GetPreferredAllocation returns a preferred set of devices to allocate +// from a list of available ones. The resulting preferred allocation is not +// guaranteed to be the allocation ultimately performed by the +// devicemanager. It is only designed to help the devicemanager make a more +// informed allocation decision when possible. +func (SocketCANDevicePlugin) GetPreferredAllocation(ctx context.Context, in *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + return nil, nil +} + +// This is an internal method which injects the socketcan virtual network interfaces. +// K8s only support setting up mounts and `/dev` devices, so we create a fake device node +// and keep checking all containers to look for this sentinel device. After we find one, we +// inject the network interface into it's namespace. +func (p *SocketCANDevicePlugin) interfaceCreator() { + client, err := containerd.New("/var/run/k8s-socketcan/containerd.sock") + if err != nil { + glog.V(3).Info("Failed to connect to containerd") + panic(err) + } + p.client = client + + context := context.Background() + p.ctx = namespaces.WithNamespace(context, "k8s.io") + + // we'll keep a list of pending allocations and keep checking for new containers every + // containerWaitDelaySeconds + p.device_paths = make(map[string]*Assignment) + + go func() { + var retry *time.Timer = time.NewTimer(0) + var waiting = false + <-retry.C + for { + select { + case alloc := <-p.assignmentCh: + glog.V(3).Infof("New allocation request: %v", alloc) + p.device_paths[alloc.ContainerPath] = alloc + case <-retry.C: + waiting = false + glog.V(3).Infof("Trying to allocate: %v", p.device_paths) + p.tryAllocatingDevices() + } + + if !waiting && len(p.device_paths) > 0 { + retry = time.NewTimer(containerWaitDelaySeconds * time.Second) + waiting = true + } + } + }() +} + +// searches through all containers for matching fake devices and creates the network interfaces +func (p *SocketCANDevicePlugin) tryAllocatingDevices() { + containers, err := p.client.Containers(p.ctx, "") + if err != nil { + glog.V(3).Infof("Failed to get container list: %v", err) + return + } + + for _, container := range containers { + spec, err := container.Spec(p.ctx) + if err != nil { + glog.V(3).Infof("Failed to get fetch container spec: %v", err) + return + } + for _, device := range spec.Linux.Devices { + if assignment, ok := p.device_paths[device.Path]; ok { + // we found a container we are looking for + task, err := container.Task(p.ctx, nil) + if err != nil { + glog.Warningf("Failed to get the task: %v", err) + return + } + + pids, err := task.Pids(p.ctx) + if err != nil { + glog.Warningf("Failed to get task Pids: %v", err) + return + } + + err = p.moveSocketcanIntoPod(assignment.Name, int(pids[0].Pid)) + if err != nil { + glog.Warningf("Failed to create interface: %v: %v", assignment.Name, err) + return + } + + glog.V(3).Infof("Successfully created the vcan interface: %v", assignment) + delete(p.device_paths, device.Path) + } + } + } +} + +// creates the named vcan interface inside the pod namespace +func (nbdp *SocketCANDevicePlugin) moveSocketcanIntoPod(ifname string, containerPid int) error { + link, err := netlink.LinkByName(ifname) + if err != nil { + return err + } + return netlink.LinkSetNsPid(link, containerPid) +} + func main() { flag.Parse() @@ -243,6 +437,13 @@ func main() { // See also: https://github.com/coredns/coredns/pull/1598 flag.Set("logtostderr", "true") - manager := dpm.NewManager(SocketCANLister{}) + hw_devices := []string{} + + device_list := os.Getenv("SOCKETCAN_DEVICES") + if device_list != "" { + hw_devices = strings.Split(device_list, " ") + } + + manager := dpm.NewManager(SocketCANLister{hw_devices}) manager.Run() }