@@ -2,21 +2,27 @@ package serve
2
2
3
3
import (
4
4
"context"
5
+ "encoding/json"
5
6
"fmt"
7
+ "reflect"
6
8
"strings"
7
9
"time"
8
10
9
11
"github.com/cloudquery/plugin-sdk/v4/plugin"
10
12
"github.com/rs/zerolog"
11
13
12
14
"go.opentelemetry.io/otel"
15
+ "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
13
16
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
14
17
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
15
18
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
19
+ otellog "go.opentelemetry.io/otel/log"
20
+ logglobal "go.opentelemetry.io/otel/log/global"
21
+ "go.opentelemetry.io/otel/sdk/log"
16
22
"go.opentelemetry.io/otel/sdk/metric"
17
23
"go.opentelemetry.io/otel/sdk/resource"
18
24
"go.opentelemetry.io/otel/sdk/trace"
19
- semconv "go.opentelemetry.io/otel/semconv/v1.25 .0"
25
+ semconv "go.opentelemetry.io/otel/semconv/v1.26 .0"
20
26
)
21
27
22
28
// newResource returns a resource describing this application.
@@ -116,9 +122,41 @@ func getMetricReader(ctx context.Context, opts otelConfig) (*metric.PeriodicRead
116
122
return reader , nil
117
123
}
118
124
125
+ func getLogsProcessor (ctx context.Context , opts otelConfig ) (* log.BatchProcessor , error ) {
126
+ if opts .endpoint == "" {
127
+ return nil , nil
128
+ }
129
+
130
+ logOptions := []otlploghttp.Option {
131
+ otlploghttp .WithEndpoint (opts .endpoint ),
132
+ otlploghttp .WithCompression (otlploghttp .GzipCompression ),
133
+ }
134
+
135
+ if opts .insecure {
136
+ logOptions = append (logOptions , otlploghttp .WithInsecure ())
137
+ }
138
+
139
+ if len (opts .headers ) > 0 {
140
+ headers := parseOtelHeaders (opts .headers )
141
+ logOptions = append (logOptions , otlploghttp .WithHeaders (headers ))
142
+ }
143
+
144
+ if opts .urlPath != "" {
145
+ logOptions = append (logOptions , otlploghttp .WithURLPath (opts .urlPath ))
146
+ }
147
+
148
+ exporter , err := otlploghttp .New (ctx , logOptions ... )
149
+ if err != nil {
150
+ return nil , fmt .Errorf ("creating OTLP log exporter: %w" , err )
151
+ }
152
+
153
+ processor := log .NewBatchProcessor (exporter )
154
+ return processor , nil
155
+ }
156
+
119
157
func setupOtel (ctx context.Context , logger zerolog.Logger , p * plugin.Plugin , otelEndpoint string , otelEndpointInsecure bool , otelEndpointHeaders []string , otelEndpointURLPath string ) (shutdown func (), err error ) {
120
158
if otelEndpoint == "" {
121
- return func () {} , nil
159
+ return nil , nil
122
160
}
123
161
opts := otelConfig {
124
162
endpoint : otelEndpoint ,
@@ -136,6 +174,11 @@ func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, ote
136
174
return nil , err
137
175
}
138
176
177
+ logsProcessor , err := getLogsProcessor (ctx , opts )
178
+ if err != nil {
179
+ return nil , err
180
+ }
181
+
139
182
pluginResource := newResource (p )
140
183
tp := trace .NewTracerProvider (
141
184
trace .WithBatcher (traceExporter ),
@@ -146,20 +189,116 @@ func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, ote
146
189
metric .WithReader (metricReader ),
147
190
metric .WithResource (pluginResource ),
148
191
)
192
+
193
+ lp := log .NewLoggerProvider (
194
+ log .WithProcessor (logsProcessor ),
195
+ log .WithResource (newResource (p )),
196
+ )
197
+
149
198
otel .SetErrorHandler (otel .ErrorHandlerFunc (func (err error ) {
150
199
logger .Warn ().Err (err ).Msg ("otel error" )
151
200
}))
152
201
otel .SetTracerProvider (tp )
153
202
otel .SetMeterProvider (mt )
203
+ logglobal .SetLoggerProvider (lp )
154
204
155
205
shutdown = func () {
156
206
if err := tp .Shutdown (context .Background ()); err != nil {
157
- logger .Error ().Err (err ).Msg ("failed to shutdown OTLP trace exporter " )
207
+ logger .Error ().Err (err ).Msg ("failed to shutdown OTLP trace provider " )
158
208
}
159
209
if err := mt .Shutdown (context .Background ()); err != nil {
160
- logger .Error ().Err (err ).Msg ("failed to shutdown OTLP metric exporter" )
210
+ logger .Error ().Err (err ).Msg ("failed to shutdown OTLP metric provider" )
211
+ }
212
+ if err := lp .Shutdown (context .Background ()); err != nil {
213
+ logger .Error ().Err (err ).Msg ("failed to shutdown OTLP logger provider" )
161
214
}
162
215
}
163
216
164
217
return shutdown , nil
165
218
}
219
+
220
+ // Similar to https://github.com/AkhigbeEromo/opentelemetry-go-contrib/blob/dedcf91a55a36a5a8589c56f2e43c188eb42f4f2/bridges/otelzerolog/hook.go
221
+ // but with `TraceLevel` and attributes support
222
+ type otelLoggerHook struct {
223
+ otellog.Logger
224
+ ctx context.Context
225
+ }
226
+
227
+ func (h * otelLoggerHook ) Run (e * zerolog.Event , level zerolog.Level , message string ) {
228
+ record := otellog.Record {}
229
+ record .SetTimestamp (time .Now ().UTC ())
230
+ record .SetSeverity (otellogSeverity (level ))
231
+ record .SetBody (otellog .StringValue (message ))
232
+ // See https://github.com/rs/zerolog/issues/493, this is ugly but it works
233
+ logData := make (map [string ]any )
234
+ eventBuffer := fmt .Sprintf ("%s}" , reflect .ValueOf (e ).Elem ().FieldByName ("buf" ))
235
+ err := json .Unmarshal ([]byte (eventBuffer ), & logData )
236
+ if err == nil {
237
+ recordAttributes := make ([]otellog.KeyValue , 0 , len (logData ))
238
+ for k , v := range logData {
239
+ if k == "level" {
240
+ continue
241
+ }
242
+ if k == "time" {
243
+ eventTimestamp , ok := v .(string )
244
+ if ! ok {
245
+ continue
246
+ }
247
+ t , err := time .Parse (time .RFC3339Nano , eventTimestamp )
248
+ if err == nil {
249
+ record .SetTimestamp (t )
250
+ continue
251
+ }
252
+ }
253
+ var attributeValue otellog.Value
254
+ switch v := v .(type ) {
255
+ case string :
256
+ attributeValue = otellog .StringValue (v )
257
+ case int :
258
+ attributeValue = otellog .IntValue (v )
259
+ case int64 :
260
+ attributeValue = otellog .Int64Value (v )
261
+ case float64 :
262
+ attributeValue = otellog .Float64Value (v )
263
+ case bool :
264
+ attributeValue = otellog .BoolValue (v )
265
+ case []byte :
266
+ attributeValue = otellog .BytesValue (v )
267
+ default :
268
+ attributeValue = otellog .StringValue (fmt .Sprintf ("%v" , v ))
269
+ }
270
+ recordAttributes = append (recordAttributes , otellog.KeyValue {
271
+ Key : k ,
272
+ Value : attributeValue ,
273
+ })
274
+ }
275
+ record .AddAttributes (recordAttributes ... )
276
+ }
277
+
278
+ h .Emit (h .ctx , record )
279
+ }
280
+
281
+ func otellogSeverity (level zerolog.Level ) otellog.Severity {
282
+ switch level {
283
+ case zerolog .DebugLevel :
284
+ return otellog .SeverityDebug
285
+ case zerolog .InfoLevel :
286
+ return otellog .SeverityInfo
287
+ case zerolog .WarnLevel :
288
+ return otellog .SeverityWarn
289
+ case zerolog .ErrorLevel :
290
+ return otellog .SeverityError
291
+ case zerolog .FatalLevel :
292
+ return otellog .SeverityFatal2
293
+ case zerolog .PanicLevel :
294
+ return otellog .SeverityFatal1
295
+ case zerolog .TraceLevel :
296
+ return otellog .SeverityTrace
297
+ default :
298
+ return otellog .SeverityUndefined
299
+ }
300
+ }
301
+
302
+ func newOTELLoggerHook () zerolog.Hook {
303
+ return & otelLoggerHook {logglobal .GetLoggerProvider ().Logger ("cloudquery" ), context .Background ()}
304
+ }
0 commit comments