Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

Commit f9d678f

Browse files
committed
Optimize allocations on emission of single point
Fixes #60
1 parent ab0b8f1 commit f9d678f

File tree

7 files changed

+105
-17
lines changed

7 files changed

+105
-17
lines changed

Diff for: src/InfluxDB.Collector/Configuration/AggregateEmitter.cs

+16-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace InfluxDB.Collector.Configuration
66
{
7-
class AggregateEmitter : IPointEmitter
7+
class AggregateEmitter : IPointEmitter, ISinglePointEmitter
88
{
99
readonly List<IPointEmitter> _emitters;
1010

@@ -19,5 +19,20 @@ public void Emit(PointData[] points)
1919
foreach (var emitter in _emitters)
2020
emitter.Emit(points);
2121
}
22+
23+
public void Emit(PointData point)
24+
{
25+
foreach (var emitter in _emitters)
26+
{
27+
if (emitter is ISinglePointEmitter singlePointEmitter)
28+
{
29+
singlePointEmitter.Emit(point);
30+
}
31+
else
32+
{
33+
emitter.Emit(new[] { point });
34+
}
35+
}
36+
}
2237
}
2338
}

Diff for: src/InfluxDB.Collector/MetricsCollector.cs

+16-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace InfluxDB.Collector
77
{
8-
public abstract class MetricsCollector : IPointEmitter, IDisposable
8+
public abstract class MetricsCollector : IPointEmitter, ISinglePointEmitter, IDisposable
99
{
1010
readonly Util.ITimestampSource _timestampSource = new Util.PseudoHighResTimestampSource();
1111

@@ -34,14 +34,16 @@ public void Dispose()
3434
Dispose(true);
3535
}
3636

37-
protected virtual void Dispose(bool disposing) { }
37+
protected virtual void Dispose(bool disposing)
38+
{
39+
}
3840

3941
public void Write(string measurement, IReadOnlyDictionary<string, object> fields, IReadOnlyDictionary<string, string> tags = null, DateTime? timestamp = null)
4042
{
4143
try
4244
{
4345
var point = new PointData(measurement, fields, tags, timestamp ?? _timestampSource.GetUtcNow());
44-
Emit(new[] { point });
46+
Emit(point);
4547
}
4648
catch (Exception ex)
4749
{
@@ -54,6 +56,16 @@ void IPointEmitter.Emit(PointData[] points)
5456
Emit(points);
5557
}
5658

59+
void ISinglePointEmitter.Emit(PointData point)
60+
{
61+
Emit(point);
62+
}
63+
5764
protected abstract void Emit(PointData[] points);
65+
66+
protected virtual void Emit(PointData point)
67+
{
68+
Emit(new[] { point });
69+
}
5870
}
59-
}
71+
}

Diff for: src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs

+30-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace InfluxDB.Collector.Pipeline.Common
88
{
9-
internal abstract class IntervalEmitterBase : IPointEmitter, IDisposable
9+
internal abstract class IntervalEmitterBase : IPointEmitter, ISinglePointEmitter, IDisposable
1010
{
1111
readonly object _queueLock = new object();
1212
Queue<PointData> _queue = new Queue<PointData>();
@@ -77,22 +77,45 @@ protected Task OnTick()
7777
}
7878

7979
public void Emit(PointData[] points)
80+
{
81+
if (!CheckState())
82+
{
83+
return;
84+
}
85+
86+
lock (_queueLock)
87+
{
88+
foreach (var point in points)
89+
_queue.Enqueue(point);
90+
}
91+
}
92+
93+
public void Emit(PointData point)
94+
{
95+
if (!CheckState())
96+
{
97+
return;
98+
}
99+
100+
lock (_queueLock)
101+
{
102+
_queue.Enqueue(point);
103+
}
104+
}
105+
106+
private bool CheckState()
80107
{
81108
lock (_stateLock)
82109
{
83-
if (_unloading) return;
110+
if (_unloading) return false;
84111
if (!_started)
85112
{
86113
_started = true;
87114
_timer.Start(TimeSpan.Zero);
88115
}
89116
}
90117

91-
lock (_queueLock)
92-
{
93-
foreach (var point in points)
94-
_queue.Enqueue(point);
95-
}
118+
return true;
96119
}
97120

98121
protected abstract void HandleBatch(IReadOnlyCollection<PointData> batch);

Diff for: src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs

+16-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace InfluxDB.Collector.Pipeline.Emit
77
{
8-
class HttpLineProtocolEmitter : IDisposable, IPointEmitter
8+
class HttpLineProtocolEmitter : IDisposable, IPointEmitter, ISinglePointEmitter
99
{
1010
readonly ILineProtocolClient _client;
1111

@@ -29,9 +29,23 @@ public void Emit(PointData[] points)
2929
payload.Add(new LineProtocolPoint(point.Measurement, point.Fields, point.Tags, point.UtcTimestamp));
3030
}
3131

32+
SendPayload(payload);
33+
}
34+
35+
public void Emit(PointData point)
36+
{
37+
var payload = new LineProtocolPayload();
38+
39+
payload.Add(new LineProtocolPoint(point.Measurement, point.Fields, point.Tags, point.UtcTimestamp));
40+
41+
SendPayload(payload);
42+
}
43+
44+
private void SendPayload(LineProtocolPayload payload)
45+
{
3246
var influxResult = _client.WriteAsync(payload).Result;
3347
if (!influxResult.Success)
3448
CollectorLog.ReportError(influxResult.ErrorMessage, null);
3549
}
3650
}
37-
}
51+
}
+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace InfluxDB.Collector.Pipeline
2+
{
3+
interface ISinglePointEmitter
4+
{
5+
void Emit(PointData point);
6+
}
7+
}

Diff for: src/InfluxDB.Collector/Pipeline/NullMetricsCollector.cs

+4
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,9 @@ class NullMetricsCollector : MetricsCollector
55
protected override void Emit(PointData[] points)
66
{
77
}
8+
9+
protected override void Emit(PointData point)
10+
{
11+
}
812
}
913
}

Diff for: src/InfluxDB.Collector/Pipeline/PipelinedMetricsCollector.cs

+16-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-

2-
using System;
1+
using System;
32

43
namespace InfluxDB.Collector.Pipeline
54
{
@@ -24,10 +23,24 @@ protected override void Emit(PointData[] points)
2423
_emitter.Emit(points);
2524
}
2625

26+
protected override void Emit(PointData point)
27+
{
28+
_enricher.Enrich(point);
29+
30+
if (_emitter is ISinglePointEmitter singlePointEmitter)
31+
{
32+
singlePointEmitter.Emit(point);
33+
}
34+
else
35+
{
36+
_emitter.Emit(new[] { point });
37+
}
38+
}
39+
2740
protected override void Dispose(bool disposing)
2841
{
2942
if (disposing)
3043
_dispose();
3144
}
3245
}
33-
}
46+
}

0 commit comments

Comments
 (0)