Java Multithreaded Two Phase Termination Mode Two Phase Termination Patter

  • 2021-12-05 06:00:13
  • OfStack

Directory 1, two-phase termination mode introduction 2, Terminator code demonstration 3, TerminationRequester4, simulation client or server may terminate the service example 5. mac telnet analog client input

1. Introduction of two-stage termination mode

Sometimes, we want to end the thread early, but it is not easy to stop the thread safely and reliably. If the thread is stopped immediately, the shared data structure will be in a bad state, such as the stop method of Thread class which has been discarded at present (it will cause the thread to throw java.lang.ThreadDeath After that, the thread is terminated, even when executing synchronized Method). It is better to finish the termination process and then terminate the thread, that is Two-phase Termination Two-phase termination mode.

This pattern has two roles:

Terminator The terminator is responsible for receiving the termination request, executing the termination processing, and then terminating himself after the processing is completed. TerminationRequester Termination request originator, which is used to send Terminator Issue a termination request.

2. Terminator code demonstration

The sample code of this pattern is as follows:


public class CounterIncrement extends Thread {

    private volatile boolean terminated = false;

    private int counter = 0;

    private Random random = new Random(System.currentTimeMillis());
    @Override
    public void run() {

        try {
            while (!terminated) {
                System.out.println(Thread.currentThread().getName()+" "+counter++);
                Thread.sleep(random.nextInt(1000));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            this.clean();
        }
    }

    private void clean() {
        System.out.println("do some clean work for the second phase,current counter "+counter);

    }

    public void close() {
        this.terminated = true;
        this.interrupt();
    }
}

3. TerminationRequester


public class CounterTest {
    public static void main(String[] args) throws InterruptedException {
        CounterIncrement counterIncrement = new CounterIncrement();
        counterIncrement.start();

        Thread.sleep(15_000L);
        // Active cleaning 
        counterIncrement.close();
    }
}

This code shows that you must pay attention to the following things to implement the two-stage termination mode:

Use the thread stop flag and interrupt Method, both of which are missing 1


  public void close() {
        this.terminated = true;
        this.interrupt();
    }

Here, we use terminated As a thread stop flag, the variable takes the volatile Decoration, avoiding the overhead of using explicit locks and ensuring memory visibility. Thread run Method checks that terminated Property, if the property is true The thread is stopped, but the thread may have called the blocking method in the wait Status, the task may never be checked terminated Signs; Threads may also be in the sleep() State, etc sleep When the termination state is executed after time, the responsiveness of the program decreases. You can change the method to run as follows, and the thread stop is obviously much slower:


  public void close() {
        terminated = true;
  }

4. Simulate the example that either the client or the server may terminate the service


public class AppServer extends Thread {

    private static final int DEFAULT_PORT = 12722;
    private final static ExecutorService executor = Executors.newFixedThreadPool(10);
    private int port;
    private volatile boolean start = true;
    private List<ClientHandler> clientHandlers = new ArrayList<>();
    private ServerSocket server;

    public AppServer() {
        this(DEFAULT_PORT);
    }

    public AppServer(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        try {
            server = new ServerSocket(port);
            while (start) {
                Socket client = server.accept();
                ClientHandler clientHandler = new ClientHandler(client);
                executor.submit(clientHandler);
                this.clientHandlers.add(clientHandler);
            }

        } catch (IOException e) {
            //throw new RuntimeException();
        } finally {
            this.dispose();
        }
    }

    public void dispose() {
        System.out.println("dispose");
        this.clientHandlers.stream().forEach(ClientHandler::stop);
        this.executor.shutdown();
    }

    public void shutdown() throws IOException {
        this.start = false;
        this.interrupt();
        this.server.close();
    }
}


public class ClientHandler implements Runnable {

    private final Socket socket;

    private volatile boolean running = true;

    public ClientHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {


        try (InputStream inputStream = socket.getInputStream();
             OutputStream outputStream = socket.getOutputStream();
             BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
             PrintWriter printWriter = new PrintWriter(outputStream)) {
            while (running) {
                String message = br.readLine();
                if (message == null) {
                    break;
                }
                System.out.println("Come from client >" + message);
                printWriter.write("echo " + message+"\n");
                printWriter.flush();
            }
        } catch (IOException e) {
            // When it turns off automatically   Will running
            this.running = false;
        }finally {
            this.stop();
        }

    }

    public void stop() {
        if (!running) {
            return;
        }
        this.running = false;
        try {
            this.socket.close();

        } catch (IOException e) {

        }
    }
}


public class AppServerClient {
    public static void main(String[] args) throws InterruptedException, IOException {
        AppServer server = new AppServer(12135);
        server.start();

        Thread.sleep(20_000L);
        server.shutdown();
    }
}

5. mac telnet analog client input


bogon:~ kpioneer$ telnet localhost 12135
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hello 
echo hello 
I love you
echo I love you
Connection closed by foreign host.

Server-side output:

Come from client > hello
Come from client > I love you
dispose

Summary:

As you can see, when subclasses use the two-phase termination pattern, they only need to implement the tasks they need to perform and update the current number of tasks. In some cases, the current number of tasks may not be updated, for example, when terminating, it is not concerned about how many tasks need to be executed.


Related articles: