Skip to content

Commit

Permalink
NIFI-7073: This closes apache#4025. Route to failure when error on Pu…
Browse files Browse the repository at this point in the history
…tHDFS file system close

Signed-off-by: Joe Witt <[email protected]>
  • Loading branch information
mattyb149 authored and joewitt committed Jan 30, 2020
1 parent 850869c commit 76e8c51
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
Expand Down Expand Up @@ -353,16 +352,15 @@ public void process(InputStream in) throws IOException {
if (fos != null) {
fos.close();
}
} catch (RemoteException re) {
} catch (Throwable t) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) {
try {
hdfs.delete(createdFile, false);
} catch (Throwable ignore) {
}
}
throw re;
} catch (Throwable ignore) {
throw t;
}
fos = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,33 @@ public void testPutFilePermissionsWithNoConfiguredUmask() throws IOException {
fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
}

@Test
public void testPutFileWithCloseException() throws IOException {
mockFileSystem = new MockFileSystem(true);
String dirName = "target/testPutFileCloseException";
File file = new File(dirName);
file.mkdirs();
Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());

TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem));
runner.setProperty(PutHDFS.DIRECTORY, dirName);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");

try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
}

List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
assertFalse(failedFlowFiles.isEmpty());
assertTrue(failedFlowFiles.get(0).isPenalized());

mockFileSystem.delete(p, true);
}

private class TestablePutHDFS extends PutHDFS {

private KerberosProperties testKerberosProperties;
Expand Down Expand Up @@ -461,6 +488,15 @@ protected FileSystem getFileSystem() {

private class MockFileSystem extends FileSystem {
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
private final boolean failOnClose;

public MockFileSystem() {
failOnClose = false;
}

public MockFileSystem(boolean failOnClose) {
this.failOnClose = failOnClose;
}

@Override
public URI getUri() {
Expand All @@ -476,7 +512,17 @@ public FSDataInputStream open(final Path f, final int bufferSize) {
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) {
pathToStatus.put(f, newFile(f, permission));
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
@Override
public void close() throws IOException {
super.close();
throw new IOException("Fail on close");
}
};
} else {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
}
}

@Override
Expand Down

0 comments on commit 76e8c51

Please sign in to comment.