forked from jaegertracing/jaeger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathe2e_integration.go
190 lines (166 loc) Β· 6.32 KB
/
e2e_integration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0
package integration
import (
"context"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"gopkg.in/yaml.v3"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
)
const otlpPort = 4317
// E2EStorageIntegration holds components for e2e mode of Jaeger-v2
// storage integration test. The intended usage is as follows:
// - Initialize a specific storage implementation declares its own test functions
// (e.g. starts remote-storage).
// - Then, instantiates with e2eInitialize() to run the Jaeger-v2 collector
// and also the SpanWriter and SpanReader.
// - After that, calls RunSpanStoreTests().
// - Clean up with e2eCleanup() to close the SpanReader and SpanWriter connections.
// - At last, clean up anything declared in its own test functions.
// (e.g. close remote-storage)
type E2EStorageIntegration struct {
integration.StorageIntegration
ConfigFile string
}
// e2eInitialize starts the Jaeger-v2 collector with the provided config file,
// it also initialize the SpanWriter and SpanReader below.
// This function should be called before any of the tests start.
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
configFile := createStorageCleanerConfig(t, s.ConfigFile, storage)
t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)
outFile, err := os.OpenFile(
filepath.Join(t.TempDir(), "jaeger_output_logs.txt"),
os.O_CREATE|os.O_WRONLY,
os.ModePerm,
)
require.NoError(t, err)
t.Logf("Writing the Jaeger-v2 output logs into %s", outFile.Name())
errFile, err := os.OpenFile(
filepath.Join(t.TempDir(), "jaeger_error_logs.txt"),
os.O_CREATE|os.O_WRONLY,
os.ModePerm,
)
require.NoError(t, err)
t.Logf("Writing the Jaeger-v2 error logs into %s", errFile.Name())
cmd := exec.Cmd{
Path: "./cmd/jaeger/jaeger",
Args: []string{"jaeger", "--config", configFile},
// Change the working directory to the root of this project
// since the binary config file jaeger_query's ui_config points to
// "./cmd/jaeger/config-ui.json"
Dir: "../../../..",
Stdout: outFile,
Stderr: errFile,
}
require.NoError(t, cmd.Start())
require.Eventually(t, func() bool {
url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
t.Logf("Checking if Jaeger-v2 is available on %s", url)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Log(err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 30*time.Second, 500*time.Millisecond, "Jaeger-v2 did not start")
t.Log("Jaeger-v2 is ready")
t.Cleanup(func() {
require.NoError(t, cmd.Process.Kill())
if t.Failed() {
// A Github Actions special annotation to create a foldable section
// in the Github runner output.
// https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines
fmt.Println("::group::π§ π§ π§ Jaeger-v2 binary logs")
outLogs, err := os.ReadFile(outFile.Name())
require.NoError(t, err)
fmt.Printf("π§ π§ π§ Jaeger-v2 output logs:\n%s", outLogs)
errLogs, err := os.ReadFile(errFile.Name())
require.NoError(t, err)
fmt.Printf("π§ π§ π§ Jaeger-v2 error logs:\n%s", errLogs)
// End of Github Actions foldable section annotation.
fmt.Println("::endgroup::")
}
})
s.SpanWriter, err = createSpanWriter(logger, otlpPort)
require.NoError(t, err)
s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC)
require.NoError(t, err)
}
// e2eCleanUp closes the SpanReader and SpanWriter gRPC connection.
// This function should be called after all the tests are finished.
func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) {
require.NoError(t, s.SpanReader.(io.Closer).Close())
require.NoError(t, s.SpanWriter.(io.Closer).Close())
}
func createStorageCleanerConfig(t *testing.T, configFile string, storage string) string {
data, err := os.ReadFile(configFile)
require.NoError(t, err)
var config map[string]any
err = yaml.Unmarshal(data, &config)
require.NoError(t, err)
serviceAny, ok := config["service"]
require.True(t, ok)
service := serviceAny.(map[string]any)
service["extensions"] = append(service["extensions"].([]any), "storage_cleaner")
extensionsAny, ok := config["extensions"]
require.True(t, ok)
extensions := extensionsAny.(map[string]any)
queryAny, ok := extensions["jaeger_query"]
require.True(t, ok)
traceStorageAny, ok := queryAny.(map[string]any)["trace_storage"]
require.True(t, ok)
traceStorage := traceStorageAny.(string)
extensions["storage_cleaner"] = map[string]string{"trace_storage": traceStorage}
jaegerStorageAny, ok := extensions["jaeger_storage"]
require.True(t, ok)
jaegerStorage := jaegerStorageAny.(map[string]any)
backendsAny, ok := jaegerStorage["backends"]
require.True(t, ok)
backends := backendsAny.(map[string]any)
switch storage {
case "elasticsearch", "opensearch":
someStoreAny, ok := backends["some_storage"]
require.True(t, ok, "expecting 'some_storage' entry, found: %v", jaegerStorage)
someStore := someStoreAny.(map[string]any)
esMainAny, ok := someStore[storage]
require.True(t, ok, "expecting '%s' entry, found %v", storage, someStore)
esMain := esMainAny.(map[string]any)
esMain["service_cache_ttl"] = "1ms"
default:
// Do Nothing
}
newData, err := yaml.Marshal(config)
require.NoError(t, err)
tempFile := filepath.Join(t.TempDir(), "storageCleaner_config.yaml")
err = os.WriteFile(tempFile, newData, 0o600)
require.NoError(t, err)
return tempFile
}
func purge(t *testing.T) {
addr := fmt.Sprintf("http://0.0.0.0:%s%s", storagecleaner.Port, storagecleaner.URL)
r, err := http.NewRequestWithContext(context.Background(), http.MethodPost, addr, nil)
require.NoError(t, err)
client := &http.Client{}
resp, err := client.Do(r)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
}