Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log improvements for Websocket throttling with updating unit tests #12716

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.apimgt.gateway.dto;

/**
* Interface for defining inbound response error information
*/
public interface InboundProcessorResponseError {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.apimgt.gateway.dto;

/**
* DTO class which holds information on throttle response of websocket frames.
*/
public class WebSocketThrottleResponseDTO implements InboundProcessorResponseError {

private boolean isThrottled;
private String throttledOutReason;
private String apiContext;
private String user;

public WebSocketThrottleResponseDTO() {
}

public WebSocketThrottleResponseDTO(boolean throttled, String throttledOutReason) {
isThrottled = throttled;
this.throttledOutReason = throttledOutReason;
}

public WebSocketThrottleResponseDTO(boolean throttled, String throttledOutReason, String apiContext, String user) {
isThrottled = throttled;
this.throttledOutReason = throttledOutReason;
this.apiContext = apiContext;
this.user = user;
}

public boolean isThrottled() {
return isThrottled;
}

public void setThrottled(boolean throttled) {
isThrottled = throttled;
}

public String getThrottledOutReason() {
return throttledOutReason;
}

public void setThrottledOutReason(String throttledOutReason) {
this.throttledOutReason = throttledOutReason;
}

public String getApiContext() {
return apiContext;
}

public void setApiContext(String apiContext) {
this.apiContext = apiContext;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.analytics.Constants;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketAnalyticsMetricsHandler;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketApiConstants;
Expand Down Expand Up @@ -152,8 +153,12 @@ private void handleSubscribeFrameErrorEvent(ChannelHandlerContext ctx, InboundPr
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP) {
if (log.isDebugEnabled()) {
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. " +
ctx.channel().toString());
WebSocketThrottleResponseDTO throttleResponseDTO =
((WebSocketThrottleResponseDTO) responseDTO.getInboundProcessorResponseError());
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. "
+ ctx.channel().toString() + " API Context: " + throttleResponseDTO.getApiContext()
+ ", " + "User: " + throttleResponseDTO.getUser() + ", Reason: "
+ throttleResponseDTO.getThrottledOutReason());
}
} else if (responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_GENERAL_ERROR
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_INVALID_CREDENTIALS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.wso2.carbon.apimgt.common.gateway.constants.HealthCheckConstants;
import org.wso2.carbon.apimgt.common.gateway.dto.JWTConfigurationDto;
import org.wso2.carbon.apimgt.gateway.APIMgtGatewayConstants;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.analytics.Constants;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityConstants;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityException;
Expand Down Expand Up @@ -312,8 +313,12 @@ private void handlePublishFrameErrorEvent(ChannelHandlerContext ctx, InboundProc
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP) {
if (log.isDebugEnabled()) {
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. " +
ctx.channel().toString());
WebSocketThrottleResponseDTO throttleResponseDTO =
((WebSocketThrottleResponseDTO) responseDTO.getInboundProcessorResponseError());
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. "
+ ctx.channel().toString() + " API Context: " + throttleResponseDTO.getApiContext()
+ ", " + "User: " + throttleResponseDTO.getUser() + ", Reason: "
+ throttleResponseDTO.getThrottledOutReason());
}
} else if (responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_GENERAL_ERROR
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_INVALID_CREDENTIALS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.apimgt.api.APIManagementException;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.inbound.InboundMessageContext;
import org.wso2.carbon.apimgt.gateway.inbound.websocket.InboundProcessorResponseDTO;
import org.wso2.carbon.apimgt.gateway.internal.ServiceReferenceHolder;
Expand Down Expand Up @@ -195,6 +196,22 @@ public static boolean isThrottled(String resourceLevelThrottleKey, String subscr
return (isApiLevelThrottled || isApplicationLevelThrottled || isSubscriptionLevelThrottled);
}

public static WebSocketThrottleResponseDTO getThrottleStatus(String resourceLevelThrottleKey,
String subscriptionLevelThrottleKey,
String applicationLevelThrottleKey) {
// Check each level and record reason if throttling occurs
if (ServiceReferenceHolder.getInstance().getThrottleDataHolder().isAPIThrottled(resourceLevelThrottleKey)) {
return new WebSocketThrottleResponseDTO(true, "Throttled due to resource-level constraints");
} else if (ServiceReferenceHolder.getInstance().getThrottleDataHolder().isThrottled(
subscriptionLevelThrottleKey)) {
return new WebSocketThrottleResponseDTO(true, "Throttled due to subscription-level constraints");
} else if (ServiceReferenceHolder.getInstance().getThrottleDataHolder().isThrottled(
applicationLevelThrottleKey)) {
return new WebSocketThrottleResponseDTO(true, "Throttled due to application-level constraints");
}
return null;
}

public static String getAccessTokenCacheKey(String accessToken, String apiContext, String matchingResource) {
return accessToken + ':' + apiContext + ':' + matchingResource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import org.json.JSONObject;
import org.wso2.carbon.apimgt.common.gateway.constants.GraphQLConstants;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketApiConstants;
import org.wso2.carbon.apimgt.gateway.dto.InboundProcessorResponseError;

/**
* Extended DTO class to hold response information during execution of GraphQL subscription Inbound processors.
*/
public class GraphQLProcessorResponseDTO extends InboundProcessorResponseDTO {

String id; // operation ID
InboundProcessorResponseError inboundProcessorResponseError;

public String getId() {
return id;
Expand All @@ -34,4 +36,12 @@ public String getErrorResponseString() {
jsonObject.put(GraphQLConstants.SubscriptionConstants.PAYLOAD_FIELD_NAME_PAYLOAD, errorPayloads);
return jsonObject.toString();
}

public InboundProcessorResponseError getInboundProcessorResponseError() {
return inboundProcessorResponseError;
}

public void setInboundProcessorResponseError(InboundProcessorResponseError inboundProcessorResponseError) {
this.inboundProcessorResponseError = inboundProcessorResponseError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.apimgt.gateway.inbound.websocket;

import org.wso2.carbon.apimgt.gateway.dto.InboundProcessorResponseError;
/**
* DTO class to hold response information during execution of Inbound processors.
*/
Expand All @@ -26,6 +28,7 @@ public class InboundProcessorResponseDTO {
int errorCode;
String errorMessage;
boolean closeConnection = false; // whether to close the connection if during frame validation
InboundProcessorResponseError inboundProcessorResponseError;

public boolean isError() {
return isError;
Expand Down Expand Up @@ -62,4 +65,12 @@ public void setErrorCode(int errorCode) {
public String getErrorResponseString() {
return "Error code: " + errorCode + " reason: " + errorMessage;
}

public InboundProcessorResponseError getInboundProcessorResponseError() {
return inboundProcessorResponseError;
}

public void setInboundProcessorResponseError(InboundProcessorResponseError inboundProcessorResponseError) {
this.inboundProcessorResponseError = inboundProcessorResponseError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.wso2.carbon.apimgt.common.gateway.graphql.QueryValidator;
import org.wso2.carbon.apimgt.gateway.dto.GraphQLOperationDTO;
import org.wso2.carbon.apimgt.common.gateway.graphql.GraphQLProcessorUtil;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.WebsocketUtil;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityException;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketApiConstants;
Expand Down Expand Up @@ -298,6 +299,10 @@ private GraphQLProcessorResponseDTO validateQueryComplexity(QueryAnalyzer queryA
responseDTO.setErrorCode(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX);
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX_MESSAGE
+ " : " + queryAnalyzerResponseDTO.getErrorList().toString());
WebSocketThrottleResponseDTO throttleResponseDTO = new WebSocketThrottleResponseDTO(true, "Throttled "
+ "due to subscription-level query complexity constraint.", inboundMessageContext.getApiContext()
, inboundMessageContext.getInfoDTO().getSubscriber());
responseDTO.setInboundProcessorResponseError(throttleResponseDTO);
return responseDTO;
}
} catch (ParseException e) {
Expand Down Expand Up @@ -337,8 +342,12 @@ private GraphQLProcessorResponseDTO validateQueryDepth(QueryAnalyzer queryAnalyz
log.error("Query depth validation failed for: " + payload + " errors: " + errorList.toString());
responseDTO.setError(true);
responseDTO.setErrorCode(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP);
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP_MESSAGE
+ " : " + queryAnalyzerResponseDTO.getErrorList().toString());
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP_MESSAGE + " : "
+ queryAnalyzerResponseDTO.getErrorList().toString());
WebSocketThrottleResponseDTO throttleResponseDTO = new WebSocketThrottleResponseDTO(true, "Throttled due "
+ "to subscription-level query depth constraint.", inboundMessageContext.getApiContext()
, inboundMessageContext.getInfoDTO().getSubscriber());
responseDTO.setInboundProcessorResponseError(throttleResponseDTO);
return responseDTO;
}
return responseDTO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.json.JSONObject;
import org.wso2.carbon.apimgt.api.APIManagementException;
import org.wso2.carbon.apimgt.common.gateway.constants.GraphQLConstants;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.DataPublisherUtil;
import org.wso2.carbon.apimgt.gateway.handlers.Utils;
import org.wso2.carbon.apimgt.gateway.handlers.WebsocketUtil;
import org.wso2.carbon.apimgt.gateway.handlers.security.APIKeyValidator;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityConstants;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityException;
import org.wso2.carbon.apimgt.gateway.handlers.security.AuthenticationContext;
import org.wso2.carbon.apimgt.gateway.handlers.security.jwt.JWTValidator;
Expand All @@ -56,7 +56,6 @@

import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -247,12 +246,17 @@ public static InboundProcessorResponseDTO doThrottle(int msgSize, VerbInfoDTO ve
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
inboundMessageContext.getTenantDomain(), true);
boolean isThrottled = WebsocketUtil.isThrottled(resourceLevelThrottleKey, subscriptionLevelThrottleKey,
applicationLevelThrottleKey);
if (isThrottled) {
WebSocketThrottleResponseDTO throttleResponseDTO = WebsocketUtil.getThrottleStatus(resourceLevelThrottleKey,
subscriptionLevelThrottleKey,
applicationLevelThrottleKey);
if (throttleResponseDTO != null) {
responseDTO.setError(true);
responseDTO.setErrorCode(WebSocketApiConstants.FrameErrorConstants.THROTTLED_OUT_ERROR);
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.THROTTLED_OUT_ERROR_MESSAGE);

throttleResponseDTO.setUser(authorizedUser);
throttleResponseDTO.setApiContext(inboundMessageContext.getApiContext());
responseDTO.setInboundProcessorResponseError(throttleResponseDTO);
}
} finally {
PrivilegedCarbonContext.endTenantFlow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public void testIsThrottled() {
Assert.assertTrue(WebsocketUtil.isThrottled(resourceKey, subscriptionKey, apiKey));
}

@Test
public void testGetThrottleStatus() {
ThrottleDataHolder throttleDataHolder = Mockito.mock(ThrottleDataHolder.class);
Mockito.when(serviceReferenceHolder.getThrottleDataHolder()).thenReturn(throttleDataHolder);
Mockito.when(throttleDataHolder.isAPIThrottled(apiKey)).thenReturn(true);
Mockito.when(throttleDataHolder.isAPIThrottled(resourceKey)).thenReturn(true);
Mockito.when(throttleDataHolder.isAPIThrottled(subscriptionKey)).thenReturn(true);
Assert.assertTrue(WebsocketUtil.getThrottleStatus(resourceKey, subscriptionKey, apiKey).isThrottled());
}

@Test
public void testGetAccessTokenCacheKey() {
Assert.assertEquals("235erwytgtkyb:/ishara:/resource",
Expand Down
Loading
Loading