diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java index bc69f095642e2..03f44c3c29520 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java @@ -40,6 +40,7 @@ import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -334,10 +335,11 @@ public boolean exists() { public void remove() throws IOException { lock.writeLock().lock(); try { + long sizeBeforeRemove = fileExists ? getFileLength() : 0; close(); FileUtils.deleteFileOrDirectory(file); if (fileExists) { - updateModFileMetric(-1, -getFileLength()); + updateModFileMetric(-1, -sizeBeforeRemove); } fileExists = false; removed = true; @@ -394,12 +396,12 @@ public String toString() { public void compact() throws IOException { long originFileSize = getFileLength(); if (originFileSize > COMPACT_THRESHOLD && !hasCompacted) { + File compactedFile = new File(getFile().getPath() + COMPACT_SUFFIX); try { Map> pathModificationMap = getAllMods().stream().collect(Collectors.groupingBy(ModEntry::keyOfPatternTree)); - String newModsFileName = getFile().getPath() + COMPACT_SUFFIX; try (ModificationFile compactedModificationFile = - new ModificationFile(newModsFileName, false)) { + new ModificationFile(compactedFile, false)) { Set>> modificationsEntrySet = pathModificationMap.entrySet(); for (Map.Entry> modificationEntry : modificationsEntrySet) { @@ -408,12 +410,15 @@ public void compact() throws IOException { } } catch (IOException e) { LOGGER.error("compact mods file exception of {}", file, e); + throw e; } - // remove origin mods file - this.remove(); - fileExists = true; - // rename new mods file to origin name - Files.move(new File(newModsFileName).toPath(), file.toPath()); + long compactedFileSize = compactedFile.length(); + close(); + Files.move(compactedFile.toPath(), file.toPath(), StandardCopyOption.REPLACE_EXISTING); + if (updateMetrics) { + FileMetrics.getInstance().increaseModFileSize(compactedFileSize - originFileSize); + } + fileExists = compactedFileSize > 0; LOGGER.info("{} settle successful", file); if (getFileLength() > COMPACT_THRESHOLD) { @@ -424,6 +429,7 @@ public void compact() throws IOException { } } catch (IOException e) { LOGGER.error("remove origin file or rename new mods file error.", e); + throw e; } hasCompacted = true; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionUtilsTest.java index 1fa1c7339449c..2972c0ee929be 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionUtilsTest.java @@ -20,10 +20,15 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.utils; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; @@ -32,6 +37,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; public class CompactionUtilsTest extends AbstractCompactionTest { @Override @@ -54,4 +60,34 @@ public void testCompactionPathUtils() { Assert.fail(); } } + + @Test + public void testDeleteSourceTsFileUpdatesModMetrics() throws Exception { + int modFileNumBefore = FileMetrics.getInstance().getModFileNum(); + long modFileSizeBefore = FileMetrics.getInstance().getModFileSize(); + + createFiles(2, 1, 1, 10, 0, 0, 10, 10, false, true); + + long totalModFileSize = 0; + for (int i = 0; i < seqResources.size(); i++) { + try (ModificationFile modificationFile = seqResources.get(i).getModFileForWrite()) { + modificationFile.write( + new TreeDeletionEntry( + new MeasurementPath(new String[] {COMPACTION_TEST_SG, "d0", "s0"}), + Long.MIN_VALUE, + i + 10)); + totalModFileSize += modificationFile.getFileLength(); + } + } + + Assert.assertEquals( + modFileNumBefore + seqResources.size(), FileMetrics.getInstance().getModFileNum()); + Assert.assertEquals( + modFileSizeBefore + totalModFileSize, FileMetrics.getInstance().getModFileSize()); + + CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(new ArrayList<>(seqResources), true); + + Assert.assertEquals(modFileNumBefore, FileMetrics.getInstance().getModFileNum()); + Assert.assertEquals(modFileSizeBefore, FileMetrics.getInstance().getModFileSize()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java index a0a9885ecf08e..fb4ce06fc33ca 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager; import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; @@ -50,6 +51,67 @@ public class ModificationFileTest { + @Test + public void testRemoveUpdatesMetrics() throws IOException { + String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.remove.metrics.temp"); + int modFileNumBefore = FileMetrics.getInstance().getModFileNum(); + long modFileSizeBefore = FileMetrics.getInstance().getModFileSize(); + try (ModificationFile modificationFile = new ModificationFile(tempFileName, true)) { + modificationFile.write( + new TreeDeletionEntry( + new MeasurementPath(new String[] {"root", "sg", "d1", "s1"}), 1, 10)); + long fileLength = modificationFile.getFileLength(); + assertEquals(modFileNumBefore + 1, FileMetrics.getInstance().getModFileNum()); + assertEquals(modFileSizeBefore + fileLength, FileMetrics.getInstance().getModFileSize()); + + modificationFile.remove(); + assertEquals(modFileNumBefore, FileMetrics.getInstance().getModFileNum()); + assertEquals(modFileSizeBefore, FileMetrics.getInstance().getModFileSize()); + } finally { + Files.deleteIfExists(new File(tempFileName).toPath()); + } + } + + @Test + public void testCompactUpdatesMetricsAndAllowFurtherWrite() throws IOException { + String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.compact.metrics.temp"); + int modFileNumBefore = FileMetrics.getInstance().getModFileNum(); + long modFileSizeBefore = FileMetrics.getInstance().getModFileSize(); + long time = 1000; + try (ModificationFile modificationFile = new ModificationFile(tempFileName, true)) { + while (modificationFile.getFileLength() < 1024 * 1024) { + modificationFile.write( + new TreeDeletionEntry( + new MeasurementPath(new String[] {"root", "sg", "d1", "s1"}), + Long.MIN_VALUE, + time += 5000)); + } + + assertEquals(modFileNumBefore + 1, FileMetrics.getInstance().getModFileNum()); + modificationFile.compact(); + assertEquals(modFileNumBefore + 1, FileMetrics.getInstance().getModFileNum()); + assertEquals( + modFileSizeBefore + modificationFile.getFileLength(), + FileMetrics.getInstance().getModFileSize()); + + modificationFile.write( + new TreeDeletionEntry( + new MeasurementPath(new String[] {"root", "sg", "d1", "s2"}), + Long.MIN_VALUE, + time + 5000)); + assertEquals(modFileNumBefore + 1, FileMetrics.getInstance().getModFileNum()); + assertEquals( + modFileSizeBefore + modificationFile.getFileLength(), + FileMetrics.getInstance().getModFileSize()); + + modificationFile.remove(); + assertEquals(modFileNumBefore, FileMetrics.getInstance().getModFileNum()); + assertEquals(modFileSizeBefore, FileMetrics.getInstance().getModFileSize()); + } finally { + Files.deleteIfExists(new File(tempFileName).toPath()); + } + } + @Test public void readMyWrite() { String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");