Skip to content

Commit

Permalink
[#31927,#31928][prism] Support StringSet and Gauge metrics. (#32184)
Browse files Browse the repository at this point in the history
* Update Generated Go Protos

* [prism] Add stringset and gauge metrics

* lint + tests

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Aug 15, 2024
1 parent cbe2b9e commit 11befd3
Show file tree
Hide file tree
Showing 19 changed files with 1,799 additions and 1,476 deletions.
7 changes: 5 additions & 2 deletions sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2,177 changes: 1,094 additions & 1,083 deletions sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/pipeline_v1/endpoints.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

206 changes: 165 additions & 41 deletions sdks/go/pkg/beam/model/pipeline_v1/external_transforms.pb.go

Large diffs are not rendered by default.

698 changes: 362 additions & 336 deletions sdks/go/pkg/beam/model/pipeline_v1/metrics.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/model/pipeline_v1/schema.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 122 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ func buildUrnToOpsMap(mUrn2Spec map[string]*pipepb.MonitoringInfoSpec) map[strin
// Defaults should be safe since the metric only exists if we get any values at all.
return &distributionInt64{dist: metrics.DistributionValue{Min: math.MaxInt64, Max: math.MinInt64}}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_INT64_TYPE): func() metricAccumulator {
// Initializes the gauge so any new value will override it.
return &gaugeInt64{millisSinceEpoch: math.MinInt64}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_DOUBLE_TYPE): func() metricAccumulator {
// Initializes the gauge so any new value will override it.
return &gaugeFloat64{millisSinceEpoch: math.MinInt64}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE): func() metricAccumulator {
return &stringSet{set: map[string]struct{}{}}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_PROGRESS_TYPE): func() metricAccumulator { return &progress{} },
}

Expand Down Expand Up @@ -347,6 +358,116 @@ func (m *distributionInt64) toProto(key metricKey) *pipepb.MonitoringInfo {
}
}

type gaugeInt64 struct {
millisSinceEpoch int64
val int64
}

func (m *gaugeInt64) accumulate(pyld []byte) error {
buf := bytes.NewBuffer(pyld)

timestamp, err := coder.DecodeVarInt(buf)
if err != nil {
return err
}
if m.millisSinceEpoch > timestamp {
// Drop values that are older than what we have already.
return nil
}
val, err := coder.DecodeVarInt(buf)
if err != nil {
return err
}
m.millisSinceEpoch = timestamp
m.val = val
return nil
}

func (m *gaugeInt64) toProto(key metricKey) *pipepb.MonitoringInfo {
var buf bytes.Buffer
coder.EncodeVarInt(m.millisSinceEpoch, &buf)
coder.EncodeVarInt(m.val, &buf)
return &pipepb.MonitoringInfo{
Urn: key.Urn(),
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_INT64_TYPE),
Payload: buf.Bytes(),
Labels: key.Labels(),
}
}

type gaugeFloat64 struct {
millisSinceEpoch int64
val float64
}

func (m *gaugeFloat64) accumulate(pyld []byte) error {
buf := bytes.NewBuffer(pyld)

timestamp, err := coder.DecodeVarInt(buf)
if err != nil {
return err
}
if m.millisSinceEpoch > timestamp {
// Drop values that are older than what we have already.
return nil
}
val, err := coder.DecodeDouble(buf)
if err != nil {
return err
}
m.millisSinceEpoch = timestamp
m.val = val
return nil
}

func (m *gaugeFloat64) toProto(key metricKey) *pipepb.MonitoringInfo {
var buf bytes.Buffer
coder.EncodeVarInt(m.millisSinceEpoch, &buf)
coder.EncodeDouble(m.val, &buf)
return &pipepb.MonitoringInfo{
Urn: key.Urn(),
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_LATEST_DOUBLE_TYPE),
Payload: buf.Bytes(),
Labels: key.Labels(),
}
}

type stringSet struct {
set map[string]struct{}
}

func (m *stringSet) accumulate(pyld []byte) error {
buf := bytes.NewBuffer(pyld)

n, err := coder.DecodeInt32(buf)
if err != nil {
return err
}
// Assume it's a fixed iterator size.
for i := int32(0); i < n; i++ {
val, err := coder.DecodeStringUTF8(buf)
if err != nil {
return err
}
m.set[val] = struct{}{}
}
return nil
}

func (m *stringSet) toProto(key metricKey) *pipepb.MonitoringInfo {
var buf bytes.Buffer
coder.EncodeInt32(int32(len(m.set)), &buf)
for k := range m.set {
coder.EncodeStringUTF8(k, &buf)
}
return &pipepb.MonitoringInfo{
Urn: key.Urn(),
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE),
Payload: buf.Bytes(),
Labels: key.Labels(),
}
}

type durability int

const (
Expand Down Expand Up @@ -507,7 +628,7 @@ func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte)
a = ops.newAccum()
}
if err := a.accumulate(payload); err != nil {
panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", key.Urn(), key, a))
panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v :%v", key.Urn(), key, a, err))
}
accums[key] = a
switch u := key.Urn(); u {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,44 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) {
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_DISTRIBUTION_INT64, []byte{4, 19, 2, 7}),
},
}, {
name: "int64Gauge",
input: []map[string][]byte{
{"a": []byte{3, 5}},
{"a": []byte{14, 2}},
{"a": []byte{10, 18}},
},
shortIDs: map[string]*pipepb.MonitoringInfo{
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_LATEST_INT64),
},
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_LATEST_INT64, []byte{14, 2}),
},
}, {
name: "float64Gauge",
input: []map[string][]byte{
{"a": append([]byte{2}, doubleBytes(45)...)},
{"a": append([]byte{17}, doubleBytes(2)...)},
{"a": append([]byte{16}, doubleBytes(200)...)},
},
shortIDs: map[string]*pipepb.MonitoringInfo{
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_LATEST_DOUBLE),
},
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_LATEST_DOUBLE, append([]byte{17}, doubleBytes(2)...)),
},
}, {
name: "stringSet",
input: []map[string][]byte{
{"a": []byte{0, 0, 0, 1, 1, 63}},
{"a": []byte{0, 0, 0, 2, 1, 63, 1, 63}},
},
shortIDs: map[string]*pipepb.MonitoringInfo{
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_SET_STRING),
},
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SET_STRING, []byte{0, 0, 0, 1, 1, 63}),
},
},
}

Expand Down

0 comments on commit 11befd3

Please sign in to comment.