Skip to content

Commit

Permalink
Added timeout parameter to write_jsonl() (#4026)
Browse files Browse the repository at this point in the history
This ensures json files are flushed periodically to produce valid files
before they are fully closed.
  • Loading branch information
scudette authored Jan 22, 2025
1 parent 93f8a1a commit cf6226c
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 28 deletions.
25 changes: 13 additions & 12 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,19 @@ func (self *ApiServer) CollectArtifact(

// Build a request based on user input.
request := &flows_proto.ArtifactCollectorArgs{
ClientId: in.ClientId,
Artifacts: in.Artifacts,
Specs: in.Specs,
Creator: user_record.Name,
OpsPerSecond: in.OpsPerSecond,
CpuLimit: in.CpuLimit,
IopsLimit: in.IopsLimit,
Timeout: in.Timeout,
MaxRows: in.MaxRows,
MaxUploadBytes: in.MaxUploadBytes,
Urgent: in.Urgent,
TraceFreqSec: in.TraceFreqSec,
ClientId: in.ClientId,
Artifacts: in.Artifacts,
Specs: in.Specs,
Creator: user_record.Name,
OpsPerSecond: in.OpsPerSecond,
CpuLimit: in.CpuLimit,
IopsLimit: in.IopsLimit,
Timeout: in.Timeout,
ProgressTimeout: in.ProgressTimeout,
MaxRows: in.MaxRows,
MaxUploadBytes: in.MaxUploadBytes,
Urgent: in.Urgent,
TraceFreqSec: in.TraceFreqSec,
}

acl_manager := acl_managers.NewServerACLManager(
Expand Down
4 changes: 4 additions & 0 deletions api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ type logWriter struct {
}

func (self *logWriter) Write(b []byte) (int, error) {
// Sometimes the channel becomes closed for some reason and this
// tends to panic.
defer utils.CheckForPanic("logWriter.Write")

select {
case <-self.ctx.Done():
return 0, io.EOF
Expand Down
6 changes: 6 additions & 0 deletions docs/references/vql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,9 @@
required: true
- name: really_do_it
type: bool
- name: sync
type: bool
description: If specified we ensure data is available immediately
category: server
metadata:
permissions: DELETE_RESULTS
Expand Down Expand Up @@ -11164,6 +11167,9 @@
- name: buffer_size
type: int
description: Maximum size of buffer before flushing to file.
- name: max_time
type: int
description: Maximum time before flushing the buffer (10 sec).
metadata:
permissions: FILESYSTEM_WRITE
platforms:
Expand Down
12 changes: 6 additions & 6 deletions gui/velociraptor/src/components/flows/new-collection.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class NewCollectionResources extends React.Component {
<Col sm="8">
<ValidatedInteger
placeholder={this.getCpuLimit(this.props.artifacts)}
value={resources.cpu_limit}
value={resources.cpu_limit || undefined}
valid_func={value=>value >= 0 && value <=100}
setInvalid={value => this.setState({
invalid_1: value})}
Expand All @@ -562,7 +562,7 @@ class NewCollectionResources extends React.Component {
<Col sm="8">
<ValidatedInteger
placeholder={this.getIopsLimit(this.props.artifacts)}
value={resources.iops_limit}
value={resources.iops_limit || undefined}
setInvalid={value => this.setState({invalid_1: value})}
setValue={value => this.props.setResources({
iops_limit: value
Expand All @@ -575,7 +575,7 @@ class NewCollectionResources extends React.Component {
<Col sm="8">
<ValidatedInteger
placeholder={this.getTimeout(this.props.artifacts)}
value={resources.timeout}
value={resources.timeout || undefined}
setInvalid={value => this.setState({invalid_2: value})}
setValue={value => this.props.setResources({timeout: value})} />
</Col>
Expand All @@ -586,7 +586,7 @@ class NewCollectionResources extends React.Component {
<Col sm="8">
<ValidatedInteger
placeholder={T("If set collection will be terminated after this many seconds with no progress.")}
value={resources.progress_timeout}
value={resources.progress_timeout || undefined}
setInvalid={value => this.setState({invalid_3: value})}
setValue={value => this.props.setResources({
progress_timeout: value})} />
Expand All @@ -598,7 +598,7 @@ class NewCollectionResources extends React.Component {
<Col sm="8">
<ValidatedInteger
placeholder={this.getMaxRows(this.props.artifacts)}
value={resources.max_rows}
value={resources.max_rows || undefined}
setInvalid={value => this.setState({invalid_4: value})}
setValue={value => this.props.setResources({max_rows: value})} />
</Col>
Expand All @@ -609,7 +609,7 @@ class NewCollectionResources extends React.Component {
<Col sm="8">
<ValidatedInteger
placeholder={this.getMaxUploadBytes(this.props.artifacts)}
value={resources.max_mbytes}
value={resources.max_mbytes || undefined}
setInvalid={value => this.setState({invalid_5: value})}
setValue={value => this.props.setResources({max_mbytes: value})} />
</Col>
Expand Down
12 changes: 9 additions & 3 deletions services/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,8 @@ sources:
// Specifying timeout in the request overrides all defaults.
request.Timeout = 20
request.MaxRows = 100
request.ProgressTimeout = 21

compiled, err = launcher.CompileCollectorArgs(
ctx, self.ConfigObj, acl_manager, repository,
services.CompilerOptions{}, request)
Expand All @@ -1225,6 +1227,7 @@ sources:

assert.Equal(self.T(), getReqName(compiled[1]), "Test.Artifact.MaxRows")
assert.Equal(self.T(), compiled[1].Timeout, uint64(20))
assert.Equal(self.T(), compiled[1].ProgressTimeout, float32(21))

// Specifying MaxRows in the request overrides the setting.
assert.Equal(self.T(), request.MaxRows, uint64(100))
Expand Down Expand Up @@ -1378,9 +1381,12 @@ func (self *LauncherTestSuite) TestDelete() {
// However GetFlows omits the deleted flow immediately because it
// can not find it (The actual flow object is removed but the
// index is out of step).
res, err = launcher.GetFlows(self.Ctx, self.ConfigObj, "server",
result_sets.ResultSetOptions{}, 0, 10)
assert.NoError(self.T(), err)
vtesting.WaitUntil(time.Second, self.T(), func() bool {
res, err = launcher.GetFlows(self.Ctx, self.ConfigObj, "server",
result_sets.ResultSetOptions{}, 0, 10)
assert.NoError(self.T(), err)
return len(res.Items) == 0
})
assert.Equal(self.T(), len(res.Items), 0)

// Create the flow again
Expand Down
36 changes: 29 additions & 7 deletions vql/parsers/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"
"strconv"
"strings"
"time"

"github.com/Velocidex/ordereddict"
"google.golang.org/protobuf/reflect/protoreflect"
Expand Down Expand Up @@ -544,6 +545,7 @@ type WriteJSONPluginArgs struct {
Accessor string `vfilter:"optional,field=accessor,doc=The accessor to use"`
Query vfilter.StoredQuery `vfilter:"required,field=query,doc=query to write into the file."`
BufferSize int `vfilter:"optional,field=buffer_size,doc=Maximum size of buffer before flushing to file."`
MaxTime int `vfilter:"optional,field=max_time,doc=Maximum time before flushing the buffer (10 sec)."`
}

type WriteJSONPlugin struct{}
Expand All @@ -569,6 +571,11 @@ func (self WriteJSONPlugin) Call(
arg.BufferSize = BUFF_SIZE
}

max_time := 10 * time.Second
if arg.MaxTime > 0 {
max_time = time.Duration(arg.MaxTime) * time.Second
}

var writer *bufio.Writer

switch arg.Accessor {
Expand Down Expand Up @@ -605,18 +612,33 @@ func (self WriteJSONPlugin) Call(

lf := []byte("\n")

for row := range arg.Query.Eval(ctx, scope) {
serialized, err := json.Marshal(row)
if err == nil {
writer.Write(serialized)
writer.Write(lf)
}
events_chan := arg.Query.Eval(ctx, scope)

for {
select {
case <-ctx.Done():
return

case output_chan <- row:
case <-utils.GetTime().After(max_time):
writer.Flush()

case row, ok := <-events_chan:
if !ok {
return
}

serialized, err := json.Marshal(row)
if err == nil {
writer.Write(serialized)
writer.Write(lf)
}

select {
case <-ctx.Done():
return

case output_chan <- row:
}
}
}
}()
Expand Down

0 comments on commit cf6226c

Please sign in to comment.