Skip to content
This repository was archived by the owner on Apr 19, 2022. It is now read-only.

Commit

Permalink
Support dfs.client.use.datanode.hostname config property
Browse files Browse the repository at this point in the history
When reading blocks, client talks to datanode by default using the datanode's
IP address returned by the name node. The problem with this approach is that
the IP address is a private IP within the cluster, so the client won't works if
it is in a different network. This commit updates the config module to support
dfs.client.use.datanode.hostname config property, and have the clients to use
this property to determine weather to use the IP address or the hostname to
talk to datanodes.
  • Loading branch information
guhehehe committed Jul 15, 2016
1 parent 0a3aa9c commit 3430c80
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 7 deletions.
19 changes: 14 additions & 5 deletions snakebite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class Client(object):

def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEFAULT_VERSION,
use_trash=False, effective_user=None, use_sasl=False, hdfs_namenode_principal=None,
sock_connect_timeout=10000, sock_request_timeout=10000):
sock_connect_timeout=10000, sock_request_timeout=10000, use_datanode_hostname=False):
'''
:param host: Hostname or IP address of the NameNode
:type host: string
Expand All @@ -108,6 +108,8 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF
:type sock_connect_timeout: int
:param sock_request_timeout: Request timeout in seconds
:type sock_request_timeout: int
:param use_datanode_hostname: Use hostname instead of IP address to commuicate with datanodes
:type use_datanode_hostname: boolean
'''
if hadoop_version < 9:
raise FatalException("Only protocol versions >= 9 supported")
Expand All @@ -123,6 +125,7 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF
self.use_trash = use_trash
self.trash = self._join_user_path(".Trash")
self._server_defaults = None
self.use_datanode_hostname = use_datanode_hostname

log.debug("Created client for %s:%s with trash=%s and sasl=%s" % (host, port, use_trash, use_sasl))

Expand Down Expand Up @@ -1143,7 +1146,7 @@ def _read_file(self, path, node, tail_only, check_crc, tail_length=1024):
successful_read = False
while not locations_queue.empty():
location = locations_queue.get()[1]
host = location.id.ipAddr
host = location.id.hostName if self.use_datanode_hostname else location.id.ipAddr
port = int(location.id.xferPort)
data_xciever = DataXceiverChannel(host, port)
if data_xciever.connect():
Expand Down Expand Up @@ -1385,7 +1388,7 @@ def _reset_retries(self):

def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=False, hdfs_namenode_principal=None,
max_failovers=15, max_retries=10, base_sleep=500, max_sleep=15000, sock_connect_timeout=10000,
sock_request_timeout=10000):
sock_request_timeout=10000, use_datanode_hostname=False):
'''
:param namenodes: Set of namenodes for HA setup
:type namenodes: list
Expand All @@ -1409,6 +1412,8 @@ def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=Fal
:type sock_connect_timeout: int
:param sock_request_timeout: Request timeout in seconds
:type sock_request_timeout: int
:param use_datanode_hostname: Use hostname instead of IP address to commuicate with datanodes
:type use_datanode_hostname: boolean
'''
self.use_trash = use_trash
self.effective_user = effective_user
Expand All @@ -1420,6 +1425,7 @@ def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=Fal
self.max_sleep = max_sleep
self.sock_connect_timeout = sock_connect_timeout
self.sock_request_timeout = sock_request_timeout
self.use_datanode_hostname = use_datanode_hostname

self.failovers = -1
self.retries = -1
Expand Down Expand Up @@ -1458,7 +1464,8 @@ def _switch_namenode(self, namenodes):
self.use_sasl,
self.hdfs_namenode_principal,
self.sock_connect_timeout,
self.sock_request_timeout)
self.sock_request_timeout,
self.use_datanode_hostname)


def __calculate_exponential_time(self, time, retries, cap):
Expand Down Expand Up @@ -1573,8 +1580,10 @@ def __init__(self, hadoop_version=Namenode.DEFAULT_VERSION, effective_user=None,
nns = [Namenode(nn['namenode'], nn['port'], hadoop_version) for nn in configs['namenodes']]
if not nns:
raise InvalidInputException("List of namenodes is empty - couldn't create the client")

super(AutoConfigClient, self).__init__(nns, configs.get('use_trash', False), effective_user,
configs.get('use_sasl', False), configs.get('hdfs_namenode_principal', None),
configs.get('failover_max_attempts'), configs.get('client_retries'),
configs.get('client_sleep_base_millis'), configs.get('client_sleep_max_millis'),
10000, configs.get('socket_timeout_millis'))
10000, configs.get('socket_timeout_millis'),
use_datanode_hostname=configs.get('use_datanode_hostname', False))
2 changes: 1 addition & 1 deletion snakebite/commandlineparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ def setup_client(self):
self.client = HAClient(self.namenodes, use_trash, self.user, self.use_sasl, self.configs['hdfs_namenode_principal'],
self.configs['failover_max_attempts'], self.configs['client_retries'],
self.configs['client_sleep_base_millis'], self.configs['client_sleep_max_millis'],
self.configs['socket_timeout_millis'])
self.configs['socket_timeout_millis'], use_datanode_hostname=self.configs['use_datanode_hostname'])

def execute(self):
if self.args.help:
Expand Down
6 changes: 5 additions & 1 deletion snakebite/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ def read_hdfs_config(cls, hdfs_site_path):
if property.findall('name')[0].text == 'dfs.client.failover.max.attempts':
configs['failover_max_attempts'] = int(property.findall('value')[0].text)

if property.findall('name')[0].text == 'dfs.client.use.datanode.hostname':
configs['use_datanode_hostname'] = bool(property.findall('value')[0].text)

if namenodes:
configs['namenodes'] = namenodes

Expand Down Expand Up @@ -161,7 +164,8 @@ def get_external_config(cls):
'client_sleep_base_millis' : hdfs_configs.get('client_sleep_base_millis', 500),
'client_sleep_max_millis' : hdfs_configs.get('client_sleep_max_millis', 15000),
'socket_timeout_millis' : hdfs_configs.get('socket_timeout_millis', 60000),
'failover_max_attempts' : hdfs_configs.get('failover_max_attempts', 15)
'failover_max_attempts' : hdfs_configs.get('failover_max_attempts', 15),
'use_datanode_hostname' : hdfs_configs.get('use_datanode_hostname', False)
}

return configs
14 changes: 14 additions & 0 deletions test/commandlineparser_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,20 @@ def test_cl_skiptrash_setting_preserved_after_external_nontrash_config(self, env
finally:
self._revert_hdfs_try_paths()

@patch('os.environ.get')
def test_use_datanode_hostname(self, environ_get):
environ_get.return_value = False
# no snakebiterc
# read external config (hdfs-site, core-site)
self.parser.args = MockParseArgs()
try:
HDFSConfig.core_try_paths = (ConfigTest.get_config_path('ha-core-site.xml'),)
HDFSConfig.hdfs_try_paths = (ConfigTest.get_config_path('use-datanode-hostname-hdfs-site.xml'),)
self.parser.init()

self.assertTrue(self.parser.client.use_datanode_hostname)
finally:
self._revert_hdfs_try_paths()


class CommandLineParserExecuteTest(unittest2.TestCase):
Expand Down
5 changes: 5 additions & 0 deletions test/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,8 @@ def test_retry_configs(self):
self.assertEquals(config['client_sleep_max_millis'], 14000)
self.assertEquals(config['socket_timeout_millis'], 25000)
self.assertEquals(config['failover_max_attempts'], 7)

def test_use_datanode_hostname_configs(self):
conf_path = self.get_config_path('use-datanode-hostname-hdfs-site.xml')
config = HDFSConfig.read_hdfs_config(conf_path)
self.assertTrue(config['use_datanode_hostname'])
9 changes: 9 additions & 0 deletions test/testconfig/conf/use-datanode-hostname-hdfs-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>

0 comments on commit 3430c80

Please sign in to comment.