diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index a224e5b2c3586..398cc2cfe6e53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -68,8 +68,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -155,6 +157,9 @@ public class FragmentInstanceContext extends QueryContext { private long closedUnseqFileNum = 0; private boolean highestPriority = false; + // accessed value columns on each referenced AlignedTVList. + private final Map> alignedTVListColumnAccessMap = new ConcurrentHashMap<>(); + public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { FragmentInstanceContext instanceContext = @@ -205,6 +210,43 @@ public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { this.queryDataSourceType = queryDataSourceType; } + /** + * Record columns of the AlignedTVList accessed by the query. This method is called from + * prepareTvListMapForQuery with tvList.lockQueryList() held. Even though the HashSet inside + * alignedTVListColumnAccessMap is not thread-safe, the calling pattern guarantees thread safety + * without requiring additional synchronization. + * + * @param tvList the TVList being accessed + * @param columnIndexList list of column indices being accessed + */ + public void putAccessedColumns(TVList tvList, List columnIndexList) { + Set accessedColumns = + alignedTVListColumnAccessMap.computeIfAbsent(tvList, ignored -> new HashSet<>()); + columnIndexList.stream() + .filter(Objects::nonNull) + .forEach( + index -> { + if (index >= 0) { + accessedColumns.add(index); + } + }); + } + + /** + * Get columns of the AlignedTVList accessed by the query. This method is called from + * prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring that no other thread can + * change accessed columns for the same TVList concurrently. + * + * @param tvList the TVList being accessed + * @return set of column indices being accessed + */ + public Set getAccessedAlignedColumns(TVList tvList) { + Set accessedColumns = alignedTVListColumnAccessMap.get(tvList); + return accessedColumns == null + ? Collections.emptySet() + : Collections.unmodifiableSet(accessedColumns); + } + @TestOnly public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { @@ -871,12 +913,12 @@ public void releaseResourceWhenAllDriversAreClosed() { */ private void releaseTVListOwnedByQuery() { for (TVList tvList : tvListSet) { - long tvListRamSize = tvList.calculateRamSize().getRamSize(); tvList.lockQueryList(); Set queryContextSet = tvList.getQueryContextSet(); try { queryContextSet.remove(this); if (tvList.getOwnerQuery() == this) { + long tvListRamSize = tvList.calculateRamSize().getRamSize(); if (tvList.getReservedMemoryBytes() != tvListRamSize) { LOGGER.warn( "Release TVList owned by query: allocate size {}, release size {}", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index fa2f603d6facc..86e77d5819890 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.NodeRef; import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; @@ -38,6 +39,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.tsfile.enums.TSDataType; @@ -63,9 +65,11 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.iotdb.commons.path.AlignedPath.VECTOR_PLACEHOLDER; @@ -112,16 +116,18 @@ public abstract List getVisibleMetadataListFromWriter( * the data region write lock. At this moment, query thread holds the data region read lock. * * @param context query context - * @param memChunk writable memchunk + * @param memChunkRef writable memchunk reference * @param isWorkMemTable in working or flushing memtable * @param globalTimeFilter global time filter * @return Map */ protected Map prepareTvListMapForQuery( QueryContext context, - IWritableMemChunk memChunk, + NodeRef memChunkRef, boolean isWorkMemTable, - Filter globalTimeFilter) { + Filter globalTimeFilter, + List columnIndexList) { + IWritableMemChunk memChunk = memChunkRef.getNode(); // should copy globalTimeFilter because GroupByMonthFilter is stateful Filter copyTimeFilter = null; if (globalTimeFilter != null) { @@ -148,8 +154,15 @@ protected Map prepareTvListMapForQuery( // mutable tvlist TVList list = memChunk.getWorkingTVList(); - TVList cloneList = null; - TVList.RamInfo listRamInfo = list.calculateRamSize(); + TVList.RamInfo listRamInfo = null; + + // calculateRamSize (synchronized method on TVList) was previously called before + // lockQueryList to avoid deadlock concerns. For partial clone of AlignedTVList, however + // calculateRamSize must now be called inside the lockQueryList section because it depends on + // queries accessing the AlignedTVList. + // This is safe because the lock ordering — queryListLock must always be acquired before the + // TVList intrinsic lock (via synchronized methods like calculateRamSize, clone). So no AB-BA + // deadlock is possible. list.lockQueryList(); try { if (copyTimeFilter != null @@ -199,6 +212,11 @@ protected Map prepareTvListMapForQuery( tvListQueryMap.put(workingListForFlushSort, workingListForFlushSort.rowCount()); } } else { + // columnIndexList is to track column-level access for AlignedTVList. + // For TVList (primitive time series), it remains null and column tracking is not needed. + if (columnIndexList != null && context instanceof FragmentInstanceContext) { + ((FragmentInstanceContext) context).putAccessedColumns(list, columnIndexList); + } if (list.isSorted() || list.getQueryContextSet().isEmpty()) { LOGGER.debug( "Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it"); @@ -222,38 +240,64 @@ protected Map prepareTvListMapForQuery( LOGGER.debug( "Working MemTable - clone mutable TVList and replace old TVList in working MemTable"); QueryContext firstQuery = list.getQueryContextSet().iterator().next(); - // reserve query memory - if (firstQuery instanceof FragmentInstanceContext) { - MemoryReservationManager memoryReservationManager = - ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(listRamInfo.getRamSize()); - list.setReservedMemoryBytes(listRamInfo.getRamSize()); - } - list.setOwnerQuery(firstQuery); - // clone TVList - cloneList = list.clone(); - cloneList.getQueryContextSet().add(context); - tvListQueryMap.put(cloneList, cloneList.rowCount()); + // Synchronize on memChunk to prevent concurrent clone-and-replace races. + // Another query entering this same branch could call memChunk.setWorkingTVList(cloneList) + // between our check above and the clone below. By locking on memChunk we ensure only one + // query performs the clone-and-swap at a time. + synchronized (memChunk) { + // We re-fetch via getWorkingTVList() inside the lock because the working TVList may + // have already been replaced by a concurrent query that acquired the lock first — + // operating on the stale outer 'list' would clone an already-detached copy. + TVList workingTVList = memChunk.getWorkingTVList(); + Set columnsToClone = getAccessedColumnsForQuery(workingTVList); + listRamInfo = + (columnsToClone == null) + ? workingTVList.calculateRamSize() + : ((AlignedTVList) workingTVList).calculateRamSize(columnsToClone); + + // reserve query memory + if (firstQuery instanceof FragmentInstanceContext) { + MemoryReservationManager memoryReservationManager = + ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); + memoryReservationManager.reserveMemoryCumulatively(listRamInfo.getRamSize()); + workingTVList.setReservedMemoryBytes(listRamInfo.getRamSize()); + } + workingTVList.setOwnerQuery(firstQuery); + + // clone TVList + TVList cloneList = + (columnsToClone == null) + ? workingTVList.clone() + : ((AlignedTVList) workingTVList).clone(columnsToClone); + + cloneList.getQueryContextSet().add(context); + tvListQueryMap.put(cloneList, cloneList.rowCount()); + + memChunk.setWorkingTVList(cloneList); + } } } } catch (MemoryNotEnoughException ex) { - LOGGER.warn( - "Failed to reserve memory for TVList: ramSize {}, timestampsSize {}, arrayMemCost {}, rowCount {}, dataTypes {}", - listRamInfo.getRamSize(), - listRamInfo.getTimestampsSize(), - listRamInfo.getArrayMemCost(), - listRamInfo.getRowCount(), - listRamInfo.getDataTypes()); + if (listRamInfo != null) { + LOGGER.warn( + "Failed to reserve memory for TVList: ramSize {}, timestampsSize {}, arrayMemCost {}, rowCount {}, dataTypes {}", + listRamInfo.getRamSize(), + listRamInfo.getTimestampsSize(), + listRamInfo.getArrayMemCost(), + listRamInfo.getRowCount(), + listRamInfo.getDataTypes()); + } throw ex; } finally { list.unlockQueryList(); } - if (cloneList != null) { - memChunk.setWorkingTVList(cloneList); - } return tvListQueryMap; } + + protected Set getAccessedColumnsForQuery(TVList tvList) { + return null; + } } class AlignedResourceByPathUtils extends ResourceByPathUtils { @@ -400,15 +444,19 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( return null; } - // prepare AlignedTVList for query. It should clone TVList if necessary. - Map alignedTvListQueryMap = - prepareTvListMapForQuery( - context, alignedMemChunk, modsToMemtable == null, globalTimeFilter); - // column index list for the query List columnIndexList = alignedMemChunk.buildColumnIndexList(partialPath.getSchemaList()); + // prepare AlignedTVList for query. It should clone TVList if necessary. + Map alignedTvListQueryMap = + prepareTvListMapForQuery( + context, + NodeRef.of(alignedMemChunk), + modsToMemtable == null, + globalTimeFilter, + columnIndexList); + List> deletionList = null; if (modsToMemtable != null) { deletionList = @@ -419,6 +467,25 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( context, columnIndexList, getMeasurementSchema(), alignedTvListQueryMap, deletionList); } + /** + * This method is called from prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring + * thread-safe access to queryContextSet. + * + * @param tvList the TVList to get accessed columns for + * @return set of accessed column indices, or empty set if no columns are tracked + */ + @Override + protected Set getAccessedColumnsForQuery(TVList tvList) { + Set accessedColumns = new HashSet<>(); + for (QueryContext queryContext : tvList.getQueryContextSet()) { + if (queryContext instanceof FragmentInstanceContext) { + accessedColumns.addAll( + ((FragmentInstanceContext) queryContext).getAccessedAlignedColumns(tvList)); + } + } + return accessedColumns; + } + public VectorMeasurementSchema getMeasurementSchema() { List measurementList = partialPath.getMeasurementList(); TSDataType[] types = new TSDataType[measurementList.size()]; @@ -571,7 +638,8 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( memTableMap.get(deviceID).getMemChunkMap().get(partialPath.getMeasurement()); // prepare TVList for query. It should clone TVList if necessary. Map tvListQueryMap = - prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter); + prepareTvListMapForQuery( + context, NodeRef.of(memChunk), modsToMemtable == null, globalTimeFilter, null); List deletionList = null; if (modsToMemtable != null) { deletionList = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index 6c773942fb72b..13fa84d039e27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -101,7 +101,6 @@ protected void maybeReleaseTvList(TVList tvList) { } private void tryReleaseTvList(TVList tvList) { - long tvListRamSize = tvList.calculateRamSize().getRamSize(); tvList.lockQueryList(); try { if (tvList.getQueryContextSet().isEmpty()) { @@ -113,6 +112,7 @@ private void tryReleaseTvList(TVList tvList) { if (firstQuery instanceof FragmentInstanceContext) { MemoryReservationManager memoryReservationManager = ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); + long tvListRamSize = tvList.calculateRamSize().getRamSize(); memoryReservationManager.reserveMemoryCumulatively(tvListRamSize); tvList.setReservedMemoryBytes(tvListRamSize); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java index bb2ee311d3047..f54af9cfcbb4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java @@ -122,12 +122,12 @@ public void sortTvLists() { // We must update queryRowCount here, otherwise, it may be used later to build // BitMaps, causing bitmap array size mismatch and possible out of bound. entry.setValue(alignedTvList.sort()); - long alignedTvListRamSize = alignedTvList.calculateRamSize().getRamSize(); alignedTvList.lockQueryList(); try { FragmentInstanceContext ownerQuery = (FragmentInstanceContext) alignedTvList.getOwnerQuery(); if (ownerQuery != null) { + long alignedTvListRamSize = alignedTvList.calculateRamSize().getRamSize(); long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes(); if (deltaBytes > 0) { ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); @@ -367,12 +367,12 @@ public IPointReader getPointReader() { int queryLength = entry.getValue(); if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) { entry.setValue(alignedTvList.sort()); - long alignedTvListRamSize = alignedTvList.calculateRamSize().getRamSize(); alignedTvList.lockQueryList(); try { FragmentInstanceContext ownerQuery = (FragmentInstanceContext) alignedTvList.getOwnerQuery(); if (ownerQuery != null) { + long alignedTvListRamSize = alignedTvList.calculateRamSize().getRamSize(); long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes(); if (deltaBytes > 0) { ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 06bd5404be0f1..65902d6d61daf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -225,12 +225,12 @@ public void writeAlignedTablet( } @Override - public AlignedTVList getWorkingTVList() { + public synchronized AlignedTVList getWorkingTVList() { return list; } @Override - public void setWorkingTVList(TVList list) { + public synchronized void setWorkingTVList(TVList list) { this.list = (AlignedTVList) list; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java index c0a71bf7edcdc..223e9ebb81142 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java @@ -136,11 +136,11 @@ public void sortTvLists() { int queryRowCount = entry.getValue(); if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) { entry.setValue(tvList.sort()); - long tvListRamSize = tvList.calculateRamSize().getRamSize(); tvList.lockQueryList(); try { FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery(); if (ownerQuery != null) { + long tvListRamSize = tvList.calculateRamSize().getRamSize(); long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes(); if (deltaBytes > 0) { ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); @@ -288,11 +288,11 @@ public IPointReader getPointReader() { int queryLength = entry.getValue(); if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) { entry.setValue(tvList.sort()); - long tvListRamSize = tvList.calculateRamSize().getRamSize(); tvList.lockQueryList(); try { FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery(); if (ownerQuery != null) { + long tvListRamSize = tvList.calculateRamSize().getRamSize(); long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes(); if (deltaBytes > 0) { ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index b99e784faadb0..9bbe1a0575e66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -52,6 +52,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -151,32 +152,14 @@ public synchronized AlignedTVList cloneForFlushSort() { public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); cloneAs(cloneList); - System.arraycopy( - memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); - for (int i = 0; i < values.size(); i++) { - // Clone value - List columnValues = values.get(i); - for (Object valueArray : columnValues) { - cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); - } - // Clone bitmap in columnIndex - if (bitMaps != null && bitMaps.get(i) != null) { - List columnBitMaps = bitMaps.get(i); - if (cloneList.bitMaps == null) { - cloneList.bitMaps = new ArrayList<>(dataTypes.size()); - for (int j = 0; j < dataTypes.size(); j++) { - cloneList.bitMaps.add(null); - } - } - if (cloneList.bitMaps.get(i) == null) { - List cloneColumnBitMaps = new ArrayList<>(); - for (BitMap bitMap : columnBitMaps) { - cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); - } - cloneList.bitMaps.set(i, cloneColumnBitMaps); - } - } - } + cloneColumnDataTo(cloneList, null); + return cloneList; + } + + public synchronized AlignedTVList clone(Set columnsToClone) { + AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); + cloneAs(cloneList); + cloneColumnDataTo(cloneList, columnsToClone); return cloneList; } @@ -191,10 +174,13 @@ public synchronized void putAlignedValue(long timestamp, Object[] value) { timestamps.get(arrayIndex)[elementIndex] = timestamp; for (int i = 0; i < values.size(); i++) { Object columnValue = value[i]; - List columnValues = values.get(i); if (columnValue == null) { markNullValue(i, arrayIndex, elementIndex); } + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } switch (dataTypes.get(i)) { case TEXT: case BLOB: @@ -346,13 +332,7 @@ private TsPrimitiveType getAlignedValueByValueIndex( } public void extendColumn(TSDataType dataType) { - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(values.size()); - for (int i = 0; i < values.size(); i++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; - } + ensureBitMapsInitialized(); List columnValue = new ArrayList<>(); List columnBitMaps = new ArrayList<>(); for (int i = 0; i < timestamps.size(); i++) { @@ -572,16 +552,14 @@ public Pair delete(long lowerBound, long upperBound, int colum } public void deleteColumn(int columnIndex) { - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(dataTypes.size()); - for (int j = 0; j < dataTypes.size(); j++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; + List columnValues = values.get(columnIndex); + if (columnValues == null) { + return; } + ensureBitMapsInitialized(); if (bitMaps.get(columnIndex) == null) { List columnBitMaps = new ArrayList<>(); - for (int i = 0; i < values.get(columnIndex).size(); i++) { + for (int i = 0; i < columnValues.size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE)); } bitMaps.set(columnIndex, columnBitMaps); @@ -635,6 +613,64 @@ protected Object cloneValue(TSDataType type, Object value) { } } + private void cloneColumnDataTo(AlignedTVList cloneList, Set columnsToClone) { + boolean cloneAllColumns = columnsToClone == null; + System.arraycopy( + memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); + for (int i = 0; i < values.size(); i++) { + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } + boolean shouldCloneColumn = cloneAllColumns || columnsToClone.contains(i); + if (!shouldCloneColumn) { + cloneList.values.set(i, columnValues); + values.set(i, null); + if (bitMaps != null && bitMaps.get(i) != null) { + cloneList.ensureBitMapsInitialized(); + cloneList.bitMaps.set(i, bitMaps.get(i)); + bitMaps.set(i, null); + } + memoryBinaryChunkSize[i] = 0; + continue; + } + + for (Object valueArray : columnValues) { + cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); + } + if (bitMaps != null && bitMaps.get(i) != null) { + cloneList.ensureBitMapsInitialized(); + List cloneColumnBitMaps = new ArrayList<>(); + for (BitMap bitMap : bitMaps.get(i)) { + cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); + } + cloneList.bitMaps.set(i, cloneColumnBitMaps); + } + } + } + + /** + * Ensure the bitMaps list is initialized. This method is NOT thread-safe and relies on the caller + * to provide synchronization: + * + *
    + * - Called from synchronized clone methods, which is invoked by query/flush operations. + *
+ * + *
    + * - Called from getBitMap, which is invoked by write/delete operations (putAlignedValue, + * putAlignedValues, extendColumn, deleteColumn). DataRegion write lock is held at this moment. + *
+ */ + private void ensureBitMapsInitialized() { + if (bitMaps == null) { + bitMaps = new ArrayList<>(dataTypes.size()); + for (int i = 0; i < dataTypes.size(); i++) { + bitMaps.add(null); + } + } + } + @Override protected void clearValue() { for (int i = 0; i < dataTypes.size(); i++) { @@ -667,7 +703,11 @@ protected void expandValues() { indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); } for (int i = 0; i < dataTypes.size(); i++) { - values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i))); + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } + columnValues.add(getPrimitiveArraysByType(dataTypes.get(i))); if (bitMaps != null && bitMaps.get(i) != null) { bitMaps.get(i).add(null); } @@ -767,6 +807,9 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex continue; } List columnValues = values.get(i); + if (columnValues == null) { + continue; + } switch (dataTypes.get(i)) { case TEXT: case BLOB: @@ -809,19 +852,17 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex } private BitMap getBitMap(int columnIndex, int arrayIndex) { - // init BitMaps if doesn't have - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(dataTypes.size()); - for (int i = 0; i < dataTypes.size(); i++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; + List columnValues = values.get(columnIndex); + if (columnValues == null) { + return null; } + // init BitMaps if necessary + ensureBitMapsInitialized(); // if the bitmap in columnIndex is null, init the bitmap of this column from the beginning if (bitMaps.get(columnIndex) == null) { List columnBitMaps = new ArrayList<>(); - for (int i = 0; i < values.get(columnIndex).size(); i++) { + for (int i = 0; i < columnValues.size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE, new byte[ARRAY_SIZE])); } bitMaps.set(columnIndex, columnBitMaps); @@ -851,6 +892,14 @@ public synchronized RamInfo calculateRamSize() { timestamps.size(), alignedTvListArrayMemCost(), rowCount, new ArrayList<>(dataTypes)); } + public synchronized RamInfo calculateRamSize(Set columnsToClone) { + return new RamInfo( + timestamps.size(), + alignedTvListArrayMemCost(columnsToClone), + rowCount, + new ArrayList<>(dataTypes)); + } + /** * Get the single alignedTVList array mem cost by give types. * @@ -885,12 +934,15 @@ public static long alignedTvListArrayMemCost(TSDataType[] types) { * * @return AlignedTvListArrayMemSize */ - public long alignedTvListArrayMemCost() { + public long alignedTvListArrayMemCost(Set columnsToClone) { long size = 0; // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { + if (columnsToClone != null && !columnsToClone.contains(column)) { + continue; + } TSDataType type = dataTypes.get(column); - if (type != null) { + if (type != null && values.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); if (bitMaps != null && bitMaps.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; @@ -912,6 +964,10 @@ public long alignedTvListArrayMemCost() { return size; } + public long alignedTvListArrayMemCost() { + return alignedTvListArrayMemCost((Set) null); + } + /** * Get the single column array mem cost by give type. * @@ -1116,6 +1172,9 @@ public void serializeToWAL(IWALByteBufferView buffer) { // serialize value and bitmap by column for (int columnIndex = 0; columnIndex < values.size(); columnIndex++) { List columnValues = values.get(columnIndex); + if (columnValues == null) { + continue; + } for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) { int arrayIndex = rowIndex / ARRAY_SIZE; int elementIndex = rowIndex % ARRAY_SIZE; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index cfc7f887dcfd3..53981a329dd8d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -49,6 +50,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import org.mockito.Mockito; @@ -57,8 +59,11 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -238,6 +243,70 @@ public void testTVListCloneForQuery() { } } + @Test + public void testAlignedTVListPartialColumnClone() { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(2, "test-aligned-partial-clone"); + + try { + // Create MemTable with AlignedPath + List schemaList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + schemaList.add(new MeasurementSchema("sensor_" + i, TSDataType.INT64)); + } + String deviceId = "d1"; + IMemTable memTable = createMemTable(deviceId, schemaList); + + // Verify we have unsorted AlignedTVList + assertEquals(1, memTable.getMemTableMap().size()); + IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next(); + assertEquals(1, memChunkGroup.getMemChunkMap().size()); + IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next(); + TVList tvList = memChunk.getWorkingTVList(); + assertFalse(tvList.isSorted()); + assertEquals(6424, tvList.calculateRamSize().getRamSize()); + assertEquals(100, tvList.rowCount()); + + // FragmentInstance Context + FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1"); + FragmentInstanceStateMachine stateMachine1 = + new FragmentInstanceStateMachine(id1, instanceNotificationExecutor); + FragmentInstanceContext context1 = createFragmentInstanceContext(id1, stateMachine1); + + FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2"); + FragmentInstanceStateMachine stateMachine2 = + new FragmentInstanceStateMachine(id2, instanceNotificationExecutor); + FragmentInstanceContext context2 = createFragmentInstanceContext(id2, stateMachine2); + + // Query 1: sensor_2 and sensor_0 + List measurements1 = Arrays.asList("sensor_2", "sensor_0"); + List schemas1 = Arrays.asList(schemaList.get(2), schemaList.get(0)); + AlignedPath fullPath1 = new AlignedPath(deviceId, measurements1, schemas1); + + ReadOnlyMemChunk readOnlyMemChunk1 = + memTable.query(context1, fullPath1, Long.MIN_VALUE, null, null); + Set accessedColumnsForQuery1 = context1.getAccessedAlignedColumns(tvList); + assertEquals(new HashSet<>(Arrays.asList(0, 2)), accessedColumnsForQuery1); + + // Query 2: sensor_1 and sensor_3 + List measurements2 = Arrays.asList("sensor_1", "sensor_3"); + List schemas2 = Arrays.asList(schemaList.get(1), schemaList.get(3)); + AlignedPath fullPath2 = new AlignedPath(deviceId, measurements2, schemas2); + ReadOnlyMemChunk readOnlyMemChunk2 = + memTable.query(context2, fullPath2, Long.MIN_VALUE, null, null); + + // Only cloned sensor_2 and sensor_0 exist + assertEquals(3352, tvList.calculateRamSize().getRamSize()); + assertEquals(100, tvList.rowCount()); + + } catch (Exception e) { + fail(e.getMessage()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor) throws CpuNotEnoughException { IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class); @@ -298,4 +367,24 @@ private IMemTable createMemTable(String deviceId, String measurementId) } return memTable; } + + private IMemTable createMemTable(String deviceId, List schemaList) + throws IllegalPathException { + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "1"); + + // Insert data in reverse order to make it unsorted + int rows = 100; + for (int i = rows - 1; i >= 0; i--) { + Object[] values = new Object[5]; + for (int j = 0; j < 5; j++) { + values[j] = (long) i * 100 + j; + } + memTable.writeAlignedRow( + DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)), + schemaList, + i, + values); + } + return memTable; + } }