diff --git a/snakebite/client.py b/snakebite/client.py index db4c939..572b5f3 100644 --- a/snakebite/client.py +++ b/snakebite/client.py @@ -88,7 +88,7 @@ class Client(object): def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEFAULT_VERSION, use_trash=False, effective_user=None, use_sasl=False, hdfs_namenode_principal=None, - sock_connect_timeout=10000, sock_request_timeout=10000): + sock_connect_timeout=10000, sock_request_timeout=10000, use_datanode_hostname=False): ''' :param host: Hostname or IP address of the NameNode :type host: string @@ -108,6 +108,8 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF :type sock_connect_timeout: int :param sock_request_timeout: Request timeout in seconds :type sock_request_timeout: int + :param use_datanode_hostname: Use hostname instead of IP address to commuicate with datanodes + :type use_datanode_hostname: boolean ''' if hadoop_version < 9: raise FatalException("Only protocol versions >= 9 supported") @@ -123,6 +125,7 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF self.use_trash = use_trash self.trash = self._join_user_path(".Trash") self._server_defaults = None + self.use_datanode_hostname = use_datanode_hostname log.debug("Created client for %s:%s with trash=%s and sasl=%s" % (host, port, use_trash, use_sasl)) @@ -1143,7 +1146,7 @@ def _read_file(self, path, node, tail_only, check_crc, tail_length=1024): successful_read = False while not locations_queue.empty(): location = locations_queue.get()[1] - host = location.id.ipAddr + host = location.id.hostName if self.use_datanode_hostname else location.id.ipAddr port = int(location.id.xferPort) data_xciever = DataXceiverChannel(host, port) if data_xciever.connect(): @@ -1385,7 +1388,7 @@ def _reset_retries(self): def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=False, hdfs_namenode_principal=None, max_failovers=15, max_retries=10, base_sleep=500, max_sleep=15000, sock_connect_timeout=10000, - sock_request_timeout=10000): + sock_request_timeout=10000, use_datanode_hostname=False): ''' :param namenodes: Set of namenodes for HA setup :type namenodes: list @@ -1409,6 +1412,8 @@ def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=Fal :type sock_connect_timeout: int :param sock_request_timeout: Request timeout in seconds :type sock_request_timeout: int + :param use_datanode_hostname: Use hostname instead of IP address to commuicate with datanodes + :type use_datanode_hostname: boolean ''' self.use_trash = use_trash self.effective_user = effective_user @@ -1420,6 +1425,7 @@ def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=Fal self.max_sleep = max_sleep self.sock_connect_timeout = sock_connect_timeout self.sock_request_timeout = sock_request_timeout + self.use_datanode_hostname = use_datanode_hostname self.failovers = -1 self.retries = -1 @@ -1458,7 +1464,8 @@ def _switch_namenode(self, namenodes): self.use_sasl, self.hdfs_namenode_principal, self.sock_connect_timeout, - self.sock_request_timeout) + self.sock_request_timeout, + self.use_datanode_hostname) def __calculate_exponential_time(self, time, retries, cap): @@ -1573,8 +1580,10 @@ def __init__(self, hadoop_version=Namenode.DEFAULT_VERSION, effective_user=None, nns = [Namenode(nn['namenode'], nn['port'], hadoop_version) for nn in configs['namenodes']] if not nns: raise InvalidInputException("List of namenodes is empty - couldn't create the client") + super(AutoConfigClient, self).__init__(nns, configs.get('use_trash', False), effective_user, configs.get('use_sasl', False), configs.get('hdfs_namenode_principal', None), configs.get('failover_max_attempts'), configs.get('client_retries'), configs.get('client_sleep_base_millis'), configs.get('client_sleep_max_millis'), - 10000, configs.get('socket_timeout_millis')) + 10000, configs.get('socket_timeout_millis'), + use_datanode_hostname=configs.get('use_datanode_hostname', False)) diff --git a/snakebite/commandlineparser.py b/snakebite/commandlineparser.py index 66c66cb..90306bd 100644 --- a/snakebite/commandlineparser.py +++ b/snakebite/commandlineparser.py @@ -446,7 +446,7 @@ def setup_client(self): self.client = HAClient(self.namenodes, use_trash, self.user, self.use_sasl, self.configs['hdfs_namenode_principal'], self.configs['failover_max_attempts'], self.configs['client_retries'], self.configs['client_sleep_base_millis'], self.configs['client_sleep_max_millis'], - self.configs['socket_timeout_millis']) + self.configs['socket_timeout_millis'], use_datanode_hostname=self.configs['use_datanode_hostname']) def execute(self): if self.args.help: diff --git a/snakebite/config.py b/snakebite/config.py index c48fdff..3a060d3 100644 --- a/snakebite/config.py +++ b/snakebite/config.py @@ -113,6 +113,9 @@ def read_hdfs_config(cls, hdfs_site_path): if property.findall('name')[0].text == 'dfs.client.failover.max.attempts': configs['failover_max_attempts'] = int(property.findall('value')[0].text) + if property.findall('name')[0].text == 'dfs.client.use.datanode.hostname': + configs['use_datanode_hostname'] = bool(property.findall('value')[0].text) + if namenodes: configs['namenodes'] = namenodes @@ -161,7 +164,8 @@ def get_external_config(cls): 'client_sleep_base_millis' : hdfs_configs.get('client_sleep_base_millis', 500), 'client_sleep_max_millis' : hdfs_configs.get('client_sleep_max_millis', 15000), 'socket_timeout_millis' : hdfs_configs.get('socket_timeout_millis', 60000), - 'failover_max_attempts' : hdfs_configs.get('failover_max_attempts', 15) + 'failover_max_attempts' : hdfs_configs.get('failover_max_attempts', 15), + 'use_datanode_hostname' : hdfs_configs.get('use_datanode_hostname', False) } return configs diff --git a/test/commandlineparser_test.py b/test/commandlineparser_test.py index deac538..4f630ab 100644 --- a/test/commandlineparser_test.py +++ b/test/commandlineparser_test.py @@ -1093,6 +1093,20 @@ def test_cl_skiptrash_setting_preserved_after_external_nontrash_config(self, env finally: self._revert_hdfs_try_paths() + @patch('os.environ.get') + def test_use_datanode_hostname(self, environ_get): + environ_get.return_value = False + # no snakebiterc + # read external config (hdfs-site, core-site) + self.parser.args = MockParseArgs() + try: + HDFSConfig.core_try_paths = (ConfigTest.get_config_path('ha-core-site.xml'),) + HDFSConfig.hdfs_try_paths = (ConfigTest.get_config_path('use-datanode-hostname-hdfs-site.xml'),) + self.parser.init() + + self.assertTrue(self.parser.client.use_datanode_hostname) + finally: + self._revert_hdfs_try_paths() class CommandLineParserExecuteTest(unittest2.TestCase): diff --git a/test/config_test.py b/test/config_test.py index 546026b..09c449c 100644 --- a/test/config_test.py +++ b/test/config_test.py @@ -138,3 +138,8 @@ def test_retry_configs(self): self.assertEquals(config['client_sleep_max_millis'], 14000) self.assertEquals(config['socket_timeout_millis'], 25000) self.assertEquals(config['failover_max_attempts'], 7) + + def test_use_datanode_hostname_configs(self): + conf_path = self.get_config_path('use-datanode-hostname-hdfs-site.xml') + config = HDFSConfig.read_hdfs_config(conf_path) + self.assertTrue(config['use_datanode_hostname']) diff --git a/test/testconfig/conf/use-datanode-hostname-hdfs-site.xml b/test/testconfig/conf/use-datanode-hostname-hdfs-site.xml new file mode 100644 index 0000000..17831d7 --- /dev/null +++ b/test/testconfig/conf/use-datanode-hostname-hdfs-site.xml @@ -0,0 +1,9 @@ + + + + + + dfs.client.use.datanode.hostname + true + +