The Principle and Application of Netty Direct Memory

  • 2021-09-16 06:51:30
  • OfStack

Directory 1. Overview of Common Memory Models2. Principle of Direct Memory in Java 3. Use of Direct Memory in Netty 4. Summary

1. Overview of Common Memory Models

In general, in order to ensure the security and robustness of the system itself, the memory will be logically isolated into kernel area and user area, which is easy to understand. Because the user behavior is too uncontrollable and exposed too much, it is easy to lead to various magical usages, which are beyond the control range of the system. Of course, some languages support direct control of memory, such as C, where you can access data almost anywhere in memory with one pointer (except one hardware address). Like assembly, you can access any address. And these underlying languages have been farther and farther away from us, and they basically have little to do with ordinary programmers.

Most of the programming control of users is carried out in the user area, for example, I do 1 addition, subtraction, multiplication and division, such as Integer a = 2; Integer b = 3; Integer c = a * b; This kind of operation, all operations are done in user space. These operations will not involve the kernel area. However, some operations must be carried out by the kernel, such as reading and writing files, that is, data exchange between different devices, that is, io class operations. This kind of operation is very difficult to implement, so 1 must be completed by the operating system at the bottom of the operation. Then, the first-hand data must pass through the kernel area. However, our code is running in the user area, so usually, there will be such a process of copying the kernel area data to the user area data. This is a read process, while the write process is an opposite operation, copying data from the user area to the kernel area, and then the kernel completes io operation.

Directly dividing memory into kernel area and user area is too extensive to be wrong, but there is a feeling that saying is equal to not saying.

Therefore, the division of memory needs to be finer, that is, the so-called memory model or memory area. It is natural for a hundred schools of thought to contend for each language and scene. However, it is roughly divided into areas with different purposes according to the rules set in 1, and then allocated memory to the areas when needed, and saved it in the corresponding tables or identifiers, so that it can be read or not reallocated later. Among them, there is also a very important point, in addition to knowing how to allocate memory, but also know how to recycle memory. In addition, how to ensure the visibility of memory is also an important topic to be considered in a memory model.

Needless to say, because there is no one saying that it is all right, and I don't have the ability to explain this matter clearly. Let's make up our own brains.

2. Principle of Direct Memory in Java

First of all, why does java have the concept of direct memory? As we know, there is a very important memory area in java, that is, heap memory, and almost all objects are allocated on the heap, so most of GC work is also carried out for the heap. In connection with the above section 1, we can divide the heap memory into the user space memory area. It should be said that java can basically manage the life cycle of java objects as long as it manages this 1 block of memory. So, what exactly is direct memory? What does it have to do with heap memory?

Direct memory is separated from the heap space, which does not belong to the heap of java, nor does other areas, that is, direct memory is not controlled by jvm. It belongs to a 1-segment memory area directly controlled by the system.

Why should direct memory be separated from the control of jvm? Because jvm controls the user space, and some scenarios require the intervention of kernel space, the whole process can be completed. If the user space wants to get data, it must copy the data as requested in the kernel before the data is visible to the user space. In many of these scenarios, the purpose of copying data is only to use its data once, and after the corresponding conversion, it is no longer related, such as the access process of stream data. This replication process, there must be a lot of performance loss, so there is the appearance of direct memory. Its purpose is to avoid meaningless data replication between kernel space and user space, thus improving program performance.

Direct memory is not controlled by jvm, so who controls it? In fact, it is controlled by the bottom layer of the operating system. When making a memory allocation request, the system will apply for a shared area. The kernel and user code share the data writing here, that is, the data written by the kernel can be directly accessed by the user code, and the data written by the user code can be directly used by the kernel. At the bottom, the shared memory is realized by the function interface of mmap.

At the java level, it is presented using DirectByteBuffer, and its creation, use and deletion are as follows:


//  Create a direct memory space instance 
    ByteBuffer buffer = ByteBuffer.allocateDirect(1600);
    for (int i = 0; i < 90_0000; i++) {
        for (int j = 0; j < 199; j++) {
            //  Writing of data 
            buffer.putInt(j);
        }
        buffer.flip();
        for (int j = 0; j < 199; j++) {
            //  Reading of data 
            buffer.get();
        }
        //  Data cleaning 
        buffer.clear();
    }

3. Using direct memory in Netty

Knowing the use process of direct memory, we need to find out how to find a better scene. netty as a high performance network communication framework, the important work is to deal with the network io problem. Then, in its scene, it is perfect to use direct memory as a big killer. So, how does netty use it?

Two scenarios: 1. When transferring network data to applications (read process); 2. When the application transfers data to the remote end (write process);


//  Write the procedure, setting the msg Convert to direct memory storage 2 Binary data 
    // io.netty.handler.codec.MessageToByteEncoder#write
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                //  Default  preferDirect = true;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    //  Call the implementation of the subclass to encode the data in order to implement the private protocol 
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    //  Write data to the remote end 
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
    // io.netty.handler.codec.MessageToByteEncoder#allocateBuffer
    /**
     * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
     * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
     */
    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                               boolean preferDirect) throws Exception {
        if (preferDirect) {
            // PooledByteBufAllocator
            return ctx.alloc().ioBuffer();
        } else {
            return ctx.alloc().heapBuffer();
        }
    }
    // io.netty.buffer.AbstractByteBufAllocator#ioBuffer()
    @Override
    public ByteBuf ioBuffer() {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(DEFAULT_INITIAL_CAPACITY);
        }
        return heapBuffer(DEFAULT_INITIAL_CAPACITY);
    }
    // io.netty.buffer.AbstractByteBufAllocator#directBuffer(int)
    @Override
    public ByteBuf directBuffer(int initialCapacity) {
        return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    }
    @Override
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        }
        validate(initialCapacity, maxCapacity);
        return newDirectBuffer(initialCapacity, maxCapacity);
    }
    // io.netty.buffer.PooledByteBufAllocator#newDirectBuffer
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    // io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        allocate(cache, buf, reqCapacity);
        return buf;
    }
        // io.netty.buffer.PoolArena.DirectArena#newByteBuf
        @Override
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
    // io.netty.util.internal.PlatformDependent0#newDirectBuffer
    static ByteBuffer newDirectBuffer(long address, int capacity) {
        ObjectUtil.checkPositiveOrZero(capacity, "capacity");

        try {
            return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
        } catch (Throwable cause) {
            // Not expected to ever throw!
            if (cause instanceof Error) {
                throw (Error) cause;
            }
            throw new Error(cause);
        }
    }

The process of writing data to ByteBuffer, that is, the process of writing data to direct memory, may not be as simple as ordinary heap object 1.


// io.netty.buffer.AbstractByteBuf#writeBytes(byte[])
    @Override
    public ByteBuf writeBytes(byte[] src) {
        writeBytes(src, 0, src.length);
        return this;
    }

    @Override
    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
        ensureWritable(length);
        setBytes(writerIndex, src, srcIndex, length);
        writerIndex += length;
        return this;
    }
    
    // io.netty.buffer.PooledUnsafeDirectByteBuf#setBytes(int, byte[], int, int)
    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        // addr()  Will get 1 Memory addresses 
        UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
        return this;
    }
    // io.netty.buffer.PooledUnsafeDirectByteBuf#addr
    private long addr(int index) {
        return memoryAddress + index;
    }

    // io.netty.buffer.UnsafeByteBufUtil#setBytes(io.netty.buffer.AbstractByteBuf, long, int, byte[], int, int)
    static void setBytes(AbstractByteBuf buf, long addr, int index, byte[] src, int srcIndex, int length) {
        buf.checkIndex(index, length);
        if (length != 0) {
            //  Putting byte data copy To DirectByteBuffer Medium 
            PlatformDependent.copyMemory(src, srcIndex, addr, length);
        }
    }
    // io.netty.util.internal.PlatformDependent#copyMemory(byte[], int, long, long)
    public static void copyMemory(byte[] src, int srcIndex, long dstAddr, long length) {
        PlatformDependent0.copyMemory(src, BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddr, length);
    }
    // io.netty.util.internal.PlatformDependent0#copyMemory(java.lang.Object, long, java.lang.Object, long, long)
    static void copyMemory(Object src, long srcOffset, Object dst, long dstOffset, long length) {
        //UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, length);
        while (length > 0) {
            long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
            //  Ultimately by jvm The local method of, and the memory copy,  Here dst For null,  That is, the data will only copy To the corresponding  dstOffset  Medium 
            //  The offset cardinality is :  Various basic addresses  ARRAY_OBJECT_BASE_OFFSET...
            UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
            length -= size;
            srcOffset += size;
            dstOffset += size;
        }
    }

As you can see, the last direct memory writing is to write memory data to the operating system through Unsafe class.

Finally, let's look at how it writes data to the remote end:


// io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);

        return promise;
    }

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }
    
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
        // io.netty.channel.DefaultChannelPipeline.HeadContext#write
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }
        // io.netty.channel.AbstractChannel.AbstractUnsafe#write
        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                //  Conversion msg For direct memory, if necessary 
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
            //  Will msg Put in outboundBuffer In, it is equivalent to writing the data 
            outboundBuffer.addMessage(msg, size, promise);
        }
    // io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
    @Override
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }

            return newDirectBuffer(buf);
        }

        if (msg instanceof FileRegion) {
            return msg;
        }

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }
    // io.netty.channel.ChannelOutboundBuffer#addMessage
    /**
     * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
     * the message was written.
     */
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        //  Triggered immediately if necessary  fireChannelWritabilityChanged  Event, so that data is written to the network immediately 
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

That is to say, the data written through direct memory can be sent to the remote end only by calling the access interface of the lower kernel and putting the data in direct memory into buffer.

Finally, let's take a look at a brief netty access reading process for network data to identify whether and how direct memory is used.


// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
            //  Assignment creation ByteBuffer,  This is actually the embodiment of direct memory 
                    byteBuf = allocHandle.allocate(allocator);
            //  Read data to ByteBuffer Medium 
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
            //  Read to 1 Part of the data, just to pipeline Instead of delivering it after it is all completed 
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
    // io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#allocate
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }
    // io.netty.buffer.AbstractByteBufAllocator#ioBuffer(int)
    @Override
    public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

It can be seen that when accessing data, direct memory is still used to receive data, so as to achieve the purpose of sharing kernel with users without copying.

The above is the operation mode of netty on the whole direct memory. It seems a bit complicated. The main netty is full of the embodiment of its design philosophy. Whether it is a write event, a read event or a state change event, it is a long string of pipelined operations. Of course, what we are talking about here is how it uses direct memory. It uses direct = ByteBuffer. allocateDirect (1) by using one PooledUnsafeDirectByteBuf and finally referring to jdk; Use DirectByteBuffer to realize the use of direct memory. And use its construction method DirectByteBuffer (long addr, int cap) to create direct memory objects.

4. Summary

On the whole, direct memory reduces the memory replication operation during io, but it is only the memory replication of kernel and user space, because the data replication of user space is indispensable, because eventually they must be converted into binary streams before they can be read by programs in different spaces. However, creating a direct memory object is more expensive than creating a normal memory object, because it may need to maintain a more complex relational environment. In fact, direct memory can share memory between different processes, which is not possible in ordinary object memory (although java is single-process, not care). The use of java's direct memory is only a convenient interface provided by the system, which adapts to better scenarios.

Direct memory can actually be called shared memory, which can realize the communication between different processes, that is, different processes can see other processes modify the memory address of this block. This is an efficient way to communicate between processes, which is very helpful for multi-process applications. However, it is not necessary for multithreaded applications, because multithreading itself shares memory. Applications like nginx are very useful. Because for some global counters, it is necessary to maintain multiple processes, which is perfectly solved by shared memory.

As a network communication framework, netty is to better handle specific scenarios and use direct memory more reasonably, thus achieving the so-called zero copy and one of the cornerstones of high performance. Therefore, a good framework, 1 must be a leader in solving certain problems, it is not a functional pioneer, but 1 must be a good successor.

In addition, memory management is a very complicated problem. But it is very important, and it is worth spending a lot of time studying.

The above is the analysis of Netty direct memory principle and application details, more information about Netty direct memory principle please pay attention to other related articles on this site!


Related articles: