Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Manik2708 <[email protected]>
  • Loading branch information
Manik2708 committed Jan 25, 2025
1 parent 96380d7 commit 2a4163b
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 16 deletions.
4 changes: 4 additions & 0 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,27 @@ type MultiSearchService interface {
Do(ctx context.Context) (*elastic.MultiSearchResult, error)
}

// AliasAddAction is an abstraction for elastic.AliasAddAction
type AliasAddAction interface {
Index(index ...string) AliasAddAction
IsWriteIndex(flag bool) AliasAddAction
Do(ctx context.Context) (*elastic.AliasResult, error)
}

// AliasRemoveAction is an abstraction for elastic.AliasRemoveAction
type AliasRemoveAction interface {
Index(index ...string) AliasRemoveAction
Do(ctx context.Context) (*elastic.AliasResult, error)
}

// XPackIlmPutLifecycle is an abstraction for elastic.XPackIlmPutLifecycle
type XPackIlmPutLifecycle interface {
BodyString(body string) XPackIlmPutLifecycle
Policy(policy string) XPackIlmPutLifecycle
Do(ctx context.Context) (*elastic.XPackIlmPutLifecycleResponse, error)
}

// IndicesGetService is an abstraction for elastic.IndicesGetService
type IndicesGetService interface {
Index(indices ...string) IndicesGetService
Do(ctx context.Context) (map[string]*elastic.IndicesGetResponse, error)
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *DependencyStore) getWriteIndex(ts time.Time) string {
return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
}

func (s *DependencyStore) CreatePolicy(version uint) error {
policyManager := ilm.NewPolicyManager(s.client, version, s.dependencyIndexPrefix)
func (s *DependencyStore) CreatePolicy() error {
policyManager := ilm.NewPolicyManager(s.client, s.dependencyIndexPrefix)
return policyManager.Init()
}
6 changes: 3 additions & 3 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
if f.primaryConfig.UseILM {
err = ilm.CreatePolicyIfNotExists(primaryClient, f.primaryConfig.IsOpenSearch, f.primaryConfig.Version)
if err != nil {
return fmt.Errorf("failed to create ILM policy: %w", err)
return fmt.Errorf("failed to create index management policy: %w", err)
}
}

Expand Down Expand Up @@ -321,7 +321,7 @@ func createSpanWriter(
}
}
if cfg.UseILM {
err := writer.CreatePolicy(cfg.Version)
err := writer.InitializePolicyManager()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store
}
}
if f.primaryConfig.UseILM {
err := store.CreatePolicy(f.primaryConfig.Version)
err := store.InitializePolicyManager()
if err != nil {
return nil, err
}
Expand Down
7 changes: 1 addition & 6 deletions plugin/storage/es/ilm/policymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,13 @@ var ErrIlmNotSupported = errors.New("ILM is supported only for ES version 7+")
type PolicyManager struct {
client func() es.Client
prefixedIndexNameWithSeparator string
version uint
}

// Init makes the jaeger ready for automatic rollover by using ILM by creating
// initial rollover indices and read-write aliases
func (p *PolicyManager) Init() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if p.version < ilmVersionSupport {
return ErrIlmNotSupported
}
err := p.init(ctx)
if err != nil {
return err
Expand All @@ -52,10 +48,9 @@ func (p *PolicyManager) Init() error {
// NewPolicyManager creates the policy manager with appropriate version and prefixedIndexNameWithSeparator.
// prefixedIndexNameWithSeparator is the prefix with separator. For example if index prefix is jaeger-main
// and policy manager is called for span indices, then prefixedIndexNameWithSeparator will be jaeger-main-jaeger-span-
func NewPolicyManager(cl func() es.Client, version uint, prefixedIndexNameWithSeparator string) *PolicyManager {
func NewPolicyManager(cl func() es.Client, prefixedIndexNameWithSeparator string) *PolicyManager {
return &PolicyManager{
client: cl,
version: version,
prefixedIndexNameWithSeparator: prefixedIndexNameWithSeparator,
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/es/samplingstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time,
}).Add()
}

func (s *SamplingStore) CreatePolicy(version uint) error {
policyManager := ilm.NewPolicyManager(s.client, version, s.samplingIndexPrefix)
func (s *SamplingStore) InitializePolicyManager() error {
policyManager := ilm.NewPolicyManager(s.client, s.samplingIndexPrefix)
return policyManager.Init()
}

Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, index
return nil
}

func (s *SpanWriter) CreatePolicy(version uint) error {
func (s *SpanWriter) InitializePolicyManager() error {
spanPrefix := s.spanAndServicePrefixFn(spanIndexBaseName)
spanPolicyManager := ilm.NewPolicyManager(s.client, version, spanPrefix)
spanPolicyManager := ilm.NewPolicyManager(s.client, spanPrefix)
err := spanPolicyManager.Init()
if err != nil {
return err
}
servicePrefix := s.spanAndServicePrefixFn(serviceIndexBaseName)
servicePolicyManager := ilm.NewPolicyManager(s.client, version, servicePrefix)
servicePolicyManager := ilm.NewPolicyManager(s.client, servicePrefix)
return servicePolicyManager.Init()
}

Expand Down

0 comments on commit 2a4163b

Please sign in to comment.