Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples updated, README enhanced #15

Merged
merged 7 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Added the logic to automatically limit the request sizes for `BulkUpsert`, to avoid the ingestion errors
* Added the saving extra input fields as the (optional) additional JSON-formatted field named `.other`
* Supported the flexible schema parsers like `logfmt` on input
* Fixed the lost of same-time messages by (optionally) adding extra `.hash` field containing the `Cityhash64` computed over the record
* Fixed the loss of same-time messages by (optionally) adding extra `.hash` field containing the `Cityhash64` computed over the record's data fields

## v1.1.1
* Fixed Dockerfile for build with go1.22
Expand Down
58 changes: 50 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,59 @@

## Build

```makefile
Build prerequisites:

* [Golang](https://go.dev/dl/) v1.21 or later
* C compiler and linker suitable for the operating system used (needed to build the plugin's shared library)
* `make` utility

To build the plugin, run the following command:

```bash
BIN=out_ydb.so make build
```

# Usage
## Configuration

`fluent-bit -e out_ydb.so -c examples/tail2ydb.conf`
The plugin supports the following configuration settings:

Configuration file (there is an example in files):
```
ConnectionURL - connection url for YDB
Certificates - path to file with certificates or certificate content
TablePath - relative table path
| Parameter | Description |
|---------------|-------------|
| ConnectionURL | YDB connection URL, including the protocol, endpoint and database path (see the [documentation](https://ydb.tech/docs/en/concepts/connect)) |
| TablePath | Relative table path, may include the schema in form `SchemaName/TableName` |
| Columns | JSON structure mapping the fields of FluentBit record to the columns of target YDB table. May include the pseudo-fields listed below |
| CredentialsAnonymous | Configure as `1` for anonymous YDB authentication |
| CredentialsYcServiceAccountKeyFile | Set to the path of file containing the service account (SA) key, to use the SA key YDB authentication |
| CredentialsYcServiceAccountKeyJson | Set to the JSON data of the service account (SA) key instead of the filename (useful in K8s environment) |
| CredentialsYcMetadata | Configure as `1` for virtual machine metadata YDB authentication |
| CredentialsStatic | Username and password for YDB authentication, specified in the following format: `username:password@` |
| CredentialsToken | Custom token value, to use the token authentication YDB mode |
| Certificates | Path to the certificate authority (CA) trusted certificates file, or the literal trusted CA certificate value |
| LogLevel | Plugin specific logging level, should be one of `disabled`, `trace`, `debug`, `info`, `warn`, `error`, `fatal` or `panic` |

The following pseudo-fields are available, in addition to those available in the FluentBit record, to be mapped into the YDB table columns:

* `.timestamp` - record's timestamp, mandatory
* `.input` - record's input stream name, mandatory
* `.hash` - uint64 hash value computed over all the data fields (except the pseudo-fields), optional
* `.other` - the JSON document containing all the data fields which were not explicitly mapped to a field in the table, optional

## Usage example

YDB database should be available, either in the form of a local single-node setup (see the [Quickstart](https://ydb.tech/docs/en/quickstart) section in YDB Documentation), a fully [managed service](https://yandex.cloud/en/services/ydb), or as part of the YDB cluster installed on self-hosted resources.

FluentBit should be installed, either version 2 or 3.

In the [examples](./examples/) directory the following files are provided:

* [tail2ydb.sql](./examples/tail2ydb.sql) - example of YDB table structure to capture the log;
* [tail2ydb.conf](./examples/tail2ydb.conf) - example of FluentBit configuration to read from `/var/log/syslog` and write to YDB table;
* [docker-compose.yml](./examples/docker-compose.yml) - Docker Compose setup to run the single-node YDB instance for development or testing purposes.

Target table should be created in the YDB database prior to running FluentBit with the configuraton referencing it.

To run the example configuration, customize the YDB connection settings in the `tail2ydb.conf` file, and run the following command:

```bash
fluent-bit -e out_ydb.so -c examples/tail2ydb.conf
```
26 changes: 15 additions & 11 deletions examples/tail2ydb.conf
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
[SERVICE]
Flush 1

[INPUT]
Name tail
Path tail.log
Name tail
Path /var/log/syslog
Read_from_Head true

[OUTPUT]
Name ydb
ConnectionURL grpc://ydb-olap-perf-002.search.yandex.net:2135/olap-perf/deploy_logs
TablePath fluent/bit/log
Columns {".timestamp":"timestamp"},".input":"input","log":"message"}
# CredentialsYcServiceAccountKeyFile
# CredentialsYcServiceAccountKeyJson
# CredentialsYcMetadata
# CredentialsStatic
# CredentialsToken
Name ydb
ConnectionURL grpc://localhost:2136/Root/test
TablePath fluentbit/log
Columns {".timestamp":"timestamp", ".input":"input", ".hash":"datahash", "log":"message"}
CredentialsAnonymous 1
# CredentialsYcServiceAccountKeyFile sa-key.json
# CredentialsYcServiceAccountKeyJson json-data
# CredentialsYcMetadata 1
# CredentialsStatic username:password@
# CredentialsToken token-value
# Certificates ydb-ca.crt
LogLevel disabled # optional parameter. Value must be one of "disabled", "trace", "debug", "info", "warn", "error", "fatal" or "panic"
12 changes: 6 additions & 6 deletions examples/tail2ydb.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
CREATE TABLE `fluent/bit/log` (
CREATE TABLE `fluentbit/log` (
`timestamp` Timestamp NOT NULL,
`input` Text NOT NULL,
`input` Text NOT NULL,
`datahash` Uint64 NOT NULL,
`message` Text NOT NULL,
PRIMARY KEY (
`timestamp`, `input`
`timestamp`, `input`, `datahash`
)
)
PARTITION BY HASH(`timestamp`, `input`)
) PARTITION BY HASH(`timestamp`, `input`)
WITH (
STORE = COLUMN
)
);
5 changes: 0 additions & 5 deletions examples/tail2ydb_columns.json

This file was deleted.

32 changes: 13 additions & 19 deletions internal/storage/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,6 @@ func (s *YDB) ConvertRows(events []*model.Event) ([]types.Value, int, error) {
if hashUsed {
hashValue[field] = value
}
} else {
log.Debug(fmt.Sprintf("column for message key: %s (value: %v) not found, skipped", field, value))
}

continue
Expand Down Expand Up @@ -380,39 +378,33 @@ func (s *YDB) Write(events []*model.Event) error {
if portion > sz {
portion = sz
}
log.Debug(fmt.Sprintf("Got events block of size %d with portion %d and %d max bytes per row...",
sz, portion, maxrowbytes))
position := 0
for position < sz {
finish := position + portion
if finish > sz {
finish = sz
}
part := rows[position:finish]
log.Debug(fmt.Sprintf("...Processing positions [%d:%d], size %d", position, finish, len(part)))
err = s.db.Table().Do(context.Background(),
func(ctx context.Context, sess table.Session) error {
return sess.BulkUpsert(ctx, path.Join(s.db.Name(), s.cfg.TablePath), types.ListValue(part...))
},
)
if err != nil {
log.Debug(fmt.Sprintf("...BulkUpsert failed: %v", err))
if ydb.IsOperationErrorSchemeError(err) {
log.Warn("Detected scheme error, trying to resolve field mapping from table description")
resolveErr := s.resolveFieldMapping(context.Background())
if resolveErr != nil {
return errors.Join(err, resolveErr)
}
}

break
return err
}
log.Debug("...BulkUpsert succeeded")
position = finish
}

if ydb.IsOperationErrorSchemeError(err) {
log.Warn("Detected scheme error, trying to resolve field mapping from table description")
resolveErr := s.resolveFieldMapping(context.Background())
if resolveErr != nil {
return errors.Join(err, resolveErr)
}
}

return err
return nil
}

func (s *YDB) Exit() error {
Expand Down Expand Up @@ -467,12 +459,14 @@ func convertValueIfOptional(optional bool, v types.Value) types.Value {
}

const (
LenTimestamp3339 = 24
// Number of numerical characters after dot may be different.
// The longest one is probably this: 2024-05-02T12:36:13.395105207Z
LenTimestamp3339 = 22
)

func convertTimestamp(optional bool, v string) types.Value {
var err error
if len(v) == LenTimestamp3339 {
if len(v) >= LenTimestamp3339 {
var tv time.Time
tv, err = time.Parse(time.RFC3339, v)
if err == nil {
Expand Down
Loading