Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28915 fix:major compaction Select File Policy #6553

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1858,7 +1858,20 @@ public Optional<CompactionContext> requestCompaction(int priority,
}
// Before we do compaction, try to get rid of unneeded files to simplify things.
removeUnneededFiles();

Optional<CompactionContext> compaction = getRequestCompaction(priority, tracker, user, filesCompacting);
if (compaction.isPresent() && compaction.get() != null && compaction.get().getRequest() != null && compaction.get().getRequest().isMajor()) {
List<HStoreFile> extraFiles = getExtraFiles();
if (extraFiles.size() != 0){
addToStoreFiles(extraFiles);
filesCompacting.clear();
this.triggerMajorCompaction();
compaction = getRequestCompaction(priority, tracker, user, filesCompacting);
}
}
return compaction;
}
public Optional<CompactionContext> getRequestCompaction(int priority,
CompactionLifeCycleTracker tracker, User user, List<HStoreFile> filesCompacting) throws IOException {
final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequestImpl request = null;
this.lock.readLock().lock();
Expand Down Expand Up @@ -1945,6 +1958,58 @@ public Optional<CompactionContext> requestCompaction(int priority,
return Optional.of(compaction);
}

private List<HStoreFile> addToStoreFiles(List<HStoreFile> storeFiles) throws IOException {
// Load dirty data files before performing compaction
if (storeFiles != null && storeFiles.size() != 0) {
LOG.info("add new " + storeFiles + " to storeFiles");
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(storeFiles);
} finally {
this.lock.writeLock().unlock();
}
notifyChangedReadersObservers(storeFiles);
}
return storeFiles;
}

private List<HStoreFile> getExtraFiles() throws IOException {
List<HStoreFile> extraFiles = new ArrayList<>();
//step.1 Get the compacted files
Collection<HStoreFile> compactedFiles =
this.getStoreEngine().getStoreFileManager().getCompactedfiles();
//step.2 Get all files under the data path
Collection<StoreFileInfo> storeFileInfos = fs.getStoreFiles(getColumnFamilyName());
if (storeFileInfos == null || storeFileInfos.size() != 0) {
return extraFiles;
}
List<String> compactedPathNames = new ArrayList<>();
if (compactedFiles != null && compactedFiles.size() != 0){
compactedPathNames = compactedFiles.stream().map(HStoreFile ::getPathName).collect(
Collectors.toList());
}
//step.3 Get the files referenced in memory
Collection<HStoreFile> memoryStoreFiles = this.getStorefiles();
List<String> memoryPathNames = memoryStoreFiles.stream().map(HStoreFile ::getPathName).collect(
Collectors.toList());
//step.4 Filter out files that have been compacted and in memory
Iterator<StoreFileInfo> iterator = storeFileInfos.iterator();
while (iterator.hasNext()) {
StoreFileInfo storeFileInfo = iterator.next();
if (memoryPathNames.contains(storeFileInfo.getPathName()) || compactedPathNames.contains(storeFileInfo.getPathName())) {
iterator.remove();
}
}
if (storeFileInfos.size() != 0) {
for (final StoreFileInfo storeFileInfo : storeFileInfos) {
// open each store file in parallel
HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
extraFiles.add(storeFile);
}
}
return extraFiles;
}

/** Adds the files to compacting files. filesCompacting must be locked. */
private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
if (CollectionUtils.isEmpty(filesToAdd)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ public Path getPath() {
return this.fileInfo.getPath();
}

@Override
public String getPathName() {
return this.fileInfo.getPath().getName();
}

@Override
public Path getEncodedPath() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public interface StoreFile {
*/
Path getPath();

/**
* @return Path name if this StoreFile was made with a Stream.
*/
String getPathName();

/**
* @return Encoded Path if this StoreFile was made with a Stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ public Path getPath() {
return initialPath;
}

/** @return The name of the file */
public String getPathName() {
return initialPath.getName();
}

/** @return The {@link FileStatus} of the file */
public FileStatus getFileStatus() throws IOException {
return getReferencedFileStatus(fs);
Expand Down