Skip to content
This repository was archived by the owner on Apr 19, 2022. It is now read-only.

Commit

Permalink
Release 2.3.0
Browse files Browse the repository at this point in the history
Quite a few things in this release:
    - Trash functionality. Configuration will now look for trash
      settings and apply them. When trash is enabled, deleting files
      will move them to trash. The Client class now also takes a
      use_trash argument. The command line client's rm command now takes
      -S or --skiptrash to skip the trash.
    - Security fix for UIDs. Snakebite would set the realUser property
      for requests to the Namenode which caused everything to be
      excecuted as the user the NN was running at, which is dangerous
      and completely circumvents security. This has been changed to use
      the effectiveUser property.
    - Config format: a new .snakebiterc format has been introduced that
      is verioned. The new format support skiptrash. Config is backwards
      compatible with both single line NN config and HA, but defaults to
      skiptrash=False for those cases.
    - ~/.snakebiterc isn't written anymore. This change is made, since
      moving cluster config was hard and users had to remove their
      .snakebiterc. It can still be used to override default settings.
    - Tests that use the java MiniCluster will not run anymore when an
      active Hadoop cluster is found (i.e. when `hadoop fs -ls /` can be
      ran from the host running the tests). This is done because in
      certain cases, the MiniCluster java class still interacts with the
      Hadoop environment (e.g. format journal nodes).

Change-Id: I50110c18cb895c5feae46c370d0184c59212054c
  • Loading branch information
Wouter de Bie committed Mar 26, 2014
1 parent 8950612 commit b4768bf
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ install:
- pip install protobuf --use-mirrors
- pip install unittest2 --use-mirrors
- pip install argparse --use-mirrors
script: nosetests
script: nosetests -v
env:
- HADOOP_DISTRO=cdh
- HADOOP_DISTRO=hdp
Expand Down
32 changes: 32 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,35 @@
snakebite (2.3.0) unstable; urgency=low

Release 2.3.0

Quite a few things in this release:
- Trash functionality. Configuration will now look for trash
settings and apply them. When trash is enabled, deleting files
will move them to the trash. The Client class now also takes a
use_trash argument. The command line client's rm command now takes
-S or --skiptrash to skip the trash.
- Security fix for UIDs. Snakebite used to set the realUser property
for requests to the Namenode which caused everything to be
excecuted as the user the NN was running at. This is dangerous
and completely circumvents security. This has been changed to use
the effectiveUser property.
- Config format: a new .snakebiterc format has been introduced that
is verioned. The new format support skiptrash. Config is backwards
compatible with both single line NN config and HA, but defaults to
skiptrash=False for those cases.
- ~/.snakebiterc isn't written anymore. This change is made, since
cluster config changes didnt get reflected in .snakebiterc so
users had to remove their .snakebiterc. Right now, .snakebiterc
is used in cases that require a permanent override, or where no
hadoop configuration is available.
- Tests that use the java MiniCluster will not run anymore when an
active Hadoop cluster is found (i.e. when `hadoop fs -ls /` can be
run from the host running the tests). This is done because in
certain cases, the MiniCluster java class still interacts with the
Hadoop environment (e.g. format journal nodes).

-- Wouter de Bie <[email protected]> Tue, 11 Mar 2014 12:01:12 +0000

snakebite (2.2.5) unstable; urgency=low

Fix bug in Client.test() when using globs
Expand Down
13 changes: 8 additions & 5 deletions doc/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ The CLI client first tries parse the path and in case it's in the form
``hdfs://namenode:port/path`` it will use that configuration.
Otherwise it will use -n and -p command line arguments.
If the previous aren't set it tries to read the config from ``~/.snakebiterc`` and
if that doesn't exist, it will check ``$HADOOP_HOME/core-site.xml`` and create a
``~/.snakebiterc`` from that.
if that doesn't exist, it will check ``$HADOOP_HOME/core-site.xml``.

A config looks like

::

{
"namenode": "<host/ip>",
"port": 54310,
"version": 7
"config_version": 2,
"skiptrash": true,
"namenodes": [
{"host": "mynamenode1", "port": 54310, "version": 9},
{"host": "mynamenode2", "port": 54310, "version": 9}
]
}


The version property denotes the protocol version used. CDH 4.1.3 uses protocol 7, while
HDP 2.0 uses protocol 8. Snakebite defaults to 7.

Expand Down
3 changes: 2 additions & 1 deletion snakebite/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ def create_rpc_request_header(self):
def create_connection_context(self):
'''Creates and seriazlies a IpcConnectionContextProto (not delimited)'''
context = IpcConnectionContextProto()
context.userInfo.realUser = pwd.getpwuid(os.getuid())[0]
local_user = pwd.getpwuid(os.getuid())[0]
context.userInfo.effectiveUser = local_user
context.protocol = "org.apache.hadoop.hdfs.protocol.ClientProtocol"

s_context = context.SerializeToString()
Expand Down
77 changes: 64 additions & 13 deletions snakebite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import inspect
import socket
import errno
import time

log = logging.getLogger(__name__)

Expand All @@ -47,7 +48,7 @@ class Client(object):
**Example:**
>>> from snakebite.client import Client
>>> client = Client("localhost", 54310)
>>> client = Client("localhost", 54310, use_trash=False)
>>> for x in client.ls(['/']):
... print x
Expand All @@ -74,14 +75,16 @@ class Client(object):
3: "s"
}

def __init__(self, host, port, hadoop_version=Namenode.DEFAULT_VERSION):
def __init__(self, host, port, hadoop_version=Namenode.DEFAULT_VERSION, use_trash=False):
'''
:param host: Hostname or IP address of the NameNode
:type host: string
:param port: RPC Port of the NameNode
:type port: int
:param hadoop_version: What hadoop protocol version should be used (default: 9)
:type hadoop_version: int
:param use_trash: Use a trash when removing files.
:type use_trash: boolean
'''
if hadoop_version < 9:
raise Exception("Only protocol versions >= 9 supported")
Expand All @@ -90,6 +93,10 @@ def __init__(self, host, port, hadoop_version=Namenode.DEFAULT_VERSION):
self.port = port
self.service_stub_class = client_proto.ClientNamenodeProtocol_Stub
self.service = RpcService(self.service_stub_class, self.port, self.host, hadoop_version)
self.use_trash = use_trash
self.trash = self._join_user_path(".Trash")

log.debug("Created client for %s:%s with trash=%s" % (host, port, use_trash))

def ls(self, paths, recurse=False, include_toplevel=False, include_children=True):
''' Issues 'ls' command and returns a list of maps that contain fileinfo
Expand Down Expand Up @@ -410,11 +417,51 @@ def _handle_delete(self, path, node, recurse):
if not recurse:
recurse = False

request = client_proto.DeleteRequestProto()
request.src = path
request.recursive = recurse
response = self.service.delete(request)
return {"path": path, "result": response.result}
if self.__should_move_to_trash(path):
if path.endswith("/"):
suffix_path = path[1:-1]
else:
suffix_path = path[1:]

trash_path = os.path.join(self.trash, "Current", suffix_path)
if trash_path.endswith("/"):
trash_path = trash_path[:-1]

base_trash_path = os.path.join(self.trash, "Current", os.path.dirname(suffix_path))
if base_trash_path.endswith("/"):
base_trash_path = base_trash_path[:-1]

# Try twice, in case checkpoint between mkdir() and rename()
for i in range(0, 2):
list(self.mkdir([base_trash_path], create_parent=True, mode=0700))

original_path = trash_path

while self.test(trash_path, exists=True):
unix_timestamp = str(int(time.time() * 1000))
trash_path = "%s%s" % (original_path, unix_timestamp)

result = self._handle_rename(path, node, trash_path)
if result['result']:
result['message'] = ". Moved %s to %s" % (path, trash_path)
return result
raise Exception("Failed to move to trash: %s" % path)
else:
request = client_proto.DeleteRequestProto()
request.src = path
request.recursive = recurse
response = self.service.delete(request)
return {"path": path, "result": response.result}

def __should_move_to_trash(self, path):
if not self.use_trash:
return False
if path.startswith(self.trash):
return False # Path already in trash
if os.path.dirname(self.trash).startswith(path):
raise Exception("Cannot move %s to the trash, as it contains the trash" % path)

return True

def rmdir(self, paths):
''' Delete a directory
Expand Down Expand Up @@ -1171,15 +1218,16 @@ class HAClient(Client):
>>> from snakebite.namenode import Namenode
>>> n1 = Namenode("namenode1.mydomain", 54310)
>>> n2 = Namenode("namenode2.mydomain", 54310)
>>> client = HAClient([n1, n2])
>>> client = HAClient([n1, n2], use_trash=True)
>>> for x in client.ls(['/']):
... print x
.. note::
Different Hadoop distributions use different protocol versions. Snakebite defaults to 9, but this can be set by passing
in the ``version`` parameter to the Namenode class constructor.
'''
def __init__(self, namenodes):
def __init__(self, namenodes, use_trash=False):
self.use_trash = use_trash
self.namenode = self._switch_namenode(namenodes)
self.namenode.next()

Expand All @@ -1195,9 +1243,11 @@ def __init__(self, namenodes):
def _switch_namenode(self, namenodes):
for namenode in namenodes:
log.debug("Switch to namenode: %s:%d" % (namenode.host, namenode.port))

yield super(HAClient, self).__init__(namenode.host,
namenode.port,
namenode.version)
namenode.version,
self.use_trash)
else:
msg = "Request tried and failed for all %d namenodes: " % len(namenodes)
for namenode in namenodes:
Expand Down Expand Up @@ -1269,6 +1319,7 @@ class AutoConfigClient(HAClient):
in the ``hadoop_version`` parameter to the constructor.
'''
def __init__(self, hadoop_version=Namenode.DEFAULT_VERSION):
nns = [Namenode(c['namenode'], c['port'], hadoop_version)
for c in HDFSConfig.get_external_config()]
super(AutoConfigClient, self).__init__(nns)
configs = HDFSConfig.get_external_config()
nns = [Namenode(c['namenode'], c['port'], hadoop_version, c['use_trash']) for c in configs]
use_trash = any([c['use_trash'] for c in configs])
super(AutoConfigClient, self).__init__(nns, use_trash)
59 changes: 31 additions & 28 deletions snakebite/commandlineparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ class CommandLineParser(object):
"long": '--summary',
"help": 'print summarized output',
"action": 'store_true'},
'S': {"short": '-S',
"long": '--skiptrash',
"help": 'skip the trash (when trash is enabled)',
"default": False,
"action": 'store_true'},
'z': {"short": '-z',
"long": '--zero',
"help": 'check for zero length',
Expand Down Expand Up @@ -243,16 +248,14 @@ def read_config(self):
else:
# Try to read the configuration for HDFS configuration files
configs = HDFSConfig.get_external_config()

# if configs exist and contain something
if configs:
for config in configs:
nn = Namenode(config['namenode'],
config['port'])
self.namenodes.append(nn)

# Save retrieved configuration to snakebite config file
self._write_hadoop_config(config_file)
self.args.skiptrash = not any([c['use_trash'] for c in configs])

if len(self.namenodes):
return
Expand All @@ -265,25 +268,42 @@ def read_config(self):
for hdfs_conf_path in HDFSConfig.hdfs_try_paths:
print " - %s" % hdfs_conf_path
print "\nYou can manually create ~/.snakebiterc with the following content:"
print '{"namenode": "ip/hostname", "port": 54310, "version": %d}' % Namenode.DEFAULT_VERSION
print '{'
print ' "config_version": 2,'
print ' "use_trash": true,'
print ' "namenodes": ['
print ' {"host": "namenode-ha1", "port": 54310, "version": %d},' % Namenode.DEFAULT_VERSION
print ' {"host": "namenode-ha2", "port": 54310, "version": %d}' % Namenode.DEFAULT_VERSION
print ' ]'
print '}'

sys.exit(1)

def _read_config_snakebiterc(self):
with open(os.path.join(os.path.expanduser('~'), '.snakebiterc')) as config_file:
configs = json.load(config_file)

if isinstance(configs, list):
# Version 1: List of namenodes
# config is a list of namenode(s) - possibly HA
for config in configs:
nn = Namenode(config['namenode'],
config['port'],
config.get('version', Namenode.DEFAULT_VERSION))
self.namenodes.append(nn)
elif isinstance(configs, dict):
# config is a single namenode - no HA
self.namenodes.append(Namenode(configs['namenode'],
configs['port'],
configs.get('version', Namenode.DEFAULT_VERSION)))
if configs.get("config_version"):
# Config ersion > 2
for nn_config in configs['namenodes']:
nn = Namenode(nn_config['host'], nn_config['port'], nn_config.get('version', Namenode.DEFAULT_VERSION))
self.namenodes.append(nn)

self.args.skiptrash = configs.get("skiptrash")
else:
# config is a single namenode - no HA
self.namenodes.append(Namenode(configs['namenode'],
configs['port'],
configs.get('version', Namenode.DEFAULT_VERSION)))
else:
print "Config retrieved from ~/.snakebiterc is corrupted! Remove it!"
sys.exit(1)
Expand Down Expand Up @@ -318,24 +338,6 @@ def _read_config_cl(self):
else:
return False

def _write_hadoop_config(self, config_file_path):
# Write config to file
with open(config_file_path, "w") as config_file:
config = []
for namenode in self.namenodes:
config.append(namenode.toDict())

if len(config) > 1:
#If many NNs use list syntax
pretty_config = json.dumps(config).split('},')
config_file.write('},\n'.join(pretty_config))
elif len(config) == 1:
#If just one NN use old syntax
config_file.write(json.dumps(config[0]))
else:
print "Try to write configuration without NameNode configuration!"
sys.exit(-1)

def parse(self, non_cli_input=None): # Allow input for testing purposes
if not sys.argv[1:] and not non_cli_input:
self.parser.print_help()
Expand Down Expand Up @@ -364,7 +366,8 @@ def parse(self, non_cli_input=None): # Allow input for testing purposes
return self.args

def setup_client(self):
self.client = HAClient(self.namenodes)
use_trash = not self.args.skiptrash
self.client = HAClient(self.namenodes, use_trash)

def execute(self):
if self.args.help:
Expand Down Expand Up @@ -501,7 +504,7 @@ def mv(self):
for line in format_results(result, json_output=self.args.json):
print line

@command(args="[paths]", descr="remove paths", allowed_opts=["R"], req_args=['dir [dirs]'])
@command(args="[paths]", descr="remove paths", allowed_opts=["R", "S"], req_args=['dir [dirs]'])
def rm(self):
result = self.client.delete(self.args.dir, recurse=self.args.recurse)
for line in format_results(result, json_output=self.args.json):
Expand Down
Loading

0 comments on commit b4768bf

Please sign in to comment.