From 1461a411820b2b6329b217bb87f68877f4f5ec8c Mon Sep 17 00:00:00 2001 From: Dario Gieselaar Date: Wed, 22 Jan 2025 14:59:58 +0100 Subject: [PATCH 1/2] [Streams] Centralize error handling --- .../src/helpers/type_guards.ts | 4 +- .../streams/server/lib/streams/client.ts | 25 +- .../component_template_not_found_error.ts | 15 + .../errors/definition_id_invalid_error.ts | 15 + .../errors/definition_not_found_error.ts | 15 + .../errors/detected_mapping_failure_error.ts | 15 + .../streams/errors/fork_condition_missing.ts | 13 - .../server/lib/streams/errors/index.ts | 15 - .../errors/index_template_not_found.ts | 13 - .../errors/ingest_pipeline_not_found.ts | 13 - .../lib/streams/errors/malformed_children.ts | 13 - ...failure.ts => malformed_children_error.ts} | 8 +- .../lib/streams/errors/malformed_fields.ts | 13 - ...d_invalid.ts => malformed_fields_error.ts} | 8 +- .../lib/streams/errors/malformed_stream.ts | 13 - ...not_found.ts => malformed_stream_error.ts} | 8 +- .../lib/streams/errors/malformed_stream_id.ts | 13 - .../errors/malformed_stream_id_error.ts | 15 + .../streams/errors/non_additive_processor.ts | 13 - .../errors/non_additive_processor_error.ts | 15 + .../errors/root_stream_immutability_error.ts | 15 + .../root_stream_immutability_exception.ts | 13 - ...emplate_not_found.ts => security_error.ts} | 8 +- .../lib/streams/errors/security_exception.ts | 13 - ...n_failed.ts => simulation_failed_error.ts} | 8 +- .../{id_conflict_error.ts => status_error.ts} | 5 +- .../server/lib/streams/helpers/retry.ts | 6 - .../lib/streams/helpers/validate_fields.ts | 6 +- .../lib/streams/helpers/validate_stream.ts | 16 +- .../streams/server/lib/streams/stream_crud.ts | 5 +- .../server/routes/create_server_route.ts | 35 +- .../server/routes/streams/crud/route.ts | 166 +++----- .../server/routes/streams/enablement/route.ts | 30 +- .../server/routes/streams/ingest/route.ts | 100 ++--- .../server/routes/streams/management/route.ts | 170 +++----- .../server/routes/streams/processing/route.ts | 69 ++-- .../server/routes/streams/schema/route.ts | 370 +++++++++--------- 37 files changed, 553 insertions(+), 744 deletions(-) create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/component_template_not_found_error.ts create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_id_invalid_error.ts create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_not_found_error.ts create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure_error.ts delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index_template_not_found.ts delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_children.ts rename x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/{detected_mapping_failure.ts => malformed_children_error.ts} (63%) delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_fields.ts rename x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/{definition_id_invalid.ts => malformed_fields_error.ts} (63%) delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream.ts rename x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/{definition_not_found.ts => malformed_stream_error.ts} (63%) delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id_error.ts delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor_error.ts create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_error.ts delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts rename x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/{component_template_not_found.ts => security_error.ts} (65%) delete mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/security_exception.ts rename x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/{simulation_failed.ts => simulation_failed_error.ts} (72%) rename x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/{id_conflict_error.ts => status_error.ts} (71%) diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/type_guards.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/type_guards.ts index 14588be802d8f..acd3b33644344 100644 --- a/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/type_guards.ts +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/helpers/type_guards.ts @@ -8,7 +8,7 @@ import { ZodSchema, z } from '@kbn/zod'; export function createIsNarrowSchema( - base: TBaseSchema, + _base: TBaseSchema, narrow: TNarrowSchema ) { return >( @@ -19,7 +19,7 @@ export function createIsNarrowSchema( - base: TBaseSchema, + _base: TBaseSchema, narrow: TNarrowSchema ) { return >( diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts index 970dcc3b7497a..ef009f29d5710 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts @@ -30,8 +30,6 @@ import { } from '@kbn/streams-schema'; import { cloneDeep, keyBy, omit, orderBy } from 'lodash'; import { AssetClient } from './assets/asset_client'; -import { DefinitionNotFound, SecurityException } from './errors'; -import { MalformedStreamId } from './errors/malformed_stream_id'; import { syncUnwiredStreamDefinitionObjects, syncWiredStreamDefinitionObjects, @@ -50,6 +48,9 @@ import { deleteStreamObjects, deleteUnmanagedStreamObjects, } from './stream_crud'; +import { DefinitionNotFoundError } from './errors/definition_not_found_error'; +import { MalformedStreamIdError } from './errors/malformed_stream_id_error'; +import { SecurityError } from './errors/security_error'; interface AcknowledgeResponse { acknowledged: true; @@ -70,8 +71,8 @@ function isElasticsearch404(error: unknown): error is errors.ResponseError & { s return isResponseError(error) && error.statusCode === 404; } -function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFound { - return error instanceof DefinitionNotFound; +function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFoundError { + return error instanceof DefinitionNotFoundError; } export class StreamsClient { @@ -359,7 +360,7 @@ export class StreamsClient { parentDefinition && !isWiredStreamDefinition(parentDefinition) ) { - throw new MalformedStreamId('Cannot fork a stream that is not managed'); + throw new MalformedStreamIdError('Cannot fork a stream that is not managed'); } validateAncestorFields({ @@ -381,7 +382,7 @@ export class StreamsClient { continue; } if (!isChildOf(definition.name, item.destination)) { - throw new MalformedStreamId( + throw new MalformedStreamIdError( `The ID (${item.destination}) from the child stream must start with the parent's id (${definition.name}), followed by a dot and a name` ); } @@ -432,12 +433,12 @@ export class StreamsClient { // check whether root stream has a child of the given name already if (parentDefinition.ingest.routing.some((item) => item.destination === childDefinition.name)) { - throw new MalformedStreamId( + throw new MalformedStreamIdError( `The stream with ID (${name}) already exists as a child of the parent stream` ); } if (!isChildOf(parentDefinition.name, childDefinition.name)) { - throw new MalformedStreamId( + throw new MalformedStreamIdError( `The ID (${name}) from the new stream must start with the parent's id (${parentDefinition.name}), followed by a dot and a name` ); } @@ -480,7 +481,7 @@ export class StreamsClient { checkAccess({ id: name, scopedClusterClient: this.dependencies.scopedClusterClient }).then( (privileges) => { if (!privileges.read) { - throw new DefinitionNotFound(`Stream definition for ${name} not found`); + throw new DefinitionNotFoundError(`Stream definition for ${name} not found`); } } ), @@ -497,7 +498,7 @@ export class StreamsClient { }) .catch(async (error) => { if (isElasticsearch404(error)) { - throw new DefinitionNotFound(`Cannot find stream ${name}`); + throw new DefinitionNotFoundError(`Cannot find stream ${name}`); } throw error; }); @@ -708,7 +709,7 @@ export class StreamsClient { ]); if (!access.write) { - throw new SecurityException(`Cannot delete stream, insufficient privileges`); + throw new SecurityError(`Cannot delete stream, insufficient privileges`); } if (!definition) { @@ -717,7 +718,7 @@ export class StreamsClient { const parentId = getParentId(name); if (isWiredStreamDefinition(definition) && !parentId) { - throw new MalformedStreamId('Cannot delete root stream'); + throw new MalformedStreamIdError('Cannot delete root stream'); } await this.deleteStreamFromDefinition(definition); diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/component_template_not_found_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/component_template_not_found_error.ts new file mode 100644 index 0000000000000..4f9cf34b81b39 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/component_template_not_found_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class ComponentTemplateNotFoundError extends StatusError { + constructor(message: string) { + super(message, 404); + this.name = 'ComponentTemplateNotFoundError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_id_invalid_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_id_invalid_error.ts new file mode 100644 index 0000000000000..56730ba879ee7 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_id_invalid_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class DefinitionIdInvalidError extends StatusError { + constructor(message: string) { + super(message, 400); + this.name = 'DefinitionIdInvalidError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_not_found_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_not_found_error.ts new file mode 100644 index 0000000000000..77dddc64ea670 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_not_found_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class DefinitionNotFoundError extends StatusError { + constructor(message: string) { + super(message, 404); + this.name = 'DefinitionNotFoundError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure_error.ts new file mode 100644 index 0000000000000..532a96d26adb1 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class DetectedMappingFailureError extends StatusError { + constructor(message: string) { + super(message, 400); + this.name = 'DetectedMappingFailureError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts deleted file mode 100644 index 713751dbe4363..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class ForkConditionMissing extends Error { - constructor(message: string) { - super(message); - this.name = 'ForkConditionMissing'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts deleted file mode 100644 index eb03a5367d247..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index.ts +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export * from './definition_id_invalid'; -export * from './definition_not_found'; -export * from './id_conflict_error'; -export * from './security_exception'; -export * from './index_template_not_found'; -export * from './fork_condition_missing'; -export * from './component_template_not_found'; -export * from './root_stream_immutability_exception'; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index_template_not_found.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index_template_not_found.ts deleted file mode 100644 index 4f4735dd15fa1..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/index_template_not_found.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class IndexTemplateNotFound extends Error { - constructor(message: string) { - super(message); - this.name = 'IndexTemplateNotFound'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts deleted file mode 100644 index 8bf9bbd4933ce..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class IngestPipelineNotFound extends Error { - constructor(message: string) { - super(message); - this.name = 'IngestPipelineNotFound'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_children.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_children.ts deleted file mode 100644 index 699c4cdd5b1ef..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_children.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class MalformedChildren extends Error { - constructor(message: string) { - super(message); - this.name = 'MalformedChildren'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_children_error.ts similarity index 63% rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure.ts rename to x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_children_error.ts index b026b150b64e8..dfc5eaee1e7f4 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/detected_mapping_failure.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_children_error.ts @@ -5,9 +5,11 @@ * 2.0. */ -export class DetectedMappingFailure extends Error { +import { StatusError } from './status_error'; + +export class MalformedChildrenError extends StatusError { constructor(message: string) { - super(message); - this.name = 'DetectedMappingFailure'; + super(message, 400); + this.name = 'MalformedChildrenError'; } } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_fields.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_fields.ts deleted file mode 100644 index b8f7ac1392610..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_fields.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class MalformedFields extends Error { - constructor(message: string) { - super(message); - this.name = 'MalformedFields'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_fields_error.ts similarity index 63% rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts rename to x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_fields_error.ts index 817e8f67bf25d..58793302aa7b4 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_fields_error.ts @@ -5,9 +5,11 @@ * 2.0. */ -export class DefinitionIdInvalid extends Error { +import { StatusError } from './status_error'; + +export class MalformedFieldsError extends StatusError { constructor(message: string) { - super(message); - this.name = 'DefinitionIdInvalid'; + super(message, 400); + this.name = 'MalformedFieldsError'; } } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream.ts deleted file mode 100644 index 1e468d9ded7bf..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class MalformedStream extends Error { - constructor(message: string) { - super(message); - this.name = 'MalformedStream'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_not_found.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_error.ts similarity index 63% rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_not_found.ts rename to x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_error.ts index f7e60193baa5f..8c23fe24862ff 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/definition_not_found.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_error.ts @@ -5,9 +5,11 @@ * 2.0. */ -export class DefinitionNotFound extends Error { +import { StatusError } from './status_error'; + +export class MalformedStreamError extends StatusError { constructor(message: string) { - super(message); - this.name = 'DefinitionNotFound'; + super(message, 400); + this.name = 'MalformedStreamError'; } } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts deleted file mode 100644 index 2f988204c74b0..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class MalformedStreamId extends Error { - constructor(message: string) { - super(message); - this.name = 'MalformedStreamId'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id_error.ts new file mode 100644 index 0000000000000..5ce049f470d9e --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/malformed_stream_id_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class MalformedStreamIdError extends StatusError { + constructor(message: string) { + super(message, 400); + this.name = 'MalformedStreamIdError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts deleted file mode 100644 index 5a237fcf42f1e..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class NonAdditiveProcessor extends Error { - constructor(message: string) { - super(message); - this.name = 'NonAdditiveProcessor'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor_error.ts new file mode 100644 index 0000000000000..9e509fe8ca9f4 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class NonAdditiveProcessorError extends StatusError { + constructor(message: string) { + super(message, 400); + this.name = 'NonAdditiveProcessorError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_error.ts new file mode 100644 index 0000000000000..e62e71b68acb0 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class RootStreamImmutabilityError extends StatusError { + constructor(message: string) { + super(message, 400); + this.name = 'RootStreamImmutabilityError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts deleted file mode 100644 index 4b1573f0ff01b..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/root_stream_immutability_exception.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class RootStreamImmutabilityException extends Error { - constructor(message: string) { - super(message); - this.name = 'RootStreamImmutabilityException'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/component_template_not_found.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/security_error.ts similarity index 65% rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/component_template_not_found.ts rename to x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/security_error.ts index a7e9cebf98507..cbe273b394a4d 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/component_template_not_found.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/security_error.ts @@ -5,9 +5,11 @@ * 2.0. */ -export class ComponentTemplateNotFound extends Error { +import { StatusError } from './status_error'; + +export class SecurityError extends StatusError { constructor(message: string) { - super(message); - this.name = 'ComponentTemplateNotFound'; + super(message, 403); + this.name = 'SecurityError'; } } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/security_exception.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/security_exception.ts deleted file mode 100644 index 0b4ae450c2530..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/security_exception.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -export class SecurityException extends Error { - constructor(message: string) { - super(message); - this.name = 'SecurityException'; - } -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/simulation_failed.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/simulation_failed_error.ts similarity index 72% rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/simulation_failed.ts rename to x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/simulation_failed_error.ts index 28140b377403f..651500cf88fb4 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/simulation_failed.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/simulation_failed_error.ts @@ -6,15 +6,17 @@ */ import { errors } from '@elastic/elasticsearch'; +import { StatusError } from './status_error'; -export class SimulationFailed extends Error { +export class SimulationFailedError extends StatusError { constructor(error: errors.ResponseError) { super( error.body?.error?.reason || error.body?.error?.caused_by?.reason || error.message || - 'Unknown error' + 'Unknown error', + 400 ); - this.name = 'SimulationFailed'; + this.name = 'SimulationFailedError'; } } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/id_conflict_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/status_error.ts similarity index 71% rename from x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/id_conflict_error.ts rename to x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/status_error.ts index a24c7357379fa..a510aae623acc 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/id_conflict_error.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/status_error.ts @@ -5,9 +5,8 @@ * 2.0. */ -export class IdConflict extends Error { - constructor(message: string) { +export class StatusError extends Error { + constructor(message: string, public readonly statusCode: number) { super(message); - this.name = 'IdConflict'; } } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/retry.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/retry.ts index 32604a22bf9be..8d57191476177 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/retry.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/retry.ts @@ -8,7 +8,6 @@ import { setTimeout } from 'timers/promises'; import { errors as EsErrors } from '@elastic/elasticsearch'; import type { Logger } from '@kbn/logging'; -import { SecurityException } from '../errors'; const MAX_ATTEMPTS = 5; @@ -48,11 +47,6 @@ export const retryTransientEsErrors = async ( await setTimeout(retryDelaySec * 1000); return retryTransientEsErrors(esCall, { logger, attempt: retryCount }); } - - if (e.meta?.body?.error?.type === 'security_exception') { - throw new SecurityException(e.meta.body.error.reason); - } - throw e; } }; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_fields.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_fields.ts index f54f438d0705e..a9ff3235fa760 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_fields.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_fields.ts @@ -6,7 +6,7 @@ */ import { FieldDefinition, WiredStreamDefinition } from '@kbn/streams-schema'; -import { MalformedFields } from '../errors/malformed_fields'; +import { MalformedFieldsError } from '../errors/malformed_fields_error'; export function validateAncestorFields({ ancestors, @@ -24,7 +24,7 @@ export function validateAncestorFields({ attr.type !== fields[fieldName].type && ancestorFieldName === fieldName ) ) { - throw new MalformedFields( + throw new MalformedFieldsError( `Field ${fieldName} is already defined with incompatible type in the parent stream ${ancestor.name}` ); } @@ -48,7 +48,7 @@ export function validateDescendantFields({ attr.type !== fields[fieldName].type && descendantFieldName === fieldName ) ) { - throw new MalformedFields( + throw new MalformedFieldsError( `Field ${fieldName} is already defined with incompatible type in the child stream ${descendant.name}` ); } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts index 305e5aeffbb79..67828477c4bf4 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts @@ -12,9 +12,9 @@ import { isWiredStreamDefinition, } from '@kbn/streams-schema'; import { difference, isEqual } from 'lodash'; -import { RootStreamImmutabilityException } from '../errors'; -import { MalformedStream } from '../errors/malformed_stream'; -import { MalformedChildren } from '../errors/malformed_children'; +import { MalformedChildrenError } from '../errors/malformed_children_error'; +import { MalformedStreamError } from '../errors/malformed_stream_error'; +import { RootStreamImmutabilityError } from '../errors/root_stream_immutability_error'; /* * Changes to mappings (fields) and processing rules are not allowed on the root stream. @@ -30,7 +30,7 @@ export function validateRootStreamChanges( ); if (hasFieldChanges) { - throw new RootStreamImmutabilityException('Root stream fields cannot be changed'); + throw new RootStreamImmutabilityError('Root stream fields cannot be changed'); } const hasProcessingChanges = !isEqual( @@ -39,7 +39,7 @@ export function validateRootStreamChanges( ); if (hasProcessingChanges) { - throw new RootStreamImmutabilityException('Root stream processing rules cannot be changed'); + throw new RootStreamImmutabilityError('Root stream processing rules cannot be changed'); } } @@ -55,7 +55,7 @@ export function validateStreamTypeChanges( isWiredStreamDefinition(nextStreamDefinition); if (fromUnwiredToWired) { - throw new MalformedStream('Cannot change unwired stream to wired stream'); + throw new MalformedStreamError('Cannot change unwired stream to wired stream'); } const fromWiredToUnwired = @@ -63,7 +63,7 @@ export function validateStreamTypeChanges( isUnwiredStreamDefinition(nextStreamDefinition); if (fromWiredToUnwired) { - throw new MalformedStream('Cannot change wired stream to unwired stream'); + throw new MalformedStreamError('Cannot change wired stream to unwired stream'); } } @@ -85,6 +85,6 @@ export function validateStreamChildrenChanges( const removedChildren = difference(existingChildren, nextChildren); if (removedChildren.length) { - throw new MalformedChildren('Cannot remove children from a stream via updates'); + throw new MalformedChildrenError('Cannot remove children from a stream via updates'); } } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts index 3508559f22940..5aebb745f727e 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts @@ -12,11 +12,11 @@ import { IngestStreamLifecycle } from '@kbn/streams-schema/src/models/ingest/com import { deleteComponent } from './component_templates/manage_component_templates'; import { getComponentTemplateName } from './component_templates/name'; import { deleteDataStream } from './data_streams/manage_data_streams'; -import { DefinitionNotFound } from './errors'; import { deleteTemplate } from './index_templates/manage_index_templates'; import { getIndexTemplateName } from './index_templates/name'; import { deleteIngestPipeline } from './ingest_pipelines/manage_ingest_pipelines'; import { getProcessingPipelineName, getReroutePipelineName } from './ingest_pipelines/name'; +import { DefinitionNotFoundError } from './errors/definition_not_found_error'; interface BaseParams { scopedClusterClient: IScopedClusterClient; @@ -278,8 +278,9 @@ async function getDataStream({ throw e; } } + if (!dataStream) { - throw new DefinitionNotFound(`Stream definition for ${name} not found.`); + throw new DefinitionNotFoundError(`Stream definition for ${name} not found.`); } return dataStream; } diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts index 94d85a71c82bb..83eec795f015d 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts @@ -6,6 +6,39 @@ */ import { createServerRouteFactory } from '@kbn/server-route-repository'; +import { CreateServerRouteFactory } from '@kbn/server-route-repository-utils/src/typings'; +import { badRequest, forbidden, internal, notFound } from '@hapi/boom'; +import { errors } from '@elastic/elasticsearch'; import { StreamsRouteHandlerResources } from './types'; +import { StatusError } from '../lib/streams/errors/status_error'; -export const createServerRoute = createServerRouteFactory(); +const createPlainStreamsServerRoute = createServerRouteFactory(); + +export const createServerRoute: CreateServerRouteFactory< + StreamsRouteHandlerResources, + undefined +> = ({ handler, ...config }) => { + return createPlainStreamsServerRoute({ + ...config, + handler: (options) => { + return handler(options).catch((error) => { + if (error instanceof StatusError || error instanceof errors.ResponseError) { + switch (error.statusCode) { + case 400: + throw badRequest(error); + + case 403: + throw forbidden(error); + + case 404: + throw notFound(error); + + case 500: + throw internal(error); + } + } + throw error; + }); + }, + }); +}; diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts index c6fb60260a5d0..ee2deb2c63115 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts @@ -6,22 +6,13 @@ */ import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types'; -import { badRequest, internal, notFound } from '@hapi/boom'; -import { isResponseError } from '@kbn/es-errors'; import { StreamDefinition, StreamGetResponse, streamUpsertRequestSchema, } from '@kbn/streams-schema'; import { z } from '@kbn/zod'; -import { - DefinitionNotFound, - ForkConditionMissing, - IndexTemplateNotFound, - RootStreamImmutabilityException, - SecurityException, -} from '../../../lib/streams/errors'; -import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id'; +import { UpsertStreamResponse } from '../../../lib/streams/client'; import { createServerRoute } from '../../create_server_route'; import { readStream } from './read_stream'; @@ -41,26 +32,18 @@ export const readStreamRoute = createServerRoute({ path: z.object({ id: z.string() }), }), handler: async ({ params, request, getScopedClients }): Promise => { - try { - const { assetClient, streamsClient, scopedClusterClient } = await getScopedClients({ - request, - }); - - const body = await readStream({ - name: params.path.id, - assetClient, - scopedClusterClient, - streamsClient, - }); - - return body; - } catch (e) { - if (e instanceof DefinitionNotFound || (isResponseError(e) && e.statusCode === 404)) { - throw notFound(e); - } - - throw internal(e); - } + const { assetClient, streamsClient, scopedClusterClient } = await getScopedClients({ + request, + }); + + const body = await readStream({ + name: params.path.id, + assetClient, + scopedClusterClient, + streamsClient, + }); + + return body; }, }); @@ -90,41 +73,33 @@ export const streamDetailRoute = createServerRoute({ }), }), handler: async ({ params, request, getScopedClients }): Promise => { - try { - const { scopedClusterClient, streamsClient } = await getScopedClients({ request }); - const streamEntity = await streamsClient.getStream(params.path.id); - - // check doc count - const docCountResponse = await scopedClusterClient.asCurrentUser.search({ - index: streamEntity.name, - body: { - track_total_hits: true, - query: { - range: { - '@timestamp': { - gte: params.query.start, - lte: params.query.end, - }, + const { scopedClusterClient, streamsClient } = await getScopedClients({ request }); + const streamEntity = await streamsClient.getStream(params.path.id); + + // check doc count + const docCountResponse = await scopedClusterClient.asCurrentUser.search({ + index: streamEntity.name, + body: { + track_total_hits: true, + query: { + range: { + '@timestamp': { + gte: params.query.start, + lte: params.query.end, }, }, - size: 0, }, - }); - - const count = (docCountResponse.hits.total as SearchTotalHits).value; + size: 0, + }, + }); - return { - details: { - count, - }, - }; - } catch (e) { - if (e instanceof DefinitionNotFound) { - throw notFound(e); - } + const count = (docCountResponse.hits.total as SearchTotalHits).value; - throw internal(e); - } + return { + details: { + count, + }, + }; }, }); @@ -142,18 +117,10 @@ export const listStreamsRoute = createServerRoute({ }, params: z.object({}), handler: async ({ request, getScopedClients }): Promise<{ streams: StreamDefinition[] }> => { - try { - const { streamsClient } = await getScopedClients({ request }); - return { - streams: await streamsClient.listStreams(), - }; - } catch (e) { - if (e instanceof DefinitionNotFound) { - throw notFound(e); - } - - throw internal(e); - } + const { streamsClient } = await getScopedClients({ request }); + return { + streams: await streamsClient.listStreams(), + }; }, }); @@ -175,30 +142,13 @@ export const editStreamRoute = createServerRoute({ }), body: streamUpsertRequestSchema, }), - handler: async ({ params, request, getScopedClients }) => { - try { - const { streamsClient } = await getScopedClients({ request }); - - return await streamsClient.upsertStream({ - request: params.body, - name: params.path.id, - }); - } catch (e) { - if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { - throw notFound(e); - } - - if ( - e instanceof SecurityException || - e instanceof ForkConditionMissing || - e instanceof MalformedStreamId || - e instanceof RootStreamImmutabilityException - ) { - throw badRequest(e); - } + handler: async ({ params, request, getScopedClients }): Promise => { + const { streamsClient } = await getScopedClients({ request }); - throw internal(e); - } + return await streamsClient.upsertStream({ + request: params.body, + name: params.path.id, + }); }, }); @@ -220,29 +170,11 @@ export const deleteStreamRoute = createServerRoute({ }), }), handler: async ({ params, request, getScopedClients }): Promise<{ acknowledged: true }> => { - try { - const { streamsClient } = await getScopedClients({ - request, - }); - - await streamsClient.deleteStream(params.path.id); - - return { acknowledged: true }; - } catch (e) { - if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { - throw notFound(e); - } - - if ( - e instanceof SecurityException || - e instanceof ForkConditionMissing || - e instanceof MalformedStreamId - ) { - throw badRequest(e); - } + const { streamsClient } = await getScopedClients({ + request, + }); - throw internal(e); - } + return await streamsClient.deleteStream(params.path.id); }, }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts index a7021bbddd8c8..7eafbeb4e7056 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts @@ -5,11 +5,9 @@ * 2.0. */ -import { badRequest, internal } from '@hapi/boom'; import { z } from '@kbn/zod'; -import { SecurityException } from '../../../lib/streams/errors'; -import { createServerRoute } from '../../create_server_route'; import { DisableStreamsResponse, EnableStreamsResponse } from '../../../lib/streams/client'; +import { createServerRoute } from '../../create_server_route'; export const enableStreamsRoute = createServerRoute({ endpoint: 'POST /api/streams/_enable', @@ -25,18 +23,11 @@ export const enableStreamsRoute = createServerRoute({ }, }, handler: async ({ request, getScopedClients }): Promise => { - try { - const { streamsClient } = await getScopedClients({ - request, - }); + const { streamsClient } = await getScopedClients({ + request, + }); - return await streamsClient.enableStreams(); - } catch (e) { - if (e instanceof SecurityException) { - throw badRequest(e); - } - throw internal(e); - } + return await streamsClient.enableStreams(); }, }); @@ -52,16 +43,9 @@ export const disableStreamsRoute = createServerRoute({ }, }, handler: async ({ request, getScopedClients }): Promise => { - try { - const { streamsClient } = await getScopedClients({ request }); + const { streamsClient } = await getScopedClients({ request }); - return await streamsClient.disableStreams(); - } catch (e) { - if (e instanceof SecurityException) { - throw badRequest(e); - } - throw internal(e); - } + return await streamsClient.disableStreams(); }, }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/ingest/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/ingest/route.ts index 83def3e1248f5..29753fd100c10 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/ingest/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/ingest/route.ts @@ -5,8 +5,7 @@ * 2.0. */ -import { badRequest, internal, notFound } from '@hapi/boom'; -import { isResponseError } from '@kbn/es-errors'; +import { badRequest } from '@hapi/boom'; import { IngestGetResponse, StreamUpsertRequest, @@ -15,14 +14,6 @@ import { isWiredStreamDefinition, } from '@kbn/streams-schema'; import { z } from '@kbn/zod'; -import { - DefinitionNotFound, - ForkConditionMissing, - IndexTemplateNotFound, - RootStreamImmutabilityException, - SecurityException, -} from '../../../lib/streams/errors'; -import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id'; import { createServerRoute } from '../../create_server_route'; const readIngestRoute = createServerRoute({ @@ -41,31 +32,23 @@ const readIngestRoute = createServerRoute({ path: z.object({ id: z.string() }), }), handler: async ({ params, request, getScopedClients }): Promise => { - try { - const { streamsClient } = await getScopedClients({ - request, - }); - - const name = params.path.id; + const { streamsClient } = await getScopedClients({ + request, + }); - const definition = await streamsClient.getStream(name); + const name = params.path.id; - if (isWiredStreamDefinition(definition)) { - return { ingest: definition.ingest }; - } + const definition = await streamsClient.getStream(name); - if (isUnwiredStreamDefinition(definition)) { - return { ingest: definition.ingest }; - } - - throw badRequest(`Stream is not an ingest stream`); - } catch (e) { - if (e instanceof DefinitionNotFound || (isResponseError(e) && e.statusCode === 404)) { - throw notFound(e); - } + if (isWiredStreamDefinition(definition)) { + return { ingest: definition.ingest }; + } - throw internal(e); + if (isUnwiredStreamDefinition(definition)) { + return { ingest: definition.ingest }; } + + throw badRequest(`Stream is not an ingest stream`); }, }); @@ -88,49 +71,32 @@ const upsertIngestRoute = createServerRoute({ body: ingestUpsertRequestSchema, }), handler: async ({ params, request, getScopedClients }) => { - try { - const { streamsClient, assetClient } = await getScopedClients({ - request, - }); - - const name = params.path.id; + const { streamsClient, assetClient } = await getScopedClients({ + request, + }); - const assets = await assetClient.getAssets({ - entityId: name, - entityType: 'stream', - }); + const name = params.path.id; - const ingestUpsertRequest = params.body; + const assets = await assetClient.getAssets({ + entityId: name, + entityType: 'stream', + }); - const dashboards = assets - .filter((asset) => asset.assetType === 'dashboard') - .map((asset) => asset.assetId); + const ingestUpsertRequest = params.body; - const upsertRequest = { - dashboards, - stream: ingestUpsertRequest, - } as StreamUpsertRequest; + const dashboards = assets + .filter((asset) => asset.assetType === 'dashboard') + .map((asset) => asset.assetId); - return await streamsClient.upsertStream({ - request: upsertRequest, - name: params.path.id, - }); - } catch (e) { - if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { - throw notFound(e); - } + const upsertRequest = { + dashboards, + stream: ingestUpsertRequest, + } as StreamUpsertRequest; - if ( - e instanceof SecurityException || - e instanceof ForkConditionMissing || - e instanceof MalformedStreamId || - e instanceof RootStreamImmutabilityException - ) { - throw badRequest(e); - } - - throw internal(e); - } + return await streamsClient.upsertStream({ + request: upsertRequest, + name: params.path.id, + }); }, }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/management/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/management/route.ts index c9f659d4f8d51..3f20d18541f30 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/management/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/management/route.ts @@ -5,23 +5,14 @@ * 2.0. */ -import { z } from '@kbn/zod'; -import { badRequest, internal, notFound } from '@hapi/boom'; import { conditionSchema } from '@kbn/streams-schema'; -import { errors } from '@elastic/elasticsearch'; -import { - DefinitionNotFound, - ForkConditionMissing, - IndexTemplateNotFound, - RootStreamImmutabilityException, - SecurityException, -} from '../../../lib/streams/errors'; -import { createServerRoute } from '../../create_server_route'; -import { checkAccess } from '../../../lib/streams/stream_crud'; -import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id'; -import { conditionToQueryDsl } from '../../../lib/streams/helpers/condition_to_query_dsl'; -import { getFields } from '../../../lib/streams/helpers/condition_fields'; +import { z } from '@kbn/zod'; import { ResyncStreamsResponse } from '../../../lib/streams/client'; +import { getFields } from '../../../lib/streams/helpers/condition_fields'; +import { conditionToQueryDsl } from '../../../lib/streams/helpers/condition_to_query_dsl'; +import { checkAccess } from '../../../lib/streams/stream_crud'; +import { createServerRoute } from '../../create_server_route'; +import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error'; export const forkStreamsRoute = createServerRoute({ endpoint: 'POST /api/streams/{id}/_fork', @@ -42,36 +33,15 @@ export const forkStreamsRoute = createServerRoute({ body: z.object({ stream: z.object({ name: z.string() }), if: conditionSchema }), }), handler: async ({ params, request, getScopedClients }): Promise<{ acknowledged: true }> => { - try { - if (!params.body.if) { - throw new ForkConditionMissing('You must provide a condition to fork a stream'); - } - - const { streamsClient } = await getScopedClients({ - request, - }); - - return await streamsClient.forkStream({ - parent: params.path.id, - if: params.body.if, - name: params.body.stream.name, - }); - } catch (e) { - if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { - throw notFound(e); - } - - if ( - e instanceof SecurityException || - e instanceof ForkConditionMissing || - e instanceof MalformedStreamId || - e instanceof RootStreamImmutabilityException - ) { - throw badRequest(e); - } - - throw internal(e); - } + const { streamsClient } = await getScopedClients({ + request, + }); + + return await streamsClient.forkStream({ + parent: params.path.id, + if: params.body.if, + name: params.body.stream.name, + }); }, }); @@ -136,73 +106,63 @@ export const sampleStreamRoute = createServerRoute({ }), }), handler: async ({ params, request, getScopedClients }): Promise<{ documents: unknown[] }> => { - try { - const { scopedClusterClient } = await getScopedClients({ request }); + const { scopedClusterClient } = await getScopedClients({ request }); - const { read } = await checkAccess({ id: params.path.id, scopedClusterClient }); - if (!read) { - throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`); - } + const { read } = await checkAccess({ id: params.path.id, scopedClusterClient }); - const { if: condition, start, end, size } = params.body; - const searchBody = { - query: { - bool: { - must: [ - condition ? conditionToQueryDsl(condition) : { match_all: {} }, - { - range: { - '@timestamp': { - gte: start, - lte: end, - format: 'epoch_millis', - }, + if (!read) { + throw new DefinitionNotFoundError(`Stream definition for ${params.path.id} not found`); + } + + const { if: condition, start, end, size } = params.body; + const searchBody = { + query: { + bool: { + must: [ + condition ? conditionToQueryDsl(condition) : { match_all: {} }, + { + range: { + '@timestamp': { + gte: start, + lte: end, + format: 'epoch_millis', }, }, - ], - }, - }, - // Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as. - // Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during - // ingest in the painless condition checks. - // This is less efficient than it could be - in some cases, these fields _are_ indexed with the right type and we could use them directly. - // This can be optimized in the future. - runtime_mappings: condition - ? Object.fromEntries( - getFields(condition).map((field) => [ - field.name, - { type: field.type === 'string' ? 'keyword' : 'double' }, - ]) - ) - : undefined, - sort: [ - { - '@timestamp': { - order: 'desc', }, + ], + }, + }, + // Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as. + // Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during + // ingest in the painless condition checks. + // This is less efficient than it could be - in some cases, these fields _are_ indexed with the right type and we could use them directly. + // This can be optimized in the future. + runtime_mappings: condition + ? Object.fromEntries( + getFields(condition).map((field) => [ + field.name, + { type: field.type === 'string' ? 'keyword' : 'double' }, + ]) + ) + : undefined, + sort: [ + { + '@timestamp': { + order: 'desc', }, - ], - terminate_after: size, - track_total_hits: false, - size, - }; - const results = await scopedClusterClient.asCurrentUser.search({ - index: params.path.id, - allow_no_indices: true, - ...searchBody, - }); - - return { documents: results.hits.hits.map((hit) => hit._source) }; - } catch (error) { - if (error instanceof errors.ResponseError && error.meta.statusCode === 404) { - throw notFound(error); - } - if (error instanceof DefinitionNotFound) { - throw notFound(error); - } + }, + ], + terminate_after: size, + track_total_hits: false, + size, + }; + const results = await scopedClusterClient.asCurrentUser.search({ + index: params.path.id, + allow_no_indices: true, + ...searchBody, + }); - throw internal(error); - } + return { documents: results.hits.hits.map((hit) => hit._source) }; }, }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts index b5c64adc4b0bb..67e8fd268c515 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/route.ts @@ -7,7 +7,6 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import { badRequest, internal, notFound } from '@hapi/boom'; import { IScopedClusterClient } from '@kbn/core/server'; import { calculateObjectDiff, flattenObject } from '@kbn/object-utils'; import { @@ -17,13 +16,13 @@ import { } from '@kbn/streams-schema'; import { z } from '@kbn/zod'; import { isEmpty } from 'lodash'; -import { DefinitionNotFound } from '../../../lib/streams/errors'; -import { DetectedMappingFailure } from '../../../lib/streams/errors/detected_mapping_failure'; -import { NonAdditiveProcessor } from '../../../lib/streams/errors/non_additive_processor'; -import { SimulationFailed } from '../../../lib/streams/errors/simulation_failed'; import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing'; import { checkAccess } from '../../../lib/streams/stream_crud'; import { createServerRoute } from '../../create_server_route'; +import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error'; +import { SimulationFailedError } from '../../../lib/streams/errors/simulation_failed_error'; +import { DetectedMappingFailureError } from '../../../lib/streams/errors/detected_mapping_failure_error'; +import { NonAdditiveProcessorError } from '../../../lib/streams/errors/non_additive_processor_error'; const paramsSchema = z.object({ path: z.object({ id: z.string() }), @@ -50,41 +49,27 @@ export const simulateProcessorRoute = createServerRoute({ }, params: paramsSchema, handler: async ({ params, request, getScopedClients }) => { - try { - const { scopedClusterClient } = await getScopedClients({ request }); - - const { read } = await checkAccess({ id: params.path.id, scopedClusterClient }); - if (!read) { - throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`); - } - - const simulationBody = prepareSimulationBody(params); - - const simulationResult = await executeSimulation(scopedClusterClient, simulationBody); - - const simulationDiffs = prepareSimulationDiffs(simulationResult, simulationBody.docs); - - assertSimulationResult(simulationResult, simulationDiffs); - - return prepareSimulationResponse( - simulationResult, - simulationBody.docs, - simulationDiffs, - params.body.detected_fields - ); - } catch (error) { - if (error instanceof DefinitionNotFound) { - throw notFound(error); - } - if ( - error instanceof SimulationFailed || - error instanceof NonAdditiveProcessor || - error instanceof DetectedMappingFailure - ) { - throw badRequest(error); - } - throw internal(error); + const { scopedClusterClient } = await getScopedClients({ request }); + + const { read } = await checkAccess({ id: params.path.id, scopedClusterClient }); + if (!read) { + throw new DefinitionNotFoundError(`Stream definition for ${params.path.id} not found.`); } + + const simulationBody = prepareSimulationBody(params); + + const simulationResult = await executeSimulation(scopedClusterClient, simulationBody); + + const simulationDiffs = prepareSimulationDiffs(simulationResult, simulationBody.docs); + + assertSimulationResult(simulationResult, simulationDiffs); + + return prepareSimulationResponse( + simulationResult, + simulationBody.docs, + simulationDiffs, + params.body.detected_fields + ); }, }); @@ -137,7 +122,7 @@ const executeSimulation = async ( body: simulationBody, }); } catch (error) { - throw new SimulationFailed(error); + throw new SimulationFailedError(error); } }; @@ -148,14 +133,14 @@ const assertSimulationResult = ( // Assert mappings are compatible with the documents const entryWithError = simulationResult.docs.find(isMappingFailure); if (entryWithError) { - throw new DetectedMappingFailure( + throw new DetectedMappingFailureError( `The detected field types might not be compatible with these documents. ${entryWithError.doc.error.reason}` ); } // Assert that the processors are purely additive to the documents const updatedFields = computeUpdatedFields(simulationDiffs); if (!isEmpty(updatedFields)) { - throw new NonAdditiveProcessor( + throw new NonAdditiveProcessorError( `The processor is not additive to the documents. It might update fields [${updatedFields.join()}]` ); } diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/route.ts index 1bc0b5ff6fa51..50fd327684444 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/route.ts @@ -5,12 +5,11 @@ * 2.0. */ import { z } from '@kbn/zod'; -import { internal, notFound } from '@hapi/boom'; import { getFlattenedObject } from '@kbn/std'; import { fieldDefinitionConfigSchema, isWiredStreamDefinition } from '@kbn/streams-schema'; -import { DefinitionNotFound } from '../../../lib/streams/errors'; import { checkAccess } from '../../../lib/streams/stream_crud'; import { createServerRoute } from '../../create_server_route'; +import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error'; const UNMAPPED_SAMPLE_SIZE = 500; @@ -30,60 +29,52 @@ export const unmappedFieldsRoute = createServerRoute({ path: z.object({ id: z.string() }), }), handler: async ({ params, request, getScopedClients }): Promise<{ unmappedFields: string[] }> => { - try { - const { scopedClusterClient, streamsClient } = await getScopedClients({ request }); - - const searchBody = { - sort: [ - { - '@timestamp': { - order: 'desc', - }, + const { scopedClusterClient, streamsClient } = await getScopedClients({ request }); + + const searchBody = { + sort: [ + { + '@timestamp': { + order: 'desc', }, - ], - size: UNMAPPED_SAMPLE_SIZE, - }; + }, + ], + size: UNMAPPED_SAMPLE_SIZE, + }; + + const [streamDefinition, ancestors, results] = await Promise.all([ + streamsClient.getStream(params.path.id), + streamsClient.getAncestors(params.path.id), + scopedClusterClient.asCurrentUser.search({ + index: params.path.id, + ...searchBody, + }), + ]); - const [streamDefinition, ancestors, results] = await Promise.all([ - streamsClient.getStream(params.path.id), - streamsClient.getAncestors(params.path.id), - scopedClusterClient.asCurrentUser.search({ - index: params.path.id, - ...searchBody, - }), - ]); - - const sourceFields = new Set(); - - results.hits.hits.forEach((hit) => { - Object.keys(getFlattenedObject(hit._source as Record)).forEach((field) => { - sourceFields.add(field); - }); - }); + const sourceFields = new Set(); - // Mapped fields from the stream's definition and inherited from ancestors - const mappedFields = new Set(); + results.hits.hits.forEach((hit) => { + Object.keys(getFlattenedObject(hit._source as Record)).forEach((field) => { + sourceFields.add(field); + }); + }); - if (isWiredStreamDefinition(streamDefinition)) { - Object.keys(streamDefinition.ingest.wired.fields).forEach((name) => mappedFields.add(name)); - } + // Mapped fields from the stream's definition and inherited from ancestors + const mappedFields = new Set(); - for (const ancestor of ancestors) { - Object.keys(ancestor.ingest.wired.fields).forEach((name) => mappedFields.add(name)); - } + if (isWiredStreamDefinition(streamDefinition)) { + Object.keys(streamDefinition.ingest.wired.fields).forEach((name) => mappedFields.add(name)); + } - const unmappedFields = Array.from(sourceFields) - .filter((field) => !mappedFields.has(field)) - .sort(); + for (const ancestor of ancestors) { + Object.keys(ancestor.ingest.wired.fields).forEach((name) => mappedFields.add(name)); + } - return { unmappedFields }; - } catch (e) { - if (e instanceof DefinitionNotFound) { - throw notFound(e); - } + const unmappedFields = Array.from(sourceFields) + .filter((field) => !mappedFields.has(field)) + .sort(); - throw internal(e); - } + return { unmappedFields }; }, }); @@ -118,165 +109,158 @@ export const schemaFieldsSimulationRoute = createServerRoute({ simulationError: string | null; documentsWithRuntimeFieldsApplied: unknown[] | null; }> => { - try { - const { scopedClusterClient } = await getScopedClients({ request }); - - const { read } = await checkAccess({ id: params.path.id, scopedClusterClient }); - if (!read) { - throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`); - } - - const propertiesForSample = Object.fromEntries( - params.body.field_definitions.map((field) => [field.name, { type: 'keyword' }]) - ); - - const documentSamplesSearchBody = { - // Add keyword runtime mappings so we can pair with exists, this is to attempt to "miss" less documents for the simulation. - runtime_mappings: propertiesForSample, - query: { - bool: { - filter: Object.keys(propertiesForSample).map((field) => ({ - exists: { field }, - })), - }, - }, - sort: [ - { - '@timestamp': { - order: 'desc', - }, - }, - ], - size: FIELD_SIMILATION_SAMPLE_SIZE, - }; + const { scopedClusterClient } = await getScopedClients({ request }); - const sampleResults = await scopedClusterClient.asCurrentUser.search({ - index: params.path.id, - ...documentSamplesSearchBody, - }); + const { read } = await checkAccess({ id: params.path.id, scopedClusterClient }); - if ( - (typeof sampleResults.hits.total === 'object' && sampleResults.hits.total?.value === 0) || - sampleResults.hits.total === 0 || - !sampleResults.hits.total - ) { - return { - status: 'unknown', - simulationError: null, - documentsWithRuntimeFieldsApplied: null, - }; - } - - const propertiesForSimulation = Object.fromEntries( - params.body.field_definitions.map((field) => [ - field.name, - { type: field.type, ...(field.format ? { format: field.format } : {}) }, - ]) - ); - - const fieldDefinitionKeys = Object.keys(propertiesForSimulation); - - const sampleResultsAsSimulationDocs = sampleResults.hits.hits.map((hit) => ({ - _index: params.path.id, - _id: hit._id, - _source: Object.fromEntries( - Object.entries(getFlattenedObject(hit._source as Record)).filter( - ([k]) => fieldDefinitionKeys.includes(k) || k === '@timestamp' - ) - ), - })); - - const simulationBody = { - docs: sampleResultsAsSimulationDocs, - component_template_substitutions: { - [`${params.path.id}@stream.layer`]: { - template: { - mappings: { - dynamic: 'strict', - properties: propertiesForSimulation, - }, - }, + if (!read) { + throw new DefinitionNotFoundError(`Stream definition for ${params.path.id} not found.`); + } + + const propertiesForSample = Object.fromEntries( + params.body.field_definitions.map((field) => [field.name, { type: 'keyword' }]) + ); + + const documentSamplesSearchBody = { + // Add keyword runtime mappings so we can pair with exists, this is to attempt to "miss" less documents for the simulation. + runtime_mappings: propertiesForSample, + query: { + bool: { + filter: Object.keys(propertiesForSample).map((field) => ({ + exists: { field }, + })), + }, + }, + sort: [ + { + '@timestamp': { + order: 'desc', }, }, + ], + size: FIELD_SIMILATION_SAMPLE_SIZE, + }; + + const sampleResults = await scopedClusterClient.asCurrentUser.search({ + index: params.path.id, + ...documentSamplesSearchBody, + }); + + if ( + (typeof sampleResults.hits.total === 'object' && sampleResults.hits.total?.value === 0) || + sampleResults.hits.total === 0 || + !sampleResults.hits.total + ) { + return { + status: 'unknown', + simulationError: null, + documentsWithRuntimeFieldsApplied: null, }; + } - // TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() but the ES JS lib currently has a bug. The types also aren't available yet, so we use any. - const simulation = (await scopedClusterClient.asCurrentUser.transport.request({ - method: 'POST', - path: `_ingest/_simulate`, - body: simulationBody, - })) as any; - - const hasErrors = simulation.docs.some((doc: any) => doc.doc.error !== undefined); - - if (hasErrors) { - const documentWithError = simulation.docs.find((doc: any) => { - return doc.doc.error !== undefined; - }); - - return { - status: 'failure', - simulationError: JSON.stringify( - // Use the first error as a representative error - documentWithError.doc.error - ), - documentsWithRuntimeFieldsApplied: null, - }; - } - - // Convert the field definitions to a format that can be used in runtime mappings (match_only_text -> keyword) - const propertiesCompatibleWithRuntimeMappings = Object.fromEntries( - params.body.field_definitions.map((field) => [ - field.name, - { - type: field.type === 'match_only_text' ? 'keyword' : field.type, - ...(field.format ? { format: field.format } : {}), - }, - ]) - ); - - const runtimeFieldsSearchBody = { - runtime_mappings: propertiesCompatibleWithRuntimeMappings, - sort: [ - { - '@timestamp': { - order: 'desc', + const propertiesForSimulation = Object.fromEntries( + params.body.field_definitions.map((field) => [ + field.name, + { type: field.type, ...(field.format ? { format: field.format } : {}) }, + ]) + ); + + const fieldDefinitionKeys = Object.keys(propertiesForSimulation); + + const sampleResultsAsSimulationDocs = sampleResults.hits.hits.map((hit) => ({ + _index: params.path.id, + _id: hit._id, + _source: Object.fromEntries( + Object.entries(getFlattenedObject(hit._source as Record)).filter( + ([k]) => fieldDefinitionKeys.includes(k) || k === '@timestamp' + ) + ), + })); + + const simulationBody = { + docs: sampleResultsAsSimulationDocs, + component_template_substitutions: { + [`${params.path.id}@stream.layer`]: { + template: { + mappings: { + dynamic: 'strict', + properties: propertiesForSimulation, }, }, - ], - size: FIELD_SIMILATION_SAMPLE_SIZE, - fields: params.body.field_definitions.map((field) => field.name), - _source: false, - }; + }, + }, + }; - // This gives us a "fields" representation rather than _source from the simulation - const runtimeFieldsResult = await scopedClusterClient.asCurrentUser.search({ - index: params.path.id, - ...runtimeFieldsSearchBody, + // TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() but the ES JS lib currently has a bug. The types also aren't available yet, so we use any. + const simulation = (await scopedClusterClient.asCurrentUser.transport.request({ + method: 'POST', + path: `_ingest/_simulate`, + body: simulationBody, + })) as any; + + const hasErrors = simulation.docs.some((doc: any) => doc.doc.error !== undefined); + + if (hasErrors) { + const documentWithError = simulation.docs.find((doc: any) => { + return doc.doc.error !== undefined; }); return { - status: 'success', - simulationError: null, - documentsWithRuntimeFieldsApplied: runtimeFieldsResult.hits.hits - .map((hit) => { - if (!hit.fields) { - return {}; - } - return Object.keys(hit.fields).reduce>((acc, field) => { - acc[field] = hit.fields![field][0]; - return acc; - }, {}); - }) - .filter((doc) => Object.keys(doc).length > 0), + status: 'failure', + simulationError: JSON.stringify( + // Use the first error as a representative error + documentWithError.doc.error + ), + documentsWithRuntimeFieldsApplied: null, }; - } catch (e) { - if (e instanceof DefinitionNotFound) { - throw notFound(e); - } - - throw internal(e); } + + // Convert the field definitions to a format that can be used in runtime mappings (match_only_text -> keyword) + const propertiesCompatibleWithRuntimeMappings = Object.fromEntries( + params.body.field_definitions.map((field) => [ + field.name, + { + type: field.type === 'match_only_text' ? 'keyword' : field.type, + ...(field.format ? { format: field.format } : {}), + }, + ]) + ); + + const runtimeFieldsSearchBody = { + runtime_mappings: propertiesCompatibleWithRuntimeMappings, + sort: [ + { + '@timestamp': { + order: 'desc', + }, + }, + ], + size: FIELD_SIMILATION_SAMPLE_SIZE, + fields: params.body.field_definitions.map((field) => field.name), + _source: false, + }; + + // This gives us a "fields" representation rather than _source from the simulation + const runtimeFieldsResult = await scopedClusterClient.asCurrentUser.search({ + index: params.path.id, + ...runtimeFieldsSearchBody, + }); + + return { + status: 'success', + simulationError: null, + documentsWithRuntimeFieldsApplied: runtimeFieldsResult.hits.hits + .map((hit) => { + if (!hit.fields) { + return {}; + } + return Object.keys(hit.fields).reduce>((acc, field) => { + acc[field] = hit.fields![field][0]; + return acc; + }, {}); + }) + .filter((doc) => Object.keys(doc).length > 0), + }; }, }); From d8c4696b729e162047b59f8c90e1d11dcc2582d7 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:28:15 +0000 Subject: [PATCH 2/2] [CI] Auto-commit changed files from 'node scripts/styled_components_mapping' --- x-pack/solutions/observability/plugins/streams/tsconfig.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/solutions/observability/plugins/streams/tsconfig.json b/x-pack/solutions/observability/plugins/streams/tsconfig.json index 4166e8aa60b4f..748e722f66f9f 100644 --- a/x-pack/solutions/observability/plugins/streams/tsconfig.json +++ b/x-pack/solutions/observability/plugins/streams/tsconfig.json @@ -35,6 +35,7 @@ "@kbn/std", "@kbn/safer-lodash-set", "@kbn/streams-schema", - "@kbn/es-errors" + "@kbn/es-errors", + "@kbn/server-route-repository-utils" ] }