diff --git a/pkg/device/sysfs_linux.go b/pkg/device/sysfs_linux.go index 1c3f90e1..a4a306af 100644 --- a/pkg/device/sysfs_linux.go +++ b/pkg/device/sysfs_linux.go @@ -20,8 +20,10 @@ package device import ( "bufio" + "errors" "fmt" "os" + "path" "strconv" "strings" ) @@ -106,3 +108,49 @@ func getHolders(name string) ([]string, error) { func getDMName(name string) (string, error) { return readFirstLine("/sys/class/block/" + name + "/dm/name") } + +// GetStat returns statistics for a given device name. +func GetStat(name string) (stats []uint64, err error) { + line, err := readFirstLine("/sys/class/block/" + name + "/stat") + if err != nil { + return nil, err + } + + for _, token := range strings.Split(line, " ") { + token = strings.TrimSpace(token) + ui64, err := strconv.ParseUint(token, 10, 64) + if err != nil { + return nil, err + } + stats = append(stats, ui64) + } + + return stats, nil +} + +// GetHardwareSectorSize returns hardware sector size of associated drive. +func GetHardwareSectorSize(name string) (uint64, error) { + if _, err := os.Lstat("/sys/block/" + name); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return 0, err + } + + partPath := "/sys/class/block/" + name + if _, err = os.Stat(partPath + "/partition"); err != nil { + return 0, err + } + + linkPath, err := os.Readlink(partPath) + if err != nil { + return 0, err + } + + name = path.Base(path.Dir(linkPath)) + } + + s, err := readFirstLine("/sys/block/" + name + "/queue/hw_sector_size") + if err != nil || s == "" { + return 0, err + } + return strconv.ParseUint(s, 10, 64) +} diff --git a/pkg/metrics/collector.go b/pkg/metrics/collector.go index 2567eabd..d82a8a8d 100644 --- a/pkg/metrics/collector.go +++ b/pkg/metrics/collector.go @@ -18,17 +18,57 @@ package metrics import ( "context" + "fmt" directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/device" "github.com/minio/directpv/pkg/sys" "github.com/minio/directpv/pkg/types" + "github.com/minio/directpv/pkg/utils" "github.com/minio/directpv/pkg/xfs" "github.com/prometheus/client_golang/prometheus" "k8s.io/klog/v2" ) +type driveStats struct { + readSectorBytes float64 + readTicks float64 + writeSectorBytes float64 + writeTicks float64 + timeInQueue float64 +} + +func getDriveStats(driveName string) (*driveStats, error) { + stat, err := device.GetStat(driveName) + switch { + case err != nil: + return nil, err + case len(stat) == 0: + return nil, fmt.Errorf("unable to read stat from drive %v", driveName) + case len(stat) < 10: + return nil, fmt.Errorf("invalid stat format from drive %v", driveName) + } + + hardwareSectorSize, err := device.GetHardwareSectorSize(driveName) + switch { + case err != nil: + return nil, err + case hardwareSectorSize == 0: + hardwareSectorSize = 512 // Use default value + } + + // Refer https://www.kernel.org/doc/Documentation/block/stat.txt for meaning of each field. + return &driveStats{ + readSectorBytes: float64(stat[2] * hardwareSectorSize), + readTicks: float64(stat[3]), + writeSectorBytes: float64(stat[6] * hardwareSectorSize), + writeTicks: float64(stat[7]), + timeInQueue: float64(stat[9]), + }, nil +} + type metricsCollector struct { nodeID directpvtypes.NodeID desc *prometheus.Desc @@ -95,21 +135,138 @@ func (c *metricsCollector) publishVolumeStats(ctx context.Context, volume *types ) } +func (c *metricsCollector) publishDriveStats(drive *types.Drive, ch chan<- prometheus.Metric) { + deviceID, err := c.getDeviceByFSUUID(drive.Status.FSUUID) + if err != nil { + klog.ErrorS( + err, + "unable to find device by FSUUID; "+ + "either device is removed or run command "+ + "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+ + " on the host to reload", + "FSUUID", drive.Status.FSUUID) + client.Eventf( + drive, client.EventTypeWarning, client.EventReasonMetrics, + "unable to find device by FSUUID %v; "+ + "either device is removed or run command "+ + "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+ + " on the host to reload", drive.Status.FSUUID) + + return + } + deviceName := utils.TrimDevPrefix(deviceID) + + status := float64(1) // Online + driveStat, err := getDriveStats(deviceName) + if err != nil { + klog.ErrorS(err, "unable to read drive statistics") + status = float64(0) // Offline + } + + // Metrics + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_ready"), + "Drive Online/Offline Status", + []string{"drive"}, nil), + prometheus.GaugeValue, + status, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_total_bytes_read"), + "Total number of bytes read from the drive", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.readSectorBytes, drive.Name, + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_total_bytes_written"), + "Total number of bytes written to the drive", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.writeSectorBytes, drive.Name, + ) + + // Drive Read/Write Latency + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_read_latency_seconds"), + "Drive Read Latency", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.readTicks/1000, drive.Name, + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_write_latency_seconds"), + "Drive Write Latency", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.writeTicks/1000, drive.Name, + ) + + // Drive Read/Write Throughput + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_read_throughput_bytes_per_second"), + "Drive Read Throughput", + []string{"drive"}, nil), + prometheus.GaugeValue, + 1000*driveStat.readSectorBytes/driveStat.readTicks, drive.Name, + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_write_throughput_bytes_per_second"), + "Drive Write Throughput", + []string{"drive"}, nil), + prometheus.GaugeValue, + 1000*driveStat.writeSectorBytes/driveStat.writeTicks, drive.Name, + ) + + // Wait Time + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_wait_time_seconds"), + "Drive Wait Time", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.timeInQueue/1000, drive.Name, + ) +} + // Collect is called by Prometheus registry when collecting metrics. func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() - resultCh := client.NewVolumeLister(). + // Collecting volume statistics + volumeResultCh := client.NewVolumeLister(). NodeSelector([]directpvtypes.LabelValue{directpvtypes.ToLabelValue(string(c.nodeID))}). List(ctx) - for result := range resultCh { + for result := range volumeResultCh { if result.Err != nil { - return + break } if result.Volume.Status.TargetPath != "" { c.publishVolumeStats(ctx, &result.Volume, ch) } } + + // Collecting drive statistics + driveResultCh := client.NewDriveLister(). + NodeSelector([]directpvtypes.LabelValue{directpvtypes.ToLabelValue(string(c.nodeID))}). + List(ctx) + for result := range driveResultCh { + if result.Err != nil { + break + } + + c.publishDriveStats(&result.Drive, ch) + } } diff --git a/pkg/metrics/collector_test.go b/pkg/metrics/collector_test.go index b30a4295..a1345b27 100644 --- a/pkg/metrics/collector_test.go +++ b/pkg/metrics/collector_test.go @@ -40,8 +40,16 @@ const MiB = 1024 * 1024 type metricType string const ( - metricStatsBytesUsed metricType = consts.AppName + "_stats_bytes_used" - metricStatsBytesTotal metricType = consts.AppName + "_stats_bytes_total" + metricStatsBytesUsed metricType = consts.AppName + "_stats_bytes_used" + metricStatsBytesTotal metricType = consts.AppName + "_stats_bytes_total" + metricStatsDriveReady metricType = consts.AppName + "_stats_drive_ready" + metricStatsDriveBytesRead metricType = consts.AppName + "_stats_drive_total_bytes_read" + metricStatsDriveBytesWritten metricType = consts.AppName + "_stats_drive_total_bytes_written" + metricStatsDriveReadLatency metricType = consts.AppName + "_stats_drive_read_latency_seconds" + metricStatsDriveWriteLatency metricType = consts.AppName + "_stats_drive_write_latency_seconds" + metricStatsDriveReadThroughput metricType = consts.AppName + "_stats_drive_read_throughput_bytes_per_second" + metricStatsDriveWriteThroughput metricType = consts.AppName + "_stats_drive_write_throughput_bytes_per_second" + metricStatsDriveWaitTime metricType = consts.AppName + "_stats_drive_wait_time_seconds" ) var volumes []types.Volume @@ -167,3 +175,157 @@ func TestVolumeStatsEmitter(t *testing.T) { wg.Wait() } + +func TestDriveStatsEmitter(t *testing.T) { + // Create fake drives + testDrives := []types.Drive{ + *types.NewDrive( + "test-drive-1", + types.DriveStatus{}, + "test-node-1", + "loop1", + "Default", + ), + *types.NewDrive( + "test-drive-2", + types.DriveStatus{}, + "test-node-1", + "loop2", + "Default", + ), + *types.NewDrive( + "non-existent-drive", + types.DriveStatus{}, + "test-node-1", + "xyz", + "Default", + ), + } + testDrives[0].Status.FSUUID = "fsuuid1" + testDrives[0].Status.TotalCapacity = 100 * MiB + testDrives[1].Status.FSUUID = "fsuuid2" + testDrives[1].Status.TotalCapacity = 200 * MiB + + // Mock drive stats + mockDrives := map[string]*driveStats{ + "test-drive-1": { + readSectorBytes: 1000, + readTicks: 500, + writeSectorBytes: 2000, + writeTicks: 1000, + timeInQueue: 1500, + }, + "test-drive-2": { + readSectorBytes: 2000, + readTicks: 750, + writeSectorBytes: 3000, + writeTicks: 1500, + timeInQueue: 2000, + }, + "non-existent-drive": nil, + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + metricChan := make(chan prometheus.Metric, 100) // Buffered channel to prevent blocking + expectedMetrics := len(testDrives) * 8 // 8 metrics per drive + receivedMetrics := 0 + + // Mock publishDriveStats function + mockPublishDriveStats := func(drive *types.Drive, ch chan<- prometheus.Metric) { + stats, ok := mockDrives[drive.Name] + if !ok { + t.Errorf("No mock stats found for drive: %s. Available mocks: %v", drive.Name, mockDrives) + return + } + + status := float64(1) + if stats == nil { + status = float64(0) + } + + // Emit mock metrics + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveReady), "Drive ready status", []string{"drive"}, nil), + prometheus.GaugeValue, + status, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveBytesRead), "Bytes read from drive", []string{"drive"}, nil), + prometheus.GaugeValue, + stats.readSectorBytes, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveBytesWritten), "Bytes written to drive", []string{"drive"}, nil), + prometheus.GaugeValue, + stats.writeSectorBytes, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveReadLatency), "Drive read latency", []string{"drive"}, nil), + prometheus.GaugeValue, + stats.readTicks/1000, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveWriteLatency), "Drive write latency", []string{"drive"}, nil), + prometheus.GaugeValue, + stats.writeTicks/1000, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveReadThroughput), "Drive read throughput", []string{"drive"}, nil), + prometheus.GaugeValue, + 1000*stats.readSectorBytes/stats.readTicks, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveWriteThroughput), "Drive write throughput", []string{"drive"}, nil), + prometheus.GaugeValue, + 1000*stats.writeSectorBytes/stats.writeTicks, + drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(string(metricStatsDriveWaitTime), "Drive wait time", []string{"drive"}, nil), + prometheus.GaugeValue, + stats.timeInQueue/1000, + drive.Name, + ) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + t.Log("Context done, exiting metric processing goroutine") + return + case metric, ok := <-metricChan: + if !ok { + t.Log("Metric channel closed") + return + } + t.Logf("Received metric: %s", metric.Desc().String()) + receivedMetrics++ + if receivedMetrics == expectedMetrics { + t.Log("Received all expected metrics") + return + } + } + } + }() + + for _, drive := range testDrives { + t.Logf("Publishing metrics for drive: %s", drive.Name) + mockPublishDriveStats(&drive, metricChan) + } + + // Wait for all metrics to be processed + time.Sleep(100 * time.Millisecond) + + if receivedMetrics != expectedMetrics { + t.Errorf("Expected %d metrics, but received %d", expectedMetrics, receivedMetrics) + } + + close(metricChan) + wg.Wait() +}