diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 944b4e8e34491..6bc2227dd2dba 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -81,6 +81,8 @@ public class FunctionMetaDataManager implements AutoCloseable { @Getter private CompletableFuture isInitialized = new CompletableFuture<>(); + private boolean isProducerFenced = true; + public FunctionMetaDataManager(WorkerConfig workerConfig, SchedulerManager schedulerManager, PulsarClient pulsarClient, @@ -243,6 +245,10 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat needsScheduling = processUpdate(functionMetaData); } } catch (Exception e) { + if (e.getCause() instanceof PulsarClientException.ProducerFencedException) { + log.error("Function worker status has been set to false due to ProducerFencedException."); + this.isProducerFenced = false; + } log.error("Could not write into Function Metadata topic", e); throw new IllegalStateException("Internal Error updating function at the leader", e); } @@ -500,4 +506,8 @@ private void initializeTailer() throws PulsarClientException { this.functionMetaDataTopicTailer.start(); log.info("MetaData Manager Tailer started"); } + + public boolean checkLiveliness() { + return this.isProducerFenced; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index ea5517e0fd4eb..842c5417ba77c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1885,4 +1885,10 @@ protected ValidatableFunctionPackage getBuiltinFunctionPackage(String archive) { } return null; } + + @Override + public boolean checkLiveliness() { + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + return functionMetaDataManager.checkLiveliness(); + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java index 7bdc86d5fae3e..e6d69370fb22a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java @@ -35,6 +35,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.functions.FunctionConfig; @@ -429,4 +430,24 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream, delete, uri.getRequestUri(), authParams()); } + + @GET + @Path("/healthz") + @ApiOperation(value = "Run a healthCheck against the function worker") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Everything is OK"), + @ApiResponse(code = 503, message = "Service not available") + }) + public Response healthCheck() { + boolean isAlive = functions().checkLiveliness(); + if (!isAlive) { + return Response.status(Response.Status.SERVICE_UNAVAILABLE) + .entity("There is IllegalStateException, Service is not running. Need to restart.") + .build(); + } else { + return Response.status(Response.Status.OK) + .entity("There is no IllegalStateException, Service is running.") + .build(); + } + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java index 770cced1731da..55125a1494958 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java @@ -90,4 +90,6 @@ StreamingOutput downloadFunction(String tenant, String namespace, String compone List getListOfConnectors(); void reloadConnectors(AuthenticationParameters authParams); + + boolean checkLiveliness(); }