Skip to content

Commit

Permalink
Merge pull request #2064 from akto-api-security/hotfix/fix_consumer_s…
Browse files Browse the repository at this point in the history
…tuck_case

Error handling for timed out exception
  • Loading branch information
notshivansh authored Feb 7, 2025
2 parents 4336e96 + b36f92c commit 92daa5d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import static com.akto.testing.Utils.readJsonContentFromFile;
import static com.akto.testing.Utils.writeJsonContentInFile;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -10,6 +11,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.*;
Expand All @@ -21,14 +23,19 @@
import com.akto.DaoInit;
import com.akto.crons.GetRunningTestsStatus;
import com.akto.dao.context.Context;
import com.akto.dao.testing.TestingRunResultDao;
import com.akto.dao.testing.TestingRunResultSummariesDao;
import com.akto.dto.ApiInfo;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.test_editor.TestConfig;
import com.akto.dto.testing.TestingRunResult;
import com.akto.dto.testing.TestingRunResultSummary;
import com.akto.dto.testing.TestResult.TestError;
import com.akto.dto.testing.info.SingleTestPayload;
import com.akto.notifications.slack.CustomTextAlert;
import com.akto.testing.Main;
import com.akto.testing.TestExecutor;
import com.akto.testing.Utils;
import com.akto.util.Constants;
import com.akto.util.DashboardMode;
import com.alibaba.fastjson2.JSON;
Expand All @@ -37,6 +44,8 @@
import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
Expand Down Expand Up @@ -91,6 +100,25 @@ public void runTestFromMessage(String message){
executor.insertResultsAndMakeIssues(Collections.singletonList(runResult), singleTestPayload.getTestingRunResultSummaryId());
}
}

private void createTimedOutResultFromMessage(String message){
SingleTestPayload singleTestPayload = parseTestMessage(message);
Context.accountId.set(singleTestPayload.getAccountId());

String subCategory = singleTestPayload.getSubcategory();
TestConfig testConfig = TestingConfigurations.getInstance().getTestConfigMap().get(subCategory);

String testSuperType = testConfig.getInfo().getCategory().getName();
String testSubType = testConfig.getInfo().getSubCategory();

TestingRunResult runResult = Utils.generateFailedRunResultForMessage(singleTestPayload.getTestingRunId(), singleTestPayload.getApiInfoKey(), testSuperType, testSubType, singleTestPayload.getTestingRunResultSummaryId(), new ArrayList<>(), TestError.TEST_TIMED_OUT.getMessage());
TestExecutor.trim(runResult);
TestingRunResultSummariesDao.instance.getMCollection().withWriteConcern(WriteConcern.W1).findOneAndUpdate(
Filters.eq(Constants.ID, singleTestPayload.getTestingRunResultSummaryId()),
Updates.inc(TestingRunResultSummary.TEST_RESULTS_COUNT, 1)
);
TestingRunResultDao.instance.insertOne(runResult);
}

public void init(int maxRunTimeInSeconds) {
executor = Executors.newFixedThreadPool(100);
Expand Down Expand Up @@ -141,10 +169,15 @@ public void init(int maxRunTimeInSeconds) {
Future<?> future = executor.submit(() -> runTestFromMessage(message));
firstRecordRead.set(true);
try {
future.get(4, TimeUnit.MINUTES);
future.get(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.error("Task timed out: " + message);
logger.error("Task timed out");
future.cancel(true);
createTimedOutResultFromMessage(message);
} catch(TimeoutException e){
logger.error("Task timed out");
future.cancel(true);
createTimedOutResultFromMessage(message);
} catch (Exception e) {
logger.error("Error in task execution: " + message, e);
}
Expand Down
3 changes: 2 additions & 1 deletion libs/dao/src/main/java/com/akto/dto/testing/TestResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public enum TestError {
SKIPPING_EXECUTION_BECAUSE_FILTERS("Request API failed to satisfy api_selection_filters block, skipping execution", true),
DEACTIVATED_ENDPOINT("This is a deactivated endpoint", true),
USAGE_EXCEEDED("You have exceeded the limit of this feature, skipping execution", true),
ROLE_NOT_FOUND("config doesn't exist, skipping execution", false);
ROLE_NOT_FOUND("config doesn't exist, skipping execution", false),
TEST_TIMED_OUT("Test took too long for execution, exited out", true);
private final String message;
private final boolean skipTest;

Expand Down
5 changes: 3 additions & 2 deletions libs/utils/src/main/java/com/akto/testing/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.springframework.util.StringUtils;

import com.akto.dao.ApiCollectionsDao;
import com.akto.dao.context.Context;
Expand Down Expand Up @@ -561,9 +562,9 @@ public static TestingRunResult generateFailedRunResultForMessage(ObjectId testin
List<GenericTestResult> testResults = new ArrayList<>();
String failMessage = errorMessage;

if(deactivatedCollections.contains(apiInfoKey.getApiCollectionId())){
if(!StringUtils.hasLength(errorMessage) && deactivatedCollections.contains(apiInfoKey.getApiCollectionId())){
failMessage = TestError.DEACTIVATED_ENDPOINT.getMessage();
}else if(messages == null || messages.isEmpty()){
}else if(!StringUtils.hasLength(errorMessage) && (messages == null || messages.isEmpty())){
failMessage = TestError.NO_PATH.getMessage();
}

Expand Down

0 comments on commit 92daa5d

Please sign in to comment.