Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't use safe caller for event delegation to subscribers #861

Merged
merged 1 commit into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.common.SafeCaller;
import org.eclipse.smarthome.core.common.NamedThreadFactory;
import org.eclipse.smarthome.core.events.Event;
import org.eclipse.smarthome.core.events.EventFactory;
import org.eclipse.smarthome.core.events.EventFilter;
Expand All @@ -33,26 +38,36 @@
* @author Markus Rathgeb - Initial contribution
*/
@NonNullByDefault
public class EventHandler {
public class EventHandler implements AutoCloseable {

private static final long EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS = TimeUnit.SECONDS.toMillis(5);

private final Logger logger = LoggerFactory.getLogger(EventHandler.class);

private final Map<String, Set<EventSubscriber>> typedEventSubscribers;
private final Map<String, EventFactory> typedEventFactories;
private final SafeCaller safeCaller;

private final ScheduledExecutorService watcher = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("EventHandlerWatcher"));
private final ExecutorService executor = Executors
.newSingleThreadExecutor(new NamedThreadFactory("EventHandlerExecutor"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afair, the safeCaller does not use a single thread, but a pool of x threads. Are we safe here to reduce it to a single thread or might we run into some other issues?

I also remember a feature that we assigned events to bindings and made sure that if one binding exhausts all threads of a pool, that this does not negatively impact the others. I cannot find this in the code right now, so I assume this was not part of the internal event handling itself.

Copy link
Contributor Author

@maggu2810 maggu2810 Jun 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Events should be handled in order.
Using multiple threads for the event handling can result into a different event order.
Assume changed events "1->2", "2->3".
If you are using different threads how do you want to ensure that the submitted execution does not change the order because the one thread comes in front of the other?
E.g. you submit the event handling for "1->2" to the executor, after that you submit the event handling for "2->3" to the executor.
The executor uses thread1 for "1->2" and thread2 for "2->3".
Now, sometimes thread1 is executed in front of thread2, other times thread2 is executed in front of thread1 (or they are executed at the same time using different CPUs, or the execution is switched between etc.).
The event order will be non deterministic anymore.

So, regardless how it has been done, it should be done in order.
IIRC this has been also discussed some years ago.

As written above already, we could interrupt the event subscriber if the limit has been exceeded.
But if you realize that an addon consumes to much resources, perhaps it would be better to fix that addon (and realize the bad behaviour).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to put one more data point out there...

On my test system (with a ~9 year old AMD CPU with each of 6 cores running at 3.7 GHz) I max out one core at ~90 events per second. I produce this load with 5 systeminfo things set to update all channels (except disk because I don't know the performance penalty of that function) at a refresh rate of interval_high=1, interval_medium=4.

The top cpu consuming threads are:

 735734    OH-EventHandlerExecutor-1
 20119     Timer-7
 19935     ESH-usb-serial-discovery-linux-sysfs-1
 18066     pool-4-thread-1
 17530     OH-OSGiEventManager
 11320     EventAdminAsyncThread #11
 11261     EventAdminAsyncThread #12
 9450      safeCall-40
 9211      OH-EventHandlerWatcher-1
 9129      safeCall-38
 8779      pool-11-thread-1
 8565      Start Level: Equinox Container: 7dcb28cd-c6e7-4135-a1f8-6d3d46449a09
 8531      safeCall-39
 6941      sshd-SshServer[5e8ba151]-nio2-thread-1
 6514      sshd-SshServer[5e8ba151]-nio2-thread-2
 6321      safeCall-37
 6212      pool-9-thread-1
 6205      ZWaveReceiveInputThread
 4087      safeCall-41
 2974      Finalizer
 2922      Reference Handler
 2681      items-67
 2461      pool-706-thread-1
 2097      fileinstall-/opt/openhab2/userdata/etc
 2078      Karaf Lock Monitor Thread
 1925      items-68
 1643      openHAB-job-scheduler_Worker-1
 1580      openHAB-job-scheduler_Worker-2
 1555      ResourcePublisher
 1516      Active Thread: Equinox Container: 7dcb28cd-c6e7-4135-a1f8-6d3d46449a09

Overall, openHAB is consuming a little less than 2 CPUs worth of load.

top - 08:03:15 up 10 days, 14:04,  2 users,  load average: 2.03, 1.43, 1.13
Tasks: 330 total,   2 running, 263 sleeping,   0 stopped,   0 zombie
%Cpu(s): 32.6 us,  1.8 sy,  0.0 ni, 65.3 id,  0.1 wa,  0.0 hi,  0.2 si,  0.0 st
KiB Mem :  8167104 total,  2072852 free,  2031328 used,  4062924 buff/cache
KiB Swap: 16876540 total, 16876540 free,        0 used.  5811896 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                
31984 root      20   0 7042648 747712  23360 S 181.7  9.2  65:11.77 java                                                                                                                                                   
10228 mark      20   0  819608 100000  53848 S   9.3  1.2  23:08.14 Xorg                                                                                                                                                   
15095 xrdp      20   0   59036  28976   8084 S   7.0  0.4  20:59.19 xrdp                                                                                                                                                   
10900 mark      20   0  542100  41572  25628 S   4.3  0.5   8:15.48 xfce4-terminal                                                                                                                                         
10323 mark      20   0  429236  26368  20684 S   0.7  0.3   0:41.54 xfwm4                                                                                                                                                  
 1932 root      20   0 1668112  66936  38988 S   0.3  0.8  15:09.72 dockerd                                                                                                                                                
 3341 gdm       20   0  589708  24896  18020 R   0.3  0.3   5:55.77 gsd-color                                                                                                                                              
27420 mark      20   0   13824    876    776 S   0.3  0.0   0:01.59 tail                                                                                                                                                   
31641 mark      20   0   42052   3800   2912 R   0.3  0.0   0:00.07 top                                                                                                                                                    
    1 root      20   0  226512  10252   6708 S   0.0  0.1   0:42.78 systemd                                                                                                                                                

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maggu2810 We never had the requirement to respect a specific order and afair, older OSGi specs didn't ask for an ordered delivery either. I remember discussions where we found out that Equinox used single threaded delivery while Felix used a thread pool instead.

It seems that OSGi 7 has now introduced a specific setting for defining whether ordering is required or not.

But if you realize that an addon consumes to much resources, perhaps it would be better to fix that addon (and realize the bad behaviour).

Sure, but the idea was that its effect is constrained to its own functionality and does not impact others to misbehave at the same time. I remember situations where a binding took too long when consuming events and this resulted in late delivery of events to other bindings, which then showed very odd behavior and it was difficult to actually figure out, which binding was the culprit. Much better if only the one behaves badly, which actually has the bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this PR is a dramatic improvement over the current implementation. If there are no issues other than the ability to scale beyond the rates I've outlined above, can't this be merged, as it resolves a very critical issue in the current snapshots.

If there's a desire to change the implementation to scale better, can't that be done in another PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at the code and the changes in this PR don't impact how commands are sent to ThingHandlers using the SafeCaller:

safeCaller.create(handler, ThingHandler.class)
.withTimeout(CommunicationManager.THINGHANDLER_EVENT_TIMEOUT).onTimeout(() -> {
logger.warn("Handler for thing '{}' takes more than {}ms for handling a command",
handler.getThing().getUID(), CommunicationManager.THINGHANDLER_EVENT_TIMEOUT);
}).build().handleCommand(link.getLinkedUID(), command);

So commands will still be handled asynchronously which was my main concern. Commands will thus continue to be handled even if a couple of ThingHandlers hijack threads e.g. due to network timeouts or when waiting on locks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just about to ask @mhilbush to add some command handling to the load generator (with e.g. 300ms processing time of the handler).
But @wborn's observation that this had actually moved to the ProfileCallback should hopefully answer it - this should not be related to the event dispatching itself anymore. I consider my second concern to be clarified then.

Thank you all for the hard work and your patience with me!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you all for the hard work and your patience with me!

No worries. It's a change to a critical area, so I get the need for diligence.

I was just about to ask @mhilbush to add some command handling to the load generator (with e.g. 300ms processing time of the handler).

I wrote the load generator in a hurry, so there's room for improvement. As I plan to use it in the future, and possibly extend it, what were you looking for here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what were you looking for here?

A feature that makes the load generator not only create status updates, but also commands and then have the handleCommand method in the ThingHandler to block for e.g. 300ms to simulate some heavy work (like accessing external devices). This would then better simulate a setup where users can directly observe whether there is a delay in event processing or not - if switching on a device suddenly takes a few seconds, because the command hang in a single-threaded queue, it is instantly visible and annoying to the user. But as mentioned above: This PR should not have an issue on this behavior, so such an addition would only be interesting for future tests in other contexts.


/**
* Create a new event handler.
*
* @param typedEventSubscribers the event subscribers indexed by the event type
* @param typedEventFactories the event factories indexed by the event type
* @param safeCaller the safe caller to use
*/
public EventHandler(final Map<String, Set<EventSubscriber>> typedEventSubscribers,
final Map<String, EventFactory> typedEventFactories, final SafeCaller safeCaller) {
final Map<String, EventFactory> typedEventFactories) {
this.typedEventSubscribers = typedEventSubscribers;
this.typedEventFactories = typedEventFactories;
this.safeCaller = safeCaller;
}

@Override
public void close() {
watcher.shutdownNow();
executor.shutdownNow();
}

public void handleEvent(org.osgi.service.event.Event osgiEvent) {
Expand Down Expand Up @@ -131,17 +146,22 @@ private synchronized void dispatchESHEvent(final Set<EventSubscriber> eventSubsc
EventFilter filter = eventSubscriber.getEventFilter();
if (filter == null || filter.apply(event)) {
logger.trace("Delegate event to subscriber ({}).", eventSubscriber.getClass());
safeCaller.create(eventSubscriber, EventSubscriber.class).withAsync().onTimeout(() -> {
logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.",
eventSubscriber.toString(), SafeCaller.DEFAULT_TIMEOUT);
}).onException(e -> {
logger.error("Dispatching/filtering event for subscriber '{}' failed: {}",
EventSubscriber.class.getName(), e.getMessage(), e);
}).build().receive(event);
executor.submit(() -> {
ScheduledFuture<?> logTimeout = watcher.schedule(
() -> logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.",
eventSubscriber, EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS),
EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS, TimeUnit.MILLISECONDS);
try {
eventSubscriber.receive(event);
} catch (final Exception ex) {
logger.error("Dispatching/filtering event for subscriber '{}' failed: {}",
EventSubscriber.class.getName(), ex.getMessage(), ex);
}
logTimeout.cancel(false);
});
} else {
logger.trace("Skip event subscriber ({}) because of its filter.", eventSubscriber.getClass());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import org.eclipse.smarthome.core.common.SafeCaller;
import org.eclipse.smarthome.core.events.Event;
import org.eclipse.smarthome.core.events.EventFactory;
import org.eclipse.smarthome.core.events.EventSubscriber;
Expand Down Expand Up @@ -51,11 +50,9 @@ public class OSGiEventManager implements EventHandler {

private ThreadedEventHandler eventHandler;

private SafeCaller safeCaller;

@Activate
protected void activate(ComponentContext componentContext) {
eventHandler = new ThreadedEventHandler(typedEventSubscribers, typedEventFactories, safeCaller);
eventHandler = new ThreadedEventHandler(typedEventSubscribers, typedEventFactories);
eventHandler.open();
}

Expand Down Expand Up @@ -116,15 +113,6 @@ protected void removeEventFactory(EventFactory eventFactory) {
}
}

@Reference
protected void setSafeCaller(SafeCaller safeCaller) {
this.safeCaller = safeCaller;
}

protected void unsetSafeCaller(SafeCaller safeCaller) {
this.safeCaller = null;
}

@Override
public void handleEvent(org.osgi.service.event.Event osgiEvent) {
eventHandler.handleEvent(osgiEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.smarthome.core.common.SafeCaller;
import org.eclipse.smarthome.core.events.EventFactory;
import org.eclipse.smarthome.core.events.EventSubscriber;
import org.osgi.service.event.Event;
Expand Down Expand Up @@ -50,31 +49,32 @@ public class ThreadedEventHandler implements Closeable {
*
* @param typedEventSubscribers the event subscribers
* @param typedEventFactories the event factories indexed by the event type
* @param safeCaller the safe caller to use
*/
ThreadedEventHandler(Map<String, Set<EventSubscriber>> typedEventSubscribers,
final Map<String, EventFactory> typedEventFactories, final SafeCaller safeCaller) {
final Map<String, EventFactory> typedEventFactories) {
thread = new Thread(() -> {
final EventHandler worker = new EventHandler(typedEventSubscribers, typedEventFactories, safeCaller);
while (running.get()) {
try {
logger.trace("wait for event");
final Event event = queue.poll(1, TimeUnit.HOURS);
logger.trace("inspect event: {}", event);
if (event == null) {
logger.debug("Hey, you have really very few events.");
} else if (event == notifyEvent) {
// received an internal notification
} else {
worker.handleEvent(event);
try (EventHandler worker = new EventHandler(typedEventSubscribers, typedEventFactories)) {
while (running.get()) {
try {
logger.trace("wait for event");
final Event event = queue.poll(1, TimeUnit.HOURS);
logger.trace("inspect event: {}", event);
if (event == null) {
logger.debug("Hey, you have really very few events.");
} else if (event == notifyEvent) {
// received an internal notification
} else {
worker.handleEvent(event);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (RuntimeException ex) {
logger.error("Error on event handling.", ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (RuntimeException ex) {
logger.error("Error on event handling.", ex);
}
}
}, "ESH-OSGiEventManager");
}, "OH-OSGiEventManager");

}

void open() {
Expand Down