Today we’ll revisit a common design pattern, the Observer, and one of its derivative for multi-threaded processing: the Asynchronous Dispatcher.
I Observer pattern
The classical Observer pattern features 2 actors: the subject, called Observable and one or more Observers
The classical UML diagram for such a pattern is:
Original picture credits: http://en.wikipedia.org/wiki/File:Observer.svg
This pattern is quite simple and powerfull, the only requirement is that the subject (Observable) being observed should:
- manage the registration/removal of its Observers
- be able to detect any state change and notify its Observers
We are here in a push model where state changes are pushed to the Observers. The latter are just passive event listeners.
This pattern is suitable for one subject having many Observers. If there are more than one subjecs that should be monitored, the Observers registration should be done on each of them.
II The Dispatcher
To relieve the limitations of the Observer pattern. We can look at the Dispatcher pattern.
This pattern introduces a third actor, the Dispatcher. It is responsible for:
- managing registration of EventManagers (equivalent to the Observers)
- dispatching the event to all EventManagers
Please notice that we define an enum for the event type (EventTypeEnum). Indeed there is one list of EventManagers for each distinct EventTypeEnum.
The routing based on the EventTypeEnum is done in the dispatch(EventTypeEnum eventType, Subject object) method body.
public void dispatch(EventTypeEnum event, Subject object) { switch (event) { case SAVE: for(EventManager manager: this.saveEventManagers) { manager.executeEvent(event,object); } break; case UPDATE: for(EventManager manager: this.updateEventManagers) { manager.executeEvent(event,object); } break; case DELETE: for(EventManager manager: this.deleteEventManagers) { manager.executeEvent(event,object); } break; default: logger.error("Unknow EventType of type " + event.name()); } }
The executeEvent(EventTypeEnum event, Subject object) method body in EventManager is quite straightforward
public void executeEvent(EventTypeEnum event, Subject object) { ... // Processing here ... }
Alternatively it is possible to delegate the routing of EventTypeEnum processing to each EventManager.
In this case, the Dispatcher only needs a single list of all EventManagers.
Now, in each EventManager executeEvent(EventTypeEnum event, Subject object) method body, the routing is done:
public void executeEvent(EventTypeEnum event, Subject object) { switch (event) { case SAVE: // Processing here break; case UPDATE, DELETE: // Do nothing break; default: logger.error("Unknow EventType of type " + event.name()); } }
The only drawback of this pattern is that if you introduces a new EventType, you will need to change the Dispatcher code (for the routing) or each existing EventManager code (if routing is done at EventManager level).
III Asynchronous Dispatcher
In real projects, some processing can take time. With the Dispatcher pattern, the main thread is blocked until all EventManagers finish their task.
If would be interesting if we could delegate the processing to a separate thread. There comes the Asynchronous Dispatcher pattern.
Here we rely on a ThreadPoolTaskExecutor in each EventManager to delegate the work to a new thread. Please notice that the dispatching task is done in the main thread. The new thread delegation is done at the last step in each EventManager.
One important point to take care when working in a multi-threaded context is concurrency issues. Indeed when we pass the Subject object to a new thread for processing, the main thread resumes its processing flow and the same Subject object may be changed in this main thread.
Either you ensure that the processing flow in the main thread never change the Subject object, or you make a deep copy. It is quite expensive but there is no other way around to guard agains concurrent modifications.
Below is the modified dispatch(EventTypeEnum eventType, Subject object, Object…args) method
public void dispatch(EventTypeEnum event, Subject object, Object...args) { Subject copy = ObjectCloner.deepCopy(object); for(EventManager manager: this.eventManagers) { manager.executeEvent(event,copy); } }
At line 3, the Subject object is copied and passed to the manager instead of the original.
Notice: it is strongly recommended not to use JPA entities as Subject object. Indeed making a deep copy of an JPA entity with its proxy and maybe its un-initialized collections is clearly not a good idea. It is better to pass the technical ID of the object as parameter and reload entirely the object in the new thread.
You have probably noticed that existing methods now have an additional Object … args vararg. These extra arguments are usefull to pass extra parameters for the processing. It also serves to pass technical ID of JPA entities to be reloaded in the new thread, if any.
Now let’s look at the new executeEvent(EventTypeEnum event, Subject object, Object…args) method body.
public class AsyncTaskManager { private ThreadPoolTaskExecutor taskExecutor; private MyService myService; @Override public void executeEvent(EventTypeEnum event, Subject object, Object... args) { switch (event) { case SAVE: this.saveSubject(object,args); break; case UPDATE,DELETE: // Do nothing break; default: logger.error("Unknow EventTypeEnum of type " + event.name()); } } private void executeEvent(Subject object, Object... args) { this.taskExecutor.execute(new Runnable() { @Override public void run() { try { AsyncTaskManager.this.myService.save(object); } catch (Exception ex) { logger.error(ex.getMessage(), ex); } } });
At line 23, we create a new Runnable object and pass it to the thread executor from the thread pool. At line 28, please notice the special reference to the enclosing class (AsyncTaskManager.this) to access the MyService service.
Hi Duyhai, in dispatcher diagram “composition” diamond should be in the “Dispatcher” side I guess.