Skip to content

Commit

Permalink
Fixes HDFS-17181 by routing all CREATE requests to the BlockManager (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
lfrancke authored Feb 20, 2024
1 parent 1336c36 commit d1daf26
Showing 1 changed file with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import javax.servlet.ServletContext;
Expand Down Expand Up @@ -278,6 +279,9 @@ protected void queueExternalCall(ExternalCall call)
namenode.queueExternalCall(call);
}

/**
* Chooses a Datanode to redirect a request to.
*/
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
Expand All @@ -288,18 +292,18 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
throw new IOException("Namesystem has not been initialized yet.");
}
final BlockManager bm = fsn.getBlockManager();
HashSet<Node> excludes = new HashSet<Node>();

Set<Node> excludes = new HashSet<>();
if (excludeDatanodes != null) {
for (String host : StringUtils
.getTrimmedStringCollection(excludeDatanodes)) {
int idx = host.indexOf(":");
int idx = host.indexOf(':');
Node excludeNode = null;
if (idx != -1) {
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr(
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)));
} else {
if (idx == -1) {
excludeNode = bm.getDatanodeManager().getDatanodeByHost(host);
} else {
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr(
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)));
}

if (excludeNode != null) {
Expand All @@ -311,25 +315,15 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
}
}

if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(remoteAddr);
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, excludes, blocksize);
if (storages.length > 0) {
return storages[0].getDatanodeDescriptor();
}
}
} else if (op == GetOpParam.Op.OPEN
// For these operations choose a datanode containing a replica
if (op == GetOpParam.Op.OPEN
|| op == GetOpParam.Op.GETFILECHECKSUM
|| op == PostOpParam.Op.APPEND) {
//choose a datanode containing a replica
final NamenodeProtocols np = getRPCServer(namenode);
if (status == null) {
throw new FileNotFoundException("File " + path + " not found.");
}

final long len = status.getLen();
if (op == GetOpParam.Op.OPEN) {
if (openOffset < 0L || (openOffset >= len && len > 0)) {
Expand All @@ -344,10 +338,22 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
final int count = locations.locatedBlockCount();
if (count > 0) {
return bestNode(locations.get(0).getLocations(), excludes);
} else {
throw new IOException("Block could not be located. Path=" + path + ", offset=" + offset);
}
}
}

// All other operations don't affect a specific node so let the BlockManager pick a target
DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(remoteAddr);

DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, excludes, blocksize);
if (storages.length > 0) {
return storages[0].getDatanodeDescriptor();
}

return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
).chooseRandom(NodeBase.ROOT, excludes);
}
Expand All @@ -358,13 +364,13 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
* to return the first element of the node here.
*/
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes,
HashSet<Node> excludes) throws IOException {
Set<Node> excludes) throws IOException {
for (DatanodeInfo dn: nodes) {
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
if (!dn.isDecommissioned() && !excludes.contains(dn)) {
return dn;
}
}
throw new IOException("No active nodes contain this block");
throw new IOException("No active and not excluded nodes contain this block");
}

public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
Expand Down

0 comments on commit d1daf26

Please sign in to comment.