diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 60de4f001e43..3b63feddcc1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1858,7 +1858,20 @@ public Optional requestCompaction(int priority, } // Before we do compaction, try to get rid of unneeded files to simplify things. removeUnneededFiles(); - + Optional compaction = getRequestCompaction(priority, tracker, user, filesCompacting); + if (compaction.isPresent() && compaction.get() != null && compaction.get().getRequest() != null && compaction.get().getRequest().isMajor()) { + List extraFiles = getExtraFiles(); + if (extraFiles.size() != 0){ + addToStoreFiles(extraFiles); + filesCompacting.clear(); + this.triggerMajorCompaction(); + compaction = getRequestCompaction(priority, tracker, user, filesCompacting); + } + } + return compaction; + } + public Optional getRequestCompaction(int priority, + CompactionLifeCycleTracker tracker, User user, List filesCompacting) throws IOException { final CompactionContext compaction = storeEngine.createCompaction(); CompactionRequestImpl request = null; this.lock.readLock().lock(); @@ -1945,6 +1958,58 @@ public Optional requestCompaction(int priority, return Optional.of(compaction); } + private List addToStoreFiles(List storeFiles) throws IOException { + // Load dirty data files before performing compaction + if (storeFiles != null && storeFiles.size() != 0) { + LOG.info("add new " + storeFiles + " to storeFiles"); + this.lock.writeLock().lock(); + try { + this.storeEngine.getStoreFileManager().insertNewFiles(storeFiles); + } finally { + this.lock.writeLock().unlock(); + } + notifyChangedReadersObservers(storeFiles); + } + return storeFiles; + } + + private List getExtraFiles() throws IOException { + List extraFiles = new ArrayList<>(); + //step.1 Get the compacted files + Collection compactedFiles = + this.getStoreEngine().getStoreFileManager().getCompactedfiles(); + //step.2 Get all files under the data path + Collection storeFileInfos = fs.getStoreFiles(getColumnFamilyName()); + if (storeFileInfos == null || storeFileInfos.size() != 0) { + return extraFiles; + } + List compactedPathNames = new ArrayList<>(); + if (compactedFiles != null && compactedFiles.size() != 0){ + compactedPathNames = compactedFiles.stream().map(HStoreFile ::getPathName).collect( + Collectors.toList()); + } + //step.3 Get the files referenced in memory + Collection memoryStoreFiles = this.getStorefiles(); + List memoryPathNames = memoryStoreFiles.stream().map(HStoreFile ::getPathName).collect( + Collectors.toList()); + //step.4 Filter out files that have been compacted and in memory + Iterator iterator = storeFileInfos.iterator(); + while (iterator.hasNext()) { + StoreFileInfo storeFileInfo = iterator.next(); + if (memoryPathNames.contains(storeFileInfo.getPathName()) || compactedPathNames.contains(storeFileInfo.getPathName())) { + iterator.remove(); + } + } + if (storeFileInfos.size() != 0) { + for (final StoreFileInfo storeFileInfo : storeFileInfos) { + // open each store file in parallel + HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); + extraFiles.add(storeFile); + } + } + return extraFiles; + } + /** Adds the files to compacting files. filesCompacting must be locked. */ private void addToCompactingFiles(Collection filesToAdd) { if (CollectionUtils.isEmpty(filesToAdd)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index cf73f0f68871..9428586de5ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -242,6 +242,11 @@ public Path getPath() { return this.fileInfo.getPath(); } + @Override + public String getPathName() { + return this.fileInfo.getPath().getName(); + } + @Override public Path getEncodedPath() { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 7cfdfee2d157..4e97018cdc71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -65,6 +65,11 @@ public interface StoreFile { */ Path getPath(); + /** + * @return Path name if this StoreFile was made with a Stream. + */ + String getPathName(); + /** * @return Encoded Path if this StoreFile was made with a Stream. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index abfb44f967e4..65a44813b2ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -413,6 +413,11 @@ public Path getPath() { return initialPath; } + /** @return The name of the file */ + public String getPathName() { + return initialPath.getName(); + } + /** @return The {@link FileStatus} of the file */ public FileStatus getFileStatus() throws IOException { return getReferencedFileStatus(fs);