Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streams] Centralize error handling #207858

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import { ZodSchema, z } from '@kbn/zod';

export function createIsNarrowSchema<TBaseSchema extends z.Schema, TNarrowSchema extends z.Schema>(
base: TBaseSchema,
_base: TBaseSchema,
narrow: TNarrowSchema
) {
return <TValue extends z.input<TBaseSchema>>(
Expand All @@ -19,7 +19,7 @@ export function createIsNarrowSchema<TBaseSchema extends z.Schema, TNarrowSchema
}

export function createAsSchemaOrThrow<TBaseSchema extends z.Schema, TNarrowSchema extends z.Schema>(
base: TBaseSchema,
_base: TBaseSchema,
narrow: TNarrowSchema
) {
return <TValue extends z.input<TBaseSchema>>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import {
} from '@kbn/streams-schema';
import { cloneDeep, keyBy, omit, orderBy } from 'lodash';
import { AssetClient } from './assets/asset_client';
import { DefinitionNotFound, SecurityException } from './errors';
import { MalformedStreamId } from './errors/malformed_stream_id';
import {
syncUnwiredStreamDefinitionObjects,
syncWiredStreamDefinitionObjects,
Expand All @@ -50,6 +48,9 @@ import {
deleteStreamObjects,
deleteUnmanagedStreamObjects,
} from './stream_crud';
import { DefinitionNotFoundError } from './errors/definition_not_found_error';
import { MalformedStreamIdError } from './errors/malformed_stream_id_error';
import { SecurityError } from './errors/security_error';
miltonhultgren marked this conversation as resolved.
Show resolved Hide resolved

interface AcknowledgeResponse<TResult extends Result> {
acknowledged: true;
Expand All @@ -70,8 +71,8 @@ function isElasticsearch404(error: unknown): error is errors.ResponseError & { s
return isResponseError(error) && error.statusCode === 404;
}

function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFound {
return error instanceof DefinitionNotFound;
function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFoundError {
return error instanceof DefinitionNotFoundError;
}

export class StreamsClient {
Expand Down Expand Up @@ -362,7 +363,7 @@ export class StreamsClient {
parentDefinition &&
!isWiredStreamDefinition(parentDefinition)
) {
throw new MalformedStreamId('Cannot fork a stream that is not managed');
throw new MalformedStreamIdError('Cannot fork a stream that is not managed');
}

validateAncestorFields({
Expand All @@ -384,7 +385,7 @@ export class StreamsClient {
continue;
}
if (!isChildOf(definition.name, item.destination)) {
throw new MalformedStreamId(
throw new MalformedStreamIdError(
`The ID (${item.destination}) from the child stream must start with the parent's id (${definition.name}), followed by a dot and a name`
);
}
Expand Down Expand Up @@ -435,12 +436,12 @@ export class StreamsClient {

// check whether root stream has a child of the given name already
if (parentDefinition.ingest.routing.some((item) => item.destination === childDefinition.name)) {
throw new MalformedStreamId(
throw new MalformedStreamIdError(
`The stream with ID (${name}) already exists as a child of the parent stream`
);
}
if (!isChildOf(parentDefinition.name, childDefinition.name)) {
throw new MalformedStreamId(
throw new MalformedStreamIdError(
`The ID (${name}) from the new stream must start with the parent's id (${parentDefinition.name}), followed by a dot and a name`
);
}
Expand Down Expand Up @@ -483,7 +484,7 @@ export class StreamsClient {
checkAccess({ id: name, scopedClusterClient: this.dependencies.scopedClusterClient }).then(
(privileges) => {
if (!privileges.read) {
throw new DefinitionNotFound(`Stream definition for ${name} not found`);
throw new DefinitionNotFoundError(`Stream definition for ${name} not found`);
}
}
),
Expand All @@ -500,7 +501,7 @@ export class StreamsClient {
})
.catch(async (error) => {
if (isElasticsearch404(error)) {
throw new DefinitionNotFound(`Cannot find stream ${name}`);
throw new DefinitionNotFoundError(`Cannot find stream ${name}`);
}
throw error;
});
Expand Down Expand Up @@ -711,7 +712,7 @@ export class StreamsClient {
]);

if (!access.write) {
throw new SecurityException(`Cannot delete stream, insufficient privileges`);
throw new SecurityError(`Cannot delete stream, insufficient privileges`);
}

if (!definition) {
Expand All @@ -720,7 +721,7 @@ export class StreamsClient {

const parentId = getParentId(name);
if (isWiredStreamDefinition(definition) && !parentId) {
throw new MalformedStreamId('Cannot delete root stream');
throw new MalformedStreamIdError('Cannot delete root stream');
}

await this.deleteStreamFromDefinition(definition);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class ComponentTemplateNotFoundError extends StatusError {
constructor(message: string) {
super(message, 404);
this.name = 'ComponentTemplateNotFoundError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class DefinitionIdInvalidError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'DefinitionIdInvalidError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class DefinitionNotFoundError extends StatusError {
constructor(message: string) {
super(message, 404);
this.name = 'DefinitionNotFoundError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class DetectedMappingFailureError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'DetectedMappingFailureError';
}
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* 2.0.
*/

export class DetectedMappingFailure extends Error {
import { StatusError } from './status_error';

export class MalformedChildrenError extends StatusError {
constructor(message: string) {
super(message);
this.name = 'DetectedMappingFailure';
super(message, 400);
this.name = 'MalformedChildrenError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* 2.0.
*/

export class DefinitionIdInvalid extends Error {
import { StatusError } from './status_error';

export class MalformedFieldsError extends StatusError {
constructor(message: string) {
super(message);
this.name = 'DefinitionIdInvalid';
super(message, 400);
this.name = 'MalformedFieldsError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* 2.0.
*/

export class DefinitionNotFound extends Error {
import { StatusError } from './status_error';

export class MalformedStreamError extends StatusError {
constructor(message: string) {
super(message);
this.name = 'DefinitionNotFound';
super(message, 400);
this.name = 'MalformedStreamError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class MalformedStreamIdError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'MalformedStreamIdError';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class NonAdditiveProcessorError extends StatusError {
constructor(message: string) {
super(message, 400);
this.name = 'NonAdditiveProcessorError';
}
}
Loading