Skip to content

Commit

Permalink
don't use safe caller for event delegation to subscribers (#861)
Browse files Browse the repository at this point in the history
Signed-off-by: Markus Rathgeb <[email protected]>
  • Loading branch information
maggu2810 authored and kaikreuzer committed Jun 19, 2019
1 parent fefbef5 commit 8e195d4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.common.SafeCaller;
import org.eclipse.smarthome.core.common.NamedThreadFactory;
import org.eclipse.smarthome.core.events.Event;
import org.eclipse.smarthome.core.events.EventFactory;
import org.eclipse.smarthome.core.events.EventFilter;
Expand All @@ -33,26 +38,36 @@
* @author Markus Rathgeb - Initial contribution
*/
@NonNullByDefault
public class EventHandler {
public class EventHandler implements AutoCloseable {

private static final long EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS = TimeUnit.SECONDS.toMillis(5);

private final Logger logger = LoggerFactory.getLogger(EventHandler.class);

private final Map<String, Set<EventSubscriber>> typedEventSubscribers;
private final Map<String, EventFactory> typedEventFactories;
private final SafeCaller safeCaller;

private final ScheduledExecutorService watcher = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("EventHandlerWatcher"));
private final ExecutorService executor = Executors
.newSingleThreadExecutor(new NamedThreadFactory("EventHandlerExecutor"));

/**
* Create a new event handler.
*
* @param typedEventSubscribers the event subscribers indexed by the event type
* @param typedEventFactories the event factories indexed by the event type
* @param safeCaller the safe caller to use
*/
public EventHandler(final Map<String, Set<EventSubscriber>> typedEventSubscribers,
final Map<String, EventFactory> typedEventFactories, final SafeCaller safeCaller) {
final Map<String, EventFactory> typedEventFactories) {
this.typedEventSubscribers = typedEventSubscribers;
this.typedEventFactories = typedEventFactories;
this.safeCaller = safeCaller;
}

@Override
public void close() {
watcher.shutdownNow();
executor.shutdownNow();
}

public void handleEvent(org.osgi.service.event.Event osgiEvent) {
Expand Down Expand Up @@ -131,17 +146,22 @@ private synchronized void dispatchESHEvent(final Set<EventSubscriber> eventSubsc
EventFilter filter = eventSubscriber.getEventFilter();
if (filter == null || filter.apply(event)) {
logger.trace("Delegate event to subscriber ({}).", eventSubscriber.getClass());
safeCaller.create(eventSubscriber, EventSubscriber.class).withAsync().onTimeout(() -> {
logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.",
eventSubscriber.toString(), SafeCaller.DEFAULT_TIMEOUT);
}).onException(e -> {
logger.error("Dispatching/filtering event for subscriber '{}' failed: {}",
EventSubscriber.class.getName(), e.getMessage(), e);
}).build().receive(event);
executor.submit(() -> {
ScheduledFuture<?> logTimeout = watcher.schedule(
() -> logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.",
eventSubscriber, EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS),
EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS, TimeUnit.MILLISECONDS);
try {
eventSubscriber.receive(event);
} catch (final Exception ex) {
logger.error("Dispatching/filtering event for subscriber '{}' failed: {}",
EventSubscriber.class.getName(), ex.getMessage(), ex);
}
logTimeout.cancel(false);
});
} else {
logger.trace("Skip event subscriber ({}) because of its filter.", eventSubscriber.getClass());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import org.eclipse.smarthome.core.common.SafeCaller;
import org.eclipse.smarthome.core.events.Event;
import org.eclipse.smarthome.core.events.EventFactory;
import org.eclipse.smarthome.core.events.EventSubscriber;
Expand Down Expand Up @@ -51,11 +50,9 @@ public class OSGiEventManager implements EventHandler {

private ThreadedEventHandler eventHandler;

private SafeCaller safeCaller;

@Activate
protected void activate(ComponentContext componentContext) {
eventHandler = new ThreadedEventHandler(typedEventSubscribers, typedEventFactories, safeCaller);
eventHandler = new ThreadedEventHandler(typedEventSubscribers, typedEventFactories);
eventHandler.open();
}

Expand Down Expand Up @@ -116,15 +113,6 @@ protected void removeEventFactory(EventFactory eventFactory) {
}
}

@Reference
protected void setSafeCaller(SafeCaller safeCaller) {
this.safeCaller = safeCaller;
}

protected void unsetSafeCaller(SafeCaller safeCaller) {
this.safeCaller = null;
}

@Override
public void handleEvent(org.osgi.service.event.Event osgiEvent) {
eventHandler.handleEvent(osgiEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.smarthome.core.common.SafeCaller;
import org.eclipse.smarthome.core.events.EventFactory;
import org.eclipse.smarthome.core.events.EventSubscriber;
import org.osgi.service.event.Event;
Expand Down Expand Up @@ -50,31 +49,32 @@ public class ThreadedEventHandler implements Closeable {
*
* @param typedEventSubscribers the event subscribers
* @param typedEventFactories the event factories indexed by the event type
* @param safeCaller the safe caller to use
*/
ThreadedEventHandler(Map<String, Set<EventSubscriber>> typedEventSubscribers,
final Map<String, EventFactory> typedEventFactories, final SafeCaller safeCaller) {
final Map<String, EventFactory> typedEventFactories) {
thread = new Thread(() -> {
final EventHandler worker = new EventHandler(typedEventSubscribers, typedEventFactories, safeCaller);
while (running.get()) {
try {
logger.trace("wait for event");
final Event event = queue.poll(1, TimeUnit.HOURS);
logger.trace("inspect event: {}", event);
if (event == null) {
logger.debug("Hey, you have really very few events.");
} else if (event == notifyEvent) {
// received an internal notification
} else {
worker.handleEvent(event);
try (EventHandler worker = new EventHandler(typedEventSubscribers, typedEventFactories)) {
while (running.get()) {
try {
logger.trace("wait for event");
final Event event = queue.poll(1, TimeUnit.HOURS);
logger.trace("inspect event: {}", event);
if (event == null) {
logger.debug("Hey, you have really very few events.");
} else if (event == notifyEvent) {
// received an internal notification
} else {
worker.handleEvent(event);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (RuntimeException ex) {
logger.error("Error on event handling.", ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (RuntimeException ex) {
logger.error("Error on event handling.", ex);
}
}
}, "ESH-OSGiEventManager");
}, "OH-OSGiEventManager");

}

void open() {
Expand Down

0 comments on commit 8e195d4

Please sign in to comment.