Skip to content

Commit

Permalink
Optimize software_titles query to use indexes (#25722)
Browse files Browse the repository at this point in the history
For #25160.

Measured improvement by splitting the MySQL query into two queries to
use the indexes more efficiently:
- ~8s vs ~100ms for ~30k entries in software_titles for ~1.7k incoming
software without bundle_identifier (linux software).
- ~1.64s vs ~2ms for 25k entries in software_titles and ~500 incoming
new software with bundle_identifier + ~200 new software without
bundle_identifier (macOS software).

---

- [X] Changes file added for user-visible changes in `changes/`,
`orbit/changes/` or `ee/fleetd-chrome/changes`.
See [Changes
files](https://github.com/fleetdm/fleet/blob/main/docs/Contributing/Committing-Changes.md#changes-files)
for more information.
- [X] Input data is properly validated, `SELECT *` is avoided, SQL
injection is prevented (using placeholders for values in statements)
- [X] Added support on fleet's osquery simulator `cmd/osquery-perf` for
new osquery data ingestion features.
- [X] Added/updated automated tests
- [x] A detailed QA plan exists on the associated ticket (if it isn't
there, work with the product group's QA engineer to add it)
- [X] Manual QA for all new/changed functionality
  • Loading branch information
lucasmrod authored Jan 23, 2025
1 parent 07416c2 commit 148d914
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 48 deletions.
2 changes: 2 additions & 0 deletions changes/25160-optimize-software-during-enrollment
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Optimized software ingestion queries to use existing DB indexes in the software titles table.
* Fixed a bug "software not found for checksum" in software ingestion transaction retries.
50 changes: 45 additions & 5 deletions cmd/osquery-perf/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
ubuntuSoftware []map[string]string

installerMetadataCache installer_cache.Metadata

linuxRandomBuildNumber = randomString(8)
)

func loadMacOSVulnerableSoftware() {
Expand Down Expand Up @@ -105,7 +107,7 @@ func loadExtraVulnerableSoftware() {
log.Printf("Loaded %d vulnerable vscode_extensions software", len(vsCodeExtensionsVulnerableSoftware))
}

func loadSoftwareItems(fs embed.FS, path string) []map[string]string {
func loadSoftwareItems(fs embed.FS, path string, source string) []map[string]string {
bz2, err := fs.Open(path)
if err != nil {
panic(err)
Expand All @@ -128,7 +130,7 @@ func loadSoftwareItems(fs embed.FS, path string) []map[string]string {
softwareRows = append(softwareRows, map[string]string{
"name": s.Name,
"version": s.Version,
"source": "programs",
"source": source,
})
}
return softwareRows
Expand All @@ -137,8 +139,8 @@ func loadSoftwareItems(fs embed.FS, path string) []map[string]string {
func init() {
loadMacOSVulnerableSoftware()
loadExtraVulnerableSoftware()
windowsSoftware = loadSoftwareItems(windowsSoftwareFS, "windows_11-software.json.bz2")
ubuntuSoftware = loadSoftwareItems(ubuntuSoftwareFS, "ubuntu_2204-software.json.bz2")
windowsSoftware = loadSoftwareItems(windowsSoftwareFS, "windows_11-software.json.bz2", "programs")
ubuntuSoftware = loadSoftwareItems(ubuntuSoftwareFS, "ubuntu_2204-software.json.bz2", "deb_packages")
}

type Stats struct {
Expand Down Expand Up @@ -498,6 +500,9 @@ type agent struct {

softwareInstaller softwareInstaller

linuxUniqueSoftwareVersion bool
linuxUniqueSoftwareTitle bool

// Software installed on the host via Fleet. Key is the software name + version + bundle identifier.
installedSoftware sync.Map

Expand Down Expand Up @@ -584,6 +589,8 @@ func newAgent(
disableScriptExec bool,
disableFleetDesktop bool,
loggerTLSMaxLines int,
linuxUniqueSoftwareVersion bool,
linuxUniqueSoftwareTitle bool,
) *agent {
var deviceAuthToken *string
if rand.Float64() <= orbitProb {
Expand Down Expand Up @@ -661,6 +668,9 @@ func newAgent(
softwareVSCodeExtensionsFailProb: softwareVSCodeExtensionsQueryFailureProb,
softwareInstaller: softwareInstaller,

linuxUniqueSoftwareVersion: linuxUniqueSoftwareVersion,
linuxUniqueSoftwareTitle: linuxUniqueSoftwareTitle,

macMDMClient: macMDMClient,
winMDMClient: winMDMClient,

Expand Down Expand Up @@ -2314,7 +2324,23 @@ func (a *agent) processQuery(name, query string, cachedResults *cachedResults) (
if ss == fleet.StatusOK {
switch a.os { //nolint:gocritic // ignore singleCaseSwitch
case "ubuntu":
results = ubuntuSoftware
results = make([]map[string]string, 0, len(ubuntuSoftware))
for _, s := range ubuntuSoftware {
softwareName := s["name"]
if a.linuxUniqueSoftwareTitle {
softwareName = fmt.Sprintf("%s-%d-%s", softwareName, a.agentIndex, linuxRandomBuildNumber)
}
version := s["version"]
if a.linuxUniqueSoftwareVersion {
version = fmt.Sprintf("1.2.%d-%s", a.agentIndex, linuxRandomBuildNumber)
}
m := map[string]string{
"name": softwareName,
"source": s["source"],
"version": version,
}
results = append(results, m)
}
a.installedSoftware.Range(func(key, value interface{}) bool {
results = append(results, value.(map[string]string))
return true
Expand Down Expand Up @@ -2684,6 +2710,18 @@ func main() {
uniqueSoftwareUninstallProb = flag.Float64("unique_software_uninstall_prob", 0.1, "Probability of uninstalling unique_software_uninstall_count common software/s")
uniqueVSCodeExtensionsSoftwareUninstallProb = flag.Float64("unique_vscode_extensions_software_uninstall_prob", 0.1, "Probability of uninstalling unique_vscode_extensions_software_uninstall_count common software/s")

// WARNING: This will generate massive amounts of entries in the software table,
// because linux devices report many individual software items, ~1600, compared to Windows around ~100s or macOS around ~500s.
//
// This flag can be used to load test software ingestion for Linux during enrollment (during enrollment all devices
// report software to Fleet, so the initial reads/inserts can be expensive).
linuxUniqueSoftwareVersion = flag.Bool("linux_unique_software_version", false, "Make version of software items on linux hosts unique. WARNING: This will generate massive amounts of entries in the software table, because linux devices report many individual software items (compared to Windows/macOS).")
// WARNING: This will generate massive amounts of entries in the software and software_titles tables,
//
// This flag can be used to load test software ingestion for Linux during enrollment (during enrollment all devices
// report software to Fleet, so the initial reads/inserts can be expensive).
linuxUniqueSoftwareTitle = flag.Bool("linux_unique_software_title", false, "Make name of software items on linux hosts unique. WARNING: This will generate massive amounts of titles which is not realistic but serves to test performance of software ingestion when processing large number of titles.")

vulnerableSoftwareCount = flag.Int("vulnerable_software_count", 10, "Number of vulnerable installed applications reported to fleet")
withLastOpenedSoftwareCount = flag.Int("with_last_opened_software_count", 10, "Number of applications that may report a last opened timestamp to fleet")
lastOpenedChangeProb = flag.Float64("last_opened_change_prob", 0.1, "Probability of last opened timestamp to be reported as changed [0, 1]")
Expand Down Expand Up @@ -2883,6 +2921,8 @@ func main() {
*disableScriptExec,
*disableFleetDesktop,
*loggerTLSMaxLines,
*linuxUniqueSoftwareVersion,
*linuxUniqueSoftwareTitle,
)
a.stats = stats
a.nodeKeyManager = nodeKeyManager
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ services:
# CAdvisor container allows monitoring other containers. Useful for
# development.
cadvisor:
image: google/cadvisor:latest
image: gcr.io/cadvisor/cadvisor:latest
ports:
- "5678:8080"
volumes:
Expand Down
35 changes: 33 additions & 2 deletions infrastructure/loadtesting/terraform/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ terraform apply -var tag=hosts-5k-test -var fleet_containers=5 -var db_instance_

# When adding loadtest containers.
terraform apply -var tag=hosts-5k-test -var fleet_containers=5 -var db_instance_type=db.t4g.medium -var redis_instance_type=cache.t4g.small -var -var loadtest_containers=10

```

### Deploying your code to the loadtesting environment

> IMPORTANT:
> - We advice to use a separate clone of the https://github.com/fleetdm/fleet repository because `terraform` operations are lengthy. Terraform uses the local files as the configuration files.
> - When performing a load test you target a specific branch and not `main` (referenced below as `$BRANCH_NAME`). The `main` branch changes often and it might trigger rebuilts of the images. The cloned repository that you will use to run the terraform operations doesn't need to be in `$BRANCH_NAME`, such `$BRANCH_NAME` is the Fleet version that will be deployed to the load test environment.
> - These scripts were tested with terraform 1.5.X.
> - These scripts were tested with terraform 1.10.4.
1. Push your `$BRANCH_NAME` branch to https://github.com/fleetdm/fleet and trigger a manual run of the [Docker publish](https://github.com/fleetdm/fleet/actions/workflows/goreleaser-snapshot-fleet.yaml) workflow (make sure to select the branch).
1. arm64 (M1/M2/etc) Mac Only: run `helpers/setup-darwin_arm64.sh` to build terraform plugins that lack arm64 builds in the registry. Alternatively, you can use the amd64 terraform binary, which works with Rosetta 2.
Expand Down Expand Up @@ -150,3 +149,35 @@ See https://www.terraform.io/internals/debugging for more details.
In a few instances, it is possible for an ECR repository to still have images left, preventing a full `terraform destroy` of a Loadtesting instance. Use the following one-liner to clean these up before re-running `terraform destroy`:

`REPOSITORY_NAME=fleet-$(terraform workspace show); aws ecr list-images --repository-name ${REPOSITORY_NAME} --query 'imageIds[*]' --output text | while read digest tag; do aws ecr batch-delete-image --repository-name ${REPOSITORY_NAME} --image-ids imageDigest=${digest}; done`

#### Errors with macOS Docker Desktop

If you are getting the following error when running `terraform apply`:
```sh
│ Error: Error pinging Docker server: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
│ with provider["registry.terraform.io/kreuzwerker/docker"],
│ on init.tf line 45, in provider "docker":
│ 45: provider "docker" {
```
Run:
```sh
$ docker context ls
NAME DESCRIPTION DOCKER ENDPOINT ERROR
default Current DOCKER_HOST based configuration unix:///var/run/docker.sock
desktop-linux * Docker Desktop unix:///Users/foobar/.docker/run/docker.sock
```
Then add the entry with `*`, in this case `host = unix:///Users/foobar/.docker/run/docker.sock` to `infrastructure/loadtesting/terraform/init.tf`:
```sh
[...]
provider "docker" {
# Configuration options
registry_auth {
address = "${data.aws_caller_identity.current.account_id}.dkr.ecr.us-east-2.amazonaws.com"
username = data.aws_ecr_authorization_token.token.user_name
password = data.aws_ecr_authorization_token.token.password
}
host = "unix:///Users/foobar/.docker/run/docker.sock"
}
[...]
```
8 changes: 6 additions & 2 deletions server/datastore/mysql/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,12 @@ func (ds *Datastore) DBLocks(ctx context.Context) ([]*fleet.DBLock, error) {
// Even though this is a Read, use the writer as we want the db locks from
// the primary database (the read replica should have little to no trx locks).
if err := ds.writer(ctx).SelectContext(ctx, &locks, stmt); err != nil {
// To read innodb tables, DB user must have PROCESS privilege
// This can be set by DB admin like: GRANT PROCESS,SELECT ON *.* TO 'fleet'@'%';
// To read innodb tables, the DB user must have PROCESS and SELECT privileges.
//
// This can be set by a DB admin by running:
// GRANT PROCESS,SELECT ON *.* TO 'fleet'@'%';
// FLUSH PRIVILEGES;
// Make sure to restart fleet after running the commands above.
if isMySQLAccessDenied(err) {
return nil, &accessDeniedError{
Message: "select locking information: DB user must have global PROCESS and SELECT privilege",
Expand Down
119 changes: 90 additions & 29 deletions server/datastore/mysql/software.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,15 @@ func (ds *Datastore) applyChangesForNewSoftwareDB(
}
r.Deleted = deleted

// Copy incomingByChecksum because ds.insertNewInstalledHostSoftwareDB is modifying it and we
// are runnning inside ds.withRetryTxx.
incomingByChecksumCopy := make(map[string]fleet.Software, len(incomingByChecksum))
for key, value := range incomingByChecksum {
incomingByChecksumCopy[key] = value
}

inserted, err := ds.insertNewInstalledHostSoftwareDB(
ctx, tx, hostID, existingSoftware, incomingByChecksum, existingTitlesForNewSoftware,
ctx, tx, hostID, existingSoftware, incomingByChecksumCopy, existingTitlesForNewSoftware,
)
if err != nil {
return err
Expand Down Expand Up @@ -495,52 +502,105 @@ func (ds *Datastore) getExistingSoftware(
if !ok {
// This should never happen. If it does, we have a bug.
return nil, nil, nil, ctxerr.New(
ctx, fmt.Sprintf("software not found for checksum %s", hex.EncodeToString([]byte(s.Checksum))),
ctx, fmt.Sprintf("current software: software not found for checksum %s", hex.EncodeToString([]byte(s.Checksum))),
)
}
delete(newSoftware, s.Checksum)
}
}

// Get software titles for new software, if any
incomingChecksumToTitle = make(map[string]fleet.SoftwareTitle, len(newSoftware))
if len(newSoftware) > 0 {
totalToProcess := len(newSoftware)
const numberOfArgsPerSoftwareTitle = 4 // number of ? in each WHERE clause
if len(newSoftware) == 0 {
return currentSoftware, incomingChecksumToSoftware, incomingChecksumToTitle, nil
}

// There's new software, so we try to get the titles already stored in `software_titles` for them.
incomingChecksumToTitle, err = ds.getIncomingSoftwareChecksumsToExistingTitles(ctx, newSoftware, incomingChecksumToSoftware)
if err != nil {
return nil, nil, nil, ctxerr.Wrap(ctx, err, "get incoming software checksums to existing titles")
}

return currentSoftware, incomingChecksumToSoftware, incomingChecksumToTitle, nil
}

// getIncomingSoftwareChecksumsToExistingTitles loads the existing titles for the new incoming software.
// It returns a map of software checksums to existing software titles.
//
// To make best use of separate indexes, it runs two queries to get the existing titles from the DB:
// - One query for software with bundle_identifier.
// - One query for software without bundle_identifier.
func (ds *Datastore) getIncomingSoftwareChecksumsToExistingTitles(
ctx context.Context,
newSoftwareChecksums map[string]struct{},
incomingChecksumToSoftware map[string]fleet.Software,
) (map[string]fleet.SoftwareTitle, error) {
var (
incomingChecksumToTitle = make(map[string]fleet.SoftwareTitle, len(newSoftwareChecksums))
argsWithoutBundleIdentifier []interface{}
argsWithBundleIdentifier []interface{}
uniqueTitleStrToChecksum = make(map[string]string)
)
for checksum := range newSoftwareChecksums {
sw := incomingChecksumToSoftware[checksum]
if sw.BundleIdentifier != "" {
argsWithBundleIdentifier = append(argsWithBundleIdentifier, sw.BundleIdentifier)
} else {
argsWithoutBundleIdentifier = append(argsWithoutBundleIdentifier, sw.Name, sw.Source, sw.Browser)
}
// Map software title identifier to software checksums so that we can map checksums to actual titles later.
uniqueTitleStrToChecksum[UniqueSoftwareTitleStr(sw.Name, sw.Source, sw.Browser)] = checksum
}

// Get titles for software without bundle_identifier.
if len(argsWithoutBundleIdentifier) > 0 {
whereClause := strings.TrimSuffix(
strings.Repeat(`
(
(bundle_identifier = ?) OR
(name = ? AND source = ? AND browser = ? AND bundle_identifier IS NULL)
) OR`, totalToProcess), " OR",
(name = ? AND source = ? AND browser = ?)
) OR`, len(argsWithoutBundleIdentifier)/3), " OR",
)
stmt := fmt.Sprintf(
"SELECT id, name, source, browser, COALESCE(bundle_identifier, '') as bundle_identifier FROM software_titles WHERE %s",
"SELECT id, name, source, browser FROM software_titles WHERE %s",
whereClause,
)
args := make([]interface{}, 0, totalToProcess*numberOfArgsPerSoftwareTitle)
uniqueTitleStrToChecksum := make(map[string]string, totalToProcess)
for checksum := range newSoftware {
sw := incomingChecksumToSoftware[checksum]
args = append(args, sw.BundleIdentifier, sw.Name, sw.Source, sw.Browser)
// Map software title identifier to software checksums so that we can map checksums to actual titles later.
uniqueTitleStrToChecksum[UniqueSoftwareTitleStr(sw.Name, sw.Source, sw.Browser)] = checksum
}
var existingSoftwareTitlesForNewSoftware []fleet.SoftwareTitle
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &existingSoftwareTitlesForNewSoftware, stmt, args...); err != nil {
return nil, nil, nil, ctxerr.Wrap(ctx, err, "get existing titles")
var existingSoftwareTitlesForNewSoftwareWithoutBundleIdentifier []fleet.SoftwareTitle
if err := sqlx.SelectContext(ctx,
ds.reader(ctx),
&existingSoftwareTitlesForNewSoftwareWithoutBundleIdentifier,
stmt,
argsWithoutBundleIdentifier...,
); err != nil {
return nil, ctxerr.Wrap(ctx, err, "get existing titles without bundle identifier")
}
for _, title := range existingSoftwareTitlesForNewSoftwareWithoutBundleIdentifier {
checksum, ok := uniqueTitleStrToChecksum[UniqueSoftwareTitleStr(title.Name, title.Source, title.Browser)]
if ok {
incomingChecksumToTitle[checksum] = title
}
}
}

// Get titles for software with bundle_identifier
if len(argsWithBundleIdentifier) > 0 {
incomingChecksumToTitle = make(map[string]fleet.SoftwareTitle, len(newSoftwareChecksums))
stmtBundleIdentifier := `SELECT id, name, source, browser, bundle_identifier FROM software_titles WHERE bundle_identifier IN (?)`
stmtBundleIdentifier, argsWithBundleIdentifier, err := sqlx.In(stmtBundleIdentifier, argsWithBundleIdentifier)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "build query to existing titles with bundle_identifier")
}
var existingSoftwareTitlesForNewSoftwareWithBundleIdentifier []fleet.SoftwareTitle
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &existingSoftwareTitlesForNewSoftwareWithBundleIdentifier, stmtBundleIdentifier, argsWithBundleIdentifier...); err != nil {
return nil, ctxerr.Wrap(ctx, err, "get existing titles with bundle_identifier")
}
// Map software titles to software checksums.
for _, title := range existingSoftwareTitlesForNewSoftware {
for _, title := range existingSoftwareTitlesForNewSoftwareWithBundleIdentifier {
checksum, ok := uniqueTitleStrToChecksum[UniqueSoftwareTitleStr(title.Name, title.Source, title.Browser)]
if ok {
incomingChecksumToTitle[checksum] = title
}
}
}

return currentSoftware, incomingChecksumToSoftware, incomingChecksumToTitle, nil
return incomingChecksumToTitle, nil
}

// UniqueSoftwareTitleStr creates a unique string representation of the software title
Expand Down Expand Up @@ -594,9 +654,10 @@ func computeRawChecksum(sw fleet.Software) ([]byte, error) {
return h.Sum(nil), nil
}

// Insert host_software that is in softwareChecksums map, but not in existingSoftware.
// Also insert any new software titles that are needed.
// returns the inserted software on the host
// insertNewInstalledHostSoftwareDB inserts host_software that is in softwareChecksums map,
// but not in existingSoftware. It also inserts any new software titles that are needed.
//
// It returns the inserted software on the host.
func (ds *Datastore) insertNewInstalledHostSoftwareDB(
ctx context.Context,
tx sqlx.ExtContext,
Expand All @@ -613,7 +674,7 @@ func (ds *Datastore) insertNewInstalledHostSoftwareDB(
for _, s := range existingSoftware {
software, ok := softwareChecksums[s.Checksum]
if !ok {
return nil, ctxerr.New(ctx, fmt.Sprintf("software not found for checksum %s", hex.EncodeToString([]byte(s.Checksum))))
return nil, ctxerr.New(ctx, fmt.Sprintf("existing software: software not found for checksum %q", hex.EncodeToString([]byte(s.Checksum))))
}
software.ID = s.ID
insertsHostSoftware = append(insertsHostSoftware, hostID, software.ID, software.LastOpenedAt)
Expand Down Expand Up @@ -751,7 +812,7 @@ func (ds *Datastore) insertNewInstalledHostSoftwareDB(
for _, s := range updatedExistingSoftware {
software, ok := softwareChecksums[s.Checksum]
if !ok {
return nil, ctxerr.New(ctx, fmt.Sprintf("software not found for checksum %s", hex.EncodeToString([]byte(s.Checksum))))
return nil, ctxerr.New(ctx, fmt.Sprintf("updated existing software: software not found for checksum %s", hex.EncodeToString([]byte(s.Checksum))))
}
software.ID = s.ID
insertsHostSoftware = append(insertsHostSoftware, hostID, software.ID, software.LastOpenedAt)
Expand Down
Loading

0 comments on commit 148d914

Please sign in to comment.