using System; using System.Runtime.InteropServices; using System.Threading; using Unity.Collections.LowLevel.Unsafe; using Unity.Burst; using Unity.Jobs; using Unity.Jobs.LowLevel.Unsafe; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Collections; using System.Collections.Generic; using static Unity.Collections.AllocatorManager; namespace Unity.Collections { [StructLayout(LayoutKind.Sequential)] unsafe struct UnsafeQueueBlockHeader { public UnsafeQueueBlockHeader* m_NextBlock; public int m_NumItems; } [StructLayout(LayoutKind.Sequential)] [GenerateTestsForBurstCompatibility] internal unsafe struct UnsafeQueueData { internal const int m_BlockSize = 16 * 1024; public IntPtr m_FirstBlock; public IntPtr m_LastBlock; public int m_MaxItems; public int m_CurrentRead; public byte* m_CurrentWriteBlockTLS; [MethodImpl(MethodImplOptions.AggressiveInlining)] internal UnsafeQueueBlockHeader* GetCurrentWriteBlockTLS(int threadIndex) { var data = (UnsafeQueueBlockHeader**)&m_CurrentWriteBlockTLS[threadIndex * JobsUtility.CacheLineSize]; return *data; } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void SetCurrentWriteBlockTLS(int threadIndex, UnsafeQueueBlockHeader* currentWriteBlock) { var data = (UnsafeQueueBlockHeader**)&m_CurrentWriteBlockTLS[threadIndex * JobsUtility.CacheLineSize]; *data = currentWriteBlock; } [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })] public static UnsafeQueueBlockHeader* AllocateWriteBlockMT(UnsafeQueueData* data, AllocatorHandle allocator, int threadIndex) where T : unmanaged { UnsafeQueueBlockHeader* currentWriteBlock = data->GetCurrentWriteBlockTLS(threadIndex); if (currentWriteBlock != null) { if (currentWriteBlock->m_NumItems != data->m_MaxItems) { return currentWriteBlock; } currentWriteBlock = null; } currentWriteBlock = (UnsafeQueueBlockHeader*)Memory.Unmanaged.Allocate(m_BlockSize, 16, allocator); currentWriteBlock->m_NextBlock = null; currentWriteBlock->m_NumItems = 0; UnsafeQueueBlockHeader* prevLast = (UnsafeQueueBlockHeader*)Interlocked.Exchange(ref data->m_LastBlock, (IntPtr)currentWriteBlock); if (prevLast == null) { data->m_FirstBlock = (IntPtr)currentWriteBlock; } else { prevLast->m_NextBlock = currentWriteBlock; } data->SetCurrentWriteBlockTLS(threadIndex, currentWriteBlock); return currentWriteBlock; } [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })] public unsafe static void AllocateQueue(AllocatorHandle allocator, out UnsafeQueueData* outBuf) where T : unmanaged { #if UNITY_2022_2_14F1_OR_NEWER int maxThreadCount = JobsUtility.ThreadIndexCount; #else int maxThreadCount = JobsUtility.MaxJobThreadCount; #endif var queueDataSize = CollectionHelper.Align(UnsafeUtility.SizeOf(), JobsUtility.CacheLineSize); var data = (UnsafeQueueData*)Memory.Unmanaged.Allocate( queueDataSize + JobsUtility.CacheLineSize * maxThreadCount , JobsUtility.CacheLineSize , allocator ); data->m_CurrentWriteBlockTLS = ((byte*)data) + queueDataSize; data->m_FirstBlock = IntPtr.Zero; data->m_LastBlock = IntPtr.Zero; data->m_MaxItems = (m_BlockSize - UnsafeUtility.SizeOf()) / UnsafeUtility.SizeOf(); data->m_CurrentRead = 0; for (int threadIndex = 0; threadIndex < maxThreadCount; ++threadIndex) { data->SetCurrentWriteBlockTLS(threadIndex, null); } outBuf = data; } public unsafe static void DeallocateQueue(UnsafeQueueData* data, AllocatorHandle allocator) { UnsafeQueueBlockHeader* firstBlock = (UnsafeQueueBlockHeader*)data->m_FirstBlock; while (firstBlock != null) { UnsafeQueueBlockHeader* next = firstBlock->m_NextBlock; Memory.Unmanaged.Free(firstBlock, allocator); firstBlock = next; } Memory.Unmanaged.Free(data, allocator); } } /// /// An unmanaged queue. /// /// The type of the elements. [StructLayout(LayoutKind.Sequential)] [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })] public unsafe struct UnsafeQueue : INativeDisposable where T : unmanaged { [NativeDisableUnsafePtrRestriction] internal UnsafeQueueData* m_Buffer; [NativeDisableUnsafePtrRestriction] internal AllocatorHandle m_AllocatorLabel; /// /// Initializes and returns an instance of UnsafeQueue. /// /// The allocator to use. public UnsafeQueue(AllocatorHandle allocator) { m_AllocatorLabel = allocator; UnsafeQueueData.AllocateQueue(allocator, out m_Buffer); } internal static UnsafeQueue* Alloc(AllocatorHandle allocator) { UnsafeQueue* data = (UnsafeQueue*)Memory.Unmanaged.Allocate(sizeof(UnsafeQueue), UnsafeUtility.AlignOf>(), allocator); return data; } internal static void Free(UnsafeQueue* data) { if (data == null) { throw new InvalidOperationException("UnsafeQueue has yet to be created or has been destroyed!"); } var allocator = data->m_AllocatorLabel; data->Dispose(); Memory.Unmanaged.Free(data, allocator); } /// /// Returns true if this queue is empty. /// /// True if this queue has no items or if the queue has not been constructed. public readonly bool IsEmpty() { if (IsCreated) { int count = 0; var currentRead = m_Buffer->m_CurrentRead; for (UnsafeQueueBlockHeader* block = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock ; block != null ; block = block->m_NextBlock ) { count += block->m_NumItems; if (count > currentRead) { return false; } } return count == currentRead; } return true; } /// /// Returns the current number of elements in this queue. /// /// Note that getting the count requires traversing the queue's internal linked list of blocks. /// Where possible, cache this value instead of reading the property repeatedly. /// The current number of elements in this queue. public readonly int Count { get { int count = 0; for (UnsafeQueueBlockHeader* block = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock ; block != null ; block = block->m_NextBlock ) { count += block->m_NumItems; } return count - m_Buffer->m_CurrentRead; } } /// /// Returns the element at the front of this queue without removing it. /// /// The element at the front of this queue. [MethodImpl(MethodImplOptions.AggressiveInlining)] public T Peek() { CheckNotEmpty(); UnsafeQueueBlockHeader* firstBlock = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock; return UnsafeUtility.ReadArrayElement(firstBlock + 1, m_Buffer->m_CurrentRead); } /// /// Adds an element at the back of this queue. /// /// The value to be enqueued. public void Enqueue(T value) { UnsafeQueueBlockHeader* writeBlock = UnsafeQueueData.AllocateWriteBlockMT(m_Buffer, m_AllocatorLabel, 0); UnsafeUtility.WriteArrayElement(writeBlock + 1, writeBlock->m_NumItems, value); ++writeBlock->m_NumItems; } /// /// Removes and returns the element at the front of this queue. /// /// Thrown if this queue is empty. /// The element at the front of this queue. public T Dequeue() { if (!TryDequeue(out T item)) { ThrowEmpty(); } return item; } /// /// Removes and outputs the element at the front of this queue. /// /// Outputs the removed element. /// True if this queue was not empty. public bool TryDequeue(out T item) { UnsafeQueueBlockHeader* firstBlock = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock; if (firstBlock != null) { var currentRead = m_Buffer->m_CurrentRead++; var numItems = firstBlock->m_NumItems; item = UnsafeUtility.ReadArrayElement(firstBlock + 1, currentRead); if (currentRead + 1 >= numItems) { m_Buffer->m_CurrentRead = 0; m_Buffer->m_FirstBlock = (IntPtr)firstBlock->m_NextBlock; if (m_Buffer->m_FirstBlock == IntPtr.Zero) { m_Buffer->m_LastBlock = IntPtr.Zero; } #if UNITY_2022_2_14F1_OR_NEWER int maxThreadCount = JobsUtility.ThreadIndexCount; #else int maxThreadCount = JobsUtility.MaxJobThreadCount; #endif for (int threadIndex = 0; threadIndex < maxThreadCount; ++threadIndex) { if (m_Buffer->GetCurrentWriteBlockTLS(threadIndex) == firstBlock) { m_Buffer->SetCurrentWriteBlockTLS(threadIndex, null); } } Memory.Unmanaged.Free(firstBlock, m_AllocatorLabel); } return true; } item = default(T); return false; } /// /// Returns an array containing a copy of this queue's content. /// /// The allocator to use. /// An array containing a copy of this queue's content. The elements are ordered in the same order they were /// enqueued, *e.g.* the earliest enqueued element is copied to index 0 of the array. public NativeArray ToArray(AllocatorManager.AllocatorHandle allocator) { UnsafeQueueBlockHeader* firstBlock = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock; var outputArray = CollectionHelper.CreateNativeArray(Count, allocator, NativeArrayOptions.UninitializedMemory); UnsafeQueueBlockHeader* currentBlock = firstBlock; var arrayPtr = (byte*)outputArray.GetUnsafePtr(); int size = UnsafeUtility.SizeOf(); int dstOffset = 0; int srcOffset = m_Buffer->m_CurrentRead * size; int srcOffsetElements = m_Buffer->m_CurrentRead; while (currentBlock != null) { int bytesToCopy = (currentBlock->m_NumItems - srcOffsetElements) * size; UnsafeUtility.MemCpy(arrayPtr + dstOffset, (byte*)(currentBlock + 1) + srcOffset, bytesToCopy); srcOffset = srcOffsetElements = 0; dstOffset += bytesToCopy; currentBlock = currentBlock->m_NextBlock; } return outputArray; } /// /// Removes all elements of this queue. /// public void Clear() { UnsafeQueueBlockHeader* firstBlock = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock; while (firstBlock != null) { UnsafeQueueBlockHeader* next = firstBlock->m_NextBlock; Memory.Unmanaged.Free(firstBlock, m_AllocatorLabel); firstBlock = next; } m_Buffer->m_FirstBlock = IntPtr.Zero; m_Buffer->m_LastBlock = IntPtr.Zero; m_Buffer->m_CurrentRead = 0; #if UNITY_2022_2_14F1_OR_NEWER int maxThreadCount = JobsUtility.ThreadIndexCount; #else int maxThreadCount = JobsUtility.MaxJobThreadCount; #endif for (int threadIndex = 0; threadIndex < maxThreadCount; ++threadIndex) { m_Buffer->SetCurrentWriteBlockTLS(threadIndex, null); } } /// /// Whether this queue has been allocated (and not yet deallocated). /// /// True if this queue has been allocated (and not yet deallocated). public readonly bool IsCreated { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => m_Buffer != null; } /// /// Releases all resources (memory and safety handles). /// public void Dispose() { if (!IsCreated) { return; } UnsafeQueueData.DeallocateQueue(m_Buffer, m_AllocatorLabel); m_Buffer = null; } /// /// Creates and schedules a job that releases all resources (memory and safety handles) of this queue. /// /// The dependency for the new job. /// The handle of the new job. The job depends upon `inputDeps` and releases all resources (memory and safety handles) of this queue. public JobHandle Dispose(JobHandle inputDeps) { if (!IsCreated) { return inputDeps; } var jobHandle = new UnsafeQueueDisposeJob { Data = new UnsafeQueueDispose { m_Buffer = m_Buffer, m_AllocatorLabel = m_AllocatorLabel } }.Schedule(inputDeps); m_Buffer = null; return jobHandle; } /// /// An enumerator over the values of a container. /// /// /// In an enumerator's initial state, is invalid. /// The first call advances the enumerator to the first value. /// public struct Enumerator : IEnumerator { [NativeDisableUnsafePtrRestriction] internal UnsafeQueueBlockHeader* m_FirstBlock; [NativeDisableUnsafePtrRestriction] internal UnsafeQueueBlockHeader* m_Block; internal int m_Index; T value; /// /// Does nothing. /// public void Dispose() { } /// /// Advances the enumerator to the next value. /// /// True if `Current` is valid to read after the call. [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool MoveNext() { m_Index++; for (; m_Block != null ; m_Block = m_Block->m_NextBlock ) { var numItems = m_Block->m_NumItems; if (m_Index < numItems) { value = UnsafeUtility.ReadArrayElement(m_Block + 1, m_Index); return true; } m_Index -= numItems; } value = default; return false; } /// /// Resets the enumerator to its initial state. /// public void Reset() { m_Block = m_FirstBlock; m_Index = -1; } /// /// The current value. /// /// The current value. public T Current { [MethodImpl(MethodImplOptions.AggressiveInlining)] get => value; } object IEnumerator.Current => Current; } /// /// Returns a readonly version of this UnsafeQueue instance. /// /// ReadOnly containers point to the same underlying data as the UnsafeQueue it is made from. /// ReadOnly instance for this. public ReadOnly AsReadOnly() { return new ReadOnly(ref this); } /// /// A read-only alias for the value of a UnsafeQueue. Does not have its own allocated storage. /// public struct ReadOnly : IEnumerable { [NativeDisableUnsafePtrRestriction] UnsafeQueueData* m_Buffer; internal ReadOnly(ref UnsafeQueue data) { m_Buffer = data.m_Buffer; } /// /// Whether this container been allocated (and not yet deallocated). /// /// True if this container has been allocated (and not yet deallocated). public readonly bool IsCreated { [MethodImpl(MethodImplOptions.AggressiveInlining)] get { return m_Buffer != null; } } /// /// Returns true if this queue is empty. /// /// Note that getting the count requires traversing the queue's internal linked list of blocks. /// Where possible, cache this value instead of reading the property repeatedly. /// True if this queue has no items or if the queue has not been constructed. public readonly bool IsEmpty() { int count = 0; var currentRead = m_Buffer->m_CurrentRead; for (UnsafeQueueBlockHeader* block = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock ; block != null ; block = block->m_NextBlock ) { count += block->m_NumItems; if (count > currentRead) { return false; } } return count == currentRead; } /// /// Returns the current number of elements in this queue. /// /// Note that getting the count requires traversing the queue's internal linked list of blocks. /// Where possible, cache this value instead of reading the property repeatedly. /// The current number of elements in this queue. public readonly int Count { get { int count = 0; for (UnsafeQueueBlockHeader* block = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock ; block != null ; block = block->m_NextBlock ) { count += block->m_NumItems; } return count - m_Buffer->m_CurrentRead; } } /// /// The element at an index. /// /// An index. /// The element at the index. /// Thrown if the index is out of bounds. public readonly T this[int index] { get { T result; if (!TryGetValue(index, out result)) { ThrowIndexOutOfRangeException(index); } return result; } } readonly bool TryGetValue(int index, out T item) { if (index >= 0) { var idx = index; for (UnsafeQueueBlockHeader* block = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock ; block != null ; block = block->m_NextBlock ) { var numItems = block->m_NumItems; if (idx < numItems) { item = UnsafeUtility.ReadArrayElement(block + 1, idx); return true; } idx -= numItems; } } item = default; return false; } /// /// Returns an enumerator over the items of this container. /// /// An enumerator over the items of this container. public readonly Enumerator GetEnumerator() { return new Enumerator { m_FirstBlock = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock, m_Block = (UnsafeQueueBlockHeader*)m_Buffer->m_FirstBlock, m_Index = -1, }; } /// /// This method is not implemented. Use instead. /// /// Throws NotImplementedException. /// Method is not implemented. IEnumerator IEnumerable.GetEnumerator() { throw new NotImplementedException(); } /// /// This method is not implemented. Use instead. /// /// Throws NotImplementedException. /// Method is not implemented. IEnumerator IEnumerable.GetEnumerator() { throw new NotImplementedException(); } [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")] [MethodImpl(MethodImplOptions.AggressiveInlining)] readonly void ThrowIndexOutOfRangeException(int index) { throw new IndexOutOfRangeException($"Index {index} is out of bounds [0-{Count}]."); } } /// /// Returns a parallel writer for this queue. /// /// A parallel writer for this queue. public ParallelWriter AsParallelWriter() { ParallelWriter writer; writer.m_Buffer = m_Buffer; writer.m_AllocatorLabel = m_AllocatorLabel; writer.m_ThreadIndex = 0; return writer; } /// /// A parallel writer for a UnsafeQueue. /// /// /// Use to create a parallel writer for a UnsafeQueue. /// [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })] public unsafe struct ParallelWriter { [NativeDisableUnsafePtrRestriction] internal UnsafeQueueData* m_Buffer; internal AllocatorHandle m_AllocatorLabel; [NativeSetThreadIndex] internal int m_ThreadIndex; /// /// Adds an element at the back of the queue. /// /// The value to be enqueued. public void Enqueue(T value) { UnsafeQueueBlockHeader* writeBlock = UnsafeQueueData.AllocateWriteBlockMT(m_Buffer, m_AllocatorLabel, m_ThreadIndex); UnsafeUtility.WriteArrayElement(writeBlock + 1, writeBlock->m_NumItems, value); ++writeBlock->m_NumItems; } /// /// Adds an element at the back of the queue. /// /// The value to be enqueued. /// The thread index which must be set by a field from a job struct with the attribute. internal void Enqueue(T value, int threadIndexOverride) { UnsafeQueueBlockHeader* writeBlock = UnsafeQueueData.AllocateWriteBlockMT(m_Buffer, m_AllocatorLabel, threadIndexOverride); UnsafeUtility.WriteArrayElement(writeBlock + 1, writeBlock->m_NumItems, value); ++writeBlock->m_NumItems; } } [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")] [MethodImpl(MethodImplOptions.AggressiveInlining)] void CheckNotEmpty() { if (m_Buffer->m_FirstBlock == (IntPtr)0) { ThrowEmpty(); } } [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")] static void ThrowEmpty() { throw new InvalidOperationException("Trying to read from an empty queue."); } } [GenerateTestsForBurstCompatibility] internal unsafe struct UnsafeQueueDispose { [NativeDisableUnsafePtrRestriction] internal UnsafeQueueData* m_Buffer; internal AllocatorHandle m_AllocatorLabel; public void Dispose() { UnsafeQueueData.DeallocateQueue(m_Buffer, m_AllocatorLabel); } } [BurstCompile] struct UnsafeQueueDisposeJob : IJob { public UnsafeQueueDispose Data; public void Execute() { Data.Dispose(); } } }