diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index de497f25..816a196b 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -61,7 +61,7 @@ type SSServer struct { stopConfig func() error lnManager service.ListenerManager natTimeout time.Duration - m *outlineMetrics + m *outlineMetricsCollector replayCache service.ReplayCache } @@ -243,7 +243,7 @@ func (s *SSServer) Stop() error { } // RunSSServer starts a shadowsocks server running, and returns the server or an error. -func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetrics, replayHistory int) (*SSServer, error) { +func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetricsCollector, replayHistory int) (*SSServer, error) { server := &SSServer{ lnManager: service.NewListenerManager(), natTimeout: natTimeout, @@ -348,9 +348,11 @@ func main() { } defer ip2info.Close() - m := newPrometheusOutlineMetrics(ip2info, prometheus.DefaultRegisterer) - m.SetBuildInfo(version) - _, err = RunSSServer(flags.ConfigFile, flags.natTimeout, m, flags.replayHistory) + metrics := newPrometheusOutlineMetrics(ip2info) + metrics.SetBuildInfo(version) + r := prometheus.WrapRegistererWithPrefix("shadowsocks_", prometheus.DefaultRegisterer) + r.MustRegister(metrics) + _, err = RunSSServer(flags.ConfigFile, flags.natTimeout, metrics, flags.replayHistory) if err != nil { slog.Error("Server failed to start. Aborting.", "err", err) } diff --git a/cmd/outline-ss-server/metrics.go b/cmd/outline-ss-server/metrics.go index dfdeb1f2..740bd489 100644 --- a/cmd/outline-ss-server/metrics.go +++ b/cmd/outline-ss-server/metrics.go @@ -28,35 +28,140 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const namespace = "shadowsocks" - // `now` is stubbable for testing. var now = time.Now -type outlineMetrics struct { - ipinfo.IPInfoMap - *tunnelTimeCollector +type tcpCollector struct { + // NOTE: New metrics need to be added to `newTCPCollector()`, `Describe()` and + // `Collect()`. + probes *prometheus.HistogramVec + openConnections *prometheus.CounterVec + closedConnections *prometheus.CounterVec + connectionDurationMs *prometheus.HistogramVec +} - buildInfo *prometheus.GaugeVec - accessKeys prometheus.Gauge - ports prometheus.Gauge - dataBytes *prometheus.CounterVec - dataBytesPerLocation *prometheus.CounterVec - timeToCipherMs *prometheus.HistogramVec - // TODO: Add time to first byte. +var _ prometheus.Collector = (*tcpCollector)(nil) - tcpProbes *prometheus.HistogramVec - tcpOpenConnections *prometheus.CounterVec - tcpClosedConnections *prometheus.CounterVec - tcpConnectionDurationMs *prometheus.HistogramVec +func newTCPCollector() *tcpCollector { + namespace := "tcp" + return &tcpCollector{ + probes: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "probes", + Buckets: []float64{0, 49, 50, 51, 73, 91}, + Help: "Histogram of number of bytes from client to proxy, for detecting possible probes", + }, []string{"port", "status", "error"}), + openConnections: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "connections_opened", + Help: "Count of open TCP connections", + }, []string{"location", "asn"}), + closedConnections: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "connections_closed", + Help: "Count of closed TCP connections", + }, []string{"location", "asn", "status", "access_key"}), + connectionDurationMs: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "connection_duration_ms", + Help: "TCP connection duration distributions.", + Buckets: []float64{ + 100, + float64(time.Second.Milliseconds()), + float64(time.Minute.Milliseconds()), + float64(time.Hour.Milliseconds()), + float64(24 * time.Hour.Milliseconds()), // Day + float64(7 * 24 * time.Hour.Milliseconds()), // Week + }, + }, []string{"status"}), + } +} + +func (c *tcpCollector) Describe(ch chan<- *prometheus.Desc) { + c.probes.Describe(ch) + c.openConnections.Describe(ch) + c.closedConnections.Describe(ch) + c.connectionDurationMs.Describe(ch) +} - udpPacketsFromClientPerLocation *prometheus.CounterVec - udpAddedNatEntries prometheus.Counter - udpRemovedNatEntries prometheus.Counter +func (c *tcpCollector) Collect(ch chan<- prometheus.Metric) { + c.probes.Collect(ch) + c.openConnections.Collect(ch) + c.closedConnections.Collect(ch) + c.connectionDurationMs.Collect(ch) } -var _ service.TCPMetrics = (*outlineMetrics)(nil) -var _ service.UDPMetrics = (*outlineMetrics)(nil) +func (c *tcpCollector) openConnection(clientInfo ipinfo.IPInfo) { + c.openConnections.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)).Inc() +} + +func (c *tcpCollector) closeConnection(clientInfo ipinfo.IPInfo, status, accessKey string, duration time.Duration) { + c.closedConnections.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN), status, accessKey).Inc() + c.connectionDurationMs.WithLabelValues(status).Observe(duration.Seconds() * 1000) +} + +func (c *tcpCollector) addProbe(listenerId, status, drainResult string, clientProxyBytes int64) { + c.probes.WithLabelValues(listenerId, status, drainResult).Observe(float64(clientProxyBytes)) +} + +type udpCollector struct { + // NOTE: New metrics need to be added to `newUDPCollector()`, `Describe()` + // and `Collect()`. + packetsFromClientPerLocation *prometheus.CounterVec + addedNatEntries prometheus.Counter + removedNatEntries prometheus.Counter +} + +var _ prometheus.Collector = (*udpCollector)(nil) + +func newUDPCollector() *udpCollector { + namespace := "udp" + return &udpCollector{ + packetsFromClientPerLocation: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "packets_from_client_per_location", + Help: "Packets received from the client, per location and status", + }, []string{"location", "asn", "status"}), + addedNatEntries: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "nat_entries_added", + Help: "Entries added to the UDP NAT table", + }), + removedNatEntries: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "nat_entries_removed", + Help: "Entries removed from the UDP NAT table", + }), + } +} + +func (c *udpCollector) Describe(ch chan<- *prometheus.Desc) { + c.packetsFromClientPerLocation.Describe(ch) + c.addedNatEntries.Describe(ch) + c.removedNatEntries.Describe(ch) +} + +func (c *udpCollector) Collect(ch chan<- prometheus.Metric) { + c.packetsFromClientPerLocation.Collect(ch) + c.addedNatEntries.Collect(ch) + c.removedNatEntries.Collect(ch) +} + +func (c *udpCollector) addPacketFromClient(clientInfo ipinfo.IPInfo, status string) { + c.packetsFromClientPerLocation.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN), status).Inc() +} + +func (c *udpCollector) addNatEntry() { + c.addedNatEntries.Inc() +} + +func (c *udpCollector) removeNatEntry() { + c.removedNatEntries.Inc() +} // Converts a [net.Addr] to an [IPKey]. func toIPKey(addr net.Addr, accessKey string) (*IPKey, error) { @@ -90,10 +195,33 @@ type tunnelTimeCollector struct { mu sync.Mutex // Protects the activeClients map. activeClients map[IPKey]*activeClient + // NOTE: New metrics need to be added to `newTunnelTimeCollector()`, + // `Describe()` and `Collect()`. tunnelTimePerKey *prometheus.CounterVec tunnelTimePerLocation *prometheus.CounterVec } +var _ prometheus.Collector = (*tunnelTimeCollector)(nil) + +func newTunnelTimeCollector(ip2info ipinfo.IPInfoMap) *tunnelTimeCollector { + namespace := "tunnel_time" + return &tunnelTimeCollector{ + ip2info: ip2info, + activeClients: make(map[IPKey]*activeClient), + + tunnelTimePerKey: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "seconds", + Help: "Tunnel time, per access key.", + }, []string{"access_key"}), + tunnelTimePerLocation: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Name: "seconds_per_location", + Help: "Tunnel time, per location.", + }, []string{"location", "asn"}), + } +} + func (c *tunnelTimeCollector) Describe(ch chan<- *prometheus.Desc) { c.tunnelTimePerKey.Describe(ch) c.tunnelTimePerLocation.Describe(ch) @@ -113,7 +241,7 @@ func (c *tunnelTimeCollector) Collect(ch chan<- prometheus.Metric) { // Calculates and reports the tunnel time for a given active client. func (c *tunnelTimeCollector) reportTunnelTime(ipKey IPKey, client *activeClient, tNow time.Time) { tunnelTime := tNow.Sub(client.startTime) - slog.Debug("Reporting tunnel time.", "key", ipKey.accessKey, "duration", tunnelTime) + slog.LogAttrs(nil, slog.LevelDebug, "Reporting tunnel time.", slog.String("key", ipKey.accessKey), slog.Duration("duration", tunnelTime)) c.tunnelTimePerKey.WithLabelValues(ipKey.accessKey).Add(tunnelTime.Seconds()) c.tunnelTimePerLocation.WithLabelValues(client.info.CountryCode.String(), asnLabel(client.info.ASN)).Add(tunnelTime.Seconds()) // Reset the start time now that the tunnel time has been reported. @@ -149,143 +277,114 @@ func (c *tunnelTimeCollector) stopConnection(ipKey IPKey) { } } -func newTunnelTimeCollector(ip2info ipinfo.IPInfoMap, registerer prometheus.Registerer) *tunnelTimeCollector { - return &tunnelTimeCollector{ - ip2info: ip2info, - activeClients: make(map[IPKey]*activeClient), +type outlineMetricsCollector struct { + ipinfo.IPInfoMap - tunnelTimePerKey: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "tunnel_time_seconds", - Help: "Tunnel time, per access key.", - }, []string{"access_key"}), - tunnelTimePerLocation: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "tunnel_time_seconds_per_location", - Help: "Tunnel time, per location.", - }, []string{"location", "asn"}), - } + tcpCollector *tcpCollector + udpCollector *udpCollector + tunnelTimeCollector *tunnelTimeCollector + + // NOTE: New metrics need to be added to `newPrometheusOutlineMetrics()` and + // `collectors()`. + buildInfo *prometheus.GaugeVec + accessKeys prometheus.Gauge + ports prometheus.Gauge + dataBytes *prometheus.CounterVec + dataBytesPerLocation *prometheus.CounterVec + timeToCipherMs *prometheus.HistogramVec + // TODO: Add time to first byte. } -// newPrometheusOutlineMetrics constructs a metrics object that uses -// `ip2info` to convert IP addresses to countries, and reports all -// metrics to Prometheus via `registerer`. `ip2info` may be nil, but -// `registerer` must not be. -func newPrometheusOutlineMetrics(ip2info ipinfo.IPInfoMap, registerer prometheus.Registerer) *outlineMetrics { - m := &outlineMetrics{ +var _ prometheus.Collector = (*outlineMetricsCollector)(nil) +var _ service.TCPMetrics = (*outlineMetricsCollector)(nil) +var _ service.UDPMetrics = (*outlineMetricsCollector)(nil) + +// newPrometheusOutlineMetrics constructs a Prometheus metrics collector that uses +// `ip2info` to convert IP addresses to countries. `ip2info` may be nil. +func newPrometheusOutlineMetrics(ip2info ipinfo.IPInfoMap) *outlineMetricsCollector { + tcpCollector := newTCPCollector() + udpCollector := newUDPCollector() + tunnelTimeCollector := newTunnelTimeCollector(ip2info) + + return &outlineMetricsCollector{ IPInfoMap: ip2info, + + tcpCollector: tcpCollector, + udpCollector: udpCollector, + tunnelTimeCollector: tunnelTimeCollector, + buildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "build_info", - Help: "Information on the outline-ss-server build", + Name: "build_info", + Help: "Information on the outline-ss-server build", }, []string{"version"}), accessKeys: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "keys", - Help: "Count of access keys", + Name: "keys", + Help: "Count of access keys", }), ports: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "ports", - Help: "Count of open Shadowsocks ports", + Name: "ports", + Help: "Count of open Shadowsocks ports", }), - tcpProbes: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Name: "tcp_probes", - Buckets: []float64{0, 49, 50, 51, 73, 91}, - Help: "Histogram of number of bytes from client to proxy, for detecting possible probes", - }, []string{"port", "status", "error"}), - tcpOpenConnections: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: "tcp", - Name: "connections_opened", - Help: "Count of open TCP connections", - }, []string{"location", "asn"}), - tcpClosedConnections: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: "tcp", - Name: "connections_closed", - Help: "Count of closed TCP connections", - }, []string{"location", "asn", "status", "access_key"}), - tcpConnectionDurationMs: prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: "tcp", - Name: "connection_duration_ms", - Help: "TCP connection duration distributions.", - Buckets: []float64{ - 100, - float64(time.Second.Milliseconds()), - float64(time.Minute.Milliseconds()), - float64(time.Hour.Milliseconds()), - float64(24 * time.Hour.Milliseconds()), // Day - float64(7 * 24 * time.Hour.Milliseconds()), // Week - }, - }, []string{"status"}), dataBytes: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "data_bytes", - Help: "Bytes transferred by the proxy, per access key", + Name: "data_bytes", + Help: "Bytes transferred by the proxy, per access key", }, []string{"dir", "proto", "access_key"}), dataBytesPerLocation: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Name: "data_bytes_per_location", - Help: "Bytes transferred by the proxy, per location", + Name: "data_bytes_per_location", + Help: "Bytes transferred by the proxy, per location", }, []string{"dir", "proto", "location", "asn"}), timeToCipherMs: prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Namespace: namespace, - Name: "time_to_cipher_ms", - Help: "Time needed to find the cipher", - Buckets: []float64{0.1, 1, 10, 100, 1000}, + Name: "time_to_cipher_ms", + Help: "Time needed to find the cipher", + Buckets: []float64{0.1, 1, 10, 100, 1000}, }, []string{"proto", "found_key"}), - udpPacketsFromClientPerLocation: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: "udp", - Name: "packets_from_client_per_location", - Help: "Packets received from the client, per location and status", - }, []string{"location", "asn", "status"}), - udpAddedNatEntries: prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: "udp", - Name: "nat_entries_added", - Help: "Entries added to the UDP NAT table", - }), - udpRemovedNatEntries: prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: "udp", - Name: "nat_entries_removed", - Help: "Entries removed from the UDP NAT table", - }), } - m.tunnelTimeCollector = newTunnelTimeCollector(ip2info, registerer) +} + +func (m *outlineMetricsCollector) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.tcpCollector, + m.udpCollector, + m.tunnelTimeCollector, + + m.buildInfo, + m.accessKeys, + m.ports, + m.dataBytes, + m.dataBytesPerLocation, + m.timeToCipherMs, + } +} - // TODO: Is it possible to pass where to register the collectors? - registerer.MustRegister(m.buildInfo, m.accessKeys, m.ports, m.tcpProbes, m.tcpOpenConnections, m.tcpClosedConnections, m.tcpConnectionDurationMs, - m.dataBytes, m.dataBytesPerLocation, m.timeToCipherMs, m.udpPacketsFromClientPerLocation, m.udpAddedNatEntries, m.udpRemovedNatEntries, - m.tunnelTimeCollector) - return m +func (m *outlineMetricsCollector) Describe(ch chan<- *prometheus.Desc) { + for _, collector := range m.collectors() { + collector.Describe(ch) + } +} + +func (m *outlineMetricsCollector) Collect(ch chan<- prometheus.Metric) { + for _, collector := range m.collectors() { + collector.Collect(ch) + } } -func (m *outlineMetrics) SetBuildInfo(version string) { +func (m *outlineMetricsCollector) SetBuildInfo(version string) { m.buildInfo.WithLabelValues(version).Set(1) } -func (m *outlineMetrics) SetNumAccessKeys(numKeys int, ports int) { +func (m *outlineMetricsCollector) SetNumAccessKeys(numKeys int, ports int) { m.accessKeys.Set(float64(numKeys)) m.ports.Set(float64(ports)) } -func (m *outlineMetrics) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) { - m.tcpOpenConnections.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)).Inc() +func (m *outlineMetricsCollector) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) { + m.tcpCollector.openConnection(clientInfo) } -func (m *outlineMetrics) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) { +func (m *outlineMetricsCollector) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) { ipKey, err := toIPKey(clientAddr, accessKey) if err == nil { m.tunnelTimeCollector.startConnection(*ipKey) @@ -306,9 +405,8 @@ func asnLabel(asn int) string { return fmt.Sprint(asn) } -func (m *outlineMetrics) AddClosedTCPConnection(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey, status string, data metrics.ProxyMetrics, duration time.Duration) { - m.tcpClosedConnections.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN), status, accessKey).Inc() - m.tcpConnectionDurationMs.WithLabelValues(status).Observe(duration.Seconds() * 1000) +func (m *outlineMetricsCollector) AddClosedTCPConnection(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey, status string, data metrics.ProxyMetrics, duration time.Duration) { + m.tcpCollector.closeConnection(clientInfo, status, accessKey, duration) addIfNonZero(data.ClientProxy, m.dataBytes, "c>p", "tcp", accessKey) addIfNonZero(data.ClientProxy, m.dataBytesPerLocation, "c>p", "tcp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) addIfNonZero(data.ProxyTarget, m.dataBytes, "p>t", "tcp", accessKey) @@ -324,23 +422,23 @@ func (m *outlineMetrics) AddClosedTCPConnection(clientInfo ipinfo.IPInfo, client } } -func (m *outlineMetrics) AddUDPPacketFromClient(clientInfo ipinfo.IPInfo, accessKey, status string, clientProxyBytes, proxyTargetBytes int) { - m.udpPacketsFromClientPerLocation.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN), status).Inc() +func (m *outlineMetricsCollector) AddUDPPacketFromClient(clientInfo ipinfo.IPInfo, accessKey, status string, clientProxyBytes, proxyTargetBytes int) { + m.udpCollector.addPacketFromClient(clientInfo, status) addIfNonZero(int64(clientProxyBytes), m.dataBytes, "c>p", "udp", accessKey) addIfNonZero(int64(clientProxyBytes), m.dataBytesPerLocation, "c>p", "udp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) addIfNonZero(int64(proxyTargetBytes), m.dataBytes, "p>t", "udp", accessKey) addIfNonZero(int64(proxyTargetBytes), m.dataBytesPerLocation, "p>t", "udp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)) } -func (m *outlineMetrics) AddUDPPacketFromTarget(clientInfo ipinfo.IPInfo, accessKey, status string, targetProxyBytes, proxyClientBytes int) { +func (m *outlineMetricsCollector) AddUDPPacketFromTarget(clientInfo ipinfo.IPInfo, accessKey, status string, targetProxyBytes, proxyClientBytes int) { addIfNonZero(int64(targetProxyBytes), m.dataBytes, "p