博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的MemorySegment
阅读量:6374 次
发布时间:2019-06-23

本文共 30036 字,大约阅读时间需要 100 分钟。

本文主要研究一下flink的MemorySegment

MemorySegment

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java

@Internalpublic abstract class MemorySegment {	@SuppressWarnings("restriction")	protected static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;	@SuppressWarnings("restriction")	protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);	private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);	// ------------------------------------------------------------------------	protected final byte[] heapMemory;	protected long address;	protected final long addressLimit;	protected final int size;	private final Object owner;	MemorySegment(byte[] buffer, Object owner) {		if (buffer == null) {			throw new NullPointerException("buffer");		}		this.heapMemory = buffer;		this.address = BYTE_ARRAY_BASE_OFFSET;		this.size = buffer.length;		this.addressLimit = this.address + this.size;		this.owner = owner;	}	MemorySegment(long offHeapAddress, int size, Object owner) {		if (offHeapAddress <= 0) {			throw new IllegalArgumentException("negative pointer or size");		}		if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {			// this is necessary to make sure the collapsed checks are safe against numeric overflows			throw new IllegalArgumentException("Segment initialized with too large address: " + offHeapAddress					+ " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));		}		this.heapMemory = null;		this.address = offHeapAddress;		this.addressLimit = this.address + size;		this.size = size;		this.owner = owner;	}	// ------------------------------------------------------------------------	// Memory Segment Operations	// ------------------------------------------------------------------------	public int size() {		return size;	}	public boolean isFreed() {		return address > addressLimit;	}	public void free() {		// this ensures we can place no more data and trigger		// the checks for the freed segment		address = addressLimit + 1;	}	public boolean isOffHeap() {		return heapMemory == null;	}	public byte[] getArray() {		if (heapMemory != null) {			return heapMemory;		} else {			throw new IllegalStateException("Memory segment does not represent heap memory");		}	}	public long getAddress() {		if (heapMemory == null) {			return address;		} else {			throw new IllegalStateException("Memory segment does not represent off heap memory");		}	}	public abstract ByteBuffer wrap(int offset, int length);	public Object getOwner() {		return owner;	}	// ------------------------------------------------------------------------	//                    Random Access get() and put() methods	// ------------------------------------------------------------------------	//------------------------------------------------------------------------	// Notes on the implementation: We try to collapse as many checks as	// possible. We need to obey the following rules to make this safe	// against segfaults:	//	//  - Grab mutable fields onto the stack before checking and using. This	//    guards us against concurrent modifications which invalidate the	//    pointers	//  - Use subtractions for range checks, as they are tolerant	//------------------------------------------------------------------------	public abstract byte get(int index);	public abstract void put(int index, byte b);	public abstract void get(int index, byte[] dst);	public abstract void put(int index, byte[] src);	public abstract void get(int index, byte[] dst, int offset, int length);	public abstract void put(int index, byte[] src, int offset, int length);	public abstract boolean getBoolean(int index);	public abstract void putBoolean(int index, boolean value);	@SuppressWarnings("restriction")	public final char getChar(int index) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 2) {			return UNSAFE.getChar(heapMemory, pos);		}		else if (address > addressLimit) {			throw new IllegalStateException("This segment has been freed.");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final char getCharLittleEndian(int index) {		if (LITTLE_ENDIAN) {			return getChar(index);		} else {			return Character.reverseBytes(getChar(index));		}	}	public final char getCharBigEndian(int index) {		if (LITTLE_ENDIAN) {			return Character.reverseBytes(getChar(index));		} else {			return getChar(index);		}	}	@SuppressWarnings("restriction")	public final void putChar(int index, char value) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 2) {			UNSAFE.putChar(heapMemory, pos, value);		}		else if (address > addressLimit) {			throw new IllegalStateException("segment has been freed");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final void putCharLittleEndian(int index, char value) {		if (LITTLE_ENDIAN) {			putChar(index, value);		} else {			putChar(index, Character.reverseBytes(value));		}	}	public final void putCharBigEndian(int index, char value) {		if (LITTLE_ENDIAN) {			putChar(index, Character.reverseBytes(value));		} else {			putChar(index, value);		}	}	public final short getShort(int index) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 2) {			return UNSAFE.getShort(heapMemory, pos);		}		else if (address > addressLimit) {			throw new IllegalStateException("segment has been freed");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final short getShortLittleEndian(int index) {		if (LITTLE_ENDIAN) {			return getShort(index);		} else {			return Short.reverseBytes(getShort(index));		}	}	public final short getShortBigEndian(int index) {		if (LITTLE_ENDIAN) {			return Short.reverseBytes(getShort(index));		} else {			return getShort(index);		}	}	public final void putShort(int index, short value) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 2) {			UNSAFE.putShort(heapMemory, pos, value);		}		else if (address > addressLimit) {			throw new IllegalStateException("segment has been freed");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final void putShortLittleEndian(int index, short value) {		if (LITTLE_ENDIAN) {			putShort(index, value);		} else {			putShort(index, Short.reverseBytes(value));		}	}	public final void putShortBigEndian(int index, short value) {		if (LITTLE_ENDIAN) {			putShort(index, Short.reverseBytes(value));		} else {			putShort(index, value);		}	}	public final int getInt(int index) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 4) {			return UNSAFE.getInt(heapMemory, pos);		}		else if (address > addressLimit) {			throw new IllegalStateException("segment has been freed");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final int getIntLittleEndian(int index) {		if (LITTLE_ENDIAN) {			return getInt(index);		} else {			return Integer.reverseBytes(getInt(index));		}	}	public final int getIntBigEndian(int index) {		if (LITTLE_ENDIAN) {			return Integer.reverseBytes(getInt(index));		} else {			return getInt(index);		}	}	public final void putInt(int index, int value) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 4) {			UNSAFE.putInt(heapMemory, pos, value);		}		else if (address > addressLimit) {			throw new IllegalStateException("segment has been freed");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final void putIntLittleEndian(int index, int value) {		if (LITTLE_ENDIAN) {			putInt(index, value);		} else {			putInt(index, Integer.reverseBytes(value));		}	}	public final void putIntBigEndian(int index, int value) {		if (LITTLE_ENDIAN) {			putInt(index, Integer.reverseBytes(value));		} else {			putInt(index, value);		}	}	public final long getLong(int index) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 8) {			return UNSAFE.getLong(heapMemory, pos);		}		else if (address > addressLimit) {			throw new IllegalStateException("segment has been freed");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final long getLongLittleEndian(int index) {		if (LITTLE_ENDIAN) {			return getLong(index);		} else {			return Long.reverseBytes(getLong(index));		}	}	public final long getLongBigEndian(int index) {		if (LITTLE_ENDIAN) {			return Long.reverseBytes(getLong(index));		} else {			return getLong(index);		}	}	public final void putLong(int index, long value) {		final long pos = address + index;		if (index >= 0 && pos <= addressLimit - 8) {			UNSAFE.putLong(heapMemory, pos, value);		}		else if (address > addressLimit) {			throw new IllegalStateException("segment has been freed");		}		else {			// index is in fact invalid			throw new IndexOutOfBoundsException();		}	}	public final void putLongLittleEndian(int index, long value) {		if (LITTLE_ENDIAN) {			putLong(index, value);		} else {			putLong(index, Long.reverseBytes(value));		}	}	public final void putLongBigEndian(int index, long value) {		if (LITTLE_ENDIAN) {			putLong(index, Long.reverseBytes(value));		} else {			putLong(index, value);		}	}	public final float getFloat(int index) {		return Float.intBitsToFloat(getInt(index));	}	public final float getFloatLittleEndian(int index) {		return Float.intBitsToFloat(getIntLittleEndian(index));	}	public final float getFloatBigEndian(int index) {		return Float.intBitsToFloat(getIntBigEndian(index));	}	public final void putFloat(int index, float value) {		putInt(index, Float.floatToRawIntBits(value));	}	public final void putFloatLittleEndian(int index, float value) {		putIntLittleEndian(index, Float.floatToRawIntBits(value));	}	public final void putFloatBigEndian(int index, float value) {		putIntBigEndian(index, Float.floatToRawIntBits(value));	}	public final double getDouble(int index) {		return Double.longBitsToDouble(getLong(index));	}	public final double getDoubleLittleEndian(int index) {		return Double.longBitsToDouble(getLongLittleEndian(index));	}	public final double getDoubleBigEndian(int index) {		return Double.longBitsToDouble(getLongBigEndian(index));	}	public final void putDouble(int index, double value) {		putLong(index, Double.doubleToRawLongBits(value));	}	public final void putDoubleLittleEndian(int index, double value) {		putLongLittleEndian(index, Double.doubleToRawLongBits(value));	}	public final void putDoubleBigEndian(int index, double value) {		putLongBigEndian(index, Double.doubleToRawLongBits(value));	}	// -------------------------------------------------------------------------	//                     Bulk Read and Write Methods	// -------------------------------------------------------------------------	public abstract void get(DataOutput out, int offset, int length) throws IOException;	public abstract void put(DataInput in, int offset, int length) throws IOException;	public abstract void get(int offset, ByteBuffer target, int numBytes);	public abstract void put(int offset, ByteBuffer source, int numBytes);	public final void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes) {		final byte[] thisHeapRef = this.heapMemory;		final byte[] otherHeapRef = target.heapMemory;		final long thisPointer = this.address + offset;		final long otherPointer = target.address + targetOffset;		if ((numBytes | offset | targetOffset) >= 0 &&				thisPointer <= this.addressLimit - numBytes && otherPointer <= target.addressLimit - numBytes) {			UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes);		}		else if (this.address > this.addressLimit) {			throw new IllegalStateException("this memory segment has been freed.");		}		else if (target.address > target.addressLimit) {			throw new IllegalStateException("target memory segment has been freed.");		}		else {			throw new IndexOutOfBoundsException(					String.format("offset=%d, targetOffset=%d, numBytes=%d, address=%d, targetAddress=%d",					offset, targetOffset, numBytes, this.address, target.address));		}	}	// -------------------------------------------------------------------------	//                      Comparisons & Swapping	// -------------------------------------------------------------------------	public final int compare(MemorySegment seg2, int offset1, int offset2, int len) {		while (len >= 8) {			long l1 = this.getLongBigEndian(offset1);			long l2 = seg2.getLongBigEndian(offset2);			if (l1 != l2) {				return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;			}			offset1 += 8;			offset2 += 8;			len -= 8;		}		while (len > 0) {			int b1 = this.get(offset1) & 0xff;			int b2 = seg2.get(offset2) & 0xff;			int cmp = b1 - b2;			if (cmp != 0) {				return cmp;			}			offset1++;			offset2++;			len--;		}		return 0;	}	public final void swapBytes(byte[] tempBuffer, MemorySegment seg2, int offset1, int offset2, int len) {		if ((offset1 | offset2 | len | (tempBuffer.length - len)) >= 0) {			final long thisPos = this.address + offset1;			final long otherPos = seg2.address + offset2;			if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) {				// this -> temp buffer				UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, BYTE_ARRAY_BASE_OFFSET, len);				// other -> this				UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len);				// temp buffer -> other				UNSAFE.copyMemory(tempBuffer, BYTE_ARRAY_BASE_OFFSET, seg2.heapMemory, otherPos, len);				return;			}			else if (this.address > this.addressLimit) {				throw new IllegalStateException("this memory segment has been freed.");			}			else if (seg2.address > seg2.addressLimit) {				throw new IllegalStateException("other memory segment has been freed.");			}		}		// index is in fact invalid		throw new IndexOutOfBoundsException(					String.format("offset1=%d, offset2=%d, len=%d, bufferSize=%d, address1=%d, address2=%d",							offset1, offset2, len, tempBuffer.length, this.address, seg2.address));	}}复制代码
  • MemorySegment有点类似java.nio.ByteBuffer;它有一个byte[]类型的heapMemory属性;它有两个构造器,带有byte[]类型参数的构造器会将byte[]赋给heapMemory,不带byte[]类型参数的构造器则heapMemory为null;isOffHeap方法则用于判断当前的memory segment是heap还是off-heap,它根据heapMemory是否为null来判断,如果为null则是off-heap;另外提供了compare、swapBytes、copyTo方法;还显示提供了BigEndian及LittleEndian的get、put方法
  • BigEndian的相关方法有:get/putCharBigEndian、get/putShortBigEndian、get/putIntBigEndian、get/putLongBigEndian、get/putFloatBigEndian、get/putDoubleBigEndian;LittleEndian的相关方法有:get/putCharLittleEndian、get/putShortLittleEndian、get/putIntLittleEndian、get/putLongLittleEndian、get/putFloatLittleEndian、get/putDoubleLittleEndian
  • MemorySegment定义了free、wrap、get、put、getBoolean、putBoolean抽象方法,要求子类去实现;MemorySegment有两个子类,分别是HeapMemorySegment、HybridMemorySegment

HeapMemorySegment

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java

@SuppressWarnings("unused")@Internalpublic final class HeapMemorySegment extends MemorySegment {	private byte[] memory;	HeapMemorySegment(byte[] memory) {		this(memory, null);	}	HeapMemorySegment(byte[] memory, Object owner) {		super(Objects.requireNonNull(memory), owner);		this.memory = memory;	}	// -------------------------------------------------------------------------	//  MemorySegment operations	// -------------------------------------------------------------------------	@Override	public void free() {		super.free();		this.memory = null;	}	@Override	public ByteBuffer wrap(int offset, int length) {		try {			return ByteBuffer.wrap(this.memory, offset, length);		}		catch (NullPointerException e) {			throw new IllegalStateException("segment has been freed");		}	}	public byte[] getArray() {		return this.heapMemory;	}	// ------------------------------------------------------------------------	//                    Random Access get() and put() methods	// ------------------------------------------------------------------------	@Override	public final byte get(int index) {		return this.memory[index];	}	@Override	public final void put(int index, byte b) {		this.memory[index] = b;	}	@Override	public final void get(int index, byte[] dst) {		get(index, dst, 0, dst.length);	}	@Override	public final void put(int index, byte[] src) {		put(index, src, 0, src.length);	}	@Override	public final void get(int index, byte[] dst, int offset, int length) {		// system arraycopy does the boundary checks anyways, no need to check extra		System.arraycopy(this.memory, index, dst, offset, length);	}	@Override	public final void put(int index, byte[] src, int offset, int length) {		// system arraycopy does the boundary checks anyways, no need to check extra		System.arraycopy(src, offset, this.memory, index, length);	}	@Override	public final boolean getBoolean(int index) {		return this.memory[index] != 0;	}	@Override	public final void putBoolean(int index, boolean value) {		this.memory[index] = (byte) (value ? 1 : 0);	}	// -------------------------------------------------------------------------	//                     Bulk Read and Write Methods	// -------------------------------------------------------------------------	@Override	public final void get(DataOutput out, int offset, int length) throws IOException {		out.write(this.memory, offset, length);	}	@Override	public final void put(DataInput in, int offset, int length) throws IOException {		in.readFully(this.memory, offset, length);	}	@Override	public final void get(int offset, ByteBuffer target, int numBytes) {		// ByteBuffer performs the boundary checks		target.put(this.memory, offset, numBytes);	}	@Override	public final void put(int offset, ByteBuffer source, int numBytes) {		// ByteBuffer performs the boundary checks		source.get(this.memory, offset, numBytes);	}	// -------------------------------------------------------------------------	//                             Factoring	// -------------------------------------------------------------------------	/**	 * A memory segment factory that produces heap memory segments. Note that this factory does not	 * support to allocate off-heap memory.	 */	public static final class HeapMemorySegmentFactory  {		public HeapMemorySegment wrap(byte[] memory) {			return new HeapMemorySegment(memory);		}		public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {			return new HeapMemorySegment(new byte[size], owner);		}		public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {			return new HeapMemorySegment(memory, owner);		}		/**		 * Prevent external instantiation.		 */		HeapMemorySegmentFactory() {}	}	public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();}复制代码
  • HeapMemorySegment继承了MemorySegment,它有一个byte[]的memory属性,free操作会将memory设置为null,wrap方法使用的是memory属性;它的构造器要求传入的memory不能为null,然后赋给父类的heapMemory属性及自己定义的memory属性(引用);它还定义了HeapMemorySegmentFactory,提供了wrap、allocateUnpooledSegment、wrapPooledHeapMemory方法

HybridMemorySegment

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java

@Internalpublic final class HybridMemorySegment extends MemorySegment {	/**	 * The direct byte buffer that allocated the off-heap memory. This memory segment holds a	 * reference to that buffer, so as long as this memory segment lives, the memory will not be	 * released.	 */	private final ByteBuffer offHeapBuffer;	/**	 * Creates a new memory segment that represents the memory backing the given direct byte buffer.	 * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},	 * otherwise this method with throw an IllegalArgumentException.	 *	 * 

The owner referenced by this memory segment is null. * * @param buffer The byte buffer whose memory is represented by this memory segment. * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. */ HybridMemorySegment(ByteBuffer buffer) { this(buffer, null); } /** * Creates a new memory segment that represents the memory backing the given direct byte buffer. * Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)}, * otherwise this method with throw an IllegalArgumentException. * *

The memory segment references the given owner. * * @param buffer The byte buffer whose memory is represented by this memory segment. * @param owner The owner references by this memory segment. * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct. */ HybridMemorySegment(ByteBuffer buffer, Object owner) { super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner); this.offHeapBuffer = buffer; } /** * Creates a new memory segment that represents the memory of the byte array. * *

The owner referenced by this memory segment is null. * * @param buffer The byte array whose memory is represented by this memory segment. */ HybridMemorySegment(byte[] buffer) { this(buffer, null); } /** * Creates a new memory segment that represents the memory of the byte array. * *

The memory segment references the given owner. * * @param buffer The byte array whose memory is represented by this memory segment. * @param owner The owner references by this memory segment. */ HybridMemorySegment(byte[] buffer, Object owner) { super(buffer, owner); this.offHeapBuffer = null; } // ------------------------------------------------------------------------- // MemorySegment operations // ------------------------------------------------------------------------- /** * Gets the buffer that owns the memory of this memory segment. * * @return The byte buffer that owns the memory of this memory segment. */ public ByteBuffer getOffHeapBuffer() { if (offHeapBuffer != null) { return offHeapBuffer; } else { throw new IllegalStateException("Memory segment does not represent off heap memory"); } } @Override public ByteBuffer wrap(int offset, int length) { if (address <= addressLimit) { if (heapMemory != null) { return ByteBuffer.wrap(heapMemory, offset, length); } else { try { ByteBuffer wrapper = offHeapBuffer.duplicate(); wrapper.limit(offset + length); wrapper.position(offset); return wrapper; } catch (IllegalArgumentException e) { throw new IndexOutOfBoundsException(); } } } else { throw new IllegalStateException("segment has been freed"); } } // ------------------------------------------------------------------------ // Random Access get() and put() methods // ------------------------------------------------------------------------ @Override public final byte get(int index) { final long pos = address + index; if (index >= 0 && pos < addressLimit) { return UNSAFE.getByte(heapMemory, pos); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { // index is in fact invalid throw new IndexOutOfBoundsException(); } } @Override public final void put(int index, byte b) { final long pos = address + index; if (index >= 0 && pos < addressLimit) { UNSAFE.putByte(heapMemory, pos, b); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { // index is in fact invalid throw new IndexOutOfBoundsException(); } } @Override public final void get(int index, byte[] dst) { get(index, dst, 0, dst.length); } @Override public final void put(int index, byte[] src) { put(index, src, 0, src.length); } @Override public final void get(int index, byte[] dst, int offset, int length) { // check the byte array offset and length and the status if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) { throw new IndexOutOfBoundsException(); } final long pos = address + index; if (index >= 0 && pos <= addressLimit - length) { final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, length); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { // index is in fact invalid throw new IndexOutOfBoundsException(); } } @Override public final void put(int index, byte[] src, int offset, int length) { // check the byte array offset and length if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) { throw new IndexOutOfBoundsException(); } final long pos = address + index; if (index >= 0 && pos <= addressLimit - length) { final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { // index is in fact invalid throw new IndexOutOfBoundsException(); } } @Override public final boolean getBoolean(int index) { return get(index) != 0; } @Override public final void putBoolean(int index, boolean value) { put(index, (byte) (value ? 1 : 0)); } // ------------------------------------------------------------------------- // Bulk Read and Write Methods // ------------------------------------------------------------------------- @Override public final void get(DataOutput out, int offset, int length) throws IOException { if (address <= addressLimit) { if (heapMemory != null) { out.write(heapMemory, offset, length); } else { while (length >= 8) { out.writeLong(getLongBigEndian(offset)); offset += 8; length -= 8; } while (length > 0) { out.writeByte(get(offset)); offset++; length--; } } } else { throw new IllegalStateException("segment has been freed"); } } @Override public final void put(DataInput in, int offset, int length) throws IOException { if (address <= addressLimit) { if (heapMemory != null) { in.readFully(heapMemory, offset, length); } else { while (length >= 8) { putLongBigEndian(offset, in.readLong()); offset += 8; length -= 8; } while (length > 0) { put(offset, in.readByte()); offset++; length--; } } } else { throw new IllegalStateException("segment has been freed"); } } @Override public final void get(int offset, ByteBuffer target, int numBytes) { // check the byte array offset and length if ((offset | numBytes | (offset + numBytes)) < 0) { throw new IndexOutOfBoundsException(); } final int targetOffset = target.position(); final int remaining = target.remaining(); if (remaining < numBytes) { throw new BufferOverflowException(); } if (target.isDirect()) { if (target.isReadOnly()) { throw new ReadOnlyBufferException(); } // copy to the target memory directly final long targetPointer = getAddress(target) + targetOffset; final long sourcePointer = address + offset; if (sourcePointer <= addressLimit - numBytes) { UNSAFE.copyMemory(heapMemory, sourcePointer, null, targetPointer, numBytes); target.position(targetOffset + numBytes); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { throw new IndexOutOfBoundsException(); } } else if (target.hasArray()) { // move directly into the byte array get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes); // this must be after the get() call to ensue that the byte buffer is not // modified in case the call fails target.position(targetOffset + numBytes); } else { // neither heap buffer nor direct buffer while (target.hasRemaining()) { target.put(get(offset++)); } } } @Override public final void put(int offset, ByteBuffer source, int numBytes) { // check the byte array offset and length if ((offset | numBytes | (offset + numBytes)) < 0) { throw new IndexOutOfBoundsException(); } final int sourceOffset = source.position(); final int remaining = source.remaining(); if (remaining < numBytes) { throw new BufferUnderflowException(); } if (source.isDirect()) { // copy to the target memory directly final long sourcePointer = getAddress(source) + sourceOffset; final long targetPointer = address + offset; if (targetPointer <= addressLimit - numBytes) { UNSAFE.copyMemory(null, sourcePointer, heapMemory, targetPointer, numBytes); source.position(sourceOffset + numBytes); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { throw new IndexOutOfBoundsException(); } } else if (source.hasArray()) { // move directly into the byte array put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes); // this must be after the get() call to ensue that the byte buffer is not // modified in case the call fails source.position(sourceOffset + numBytes); } else { // neither heap buffer nor direct buffer while (source.hasRemaining()) { put(offset++, source.get()); } } } //......}复制代码

  • HybridMemorySegment继承了MemorySegment,它有一个ByteBuffer类型的offHeapBuffer属性,由于父类本身已经有一个byte[]类型的heapMemory属性了,因而HybridMemorySegment管理的memory可以是on-heap的(使用带有byte[]类型参数的构造器)也可以是off-heap的(使用带有ByteBuffer类型参数的构造器);wrap方法会判断,如果heapMemory不为null,则使用heapMemory,否则使用offHeapBuffer

小结

  • MemorySegment有点类似java.nio.ByteBuffer;它有一个byte[]类型的heapMemory属性;它有两个构造器,带有byte[]类型参数的构造器会将byte[]赋给heapMemory,不带byte[]类型参数的构造器则heapMemory为null;isOffHeap方法则用于判断当前的memory segment是heap还是off-heap,它根据heapMemory是否为null来判断,如果为null则是off-heap;另外提供了compare、swapBytes、copyTo方法;还显示提供了BigEndian及LittleEndian的get、put方法;MemorySegment定义了free、wrap、get、put、getBoolean、putBoolean抽象方法,要求子类去实现;MemorySegment有两个子类,分别是HeapMemorySegment、HybridMemorySegment
  • HeapMemorySegment继承了MemorySegment,它有一个byte[]的memory属性,free操作会将memory设置为null,wrap方法使用的是memory属性;它的构造器要求传入的memory不能为null,然后赋给父类的heapMemory属性及自己定义的memory属性(引用);它还定义了HeapMemorySegmentFactory,提供了wrap、allocateUnpooledSegment、wrapPooledHeapMemory方法
  • HybridMemorySegment继承了MemorySegment,它有一个ByteBuffer类型的offHeapBuffer属性,由于父类本身已经有一个byte[]类型的heapMemory属性了,因而HybridMemorySegment管理的memory可以是on-heap的(使用带有byte[]类型参数的构造器)也可以是off-heap的(使用带有ByteBuffer类型参数的构造器);wrap方法会判断,如果heapMemory不为null,则使用heapMemory,否则使用offHeapBuffer

doc

转载地址:http://rknqa.baihongyu.com/

你可能感兴趣的文章
mysql 修改列为not null报错Invalid use of NULL value
查看>>
epoll源码分析
查看>>
朱晔和你聊Spring系列S1E4:灵活但不算好用的Spring MVC
查看>>
Java使用Try with resources自动关闭资源
查看>>
china-pub十一周年庆,多重优惠隆重登场,千万别错过哟!
查看>>
HDU 3068 最长回文(manacher算法)
查看>>
二叉树
查看>>
Python featureClass clip Tin
查看>>
.NET基础篇——Entity Framework 数据转换层通用类
查看>>
求旋转数组中的最小值
查看>>
Android中获取应用程序(包)的信息-----PackageManager的使用(一)
查看>>
HDOJ1018 ( Big Number ) 【斯特林公式---处理阶乘及阶乘位数的问题】
查看>>
true运算符和false运算符重载的例子
查看>>
前端工程性能与优化
查看>>
搞不清FastCgi与PHP-fpm之间是个什么样的关系(转载)
查看>>
jQuery $.fn.extend方式自定义插件
查看>>
Java中实例方法,实例变量,静态方法,静态变量,final方法重写的问题,覆盖...
查看>>
手机表单验证插件mvalidate的使用
查看>>
控制台应用程序获取计算机名
查看>>
SQL Server 备份的 8 种方法。
查看>>