Implementation of the observer pattern using Java8 (below)

  • 2020-05-05 11:18:00
  • OfStack

In the previous article, I introduced the method of implementing the observer pattern with Java8 (above). This article continues to introduce the knowledge of java8 observer pattern, and the specific content is as follows:

thread-safe implementation

The previous section introduced the observer pattern in the modern Java environment, which is simple but complete, but ignores a key issue: thread safety. Most open Java applications are multi-threaded, and the observer mode is used for multi-threaded or asynchronous systems. For example, if an external service updates its database, the application also receives the message asynchronously and notifies the internal component to update in observer mode, rather than the internal component registering to listen to the external service directly.

Thread safety in the observer mode is primarily focused on the body of the pattern, because thread conflicts are likely to occur when a collection of registered listeners is modified, for example, when one thread tries to add a new listener and another thread tries to add a new animal object, which triggers a notification to all registered listeners. Given the sequence, the first thread may or may not have completed the registration of the new listener before the registered listener is notified of the new animal. This is a classic example of thread resource contention, and it is this phenomenon that tells developers that they need a mechanism to ensure thread safety.

The simplest solution to this problem is that all operations that access or modify the registered listener list must follow Java's synchronization mechanism, such as


public synchronized AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) { /*...*/ } 
public synchronized void unregisterAnimalAddedListener (AnimalAddedListener listener) { /*...*/ } 
public synchronized void notifyAnimalAddedListeners (Animal animal) { /*...*/ } 

As a result, only one thread can modify or access the list of registered listeners at a time, successfully avoiding resource contention issues, but new issues arise that are too restrictive (see the official web page for more information on the synchronized keyword and Java concurrency model). With method synchronization, concurrent access to the listener list can be observed at all times. Registering and revoking listeners is a write operation to the listener list, while notifying the listener access listener list is a read-only operation. Because access by notification is a read operation, multiple notification operations can occur simultaneously.

Therefore, as long as there is no listener registration or deregistration, any number of concurrent notifications can be executed simultaneously without causing a resource scramble for the registered listener list. Of course, resource contention has existed for a long time in other cases, and to solve this problem, ReadWriteLock was designed to manage resource locking separately for read and write operations. The code for the thread-safe ThreadSafeZoo implementation of the Zoo class is as follows:


public class ThreadSafeZoo { 
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Animal> animals = new ArrayList<>();
private List<AnimalAddedListener> listeners = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyAnimalAddedListeners(animal);
}
public AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
return listener;
}
public void unregisterAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
}
public void notifyAnimalAddedListeners (Animal animal) {
// Lock the list of listeners for reading
this.readLock.lock();
try {
// Notify each of the listeners in the list of registered listeners
this.listeners.forEach(listener -> listener.updateAnimalAdded(animal));
}
finally {
// Unlock the reader lock
this.readLock.unlock();
}
}
}

By deploying in this way, Subject's implementation ensures that threads are safe and that multiple threads can publish notifications at the same time. However, despite this, there are still two significant resource competition problems:

Concurrent access to each listener. Multiple threads can simultaneously notify the listener of the new animal, which means that a listener may be called by multiple threads simultaneously.

Concurrent access to animal list. Multiple threads may add objects to animal list at the same time, and if the order of notifications has an impact, it may lead to resource contention, which requires a concurrent operation handling mechanism to avoid this problem. Resource contention occurs when the list of registered listeners is notified to add animal2 and then notified to add animal1. However, if animal1 and animal2 are added by different threads, it is possible to add animal1 before animal2. Specifically, thread 1 adds animal1 and locks the module before notifying the listener, thread 2 adds animal2 and notifies the listener, and thread 1 informs the listener that animal1 has been added. Although resource contention can be ignored without regard to precedence, the problem is real.

's concurrent access to the listener

A concurrent access listener can be implemented by making the listener thread-safe. In the spirit of class responsibility, listeners have an "obligation" to ensure their own thread safety. For example, for the listener counting above, increasing or decreasing the number of animals in multiple threads can lead to a thread-safety problem. To avoid this problem, the number of animals must be calculated as an atomic operation (atomic variable or method synchronization)


public class ThreadSafeCountingAnimalAddedListener implements AnimalAddedListener { 
private static AtomicLong animalsAddedCount = new AtomicLong(0);
@Override
public void updateAnimalAdded (Animal animal) {
// Increment the number of animals
animalsAddedCount.incrementAndGet();
// Print the number of animals
System.out.println("Total animals added: " + animalsAddedCount);
}
}

Method synchronization solution code is as follows:


public class CountingAnimalAddedListener implements AnimalAddedListener { 
private static int animalsAddedCount = 0;
@Override
public synchronized void updateAnimalAdded (Animal animal) {
// Increment the number of animals
animalsAddedCount++;
// Print the number of animals
System.out.println("Total animals added: " + animalsAddedCount);
}
}

To emphasize that listeners should be thread-safe, subject needs to understand the internal logic of listeners, rather than simply ensuring thread-safe access to and modifications to listeners. Otherwise, if multiple subjects share the same listener, each subject class will have to rewrite the thread-safe code, which is obviously not clean enough, so you need to implement thread-safe in the listener class.

Ordered notification of listener

When listeners are required to execute in order, read-write locks are not sufficient, and a new mechanism is introduced to ensure that the notify functions are called in the same order that animal was added to zoo. Method synchronization has been tried, but according to method synchronization in the Oracle documentation, method synchronization does not provide sequential management of the execution of operations. It only guarantees atomic operations, that is, operations will not be interrupted, and it does not guarantee a first come, first execute (FIFO) thread order. ReentrantReadWriteLock can achieve this order of execution, the code is as follows:


public class OrderedThreadSafeZoo { 
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Animal> animals = new ArrayList<>();
private List<AnimalAddedListener> listeners = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyAnimalAddedListeners(animal);
}
public AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
return listener;
}
public void unregisterAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
}
public void notifyAnimalAddedListeners (Animal animal) {
// Lock the list of listeners for reading
this.readLock.lock();
try {
// Notify each of the listeners in the list of registered listeners
this.listeners.forEach(listener -> listener.updateAnimalAdded(animal));
}
finally {
// Unlock the reader lock
this.readLock.unlock();
}
}
}

In this way, the register, unregister, and notify functions will get read and write lock permissions in first-in, first-out (FIFO) order. Thread one register a listener, for example, thread 2 registered in start operation after trying to notify the registered listener, thread 3 in a thread 2 waiting for read-only lock is also trying to notify the registered listener, adopts fair - ordering method, thread 1 complete registration operation, the first thread 2 can then notify the listener, the last thread 3 notification listener. This ensures that action is executed in the same order as it was started.

If method synchronization is used, even though thread 2 is queued to occupy resources first, thread 3 May still acquire the resource lock before thread 2, and there is no guarantee that thread 2 will notify the listener before thread 3. The crux of the matter is that the fair-ordering approach ensures that threads execute in the order in which they request resources. The sequential mechanism for read and write locks is complex, and the official documentation of ReentrantReadWriteLock should be consulted to ensure that the lock logic is sufficient to solve the problem.

So far, thread-safety has been implemented, and the following sections describe the advantages and disadvantages of ways to extract the logic of a topic and encapsulate its mixin class as a repeatable code unit.

theme logic is encapsulated in Mixin class

Encapsulating the above observer pattern design implementation into the mixin class of the target is attractive. In general, the observer in the observer mode contains a collection of registered listeners; Responsible for registering the register function for the new listener; The unregister function for deregistration and the notify function for notification listeners. For the zoo example above, all operations of the zoo class are to implement the logic of the theme except that the list of animals is required for the problem.

The example of the Mixin class is shown below. It is important to note that to make the code cleaner, the thread-safe code is removed here:


public abstract class ObservableSubjectMixin<ListenerType> { 
private List<ListenerType> listeners = new ArrayList<>();
public ListenerType registerListener (ListenerType listener) {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
return listener;
}
public void unregisterAnimalAddedListener (ListenerType listener) {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
public void notifyListeners (Consumer<? super ListenerType> algorithm) {
// Execute some function on each of the listeners
this.listeners.forEach(algorithm);
}
}

Because did not provide the listener is registered types of interface information, cannot inform a specific listener directly, so it is need to ensure notifications versatility, allows the client to add some functions, such as accept generic type parameters matching, to apply to each listener, specific implementation code is as follows:


public class ZooUsingMixin extends ObservableSubjectMixin<AnimalAddedListener> { 
private List<Animal> animals = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyListeners((listener) -> listener.updateAnimalAdded(animal));
}
}

The biggest advantage of the Mixin class technique is that it encapsulates the Subject of the observer pattern into a repeatable class, rather than having to repeat the logic in every subject class. In addition, this approach makes the implementation of the zoo class much more concise, requiring only the storage of animal information rather than the storage and notification of listeners.

However, using the mixin class is not all good. For example, what if you want to store multiple types of listeners? For example, you also need to store the listener type AnimalRemovedListener. The mixin class is an abstract class, Java cannot inherit multiple abstract classes at the same time, and the mixin class cannot switch to an interface implementation, because the interface does not contain state, and state in observer mode needs to hold a list of registered listeners.

One solution is to create a listener type ZooListener that notifies when an animal increases or decreases, as shown below :


public interface ZooListener { 
public void onAnimalAdded (Animal animal);
public void onAnimalRemoved (Animal animal);
}

This allows you to use the interface to listen for changes in zoo state with a single listener type:


public class ZooUsingMixin extends ObservableSubjectMixin<ZooListener> { 
private List<Animal> animals = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyListeners((listener) -> listener.onAnimalAdded(animal));
}
public void removeAnimal (Animal animal) {
// Remove the animal from the list of animals
this.animals.remove(animal);
// Notify the list of registered listeners
this.notifyListeners((listener) -> listener.onAnimalRemoved(animal));
}
}

Merging multiple listener types into a single listener interface does solve the problems mentioned above, but there are still shortcomings, which are discussed in more detail in the following sections.

Multi-Method listeners and adapters

In the above method, too many functions are implemented in the listener's interface and the interface becomes verbose; for example, Swing MouseListener contains five necessary functions. Although only one of these functions may be used, the five functions must be added whenever the mouse click event is used, and the rest may be implemented with empty function bodies, which undoubtedly causes unnecessary confusion in the code.

One solution is to create an adapter (a concept derived from the adapter pattern proposed by GoF) that implements the operation of the listener interface as an abstract function for the concrete listener class to inherit. In this way, the specific listener class can select the functions it needs and take the default action for the functions adapter does not need. For example, in the ZooListener class in the above example, create ZooAdapter(Adapter has the same naming rules as the listener, just change Listener in the class name to Adapter), the code is


public class ZooAdapter implements ZooListener { 
@Override
public void onAnimalAdded (Animal animal) {}
@Override
public void onAnimalRemoved (Animal animal) {}
}

At first glance, this adapter class is trivial, but the convenience it brings is not to be underestimated. For example, for the following concrete class, just select the function that is already used:


public class ThreadSafeZoo { 
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Animal> animals = new ArrayList<>();
private List<AnimalAddedListener> listeners = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyAnimalAddedListeners(animal);
}
public AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
return listener;
}
public void unregisterAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
}
public void notifyAnimalAddedListeners (Animal animal) {
// Lock the list of listeners for reading
this.readLock.lock();
try {
// Notify each of the listeners in the list of registered listeners
this.listeners.forEach(listener -> listener.updateAnimalAdded(animal));
}
finally {
// Unlock the reader lock
this.readLock.unlock();
}
}
}
0

There are two alternatives to implementing the adapter class as well: one is to use default functions; The second is to merge the listener interface and adapter classes into a concrete class. The default function is a new Java8 that allows developers to provide a default (defensive) implementation in the interface.

This update to the Java library is intended to make it easier for developers to implement extensions without changing older versions of the code, so this approach should be used with caution. Some developers will feel that the code written in this way is not professional after using it many times, while some developers think it is a feature of Java8. However, it is necessary to understand the original intention of this technology and decide whether to use it or not. The ZooListener interface code implemented using the default function is as follows:


public class ThreadSafeZoo { 
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Animal> animals = new ArrayList<>();
private List<AnimalAddedListener> listeners = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyAnimalAddedListeners(animal);
}
public AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
return listener;
}
public void unregisterAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
}
public void notifyAnimalAddedListeners (Animal animal) {
// Lock the list of listeners for reading
this.readLock.lock();
try {
// Notify each of the listeners in the list of registered listeners
this.listeners.forEach(listener -> listener.updateAnimalAdded(animal));
}
finally {
// Unlock the reader lock
this.readLock.unlock();
}
}
}
1

By using default functions, the concrete classes of the interface are implemented, not all functions are implemented in the interface, but the required functions are selectively implemented. Although this is a simple solution to the interface bloat problem, developers should be careful when using it.

The second approach is to simplify the observer pattern, omit the listener interface, and implement the listener function with concrete classes. For example, the ZooListener interface would look like this:


public class ThreadSafeZoo { 
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected final Lock readLock = readWriteLock.readLock();
protected final Lock writeLock = readWriteLock.writeLock();
private List<Animal> animals = new ArrayList<>();
private List<AnimalAddedListener> listeners = new ArrayList<>();
public void addAnimal (Animal animal) {
// Add the animal to the list of animals
this.animals.add(animal);
// Notify the list of registered listeners
this.notifyAnimalAddedListeners(animal);
}
public AnimalAddedListener registerAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Add the listener to the list of registered listeners
this.listeners.add(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
return listener;
}
public void unregisterAnimalAddedListener (AnimalAddedListener listener) {
// Lock the list of listeners for writing
this.writeLock.lock();
try {
// Remove the listener from the list of the registered listeners
this.listeners.remove(listener);
}
finally {
// Unlock the writer lock
this.writeLock.unlock();
}
}
public void notifyAnimalAddedListeners (Animal animal) {
// Lock the list of listeners for reading
this.readLock.lock();
try {
// Notify each of the listeners in the list of registered listeners
this.listeners.forEach(listener -> listener.updateAnimalAdded(animal));
}
finally {
// Unlock the reader lock
this.readLock.unlock();
}
}
}
2

This approach simplifies the hierarchy of the observer pattern, but it is not applicable to all cases, because if the listener interface is merged into a concrete class, the concrete listener cannot implement multiple listening interfaces. For example, if the AnimalAddedListener and AnimalRemovedListener interfaces are written in the same concrete class, then a single concrete listener cannot implement both. In addition, the intent of the listener interface is more obvious than that of the concrete class, which is obviously to provide interfaces to other classes, but not so obvious.

Without proper documentation, the developer does not know that there is already a class that ACTS as an interface and implements all of its functions. Also, the class name does not contain adapter, because the class does not fit into an interface, so the class name does not specifically imply this. To sum up, there is no one-size-fits all approach to choosing a particular method for a particular problem.

Before we move on to the next chapter, it is important to note that adapters are common in observation mode, especially in older versions of Java code. Swing API is implemented on an adapter basis, as many older applications use the observer pattern in Java5 and Java6. The listener in the zoo case may not need the adapter, but you need to understand the purpose for which the adapter is proposed and its application, because you can use it in existing code. The following sections describe time-complex listeners that may perform time-consuming operations or make asynchronous calls and do not immediately give a return value.

Complex & Blocking listener

One assumption about the observer pattern is that when a function is executed, a series of listeners are called, but this process is assumed to be completely transparent to the caller. For example, when the client code adds animal to Zoo, it does not know that it will call a series of listeners until it returns that the addition was successful. If the listener takes a long time to execute (its time is affected by the number of listeners and the execution time per listener), then the client code will perceive the side effect of this simple increase in the time of the animal operation.

This article won't cover this topic in detail, but here are a few things developers should be aware of when invoking complex listeners:

The listener starts a new thread. After the new thread starts, returns the result of the listener function's processing while the listener logic is executed in the new thread, and runs the other listener execution.

Subject starts a new thread. Instead of iterating over a traditional linear list of registered listeners, Subject's notify function restarts a new thread and iterates over the listener list in the new thread. This allows the notify function to output its return value while performing other listener operations. It is important to note that a thread-safety mechanism is needed to ensure that the listener list does not make concurrent changes.

The queued listener is invoked and performs the listening function with a set of threads. Instead of simply iterating over a list of listeners, encapsulate the listener operations in functions and queue them. Once these listeners are stored in the queue, the thread can pop a single element from the queue and execute its listening logic. Similar to the producer-consumer problem, the notify procedure generates a queue of executable functions, which are then taken out of the queue and executed by a thread in turn, and the function needs to store the time it was created rather than the time it was executed for the listener function to call. For example, a function created when the listener is called needs to store the time point, which is similar to Java:


public class AnimalAddedFunctor { 
private final AnimalAddedListener listener;
private final Animal parameter;
public AnimalAddedFunctor (AnimalAddedListener listener, Animal parameter) {
this.listener = listener;
this.parameter = parameter;
}
public void execute () {
// Execute the listener with the parameter provided during creation
this.listener.updateAnimalAdded(this.parameter);
}
}

The function is created and saved in a queue and can be called at any time, eliminating the need to immediately execute its corresponding action while traversing the list of listeners. Once each function that activates the listener is pushed into the queue, the "consumer thread" returns the action to the client code. At some point later the "consumer thread" will execute these functions, just as if the listener were activated by the notify function. This technique, known in other languages as parameter binding, fits the example above. The essence of the technique is to save the parameters of the listener, and the execute() function is called directly. If the listener receives multiple arguments, the processing is similar.

It is important to note that if you want to save the order of execution of listeners, you need to introduce a comprehensive sort mechanism. In scenario 1, the listener activates the new thread in the normal order, which ensures that the listener executes in the order it was registered. In scenario 2, queues support sorting, where functions are executed in the order in which they are queued. To put it simply, developers need to pay attention to the complexity of multi-threaded execution of listeners and be careful to ensure that the required functionality is implemented.

conclusion

The observer pattern was the dominant software design pattern before it was written in 1994, providing many satisfactory solutions to the problems that often arise in software design. Java has been a leader in using this pattern, encapsulating it in its standard library, but now that Java has been updated to version 8, it's worth re-examining the use of classic patterns. With the emergence of lambda expressions and other new structures, this "old" pattern has found new life. The observer mode is the primary tool for experienced Java developers, both working with old programs and using this time-honoured method to solve new problems.

OneAPM provides you with end-to-end Java application performance solutions. We support all common Java frameworks and application servers to help you quickly find system bottlenecks and locate the root cause of exceptions. Minute-level deployment, instant experience, Java monitoring has never been easier. For more technical articles, please visit the OneAPM technology blog.

The above content introduces the method of using Java8 to implement the observer mode (below). I hope it will be helpful to you!


Related articles: