diff --git a/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDao.java b/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDao.java index f22a906054d9..d4038d4ceeb4 100644 --- a/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDao.java +++ b/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDao.java @@ -37,4 +37,6 @@ public interface UsageJobDao extends GenericDao { UsageJobVO isOwner(String hostname, int pid); void updateJobSuccess(Long jobId, long startMillis, long endMillis, long execTime, boolean success); + + void removeLastOpenJobsOwned(String hostname, int pid); } diff --git a/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java b/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java index 065dc309ebea..4c58062413de 100644 --- a/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java +++ b/engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java @@ -22,6 +22,7 @@ import java.util.List; +import org.apache.commons.collections.CollectionUtils; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; @@ -116,7 +117,7 @@ public Long checkHeartbeat(String hostname, int pid, int aggregationDuration) { public UsageJobVO isOwner(String hostname, int pid) { TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB); try { - if ((hostname == null) || (pid <= 0)) { + if (hostname == null || pid <= 0) { return null; } @@ -176,7 +177,7 @@ public UsageJobVO getNextImmediateJob() { SearchCriteria sc = createSearchCriteria(); sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0)); sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE)); - sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(0)); + sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_NOT_SCHEDULED)); List jobs = search(sc, filter); if ((jobs == null) || jobs.isEmpty()) { @@ -196,4 +197,36 @@ public Date getLastHeartbeat() { } return jobs.get(0).getHeartbeat(); } + + private List getLastOpenJobsOwned(String hostname, int pid) { + SearchCriteria sc = createSearchCriteria(); + sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0)); + sc.addAnd("host", SearchCriteria.Op.EQ, hostname); + if (pid > 0) { + sc.addAnd("pid", SearchCriteria.Op.EQ, Integer.valueOf(pid)); + } + return listBy(sc); + } + + @Override + public void removeLastOpenJobsOwned(String hostname, int pid) { + if (hostname == null) { + return; + } + + TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB); + try { + List jobs = getLastOpenJobsOwned(hostname, pid); + if (CollectionUtils.isNotEmpty(jobs)) { + s_logger.info(String.format("Found %s opens job, to remove", jobs.size())); + for (UsageJobVO job : jobs) { + s_logger.debug(String.format("Removing job - id: %d, pid: %d, job type: %d, scheduled: %d, heartbeat: %s", + job.getId(), job.getPid(), job.getJobType(), job.getScheduled(), job.getHeartbeat())); + remove(job.getId()); + } + } + } finally { + txn.close(); + } + } } diff --git a/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java b/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java index 63624cdc3c07..3dde335258ab 100644 --- a/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java +++ b/usage/src/main/java/com/cloud/usage/UsageManagerImpl.java @@ -319,6 +319,9 @@ public boolean start() { s_logger.info("Starting Usage Manager"); } + _usageJobDao.removeLastOpenJobsOwned(_hostname, 0); + Runtime.getRuntime().addShutdownHook(new AbandonJob()); + // use the configured exec time and aggregation duration for scheduling the job _scheduledFuture = _executor.scheduleAtFixedRate(this, _jobExecTime.getTimeInMillis() - System.currentTimeMillis(), _aggregationDuration * 60 * 1000, TimeUnit.MILLISECONDS); @@ -331,7 +334,6 @@ public boolean start() { _sanity = _sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, _sanityCheckInterval, TimeUnit.DAYS); } - Runtime.getRuntime().addShutdownHook(new AbandonJob()); TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB); try { if (_heartbeatLock.lock(3)) { // 3 second timeout @@ -2197,17 +2199,17 @@ protected void runInContext() { // the aggregation range away from executing the next job long now = System.currentTimeMillis(); long timeToJob = _jobExecTime.getTimeInMillis() - now; - long timeSinceJob = 0; + long timeSinceLastSuccessJob = 0; long aggregationDurationMillis = _aggregationDuration * 60L * 1000L; long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis(); if (lastSuccess > 0) { - timeSinceJob = now - lastSuccess; + timeSinceLastSuccessJob = now - lastSuccess; } - if ((timeSinceJob > 0) && (timeSinceJob > (aggregationDurationMillis - 100))) { + if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) { if (timeToJob > (aggregationDurationMillis / 2)) { if (s_logger.isDebugEnabled()) { - s_logger.debug("it's been " + timeSinceJob + " ms since last usage job and " + timeToJob + + s_logger.debug("it's been " + timeSinceLastSuccessJob + " ms since last usage job and " + timeToJob + " ms until next job, scheduling an immediate job to catch up (aggregation duration is " + _aggregationDuration + " minutes)"); } scheduleParse(); @@ -2294,17 +2296,12 @@ protected void runInContext() { } } } + private class AbandonJob extends Thread { @Override public void run() { - s_logger.info("exitting Usage Manager"); - deleteOpenjob(); - } - private void deleteOpenjob() { - UsageJobVO job = _usageJobDao.isOwner(_hostname, _pid); - if (job != null) { - _usageJobDao.remove(job.getId()); - } + s_logger.info("exiting Usage Manager"); + _usageJobDao.removeLastOpenJobsOwned(_hostname, _pid); } } }