Skip to content

Commit

Permalink
feat: Support elasticsearch data stream
Browse files Browse the repository at this point in the history
Signed-off-by: zzzk1 <[email protected]>
  • Loading branch information
zzzk1 committed Jan 16, 2025
1 parent 2ee8e4c commit 35ae71b
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cmd/es-rollover/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Bool(archive, false, "Handle archive indices")
flags.String(username, "", "The username required by storage")
flags.String(password, "", "The password required by storage")
flags.Bool(useILM, false, "Use ILM to manage jaeger indices")
flags.Bool(useILM, true, "Use ILM to manage jaeger indices")
flags.String(ilmPolicyName, "jaeger-ilm-policy", "The name of the ILM policy to use if ILM is active")
flags.Int(timeout, 120, "Number of seconds to wait for master node response")
flags.Bool(skipDependencies, false, "Disable rollover for dependencies index")
Expand Down
65 changes: 64 additions & 1 deletion cmd/es-rollover/app/init/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/http"
"strings"

"github.com/elastic/go-elasticsearch/v8/typedapi/types"

"github.com/jaegertracing/jaeger/cmd/es-rollover/app"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/client"
Expand Down Expand Up @@ -56,7 +58,10 @@ func (c Action) Do() error {
return err
}
if !policyExist {
return fmt.Errorf("ILM policy %s doesn't exist in Elasticsearch. Please create it and re-run init", c.Config.ILMPolicyName)
err := createDefaultILMPolicy(c.ILMClient, c.Config.ILMPolicyName)
if err != nil {
return fmt.Errorf("ILM policy %s Create/Update failed %w", c.Config.ILMPolicyName, err)
}
}
}
rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.Config.IndexPrefix)
Expand All @@ -68,6 +73,64 @@ func (c Action) Do() error {
return nil
}

const esILMPolicy = `
{
"policy": {
"phases": {
"hot": {
"min_age": "9999ms",
"actions": {
"rollover": {
"max_age": "1m"
},
"set_priority": {
"priority": 9999
}
}
},
"delete": {
"min_age": "9999m",
"actions": {
"delete": {}
}
}
}
}
}
`

func createDefaultILMPolicy(c client.IndexManagementLifecycleAPI, policy string) error {
ilmPolicy := types.NewIlmPolicy()
err := json.Unmarshal([]byte(esILMPolicy), ilmPolicy)
if err != nil {
return fmt.Errorf("error unmarshalling ILM policy: %w", err)
}
err = c.CreateOrUpdate(policy, client.Policy{ILMPolicy: *ilmPolicy})
if err != nil {
var esErr client.ResponseError
if errors.As(err, &esErr) {
if esErr.StatusCode != http.StatusBadRequest || esErr.Body == nil {
return esErr.Err
}
// check for the reason of the error
jsonError := map[string]any{}
err := json.Unmarshal(esErr.Body, &jsonError)
if err != nil {
// return unmarshal error
return err
}
errorMap := jsonError["error"].(map[string]any)
// check for reason, ignore already exist error
if strings.Contains(errorMap["type"].(string), "resource_already_exists_exception") {
return nil
}
}
// Return any other error unrelated to the response
return err
}
return nil
}

func createIndexIfNotExist(c client.IndexAPI, index string) error {
err := c.CreateIndex(index)
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions pkg/es/client/ilm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
package client

import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/elastic/go-elasticsearch/v8/typedapi/types"
)

var _ IndexManagementLifecycleAPI = (*ILMClient)(nil)
Expand All @@ -17,6 +20,35 @@ type ILMClient struct {
MasterTimeoutSeconds int
}

type Policy struct {
ILMPolicy types.IlmPolicy `json:"policy"`
}

// CreateOrUpdate Add or update a ILMPolicy
func (i ILMClient) CreateOrUpdate(name string, ilmPolicy Policy) error {
body, err := json.Marshal(ilmPolicy)
if err != nil {
return err
}
_, err = i.request(elasticRequest{
endpoint: "_ilm/policy/" + name,
body: body,
method: http.MethodPut,
})

var respError ResponseError
if errors.As(err, &respError) {
if respError.StatusCode == http.StatusNotFound {
return nil
}
}

if err != nil {
return fmt.Errorf("failed to create/update ILM policy: %s, %w", name, err)
}
return nil
}

// Exists verify if a ILM policy exists
func (i ILMClient) Exists(name string) (bool, error) {
_, err := i.request(elasticRequest{
Expand Down
77 changes: 77 additions & 0 deletions pkg/es/client/ilm_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
package client

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -64,3 +67,77 @@ func TestExists(t *testing.T) {
})
}
}

const esILMPolicy = `
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_age": "1m"
},
"set_priority": {
"priority": 100
}
}
},
"delete": {
"min_age": "2m",
"actions": {
"delete": {}
}
}
}
}
}
`

func TestCreate(t *testing.T) {
policy := "jaeger-ilm-policy"
tests := []struct {
name string
responseCode int
response string
errContains string
}{
{
name: "successful",
responseCode: http.StatusOK,
},
{
name: "client error",
responseCode: http.StatusBadRequest,
response: esErrResponse,
errContains: "failed to create/update ILM policy: jaeger-ilm-policy",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
assert.True(t, strings.HasSuffix(req.URL.String(), "_ilm/policy/jaeger-ilm-policy"))
assert.Equal(t, http.MethodPut, req.Method)
res.WriteHeader(test.responseCode)
res.Write([]byte(test.response))
}))
defer testServer.Close()

c := &ILMClient{
Client: Client{
Client: testServer.Client(),
Endpoint: testServer.URL,
BasicAuth: "foobar",
},
}
ilmPolicy := types.NewIlmPolicy()
json.Unmarshal([]byte(esILMPolicy), ilmPolicy)
fmt.Printf("%+v\n", *ilmPolicy)

err := c.CreateOrUpdate(policy, Policy{ILMPolicy: *ilmPolicy})
if test.errContains != "" {
require.ErrorContains(t, err, test.errContains)
}
})
}
}
1 change: 1 addition & 0 deletions pkg/es/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ type ClusterAPI interface {

type IndexManagementLifecycleAPI interface {
Exists(name string) (bool, error)
CreateOrUpdate(policy string, ilmPolicy Policy) error
}
23 changes: 22 additions & 1 deletion pkg/es/client/mocks/IndexManagementLifecycleAPI.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
dependenciesTemplateName = "jaeger-dependencies"
primaryNamespace = "es"
archiveNamespace = "es-archive"
ilmPolicy = "jaeger-ilm-policy"
)

type ESStorageIntegration struct {
Expand Down Expand Up @@ -110,7 +111,7 @@ func (*ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields b
fmt.Sprintf("--es.num-shards=%v", 5),
fmt.Sprintf("--es.num-replicas=%v", 1),
fmt.Sprintf("--es.index-prefix=%v", indexPrefix),
fmt.Sprintf("--es.use-ilm=%v", false),
fmt.Sprintf("--es.use-ilm=%v", true),
fmt.Sprintf("--es.service-cache-ttl=%v", 1*time.Second),
fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es.bulk.actions=%v", 1),
Expand Down Expand Up @@ -245,3 +246,37 @@ func (s *ESStorageIntegration) cleanESIndexTemplates(t *testing.T, prefix string
}
return nil
}

func TestElasticsearchStorage_ILMPolicy(t *testing.T) {
SkipUnlessEnv(t, "elasticsearch", "opensearch")
t.Cleanup(func() {
testutils.VerifyGoLeaksOnceForES(t)
})
c := getESHttpClient(t)
require.NoError(t, healthCheck(c))
s := &ESStorageIntegration{}
s.initializeES(t, c, true)
esVersion, err := s.getVersion()
require.NoError(t, err)
if esVersion == 8 {
request := s.v8Client.ILM.GetLifecycle.WithPolicy(ilmPolicy)
ilmPolicyExistsResponse, err := s.v8Client.ILM.GetLifecycle(request)
require.NoError(t, err)
assert.Equal(t, 200, ilmPolicyExistsResponse.StatusCode)
} else {

Check failure on line 266 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / lint

empty-block: this block is empty, you can remove it (revive)
// nothing
}
s.cleanEsILMPolicy(t, ilmPolicy)
}

func (s *ESStorageIntegration) cleanEsILMPolicy(t *testing.T, policy string) error {

Check failure on line 272 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'policy' seems to be unused, consider removing or renaming it as _ (revive)
version, err := s.getVersion()
require.NoError(t, err)
if version == 8 {
_, err := s.v8Client.ILM.RemovePolicy(ilmPolicy)
require.NoError(t, err)
} else {

Check failure on line 278 in plugin/storage/integration/elasticsearch_test.go

View workflow job for this annotation

GitHub Actions / lint

empty-block: this block is empty, you can remove it (revive)
// nothing
}
return nil
}

0 comments on commit 35ae71b

Please sign in to comment.