@@ -24,6 +24,7 @@ use file_source::{FileServer, FileServerShutdown, Fingerprinter};
24
24
use futures:: { future:: FutureExt , sink:: Sink , stream:: StreamExt } ;
25
25
use k8s_openapi:: api:: core:: v1:: Pod ;
26
26
use serde:: { Deserialize , Serialize } ;
27
+ use std:: convert:: TryInto ;
27
28
use std:: path:: PathBuf ;
28
29
use std:: time:: Duration ;
29
30
@@ -73,6 +74,22 @@ pub struct Config {
73
74
74
75
/// A list of glob patterns to exclude from reading the files.
75
76
exclude_paths_glob_patterns : Vec < PathBuf > ,
77
+
78
+ /// Max amount of bytes to read from a single file before switching over
79
+ /// to the next file.
80
+ /// This allows distributing the reads more or less evenly accross
81
+ /// the files.
82
+ #[ serde( default = "default_max_read_bytes" ) ]
83
+ max_read_bytes : usize ,
84
+
85
+ /// This value specifies not exactly the globbing, but interval
86
+ /// between the polling the files to watch from the `paths_provider`.
87
+ /// This is quite efficient, yet might still create some load of the
88
+ /// file system; in addition, it is currently coupled with chechsum dumping
89
+ /// in the underlying file server, so setting it too low may introduce
90
+ /// a significant overhead.
91
+ #[ serde( default = "default_glob_minimum_cooldown_ms" ) ]
92
+ glob_minimum_cooldown_ms : usize ,
76
93
}
77
94
78
95
inventory:: submit! {
@@ -138,6 +155,8 @@ struct Source {
138
155
field_selector : String ,
139
156
label_selector : String ,
140
157
exclude_paths : Vec < glob:: Pattern > ,
158
+ max_read_bytes : usize ,
159
+ glob_minimum_cooldown : Duration ,
141
160
}
142
161
143
162
impl Source {
@@ -166,6 +185,11 @@ impl Source {
166
185
} )
167
186
. collect :: < crate :: Result < Vec < _ > > > ( ) ?;
168
187
188
+ let glob_minimum_cooldown =
189
+ Duration :: from_millis ( config. glob_minimum_cooldown_ms . try_into ( ) . expect (
190
+ "unable to convert glob_minimum_cooldown_ms from usize to u64 without data loss" ,
191
+ ) ) ;
192
+
169
193
Ok ( Self {
170
194
client,
171
195
data_dir,
@@ -174,6 +198,8 @@ impl Source {
174
198
field_selector,
175
199
label_selector,
176
200
exclude_paths,
201
+ max_read_bytes : config. max_read_bytes ,
202
+ glob_minimum_cooldown,
177
203
} )
178
204
}
179
205
@@ -190,6 +216,8 @@ impl Source {
190
216
field_selector,
191
217
label_selector,
192
218
exclude_paths,
219
+ max_read_bytes,
220
+ glob_minimum_cooldown,
193
221
} = self ;
194
222
195
223
let watcher = k8s:: api_watcher:: ApiWatcher :: new ( client, Pod :: watch_pod_for_all_namespaces) ;
@@ -213,7 +241,7 @@ impl Source {
213
241
let paths_provider = K8sPathsProvider :: new ( state_reader. clone ( ) , exclude_paths) ;
214
242
let annotator = PodMetadataAnnotator :: new ( state_reader, fields_spec) ;
215
243
216
- // TODO: maybe some of the parameters have to be configurable.
244
+ // TODO: maybe more of the parameters have to be configurable.
217
245
218
246
// The 16KB is the maximum size of the payload at single line for both
219
247
// docker and CRI log formats.
@@ -224,8 +252,11 @@ impl Source {
224
252
let file_server = FileServer {
225
253
// Use our special paths provider.
226
254
paths_provider,
227
- // This is the default value for the read buffer size.
228
- max_read_bytes : 2048 ,
255
+ // Max amount of bytes to read from a single file before switching
256
+ // over to the next file.
257
+ // This allows distributing the reads more or less evenly accross
258
+ // the files.
259
+ max_read_bytes,
229
260
// We want to use checkpoining mechanism, and resume from where we
230
261
// left off.
231
262
start_at_beginning : false ,
@@ -242,10 +273,7 @@ impl Source {
242
273
data_dir,
243
274
// This value specifies not exactly the globbing, but interval
244
275
// between the polling the files to watch from the `paths_provider`.
245
- // This is quite efficient, yet might still create some load of the
246
- // file system, so this call is 10 times larger than the default for
247
- // the files.
248
- glob_minimum_cooldown : Duration :: from_secs ( 10 ) ,
276
+ glob_minimum_cooldown,
249
277
// The shape of the log files is well-known in the Kubernetes
250
278
// environment, so we pick the a specially crafted fingerprinter
251
279
// for the log files.
@@ -368,6 +396,14 @@ fn default_self_node_name_env_template() -> String {
368
396
format ! ( "${{{}}}" , SELF_NODE_NAME_ENV_KEY . to_owned( ) )
369
397
}
370
398
399
+ fn default_max_read_bytes ( ) -> usize {
400
+ 2048
401
+ }
402
+
403
+ fn default_glob_minimum_cooldown_ms ( ) -> usize {
404
+ 60000
405
+ }
406
+
371
407
/// This function construct the effective field selector to use, based on
372
408
/// the specified configuration.
373
409
fn prepare_field_selector ( config : & Config ) -> crate :: Result < String > {
0 commit comments