Skip to content

Commit

Permalink
Merge branch 'master' into consolidateZones
Browse files Browse the repository at this point in the history
  • Loading branch information
nkuehnel authored Apr 30, 2024
2 parents 564cf47 + d5db30a commit a36c1fc
Show file tree
Hide file tree
Showing 13 changed files with 603 additions and 510 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,97 +195,29 @@ public static void runJsprit(Scenario scenario) throws ExecutionException, Inter
carrierActivityCounterMap.put(carrier.getId(), carrierActivityCounterMap.getOrDefault(carrier.getId(), 0) + 2*carrier.getShipments().size());
}

HashMap<Id<Carrier>, Integer> sortedMap = carrierActivityCounterMap.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2, LinkedHashMap::new));

AtomicInteger startedVRPCounter = new AtomicInteger(0);
AtomicInteger solvedVRPCounter = new AtomicInteger(0);

ArrayList<Id<Carrier>> tempList = new ArrayList<>(sortedMap.keySet());

int nThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
int nThreads = Runtime.getRuntime().availableProcessors();
log.info("Starting VRP solving for {} carriers in parallel with {} threads.", carriers.getCarriers().size(), nThreads);

List<List<Id<Carrier>>> splitList = splitListAlternating(nThreads, tempList);
log.info("Distribution of carriers on threads: {}", splitList.stream().map(List::size).toList());
List<Future<?>> futureList = new ArrayList<>();
for (List<Id<Carrier>> subList : splitList) {
futureList.add(executorService.submit(() -> subList.forEach(carrierId -> {
Carrier carrier = carriers.getCarriers().get(carrierId);

double start = System.currentTimeMillis();
if (!carrier.getServices().isEmpty())
log.info("Start tour planning for {} which has {} services", carrier.getId(), carrier.getServices().size());
else if (!carrier.getShipments().isEmpty())
log.info("Start tour planning for {} which has {} shipments", carrier.getId(), carrier.getShipments().size());

startedVRPCounter.incrementAndGet();
log.info("started VRP solving for carrier number {} out of {} carriers. Current thread id: {}", startedVRPCounter.get(), carriers.getCarriers()
.size(), Thread.currentThread()
.getId());

VehicleRoutingProblem problem = MatsimJspritFactory.createRoutingProblemBuilder(carrier, scenario.getNetwork())
.setRoutingCost(netBasedCosts).build();
VehicleRoutingAlgorithm algorithm = MatsimJspritFactory.loadOrCreateVehicleRoutingAlgorithm(scenario, freightCarriersConfigGroup, netBasedCosts, problem);

algorithm.getAlgorithmListeners().addListener(new StopWatch(), VehicleRoutingAlgorithmListeners.Priority.HIGH);
int jspritIterations = getJspritIterations(carrier);
try {
if (jspritIterations > 0) {
algorithm.setMaxIterations(jspritIterations);
} else {
throw new InvalidAttributeValueException(
"Carrier has invalid number of jsprit iterations. They must be positive! Carrier id: "
+ carrier.getId().toString());
}
} catch (Exception e) {
throw new RuntimeException(e);
// e.printStackTrace();
}

VehicleRoutingProblemSolution solution = Solutions.bestOf(algorithm.searchSolutions());

log.info("tour planning for carrier {} took {} seconds.", carrier.getId(), (System.currentTimeMillis() - start) / 1000);
ThreadPoolExecutor executor = new JspritTreadPoolExecutor(new PriorityBlockingQueue<>(), nThreads);

CarrierPlan newPlan = MatsimJspritFactory.createPlan(carrier, solution);
// yy In principle, the carrier should know the vehicle types that it can deploy.
List<Future<?>> futures = new ArrayList<>();
List<Map.Entry<Id<Carrier>, Integer>> sorted = carrierActivityCounterMap.entrySet().stream()
.sorted(Map.Entry.comparingByValue((o1, o2) -> o2 - o1))
.toList();

log.info("routing plan for carrier {}", carrier.getId());
NetworkRouter.routePlan(newPlan, netBasedCosts);
solvedVRPCounter.incrementAndGet();
double timeForPlanningAndRouting = (System.currentTimeMillis() - start) / 1000;
log.info("routing for carrier {} finished. Tour planning plus routing took {} seconds.", carrier.getId(),
timeForPlanningAndRouting);
log.info("solved {} out of {} carriers.", solvedVRPCounter.get(), carriers.getCarriers().size());

carrier.setSelectedPlan(newPlan);
setJspritComputationTime(carrier, timeForPlanningAndRouting);
})));
for (Map.Entry<Id<Carrier>, Integer> entry : sorted){
JspritCarrierTask task = new JspritCarrierTask(entry.getValue(), carriers.getCarriers().get(entry.getKey()), scenario, netBasedCosts, startedVRPCounter, carriers.getCarriers().size());
log.info("Adding task for carrier {} with priority {}", entry.getKey(), entry.getValue());
futures.add(executor.submit(task));
}

for (Future<?> future : futureList) {
for (Future<?> future : futures) {
future.get();
}
}

// split tempList in nThreads parts such that it is split to 1,2,...,n,1,2,..
private static List<List<Id<Carrier>>> splitListAlternating(int nThreads, ArrayList<Id<Carrier>> tempList) {
List<List<Id<Carrier>>> splitList = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
List<Id<Carrier>> subList = new ArrayList<>();
for (int j = i; j < tempList.size(); j += nThreads) {
subList.add(tempList.get(j));
}
splitList.add(subList);
}

assert splitList.size() == nThreads;
assert splitList.stream().mapToInt(List::size).sum() == tempList.size();
return splitList;
}

/**
* Creates a new {@link Carriers} container only with {@link CarrierShipment}s
* for creating a new VRP. As consequence of the transformation of
Expand Down Expand Up @@ -667,4 +599,100 @@ public static void writeCarriers(Carriers carriers, String filename ) {
new CarrierPlanWriter( carriers ).write( filename );
}

static class JspritCarrierTask implements Runnable{
private final int priority;
private final Carrier carrier;
private final Scenario scenario;
private final NetworkBasedTransportCosts netBasedCosts;
private final AtomicInteger startedVRPCounter;
private final int taskCount;

public JspritCarrierTask(int priority, Carrier carrier, Scenario scenario, NetworkBasedTransportCosts netBasedCosts, AtomicInteger startedVRPCounter, int taskCount) {
this.priority = priority;
this.carrier = carrier;
this.scenario = scenario;
this.netBasedCosts = netBasedCosts;
this.startedVRPCounter = startedVRPCounter;
this.taskCount = taskCount;
}

public int getPriority() {
return priority;
}

@Override
public void run() {
FreightCarriersConfigGroup freightCarriersConfigGroup = ConfigUtils.addOrGetModule( scenario.getConfig(), FreightCarriersConfigGroup.class );

double start = System.currentTimeMillis();
if (!carrier.getServices().isEmpty())
log.info("Start tour planning for {} which has {} services", carrier.getId(), carrier.getServices().size());
else if (!carrier.getShipments().isEmpty())
log.info("Start tour planning for {} which has {} shipments", carrier.getId(), carrier.getShipments().size());

startedVRPCounter.incrementAndGet();
log.info("started VRP solving for carrier number {} out of {} carriers. Thread id: {}. Priority: {}", startedVRPCounter.get(), taskCount, Thread.currentThread().getId(), this.priority);

VehicleRoutingProblem problem = MatsimJspritFactory.createRoutingProblemBuilder(carrier, scenario.getNetwork())
.setRoutingCost(netBasedCosts).build();
VehicleRoutingAlgorithm algorithm = MatsimJspritFactory.loadOrCreateVehicleRoutingAlgorithm(scenario, freightCarriersConfigGroup, netBasedCosts, problem);

algorithm.getAlgorithmListeners().addListener(new StopWatch(), VehicleRoutingAlgorithmListeners.Priority.HIGH);
int jspritIterations = getJspritIterations(carrier);
try {
if (jspritIterations > 0) {
algorithm.setMaxIterations(jspritIterations);
} else {
throw new InvalidAttributeValueException(
"Carrier has invalid number of jsprit iterations. They must be positive! Carrier id: "
+ carrier.getId().toString());
}
} catch (Exception e) {
throw new RuntimeException(e);
}

VehicleRoutingProblemSolution solution = Solutions.bestOf(algorithm.searchSolutions());

log.info("tour planning for carrier {} took {} seconds.", carrier.getId(), (System.currentTimeMillis() - start) / 1000);

CarrierPlan newPlan = MatsimJspritFactory.createPlan(carrier, solution);
// yy In principle, the carrier should know the vehicle types that it can deploy.

log.info("routing plan for carrier {}", carrier.getId());
NetworkRouter.routePlan(newPlan, netBasedCosts);
double timeForPlanningAndRouting = (System.currentTimeMillis() - start) / 1000;
log.info("routing for carrier {} finished. Tour planning plus routing took {} seconds. Thread id: {}", carrier.getId(),
timeForPlanningAndRouting, Thread.currentThread().getId());

carrier.setSelectedPlan(newPlan);
setJspritComputationTime(carrier, timeForPlanningAndRouting);
}
}

// we need this class because otherwise there is a runtime error in the PriorityBlockingQueue
// https://jvmaware.com/priority-queue-and-threadpool/
private static class JspritTreadPoolExecutor extends ThreadPoolExecutor{
public JspritTreadPoolExecutor(BlockingQueue<Runnable> workQueue, int nThreads) {
super(nThreads, nThreads, 0, TimeUnit.SECONDS, workQueue);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new CustomFutureTask<>(runnable);
}
}

private static class CustomFutureTask<T> extends FutureTask<T> implements Comparable<CustomFutureTask<T>>{
private final JspritCarrierTask task;

public CustomFutureTask(Runnable task) {
super(task, null);
this.task = (JspritCarrierTask) task;
}

@Override
public int compareTo(CustomFutureTask that) {
return that.task.getPriority() - this.task.getPriority();
}
}
}
Loading

0 comments on commit a36c1fc

Please sign in to comment.