Skip to content

Commit

Permalink
feat(api): Support Parquet as a query response format. (#15408)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This changes adds support for the header `Accept: application/vnd.apacher.parquet`. If this header is set the response for metric and stream queries will response with a Parquet file.

A metrics response has the columns `timestamp`, `labels` and `value`. A stream response has the columns `timestamp`, `labels` and `line`.

Co-authored-by: Christian Haudum <[email protected]>
  • Loading branch information
jeschkies and chaudum authored Dec 18, 2024
1 parent 04f621e commit d0c11a6
Show file tree
Hide file tree
Showing 318 changed files with 309,229 additions and 10 deletions.
36 changes: 36 additions & 0 deletions docs/sources/reference/loki-http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,24 @@ The items in the `values` array are sorted by timestamp.
The most recent item is first when using `direction=backward`.
The oldest item is first when using `direction=forward`.

Parquet can be request as a response format by setting the `Accept` header to `application/vnd.apache.parquet`.

The schema is the following for streams:

| column_name | column_type |
|-------------|--------------------------|
| timestamp | TIMESTAMP WITH TIME ZONE |
| labels | MAP(VARCHAR, VARCHAR) |
| line |VARCHAR |

and for metrics:

| column_name | column_type |
|-------------|--------------------------|
| timestamp | TIMESTAMP WITH TIME ZONE |
| labels | MAP(VARCHAR, VARCHAR) |
| value | DOUBLE |

See [statistics](#statistics) for information about the statistics returned by Loki.

### Examples
Expand Down Expand Up @@ -518,6 +536,24 @@ The items in the `values` array are sorted by timestamp.
The most recent item is first when using `direction=backward`.
The oldest item is first when using `direction=forward`.

Parquet can be request as a response format by setting the `Accept` header to `application/vnd.apache.parquet`.

The schema is the following for streams:

| column_name | column_type |
|-------------|--------------------------|
| timestamp | TIMESTAMP WITH TIME ZONE |
| labels | MAP(VARCHAR, VARCHAR) |
| line |VARCHAR |

and for metrics:

| column_name | column_type |
|-------------|--------------------------|
| timestamp | TIMESTAMP WITH TIME ZONE |
| labels | MAP(VARCHAR, VARCHAR) |
| value | DOUBLE |

See [statistics](#statistics) for information about the statistics returned by Loki.

### Examples
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2567,6 +2567,9 @@ The `frontend` block configures the Loki query-frontend.
# The TLS configuration.
# The CLI flags prefix for this block configuration is: frontend.tail-tls-config
[tail_tls_config: <tls_config>]
# Support 'application/vnd.apache.parquet' content type in HTTP responses.
[support_parquet_encoding: <boolean>]
```

### frontend_worker
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ require (
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/ncw/swift/v2 v2.0.3
github.com/parquet-go/parquet-go v0.24.0
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0
Expand Down Expand Up @@ -161,6 +162,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.1 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.48.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/ebitengine/purego v0.8.1 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
Expand All @@ -172,9 +174,11 @@ require (
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
github.com/pkg/xattr v0.4.10 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=
github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI=
Expand Down Expand Up @@ -2140,6 +2142,7 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
Expand Down Expand Up @@ -2269,6 +2272,8 @@ github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNs
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -2333,6 +2338,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI=
github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c/go.mod h1:otzZQXgoO96RTzDB/Hycg0qZcXZsWJGJRSXbmEIJ+4M=
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/parquet-go/parquet-go v0.24.0 h1:VrsifmLPDnas8zpoHmYiWDZ1YHzLmc7NmNwPGkI2JM4=
github.com/parquet-go/parquet-go v0.24.0/go.mod h1:OqBBRGBl7+llplCvDMql8dEKaDqjaFA/VAPw+OJiNiw=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,13 +1119,14 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "no query frontend configured")
}

roundTripper := queryrange.NewSerializeRoundTripper(t.QueryFrontEndMiddleware.Wrap(frontendTripper), queryrange.DefaultCodec)
roundTripper := queryrange.NewSerializeRoundTripper(t.QueryFrontEndMiddleware.Wrap(frontendTripper), queryrange.DefaultCodec, t.Cfg.Frontend.SupportParquetEncoding)

frontendHandler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace)
if t.Cfg.Frontend.CompressResponses {
frontendHandler = gziphandler.GzipHandler(frontendHandler)
}

// TODO: add SerializeHTTPHandler
toMerge := []middleware.Interface{
httpreq.ExtractQueryTagsMiddleware(),
httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader, httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader),
Expand Down Expand Up @@ -1855,7 +1856,6 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
logger,
prometheus.DefaultRegisterer,
)

if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Config struct {

TailProxyURL string `yaml:"tail_proxy_url"`
TLS tls.ClientConfig `yaml:"tail_tls_config"`

SupportParquetEncoding bool `yaml:"support_parquet_encoding" doc:"description=Support 'application/vnd.apache.parquet' content type in HTTP responses."`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -32,4 +34,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", true, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Loki.")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")

f.BoolVar(&cfg.CompressResponses, "frontend.support-parquet-encoding", false, "Support 'application/vnd.apache.parquet' content type in HTTP responses.")
}
2 changes: 2 additions & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,8 @@ func decodeResponseProtobuf(r *http.Response, req queryrangebase.Request) (query
func (Codec) EncodeResponse(ctx context.Context, req *http.Request, res queryrangebase.Response) (*http.Response, error) {
if req.Header.Get("Accept") == ProtobufType {
return encodeResponseProtobuf(ctx, res)
} else if req.Header.Get("Accept") == ParquetType {
return encodeResponseParquet(ctx, res)
}

// Default to JSON.
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

const (
JSONType = `application/json; charset=utf-8`
ParquetType = `application/vnd.apache.parquet`
ProtobufType = `application/vnd.google.protobuf`
)

Expand Down
112 changes: 112 additions & 0 deletions pkg/querier/queryrange/parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package queryrange

import (
"bytes"
"context"
"io"
"net/http"

"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/prometheus/promql/parser"

serverutil "github.com/grafana/loki/v3/pkg/util/server"

"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
)

func encodeResponseParquet(ctx context.Context, res queryrangebase.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "codec.EncodeResponse")
defer sp.Finish()

var buf bytes.Buffer

err := encodeResponseParquetTo(ctx, res, &buf)
if err != nil {
return nil, err
}

resp := http.Response{
Header: http.Header{
"Content-Type": []string{ParquetType},
},
Body: io.NopCloser(&buf),
StatusCode: http.StatusOK,
}
return &resp, nil
}

func encodeResponseParquetTo(_ context.Context, res queryrangebase.Response, w io.Writer) error {
switch response := res.(type) {
case *LokiPromResponse:
return encodeMetricsParquetTo(response, w)
case *LokiResponse:
return encodeLogsParquetTo(response, w)
default:
return serverutil.UserError("request does not support Parquet responses")
}
}

type MetricRowType struct {
Timestamp int64 `parquet:"timestamp,timestamp(millisecond),delta"`
Labels map[string]string `parquet:"labels"`
Value float64 `parquet:"value"`
}

type LogStreamRowType struct {
Timestamp int64 `parquet:"timestamp,timestamp(nanosecond),delta"`
Labels map[string]string `parquet:"labels"`
Line string `parquet:"line,lz4"`
}

func encodeMetricsParquetTo(response *LokiPromResponse, w io.Writer) error {
schema := parquet.SchemaOf(new(MetricRowType))
writer := parquet.NewGenericWriter[MetricRowType](w, schema)

for _, stream := range response.Response.Data.Result {
lbls := make(map[string]string)
for _, keyValue := range stream.Labels {
lbls[keyValue.Name] = keyValue.Value
}
for _, sample := range stream.Samples {
row := MetricRowType{
Timestamp: sample.TimestampMs,
Labels: lbls,
Value: sample.Value,
}
if _, err := writer.Write([]MetricRowType{row}); err != nil {
return err
}
}
}
return writer.Close()
}

func encodeLogsParquetTo(response *LokiResponse, w io.Writer) error {
schema := parquet.SchemaOf(new(LogStreamRowType))
writer := parquet.NewGenericWriter[LogStreamRowType](w, schema)

for _, stream := range response.Data.Result {
lbls, err := parser.ParseMetric(stream.Labels)
if err != nil {
return err
}
lblsMap := make(map[string]string)
for _, keyValue := range lbls {
lblsMap[keyValue.Name] = keyValue.Value
}

for _, entry := range stream.Entries {
row := LogStreamRowType{
Timestamp: entry.Timestamp.UnixNano(),
Labels: lblsMap,
Line: entry.Line,
}
if _, err := writer.Write([]LogStreamRowType{row}); err != nil {
return err
}
}
}

return writer.Close()
}
63 changes: 63 additions & 0 deletions pkg/querier/queryrange/parquet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package queryrange

import (
"os"
"testing"

"github.com/parquet-go/parquet-go"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
)

func TestEncodeMetricsParquet(t *testing.T) {
resp := &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStreams,
},
},
}

f, err := os.CreateTemp("", "metrics-*.parquet")
defer f.Close() // nolint:staticcheck

require.NoError(t, err)
err = encodeMetricsParquetTo(resp, f)
require.NoError(t, err)

rows, err := parquet.ReadFile[MetricRowType](f.Name())
require.NoError(t, err)

require.Len(t, rows, 3)
}

func TestEncodeLogsParquet(t *testing.T) {
resp := &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.FORWARD,
Limit: 100,
Version: uint32(loghttp.VersionV1),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: logStreams,
},
}

f, err := os.CreateTemp("", "logs-*.parquet")
defer f.Close() // nolint:staticcheck

require.NoError(t, err)
err = encodeLogsParquetTo(resp, f)
require.NoError(t, err)

rows, err := parquet.ReadFile[LogStreamRowType](f.Name())
require.NoError(t, err)

require.Len(t, rows, 3)
}
Empty file.
Loading

0 comments on commit d0c11a6

Please sign in to comment.