diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostAgentDAO.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostAgentDAO.java index e61447dec7..b5ad61d8aa 100644 --- a/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostAgentDAO.java +++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostAgentDAO.java @@ -4,9 +4,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,11 +15,10 @@ */ package com.pinterest.deployservice.dao; -import com.pinterest.deployservice.bean.HostAgentBean; - -import java.util.Collection; +import java.sql.SQLException; import java.util.List; -import java.util.Set; + +import com.pinterest.deployservice.bean.HostAgentBean; /** * A collection of methods to help hosts and groups mapping @@ -35,9 +34,11 @@ public interface HostAgentDAO { HostAgentBean getHostById(String hostId) throws Exception; - List getStaleHosts(long after) throws Exception; + List getStaleHosts(long lastUpdateBefore) throws SQLException; + + List getStaleHosts(long lastUpdateAfter, long lastUpdateBefore) throws SQLException; - List getStaleEnvHosts(long after) throws Exception; + List getStaleEnvHosts(long lastUpdateBefore) throws Exception; List getHostsByAgent(String agentVersion, long pageIndex, int pageSize) throws Exception; } diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java index d55c28e4d8..556fba8516 100644 --- a/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java +++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java @@ -4,9 +4,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,7 @@ import com.pinterest.deployservice.bean.HostBean; +import java.sql.SQLException; import java.util.Collection; import java.util.List; import java.util.Set; @@ -55,6 +56,8 @@ public interface HostDAO { List getTerminatingHosts() throws Exception; + List getStaleAgentlessHostIds(long lastUpdateBefore, int limit) throws SQLException; + Collection getHostsByEnvId(String envId) throws Exception; HostBean getByEnvIdAndHostId(String envId, String hostId) throws Exception; diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostAgentDAOImpl.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostAgentDAOImpl.java index 2c25b1a426..f166686668 100644 --- a/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostAgentDAOImpl.java +++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostAgentDAOImpl.java @@ -4,9 +4,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,10 +15,8 @@ */ package com.pinterest.deployservice.db; -import com.pinterest.deployservice.bean.HostAgentBean; -import com.pinterest.deployservice.bean.HostState; -import com.pinterest.deployservice.bean.SetClause; -import com.pinterest.deployservice.dao.HostAgentDAO; +import java.sql.SQLException; +import java.util.List; import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbutils.QueryRunner; @@ -26,9 +24,9 @@ import org.apache.commons.dbutils.handlers.BeanHandler; import org.apache.commons.dbutils.handlers.BeanListHandler; -import java.util.Collection; -import java.util.List; -import java.util.Set; +import com.pinterest.deployservice.bean.HostAgentBean; +import com.pinterest.deployservice.bean.SetClause; +import com.pinterest.deployservice.dao.HostAgentDAO; public class DBHostAgentDAOImpl implements HostAgentDAO { private static final String INSERT_HOST_TEMPLATE = "INSERT INTO hosts_and_agents SET %s ON DUPLICATE KEY UPDATE %s"; @@ -36,7 +34,8 @@ public class DBHostAgentDAOImpl implements HostAgentDAO { private static final String DELETE_HOST_BY_ID = "DELETE FROM hosts_and_agents WHERE host_id=?"; private static final String GET_HOST_BY_NAME = "SELECT * FROM hosts_and_agents WHERE host_name=?"; private static final String GET_HOST_BY_HOSTID = "SELECT * FROM hosts_and_agents WHERE host_id=?"; - private static final String GET_STALE_HOST = "SELECT DISTINCT hosts_and_agents.* FROM hosts_and_agents WHERE hosts_and_agents.last_update? AND last_update getStaleHosts(long after) throws Exception { + public List getStaleHosts(long lastUpdateBefore) throws SQLException { + ResultSetHandler> h = new BeanListHandler<>(HostAgentBean.class); + return new QueryRunner(dataSource).query(GET_HOSTS_BY_LAST_UPDATE, h, lastUpdateBefore); + } + + @Override + public List getStaleHosts(long lastUpdateAfter, long lastUpdateBefore) throws SQLException { ResultSetHandler> h = new BeanListHandler<>(HostAgentBean.class); - return new QueryRunner(dataSource).query(GET_STALE_HOST, h, after); + return new QueryRunner(dataSource).query(GET_HOSTS_BY_LAST_UPDATES, h, lastUpdateAfter, lastUpdateBefore); } @Override diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java index 8c217d7957..14f16a1770 100644 --- a/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java +++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java @@ -29,6 +29,7 @@ import org.apache.commons.dbutils.handlers.BeanHandler; import org.apache.commons.dbutils.handlers.BeanListHandler; +import java.sql.SQLException; import java.util.Collection; import java.util.List; import java.util.Set; @@ -51,8 +52,7 @@ public class DBHostDAOImpl implements HostDAO { private static final String GET_HOST_BY_HOSTID = "SELECT * FROM hosts WHERE host_id=?"; private static final String GET_HOSTS_BY_STATES = "SELECT * FROM hosts WHERE state in (?, ?, ?) GROUP BY host_id ORDER BY last_update"; private static final String GET_GROUP_NAMES_BY_HOST = "SELECT group_name FROM hosts WHERE host_name=?"; - private static final String GET_STALE_ENV_HOST = "SELECT DISTINCT hosts.* FROM hosts INNER JOIN hosts_and_envs ON hosts.host_name=hosts_and_envs.host_name WHERE hosts.last_update getTerminatingHosts() throws Exception { HostState.TERMINATING.toString(), HostState.PENDING_TERMINATE_NO_REPLACE.toString()); } + @Override + public List getStaleAgentlessHostIds(long lastUpdateBefore, int limit) throws SQLException { + return new QueryRunner(dataSource).query(GET_STALE_AGENTLESS_HOST_IDS, + SingleResultSetHandlerFactory.newListObjectHandler(), lastUpdateBefore, limit); + } + @Override public List getAllActiveHostsByGroup(String groupName) throws Exception { ResultSetHandler> h = new BeanListHandler<>(HostBean.class); diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/handler/HostHandler.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/handler/HostHandler.java index a0d6d70d6e..2ef8e13db0 100644 --- a/deploy-service/common/src/main/java/com/pinterest/deployservice/handler/HostHandler.java +++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/handler/HostHandler.java @@ -1,17 +1,14 @@ package com.pinterest.deployservice.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.pinterest.deployservice.ServiceContext; -import com.pinterest.deployservice.bean.HostBean; -import com.pinterest.deployservice.common.CommonUtils; import com.pinterest.deployservice.dao.AgentDAO; -import com.pinterest.deployservice.dao.HostDAO; import com.pinterest.deployservice.dao.HostAgentDAO; +import com.pinterest.deployservice.dao.HostDAO; import com.pinterest.deployservice.dao.HostTagDAO; -import com.pinterest.deployservice.handler.HostHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class HostHandler { private static final Logger LOG = LoggerFactory.getLogger(HostHandler.class); @@ -27,15 +24,36 @@ public HostHandler(ServiceContext serviceContext) { hostTagDAO = serviceContext.getHostTagDAO(); } - public void removeHost(String hostId) throws Exception { + public void removeHost(String hostId) { + boolean hasException = false; try { - hostDAO.deleteAllById(hostId); agentDAO.deleteAllById(hostId); + } catch (Exception e) { + hasException = true; + LOG.error("Failed to remove host record from agent - " + hostId, e); + } + try { hostTagDAO.deleteByHostId(hostId); + } catch (Exception e) { + hasException = true; + LOG.error("Failed to remove host record from hostTag - " + hostId, e); + } + try { hostAgentDAO.delete(hostId); - LOG.info("Removed all records for the host {}", hostId); } catch (Exception e) { - LOG.error("Failed to remove all records for the host {}, exception: {}", hostId, e); + hasException = true; + LOG.error("Failed to remove host record from hostAgent - " + hostId, e); + } + try { + hostDAO.deleteAllById(hostId); + } catch (Exception e) { + hasException = true; + LOG.error("Failed to remove host record from host - " + hostId, e); + } + + if (!hasException) { + LOG.info("Removed all records for host {}", hostId); } } + } diff --git a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/ConfigHelper.java b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/ConfigHelper.java index af17ce0567..c33626fc30 100644 --- a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/ConfigHelper.java +++ b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/ConfigHelper.java @@ -89,9 +89,9 @@ public class ConfigHelper { private static final Logger LOG = LoggerFactory.getLogger(ConfigHelper.class); private static final int DEFAULT_PERIOD = 30; - private static final int DEFAULT_MAX_STALE_HOST_THRESHOLD = 600; // 10 mins - private static final int DEFAULT_MIN_STALE_HOST_THRESHOLD = 150; - private static final int DEFAULT_LAUNCH_LATENCY_THRESHOLD = 600; + private static final int DEFAULT_MAX_STALE_HOST_THRESHOLD_SECONDS = 600; // 10 min + private static final int DEFAULT_MIN_STALE_HOST_THRESHOLD_SECONDS = 150; // 2.5 min + private static final int DEFAULT_LAUNCH_LATENCY_THRESHOLD_SECONDS = 600; private static final String DEFAULT_DEPLOY_JANITOR_SCHEDULE = "0 30 3 * * ?"; private static final String DEFAULT_BUILD_JANITOR_SCHEDULE = "0 40 3 * * ?"; private static final int DEFAULT_MAX_DAYS_TO_KEEP = 180; @@ -255,8 +255,10 @@ public static void scheduleWorkers(TeletraanServiceConfiguration configuration, if (workerName.equalsIgnoreCase(SimpleAgentJanitor.class.getSimpleName())) { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - int minStaleHostThreshold = MapUtils.getIntValue(properties, "minStaleHostThreshold", DEFAULT_MIN_STALE_HOST_THRESHOLD); - int maxStaleHostThreshold = MapUtils.getIntValue(properties, "maxStaleHostThreshold", DEFAULT_MAX_STALE_HOST_THRESHOLD); + int minStaleHostThreshold = MapUtils.getIntValue(properties, "minStaleHostThreshold", + DEFAULT_MIN_STALE_HOST_THRESHOLD_SECONDS); + int maxStaleHostThreshold = MapUtils.getIntValue(properties, + "maxStaleHostThreshold", DEFAULT_MAX_STALE_HOST_THRESHOLD_SECONDS); Runnable worker = new SimpleAgentJanitor(serviceContext, minStaleHostThreshold, maxStaleHostThreshold); scheduler.scheduleAtFixedRate(worker, initDelay, period, TimeUnit.SECONDS); LOG.info("Scheduled SimpleAgentJanitor."); @@ -264,9 +266,12 @@ public static void scheduleWorkers(TeletraanServiceConfiguration configuration, if (workerName.equalsIgnoreCase(AgentJanitor.class.getSimpleName())) { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - int minStaleHostThreshold = MapUtils.getIntValue(properties, "minStaleHostThreshold", DEFAULT_MIN_STALE_HOST_THRESHOLD); - int maxStaleHostThreshold = MapUtils.getIntValue(properties, "maxStaleHostThreshold", DEFAULT_MAX_STALE_HOST_THRESHOLD); - int maxLaunchLatencyThreshold = MapUtils.getIntValue(properties, "maxLaunchLaencyThreshold", DEFAULT_LAUNCH_LATENCY_THRESHOLD); + int minStaleHostThreshold = MapUtils.getIntValue(properties, "minStaleHostThreshold", + DEFAULT_MIN_STALE_HOST_THRESHOLD_SECONDS); + int maxStaleHostThreshold = MapUtils.getIntValue(properties, "maxStaleHostThreshold", + DEFAULT_MAX_STALE_HOST_THRESHOLD_SECONDS); + int maxLaunchLatencyThreshold = MapUtils.getIntValue(properties, "maxLaunchLatencyThreshold", + DEFAULT_LAUNCH_LATENCY_THRESHOLD_SECONDS); Runnable worker = new AgentJanitor(serviceContext, minStaleHostThreshold, maxStaleHostThreshold, maxLaunchLatencyThreshold); scheduler.scheduleAtFixedRate(worker, initDelay, period, TimeUnit.SECONDS); LOG.info("Scheduled AgentJanitor."); diff --git a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/AgentJanitor.java b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/AgentJanitor.java index f151fd9a67..011e50ac8c 100644 --- a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/AgentJanitor.java +++ b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/AgentJanitor.java @@ -15,119 +15,198 @@ */ package com.pinterest.teletraan.worker; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.pinterest.deployservice.ServiceContext; -import com.pinterest.deployservice.bean.HostBean; import com.pinterest.deployservice.bean.HostAgentBean; +import com.pinterest.deployservice.bean.HostBean; import com.pinterest.deployservice.bean.HostState; -import com.pinterest.deployservice.group.HostGroupManager; import com.pinterest.deployservice.rodimus.RodimusManager; -import com.pinterest.deployservice.handler.HostHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; /** - * Housekeeping on stuck and dead agents - *

- * if an agent has not ping server for certain time, we will cross check with - * authoritive source to confirm if the host is terminated, and handle the agent - * status accordingly + * Housekeeping on stuck and dead agents and hosts + * + * If an agent has not ping server for certain time, we will cross check with + * authoritative source to confirm if the host is terminated, and handle the + * agent status accordingly. + * + * If a host doesn't have any agent for a while, we will handle the host + * accordingly. */ public class AgentJanitor extends SimpleAgentJanitor { private static final Logger LOG = LoggerFactory.getLogger(AgentJanitor.class); - private HostGroupManager hostGroupDAO; private final RodimusManager rodimusManager; - private long maxLaunchLatencyThreshold; + private final long maxLaunchLatencyThreshold; + private final long absoluteThreshold = TimeUnit.DAYS.toMillis(7); + private final int agentlessHostBatchSize = 300; + private long janitorStartTime; - public AgentJanitor(ServiceContext serviceContext, int minStaleHostThreshold, - int maxStaleHostThreshold, int maxLaunchLatencyThreshold) { - super(serviceContext, minStaleHostThreshold, maxStaleHostThreshold); - hostGroupDAO = serviceContext.getHostGroupDAO(); + public AgentJanitor(ServiceContext serviceContext, int minStaleHostThresholdSeconds, + int maxStaleHostThresholdSeconds, int maxLaunchLatencyThresholdSeconds) { + super(serviceContext, minStaleHostThresholdSeconds, maxStaleHostThresholdSeconds); rodimusManager = serviceContext.getRodimusManager(); - this.maxLaunchLatencyThreshold = maxLaunchLatencyThreshold * 1000; + maxLaunchLatencyThreshold = TimeUnit.SECONDS.toMillis(maxLaunchLatencyThresholdSeconds); } - private Collection getTerminatedHostsFromSource(Collection staleHostIds) throws Exception { - Collection terminatedHosts = new ArrayList<>(staleHostIds); - for (String hostId : staleHostIds) { - Collection resultIds = rodimusManager.getTerminatedHosts(Collections.singletonList(hostId)); - if (resultIds.isEmpty()) { - terminatedHosts.remove(hostId); + private Set getTerminatedHostsFromSource(List staleHostIds) { + int batchSize = 10; + Set terminatedHosts = new HashSet<>(); + for (int i = 0; i < staleHostIds.size(); i += batchSize) { + try { + terminatedHosts.addAll(rodimusManager + .getTerminatedHosts(staleHostIds.subList(i, Math.min(i + batchSize, staleHostIds.size())))); + } catch (Exception ex) { + LOG.error("Failed to get terminated hosts", ex); } } return terminatedHosts; } - private Long getInstanceLaunchGracePeriod(String clusterName) throws Exception { - Long launchGracePeriod = (clusterName != null) ? rodimusManager.getClusterInstanceLaunchGracePeriod(clusterName) : null; - return launchGracePeriod == null ? maxLaunchLatencyThreshold : launchGracePeriod * 1000; + private Long getInstanceLaunchGracePeriod(String clusterName) { + Long launchGracePeriod = null; + if (clusterName != null) { + try { + launchGracePeriod = rodimusManager.getClusterInstanceLaunchGracePeriod(clusterName); + } catch (Exception ex) { + LOG.error("failed to get launch grace period for cluster {}, exception: {}", clusterName, ex); + } + } + return launchGracePeriod == null ? maxLaunchLatencyThreshold : TimeUnit.SECONDS.toMillis(launchGracePeriod); } - // Process stale hosts (hosts which have not pinged for more than min threshold period) - // Removes hosts once confirmed with source - private void processLowWatermarkHosts() throws Exception { - long current_time = System.currentTimeMillis(); - // If host fails to ping for longer than min stale threshold, - // either mark them as UNREACHABLE, or remove if confirmed with source of truth - long minThreshold = current_time - minStaleHostThreshold; - List minStaleHosts = hostAgentDAO.getStaleHosts(minThreshold); - Set minStaleHostIds = new HashSet<>(); - for (HostAgentBean hostAgentBean: minStaleHosts) { - minStaleHostIds.add(hostAgentBean.getHost_id()); + private boolean isHostStale(HostAgentBean hostAgentBean) { + if (hostAgentBean == null || hostAgentBean.getLast_update() == null) { + return false; } - Collection terminatedHosts = getTerminatedHostsFromSource(minStaleHostIds); - for (String removedId: terminatedHosts) { - removeStaleHost(removedId); + + if (janitorStartTime - hostAgentBean.getLast_update() >= absoluteThreshold) { + return true; } - } - private boolean isHostStale(HostAgentBean hostAgentBean) throws Exception { - if (hostAgentBean == null || hostAgentBean.getLast_update() == null) { + HostBean hostBean; + try { + hostBean = hostDAO.getHostsByHostId(hostAgentBean.getHost_id()).get(0); + } catch (Exception ex) { + LOG.error("failed to get host bean for ({}), {}", hostAgentBean, ex); return false; } - HostBean hostBean = hostDAO.getHostsByHostId(hostAgentBean.getHost_id()).get(0); - long current_time = System.currentTimeMillis(); + Long launchGracePeriod = getInstanceLaunchGracePeriod(hostAgentBean.getAuto_scaling_group()); - if ((hostBean.getState() == HostState.PROVISIONED) && (current_time - hostAgentBean.getLast_update() >= launchGracePeriod)) { + if ((hostBean.getState() == HostState.PROVISIONED) + && (janitorStartTime - hostAgentBean.getLast_update() >= launchGracePeriod)) { return true; } if (hostBean.getState() != HostState.TERMINATING && !hostBean.isPendingTerminate() && - (current_time - hostAgentBean.getLast_update() >= maxStaleHostThreshold)) { + (janitorStartTime - hostAgentBean.getLast_update() >= maxStaleHostThreshold)) { return true; } return false; } - // Process stale hosts (hosts which have not pinged for more than max threshold period) - // Marks hosts unreachable if it's stale for max threshold - // Removes hosts once confirmed with source - private void processHighWatermarkHosts() throws Exception { - long current_time = System.currentTimeMillis(); - long maxThreshold = current_time - Math.min(maxStaleHostThreshold, maxLaunchLatencyThreshold); - List maxStaleHosts = hostAgentDAO.getStaleHosts(maxThreshold); - Set staleHostIds = new HashSet<>(); - - for (HostAgentBean hostAgentBean : maxStaleHosts) { - if (isHostStale(hostAgentBean)) { - staleHostIds.add(hostAgentBean.getHost_id()); + /** + * Process stale hosts which have not pinged since + * janitorStartTime - minStaleHostThreshold + * They will be candidates for stale hosts which will be removed in future + * executions. + * Either mark them as UNREACHABLE, or remove if confirmed with source of truth. + */ + private void determineStaleHostCandidates() { + long minThreshold = janitorStartTime - minStaleHostThreshold; + long maxThreshold = janitorStartTime - maxLaunchLatencyThreshold; + List unreachableHosts; + try { + unreachableHosts = hostAgentDAO.getStaleHosts(maxThreshold, minThreshold); + } catch (Exception ex) { + LOG.error("failed to get unreachable hosts", ex); + return; + } + ArrayList unreachableHostIds = new ArrayList<>(); + unreachableHosts.stream().map(hostAgent -> unreachableHostIds.add(hostAgent.getHost_id())); + + Set terminatedHosts = getTerminatedHostsFromSource(unreachableHostIds); + for (String unreachableId : unreachableHostIds) { + if (terminatedHosts.contains(unreachableId)) { + removeStaleHost(unreachableId); + } else { + markUnreachableHost(unreachableId); } } - Collection terminatedHosts = getTerminatedHostsFromSource(staleHostIds); - for (String staleId : staleHostIds) { + } + + /** + * Process stale hosts which have not pinged since + * janitorStartTime - maxStaleHostThreshold + * They are confirmed stale hosts, should be removed from Teletraan + */ + private void processStaleHosts() { + long maxThreshold = janitorStartTime - maxStaleHostThreshold; + List staleHosts; + try { + staleHosts = hostAgentDAO.getStaleHosts(maxThreshold); + } catch (Exception ex) { + LOG.error("failed to get stale hosts", ex); + return; + } + + Map staleHostMap = new HashMap<>(); + staleHosts.stream().map(hostAgent -> staleHostMap.put(hostAgent.getHost_id(), hostAgent)); + + Set terminatedHosts = getTerminatedHostsFromSource(new ArrayList<>(staleHostMap.keySet())); + for (String staleId : staleHostMap.keySet()) { if (terminatedHosts.contains(staleId)) { removeStaleHost(staleId); } else { - markUnreachableHost(staleId); + HostAgentBean hostAgent = staleHostMap.get(staleId); + if (isHostStale(hostAgent)) { + LOG.warn("Agent ({}) is stale (not Pinging Teletraan), but might be running.", + hostAgent); + } + } + } + } + + /** + * Clean up hosts without any agents + * + * If a host is directly added to Teletraan, there will be no agent associated + * with it immediately. Hosts may stuck in this state so we should clean up + * here. We wait 10x maxLaunchLatencyThreshold before doing cleanup. + */ + private void cleanUpAgentlessHosts() { + long noUpdateSince = janitorStartTime - 10 * maxLaunchLatencyThreshold; + List agentlessHosts; + try { + agentlessHosts = hostDAO.getStaleAgentlessHostIds(noUpdateSince, agentlessHostBatchSize); + } catch (SQLException ex) { + LOG.error("failed to get agentless hosts", ex); + return; + } + + Set terminatedHosts = getTerminatedHostsFromSource(agentlessHosts); + for (String hostId : agentlessHosts) { + if (terminatedHosts.contains(hostId)) { + removeStaleHost(hostId); + } else { + LOG.warn("Agentless host {} is stale but might be running", hostId); } } } @Override - void processAllHosts() throws Exception { - processLowWatermarkHosts(); - processHighWatermarkHosts(); + void processAllHosts() { + janitorStartTime = System.currentTimeMillis(); + processStaleHosts(); + determineStaleHostCandidates(); + cleanUpAgentlessHosts(); } } diff --git a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/SimpleAgentJanitor.java b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/SimpleAgentJanitor.java index 483b155663..b6d83a20eb 100644 --- a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/SimpleAgentJanitor.java +++ b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/SimpleAgentJanitor.java @@ -4,9 +4,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,29 +15,28 @@ */ package com.pinterest.teletraan.worker; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.pinterest.deployservice.ServiceContext; import com.pinterest.deployservice.bean.AgentBean; import com.pinterest.deployservice.bean.AgentState; -import com.pinterest.deployservice.bean.HostBean; import com.pinterest.deployservice.bean.HostAgentBean; -import com.pinterest.deployservice.bean.HostState; -import com.pinterest.deployservice.common.Constants; import com.pinterest.deployservice.dao.AgentDAO; -import com.pinterest.deployservice.dao.GroupDAO; -import com.pinterest.deployservice.dao.HostDAO; import com.pinterest.deployservice.dao.HostAgentDAO; +import com.pinterest.deployservice.dao.HostDAO; import com.pinterest.deployservice.handler.HostHandler; -import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; /** * Housekeeping on stuck and dead agents *

* if an agent has not ping server for certain time, we will cross check with - * authoritive source to confirm if the host is terminated, and set agent status + * authoritative source to confirm if the host is terminated, and set agent status * accordingly */ public class SimpleAgentJanitor implements Runnable { @@ -45,33 +44,27 @@ public class SimpleAgentJanitor implements Runnable { private AgentDAO agentDAO; protected HostDAO hostDAO; protected HostAgentDAO hostAgentDAO; - private GroupDAO groupDAO; private HostHandler hostHandler; protected long maxStaleHostThreshold; protected long minStaleHostThreshold; public SimpleAgentJanitor(ServiceContext serviceContext, int minStaleHostThreshold, - int maxStaleHostThreshold) { + int maxStaleHostThreshold) { agentDAO = serviceContext.getAgentDAO(); hostDAO = serviceContext.getHostDAO(); hostAgentDAO = serviceContext.getHostAgentDAO(); - groupDAO = serviceContext.getGroupDAO(); hostHandler = new HostHandler(serviceContext); this.maxStaleHostThreshold = maxStaleHostThreshold * 1000; this.minStaleHostThreshold = minStaleHostThreshold * 1000; } // remove the stale host from db - void removeStaleHost(String id) throws Exception { - LOG.info(String.format("Delete records of stale host {}", id)); - try { - hostHandler.removeHost(id); - } catch (Exception e) { - LOG.error("Failed to delete all records for host {}. exception {}", id, e); - } + void removeStaleHost(String id) { + LOG.info("Delete records of stale host {}", id); + hostHandler.removeHost(id); } - void markUnreachableHost(String id) throws Exception { + void markUnreachableHost(String id) { try { // mark the agent as unreachable AgentBean updateBean = new AgentBean(); @@ -94,7 +87,7 @@ private void processStaleHosts(Collection staleHostIds, boolean needToDe } void processAllHosts() throws Exception { - LOG.info("Process explicite capacity hosts"); + LOG.info("Process explicit capacity hosts"); // If a host fails to ping for longer than max stale threshold, // then just remove it long current_time = System.currentTimeMillis(); @@ -105,8 +98,8 @@ void processAllHosts() throws Exception { maxStaleHostIds.add(host.getHost_id()); } if (!maxStaleHostIds.isEmpty()) { - LOG.info("Found the following hosts (Explicite capacity) exceeded maxStaleThreshold: ", - maxStaleHostIds); + LOG.info("Found the following hosts (Explicit capacity) exceeded maxStaleThreshold: ", + maxStaleHostIds); processStaleHosts(maxStaleHostIds, true); } @@ -120,8 +113,8 @@ void processAllHosts() throws Exception { minStaleHostIds.add(host.getHost_id()); } if (!minStaleHostIds.isEmpty()) { - LOG.info("Found following hosts (Explicite capacity) excceeded minStaleThreshold: ", - minStaleHostIds); + LOG.info("Found following hosts (Explicit capacity) exceeded minStaleThreshold: ", + minStaleHostIds); processStaleHosts(minStaleHostIds, false); } } @@ -129,11 +122,11 @@ void processAllHosts() throws Exception { @Override public void run() { try { - LOG.info("Start simple agent janitor process..."); + LOG.info("Start agent janitor process..."); processAllHosts(); } catch (Throwable t) { // Catch all throwable so that subsequent job not suppressed - LOG.error("SimpleAgentJanitor Failed.", t); + LOG.error("AgentJanitor Failed.", t); } } }