Skip to content

Commit

Permalink
Improve few existing comments, add more comments for existing functio…
Browse files Browse the repository at this point in the history
…ns, delete few unnecessary/irrelevant methods
  • Loading branch information
Karthick Duraisamy Soundararaj authored and Jessica Hartog committed Jul 28, 2016
1 parent f5e02a3 commit d9f788f
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 68 deletions.
30 changes: 15 additions & 15 deletions storm/src/main/storm/mesos/MesosNimbus.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.concurrent.TimeUnit;

import static storm.mesos.util.PrettyProtobuf.offerIDListToString;
import static storm.mesos.util.PrettyProtobuf.offerToString;
import static storm.mesos.util.PrettyProtobuf.offerMapToString;
import static storm.mesos.util.PrettyProtobuf.taskInfoListToString;

Expand Down Expand Up @@ -308,16 +309,14 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
for (Protos.Offer offer : offers) {
if (isHostAccepted(offer.getHostname())) {
// TODO(ksoundararaj): Should we record the following as info instead of debug
LOG.debug("resourceOffers: Recording offer from host: {}, offerId: {}",
offer.getHostname(), offer.getId().getValue());
LOG.info("resourceOffers: Recording offer: {}", offerToString(offer));
_offers.put(offer.getId(), offer);
} else {
LOG.debug("resourceOffers: Declining offer from host: {}, offerId: {}",
offer.getHostname(), offer.getId().getValue());
LOG.info("resourceOffers: Declining offer: {}", offerToString(offer));
driver.declineOffer(offer.getId());
}
}
LOG.debug("resourceOffers: After processing offers, now have {} offers buffered: {}",
LOG.info("resourceOffers: After processing offers, now have {} offers buffered: {}",
_offers.size(), offerMapToString(_offers));
}
}
Expand Down Expand Up @@ -385,10 +384,6 @@ public Collection<WorkerSlot> allSlotsAvailableForScheduling(
}
}

private String getLogViewerConfig() {
return String.format(" -c %s=true", MesosCommon.AUTO_START_LOGVIEWER_CONF);
}

/**
* This method is invoked after IScheduler.schedule assigns the worker slots to the topologies that need assignments
*
Expand All @@ -399,17 +394,22 @@ private String getLogViewerConfig() {
@Override
public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> slotsForTopologiesNeedingAssignments) {
if (slotsForTopologiesNeedingAssignments.isEmpty()) {
LOG.debug("assignSlots: no slots passed in, nothing to do");
LOG.info("assignSlots: no slots passed in, nothing to do");
return;
}

// This is purely to print the debug information. Otherwise, the following for loop is unnecessary.
for (Map.Entry<String, Collection<WorkerSlot>> topologyToSlots : slotsForTopologiesNeedingAssignments.entrySet()) {
String topologyId = topologyToSlots.getKey();
List<String> topologySlotAssignmentStrings = new ArrayList<String>();
String info = "assignSlots: " + topologyId + " being assigned to " + topologyToSlots.getValue().size() + " slots (worker:port, cpu, mem) as follows: ";
for (WorkerSlot slot : topologyToSlots.getValue()) {
TopologyDetails details = topologies.getById(topologyId);
LOG.debug("assignSlots: topologyId: {} worker being assigned to slot: {} with workerCpu: {} workerMem: {}",
topologyId, slot, MesosCommon.topologyWorkerCpu(mesosStormConf, details), MesosCommon.topologyWorkerMem(mesosStormConf, details));
topologySlotAssignmentStrings.add("(" + slot + ", " + MesosCommon.topologyWorkerCpu(mesosStormConf, details) + ", " + MesosCommon.topologyWorkerMem(mesosStormConf, details) + ")");
}
if (!topologyToSlots.getValue().isEmpty()) {
info += StringUtils.join(topologySlotAssignmentStrings, ", ");
LOG.info(info);
}
}

Expand All @@ -427,7 +427,7 @@ public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot
List<OfferID> offerIDList = aggregatedOffersPerNode.get(node).getOfferIDList();
List<TaskInfo> taskInfoList = tasksToLaunchPerNode.get(node);

LOG.info("Using offerIDs: " + offerIDListToString(offerIDList) + " on host: " + node + " to launch tasks: " + taskInfoListToString(taskInfoList));
LOG.info("Using offerIDs: {} on host: {} to launch tasks: {}", offerIDListToString(offerIDList), node, taskInfoListToString(taskInfoList));

_driver.launchTasks(offerIDList, taskInfoList);
for (OfferID offerID: offerIDList) {
Expand Down Expand Up @@ -601,8 +601,8 @@ public Map<String, List<TaskInfo>> getTasksToLaunch(Topologies topologies,
String extraConfig = "";

if (!aggregatedOffers.isFit(mesosStormConf, topologyDetails, workerPort, hostsWithSupervisors.contains(workerHost))) {
LOG.error(String.format("Unable to launch worker %s. Required cpu: %f, Required mem: %f, Required port: %d. Available aggregatedOffers : %s",
workerHost, requiredCpu, requiredMem, workerPort, aggregatedOffers));
LOG.error(String.format("Unable to launch worker %s for topology %s. Required cpu: %f, Required mem: %f, Required port: %d. Available aggregatedOffers : %s",
workerHost, topologyDetails.getId(), requiredCpu, requiredMem, workerPort, aggregatedOffers));
continue;
}

Expand Down
38 changes: 21 additions & 17 deletions storm/src/main/storm/mesos/resources/AggregatedOffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class AggregatedOffers {

private List<Protos.Offer> offerList = new ArrayList<Protos.Offer>();

private final String hostName;
private final String hostname;

private Protos.SlaveID slaveID;

Expand All @@ -47,25 +47,20 @@ private void initializeAvailableResources() {
availableResources.put(ResourceType.PORTS, new RangeResource(ResourceType.PORTS));
}

public AggregatedOffers(String hostName) {
this.hostName = hostName;
initializeAvailableResources();
}

public AggregatedOffers(Protos.Offer offer) {
initializeAvailableResources();
this.slaveID = offer.getSlaveId();
this.hostName = offer.getHostname();
this.hostname = offer.getHostname();
add(offer);
}

public String getHostName() {
return hostName;
public String getHostname() {
return hostname;
}

public void add(Protos.Offer offer) {
// We are unable to aggregate offers if they are from different workers
assert offer.getSlaveId().equals(slaveID) && offer.getHostname().equals(hostName);
// We are unable to aggregate offers if they are from different mesos slaves/workers/agents
assert offer.getSlaveId().equals(slaveID) && offer.getHostname().equals(hostname);
offerList.add(offer);

for (Protos.Resource r : offer.getResourcesList()) {
Expand Down Expand Up @@ -103,6 +98,11 @@ public boolean isAvailable(ResourceType resourceType, ResourceEntry<?> resource)
return availableResources.get(resourceType).isAvailable(resource);
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public boolean isAvailable(ResourceType resourceType, ReservationType reservationType, ResourceEntry<?> resource) {
return availableResources.get(resourceType).isAvailable(resource, reservationType);
}
Expand All @@ -111,6 +111,11 @@ public <T extends ResourceEntry> List<T> getAllAvailableResources(ResourceType r
return availableResources.get(resourceType).getAllAvailableResources();
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public <T extends ResourceEntry> List<T> getAllAvailableResources(ResourceType resourceType, ReservationType reservationType) {
return availableResources.get(resourceType).getAllAvailableResources(reservationType);
}
Expand All @@ -128,6 +133,11 @@ public List<ResourceEntry> reserveAndGet(ResourceType resourceType, ResourceEntr
return new ArrayList<>();
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public List<ResourceEntry> reserveAndGet(ResourceType resourceType, ReservationType reservationType, ResourceEntry<?> resource) throws
ResourceNotAvailableException {
if (availableResources.get(resourceType).isAvailable(resource, reservationType)) {
Expand All @@ -136,10 +146,6 @@ public List<ResourceEntry> reserveAndGet(ResourceType resourceType, ReservationT
return new ArrayList<>();
}

public List<Protos.Offer> getOfferList() {
return offerList;
}

public List<Protos.OfferID> getOfferIDList() {
List<Protos.OfferID> offerIDList = new ArrayList<>();
for (Protos.Offer offer: offerList) {
Expand All @@ -162,7 +168,6 @@ public String toString() {


public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, boolean supervisorExists) {

double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);

Expand All @@ -175,7 +180,6 @@ public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, boolea
}

public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, Long port, boolean supervisorExists) {

double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);

Expand Down
10 changes: 9 additions & 1 deletion storm/src/main/storm/mesos/resources/RangeResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,15 @@ public List<ResourceEntry> removeAndGet(RangeResourceEntry rangeResourceEntry) t


/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*
* Remove/Reserve range from available ranges.
* {@param rangeResourceEntry} range resource to removeAndGet
* {@param reservationType} reservation type of resource that needs to be removed. If the resource represented by rangeResourceEntry
* of the reservation type specified by this parameter is not available, then {@link ResourceNotAvailableException}
* is thrown
* is thrown.
*/
@Override
public List<ResourceEntry> removeAndGet(RangeResourceEntry rangeResourceEntry, ReservationType reservationType) throws ResourceNotAvailableException {
Expand All @@ -129,6 +133,10 @@ public List<ResourceEntry> removeAndGet(RangeResourceEntry rangeResourceEntry, R
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*
* Remove/Reserve range from available ranges
* {@param rangeResourceEntry} range resource to removeAndGet
* {@param reservationTypeComparator} comparator like {@link storm.mesos.resources.DefaultReservationTypeComparator}
Expand Down
10 changes: 10 additions & 0 deletions storm/src/main/storm/mesos/resources/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,18 @@ public interface Resource<T extends ResourceEntry<? extends Number>> {

public List<ResourceEntry> removeAndGet(T resourceEntry) throws ResourceNotAvailableException;

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public List<ResourceEntry> removeAndGet(T value, ReservationType reservationType) throws ResourceNotAvailableException;

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public List<ResourceEntry> removeAndGet(T value, Comparator<ReservationType> reservationTypeComparator) throws ResourceNotAvailableException;

}
5 changes: 5 additions & 0 deletions storm/src/main/storm/mesos/resources/ResourceEntries.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public ReservationType getReservationType() {
}

/**
* Unused Method - Exists for the sake of completeness in terms of implementing ResourceEntry<T>.
*
* Lets say, we have a range [u,v]. Using this add function, we can expand the range to [w,x] if and
* only if one of the following conditions are satisfied
* `w < u`
Expand All @@ -107,6 +109,9 @@ public RangeResourceEntry add(ResourceEntry<Long> resourceEntry) {
return this;
}

/**
* Unused Method - Exists for the sake of completeness in terms of implementing ResourceEntry<T>.
*/
public RangeResourceEntry remove(ResourceEntry<Long> resourceEntry) {
RangeResourceEntry rangeResourceEntry = (RangeResourceEntry) resourceEntry;
if (this.begin < rangeResourceEntry.getBegin()) {
Expand Down
17 changes: 7 additions & 10 deletions storm/src/main/storm/mesos/resources/ScalarResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public boolean isAvailable(ScalarResourceEntry scalarResourceEntry, ReservationT
return (availableResourcesByReservationType.get(reservationType).getValue() >= scalarResourceEntry.getValue());
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public Double getTotalAvailableResource(ReservationType reservationType) {
return availableResourcesByReservationType.get(reservationType).getValue();
}
Expand Down Expand Up @@ -82,14 +87,6 @@ public List<ResourceEntry> removeAndGet(ScalarResourceEntry scalarResourceEntry)
return removeAndGet(scalarResourceEntry, availableResourcesByReservationType.keySet());
}

public List<ResourceEntry> reserveScalarResource(ResourceType resourceType, ScalarResourceEntry requiredValue) throws ResourceNotAvailableException {
if (totalAvailableResource < requiredValue.getValue()) {
throw new ResourceNotAvailableException(String.format("resourceType: {} is not available. Requested {} Available {}",
resourceType, requiredValue, totalAvailableResource));
}
return removeAndGet(requiredValue);
}

/**
* Removes/Reserves scalar resource from available resources.
* {@param scalarResourceEntry} amount of scalar resource to removeAndGet/decrement
Expand Down Expand Up @@ -128,10 +125,10 @@ public List<ResourceEntry> removeAndGet(ScalarResourceEntry scalarResourceEntry,
public String toString() {
List<String> availableResourcesByResourceTypeList = new ArrayList<>();
for (Map.Entry<ReservationType, ScalarResourceEntry> entry: availableResourcesByReservationType.entrySet()) {
availableResourcesByResourceTypeList.add(String.format("%s: %f", entry.getKey(), entry.getValue().getValue()));
availableResourcesByResourceTypeList.add(String.format("%s: %s", entry.getKey(), Double.toString(entry.getValue().getValue())));
}
String tmp = StringUtils.join(availableResourcesByResourceTypeList, ", ");
return String.format("%s: %f (%s)", resourceType.toString(), totalAvailableResource, tmp);
return String.format("%s: %s (%s)", resourceType.toString(), Double.toString(totalAvailableResource), tmp);
}

private List<ResourceEntry> removeAndGet(ScalarResourceEntry scalarResourceEntry, Collection<ReservationType> reservationTypesListByPriority) throws
Expand Down
Loading

0 comments on commit d9f788f

Please sign in to comment.