Skip to content

Commit

Permalink
Merge pull request #436 from HubSpot/logfetch_polish
Browse files Browse the repository at this point in the history
more logfetch features (WIP)
ssalinas committed Feb 17, 2015
2 parents 4251f15 + 95b9b88 commit 98c2daa
Showing 11 changed files with 301 additions and 171 deletions.
49 changes: 37 additions & 12 deletions scripts/README.md
Original file line number Diff line number Diff line change
@@ -14,22 +14,28 @@ pip install singularity-logfetch
- Any arguments specified in the log file can be overriden on the command line
- You can store a number of configuration files for different clusters in the config directory (`~/.logfetch` by default) and choose which config to use with the -c option

#Logfetch and Logcat
Two commands exist for downloading logs.
- `logfetch` will download and optionally output a grep command for the logs
- `logcat` will download logs and pipe the contents to stdout

##Options
|Flags|Description|Default|
|:---:|:---------|:-----:|
|-f , --conf_folder|Folder to look for configuration files|`~/.logfetch`|
|-c , --conf_file|configuration file to use(path relative to conf_folder)|default|
|-t , --taskId|TaskId to fetch logs for|
|-r , --requestId|RequestId to fetch logs for|
|--task-count|Number of recent tasks (belonging to a request) to fetch live logs (on machine not s3)|1|
|-d , --deployId|DeployId to fetch logs for (Must also specify requestId when using this option)|
|--dest|Destination folder for downloaded log files|`~/.logfetch_cache`|
|-f , --conf-folder|Folder to look for configuration files|`~/.logfetch`|
|-c , --conf-file|configuration file to use(path relative to conf_folder)|default|
|-t , --task-id|Task Id to fetch logs for|
|-r , --request-id|Request Id to fetch logs for|
|-tc, --task-count|Number of recent tasks (belonging to a request) to fetch live logs (on machine not s3)|1|
|-d , --deploy-id|Deploy Id to fetch logs for (Must also specify requestId when using this option)|
|-o, --dest|Destination folder for download output|`~/.logfetch_cache`|
|-n --num-parallel-fetches|Max number of log fetches to make at once|5
|-cs, --chunk_size|Chunk size for writing responses to file system|8192
|-u, --singularity-uri-base|Base url for singularity (e.g. `localhost:8080/singularity/v2/api`), This MUST be set|
|-cs, --chunk-size|Chunk size for writing responses to file system|8192
|-u, --singularity-uri-base|Base url for singularity (e.g. `localhost:8080/singularity/v2/api`)| Must be set!|
|-s , --start-days|Search for logs no older than this many days|7
|-e , --end-days|Search for logs no newer than this many days| None (today)
|-g, --grep|Grep string for searching log files|
|-g, --grep|Grep string for searching log files(Only for `logfetch`)|
|-l, --logtype|Glob matcher for type of log file to download| None (match all)|

##Grep and Log Files
When the `-g` option is set, the log fetcher will grep the downloaded files for the provided regex.
@@ -64,9 +70,14 @@ When the `-g` option is set, the log fetcher will grep the downloaded files for

- Don't search, just download logs

`logfetch -r 'My_Jobs_Id'`
`logfetch -r 'My_Request_Id'`

- Only get logs that match a glob or logfile name with the `-l` option

##Tailing Logs
`logfetch -r ‘My_Request_Id’ -l ‘*.out’`
`logfetch -r ‘My_Request_Id’ -l ‘access.log’`

#Logtail
You can tail live log files using `logtail`. Just provide the request, task, or request and deploy along with a log file path.

For example, to tail the `service.log` file for all tasks for a request named `MyRequest`, you would use the command:
@@ -76,3 +87,17 @@ For example, to tail the `service.log` file for all tasks for a request named `M
- The path for the log file is relative to the base path for that task's sandbox. For example, to tail a file in `(sandbox path)/logs/access.log`, the argument to -l would be `logs/access.log`

You can also provide the `-g` option which will provide the grep string to the singularity API and search the results. You cannot provide a full grep command as in some of the above examples, just a string to match on.

##Options
|Flags|Description|Default|
|:---:|:---------|:-----:|
|-f , --conf-folder|Folder to look for configuration files|`~/.logfetch`|
|-c , --conf-file|configuration file to use(path relative to conf_folder)|default|
|-t , --task-id|Task Id to fetch logs for|
|-r , --request-id|Request Id to fetch logs for|
|-d , --deploy-id|Deploy Id to fetch logs for (Must also specify requestId when using this option)|
|-u, --singularity-uri-base|Base url for singularity (e.g. `localhost:8080/singularity/v2/api`)|Must be set!|
|-g, --grep|Grep string for searching log files|
|-l, --logfile|Log file path to tail (ie logs/access.log)|Must be set!|
|-v, --verbose|Extra output about the task id associated with logs in the output|False|

2 changes: 1 addition & 1 deletion scripts/logfetch/callbacks.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ def callback(response, **kwargs):
with open(path, 'wb') as f:
for chunk in response.iter_content(chunk_size):
f.write(chunk)
sys.stderr.write(colored('finished downloading {0}'.format(path), 'green') + '\n')
sys.stderr.write(colored('Downloaded ', 'green') + colored(path, 'white') + '\n')

return callback

23 changes: 23 additions & 0 deletions scripts/logfetch/cat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
import sys

CAT_COMMAND_FORMAT = 'xargs -n {0} cat < {1}'
def cat_files(args, all_logs):
if all_logs:
catlist_filename = '{0}/.catlist'.format(args.dest)
create_catlist(args, all_logs, catlist_filename)
command = CAT_COMMAND_FORMAT.format(len(all_logs), catlist_filename)
sys.stdout.write(os.popen(command).read() + '\n')
else:
sys.stderr.write(colored('No log files found\n', 'magenta'))

def create_catlist(args, all_logs, catlist_filename):
catlist_file = open(catlist_filename, 'wb')
for log in all_logs:
catlist_file.write('{0}\n'.format(log))
catlist_file.close()

def remove_catlist(catlist_filename):
if os.path.isfile(catlist_filename):
os.remove(catlist_filename)

144 changes: 106 additions & 38 deletions scripts/logfetch/entrypoint.py
Original file line number Diff line number Diff line change
@@ -3,44 +3,61 @@
import sys
import os
from termcolor import colored

from fake_section_head import FakeSectionHead
from live_logs import download_live_logs
from s3_logs import download_s3_logs
from tail import start_tail
from grep import grep_files
from cat import cat_files

CONF_READ_ERR_FORMAT = 'Could not load config from {0} due to {1}'
DEFAULT_CONF_DIR = os.path.expanduser('~/.logfetch')
DEFAULT_CONF_FILE = 'default'
DEFAULT_PARALLEL_FETCHES = 5
DEFAULT_PARALLEL_FETCHES = 10
DEFAULT_CHUNK_SIZE = 8192
DEFAULT_DEST = os.path.expanduser('~/.logfetch_cache')
DEFAULT_TASK_COUNT = 10
DEFAULT_DAYS = 7

def exit(reason):
sys.stderr.write(colored(reason, 'red') + '\n')
def exit(reason, color='red'):
sys.stderr.write(colored(reason, color) + '\n')
sys.exit(1)

def tail_logs(args):
start_tail(args)
try:
start_tail(args)
except KeyboardInterrupt:
exit('Stopping logtail...', 'magenta')

def fetch_logs(args):
check_dest(args)
all_logs = []
all_logs += download_s3_logs(args)
all_logs += download_live_logs(args)
grep_files(args, all_logs)
try:
check_dest(args)
all_logs = []
all_logs += download_s3_logs(args)
all_logs += download_live_logs(args)
grep_files(args, all_logs)
except KeyboardInterrupt:
exit('Stopping logfetch...', 'magenta')

def cat_logs(args):
try:
check_dest(args)
all_logs = []
all_logs += download_s3_logs(args)
all_logs += download_live_logs(args)
cat_files(args, all_logs)
except KeyboardInterrupt:
exit('Stopping logcat...', 'magenta')


def check_dest(args):
if not os.path.exists(args.dest):
os.makedirs(args.dest)

def fetch():
conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live")
conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE")
conf_parser.add_argument("-f", "--conf-folder", dest='conf_folder', help="specify a folder for config files to live")
conf_parser.add_argument("-c", "--conf-file", dest='conf_file', help="Specify config file within the conf folder", metavar="FILE")
args, remaining_argv = conf_parser.parse_known_args()
conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR
conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE)
@@ -61,36 +78,87 @@ def fetch():
sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n')

parser = argparse.ArgumentParser(parents=[conf_parser], description="Fetch log files from Singularity. One can specify either a TaskId, RequestId and DeployId, or RequestId",
prog="log_fetcher")
prog="logfetch")

parser.set_defaults(**defaults)
parser.add_argument("-t", "--taskId", help="TaskId of task to fetch logs for", metavar="taskId")
parser.add_argument("-r", "--requestId", help="RequestId of request to fetch logs for", metavar="requestId")
parser.add_argument("--task-count", help="Number of recent tasks per request to fetch logs from", metavar="taskCount")
parser.add_argument("-d", "--deployId", help="DeployId of task to fetch logs for", metavar="deployId")
parser.add_argument("--dest", help="Destination directory", metavar="DIR")
parser.add_argument("-n", "--num-parallel-fetches", help="Number of fetches to make at once", type=int, metavar="INT")
parser.add_argument("-cs", "--chunk-size", help="Chunk size for writing from response to filesystem", type=int, metavar="INT")
parser.add_argument("-u", "--singularity-uri-base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)", metavar="URI")
parser.add_argument("-s", "--start-days", help="Search for logs no older than this many days", type=int, metavar="start_days")
parser.add_argument("-e", "--end-days", help="Search for logs no new than this many days (defaults to None/today)", type=int, metavar="end_days")
parser.add_argument("-g", "--grep", help="Regex to grep for (normal grep syntax) or a full grep command", metavar='grep')
parser.add_argument("-t", "--task-id", dest="taskId", help="TaskId of task to fetch logs for")
parser.add_argument("-r", "--request-id", dest="requestId", help="RequestId of request to fetch logs for (can be a glob)")
parser.add_argument("-tc","--task-count", dest="task_count", help="Number of recent tasks per request to fetch logs from")
parser.add_argument("-d", "--deploy-id", dest="deployId", help="DeployId of task to fetch logs for (can be a glob)")
parser.add_argument("-o", "--dest", dest="dest", help="Destination directory")
parser.add_argument("-n", "--num-parallel-fetches", dest="num_parallel_fetches", help="Number of fetches to make at once", type=int)
parser.add_argument("-cs", "--chunk-size", dest="chunk_size", help="Chunk size for writing from response to filesystem", type=int)
parser.add_argument("-u", "--singularity-uri-base", dest="singularity_uri_base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)")
parser.add_argument("-s", "--start-days", dest="start_days", help="Search for logs no older than this many days", type=int)
parser.add_argument("-e", "--end-days", dest="end_days", help="Search for logs no new than this many days (defaults to None/today)", type=int)
parser.add_argument("-l", "--log-type", dest="logtype", help="Logfile type to downlaod (ie 'access.log'), can be a glob (ie *.log)")
parser.add_argument("-g", "--grep", dest="grep", help="Regex to grep for (normal grep syntax) or a full grep command")

args = parser.parse_args(remaining_argv)

if args.deployId and not args.requestId:
exit("Must specify requestId (-r) when specifying deployId")
exit("Must specify request-id (-r) when specifying deploy-id")
elif not args.requestId and not args.deployId and not args.taskId:
exit('Must specify one of\n -t taskId\n -r requestId and -d deployId\n -r requestId')
exit('Must specify one of\n -t task-id\n -r request-id and -d deploy-id\n -r request-id')

args.dest = os.path.expanduser(args.dest)

fetch_logs(args)

def cat():
conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
conf_parser.add_argument("-f", "--conf-folder", dest="conf_folder", help="specify a folder for config files to live")
conf_parser.add_argument("-c", "--conf-file", dest="conf_file", help="Specify config file within the conf folder", metavar="FILE")
args, remaining_argv = conf_parser.parse_known_args()
conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR
conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE)
config = ConfigParser.SafeConfigParser()

defaults = {
"num_parallel_fetches" : DEFAULT_PARALLEL_FETCHES,
"chunk_size" : DEFAULT_CHUNK_SIZE,
"dest" : DEFAULT_DEST,
"task_count" : DEFAULT_TASK_COUNT,
"start_days" : DEFAULT_DAYS
}

try:
config.readfp(FakeSectionHead(open(os.path.expanduser(conf_file))))
defaults.update(dict(config.items("Defaults")))
except Exception, err:
sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n')

parser = argparse.ArgumentParser(parents=[conf_parser], description="Fetch log files from Singularity and cat to stdout. One can specify either a TaskId, RequestId and DeployId, or RequestId",
prog="logcat")

parser.set_defaults(**defaults)
parser.add_argument("-t", "--task-id", dest="taskId", help="TaskId of task to fetch logs for")
parser.add_argument("-r", "--request-id", dest="requestId", help="RequestId of request to fetch logs for (can be a glob)")
parser.add_argument("-tc","--task-count", dest="taskCount", help="Number of recent tasks per request to fetch logs from")
parser.add_argument("-d", "--deploy-id", dest="deployId", help="DeployId of tasks to fetch logs for (can be a glob)")
parser.add_argument("-o", "--dest", dest="dest", help="Destination directory")
parser.add_argument("-n", "--num-parallel-fetches", dest="num_parallel_fetches", help="Number of fetches to make at once", type=int)
parser.add_argument("-cs", "--chunk-size", dest="chunk_size", help="Chunk size for writing from response to filesystem", type=int)
parser.add_argument("-u", "--singularity-uri-base", dest="singularity_uri_base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)")
parser.add_argument("-s", "--start-days", dest="start_days", help="Search for logs no older than this many days", type=int)
parser.add_argument("-e", "--end-days", dest="end_days", help="Search for logs no new than this many days (defaults to None/today)", type=int)
parser.add_argument("-l", "--logtype", dest="logtype", help="Logfile type to downlaod (ie 'access.log'), can be a glob (ie *.log)")

args = parser.parse_args(remaining_argv)

if args.deployId and not args.requestId:
exit("Must specify requestId (-r) when specifying deploy-id")
elif not args.requestId and not args.deployId and not args.taskId:
exit('Must specify one of\n -t task-id\n -r request-id and -d deploy-id\n -r request-id')

args.dest = os.path.expanduser(args.dest)

cat_logs(args)

def tail():
conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live")
conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE")
conf_parser.add_argument("-f", "--conf-folder", dest="conf_folder", help="specify a folder for config files to live")
conf_parser.add_argument("-c", "--conf-file", dest="conf_file", help="Specify config file within the conf folder", metavar="FILE")
args, remaining_argv = conf_parser.parse_known_args()
conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR
conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE)
@@ -105,23 +173,23 @@ def tail():
sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n')

parser = argparse.ArgumentParser(parents=[conf_parser], description="Tail log files from Singularity. One can specify either a TaskId, RequestId and DeployId, or RequestId",
prog="log_fetcher")
prog="logtail")

parser.set_defaults(**defaults)
parser.add_argument("-t", "--taskId", help="TaskId of task to fetch logs for", metavar="taskId")
parser.add_argument("-r", "--requestId", help="RequestId of request to fetch logs for", metavar="requestId")
parser.add_argument("-d", "--deployId", help="DeployId of task to fetch logs for", metavar="deployId")
parser.add_argument("-u", "--singularity-uri-base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)", metavar="URI")
parser.add_argument("-g", "--grep", help="String to grep for", metavar='grep')
parser.add_argument("-l", "--logfile", help="Logfile path/name to tail (ie 'logs/access.log')", metavar="logfile")
parser.add_argument("-v", "--verbose", help="more verbose output", action='store_true')
parser.add_argument("-t", "--task-id", dest="taskId", help="TaskId of task to fetch logs for")
parser.add_argument("-r", "--request-id", dest="requestId", help="RequestId of request to fetch logs for (can be a glob)")
parser.add_argument("-d", "--deploy-id", dest="deployId", help="DeployId of tasks to fetch logs for (can be a glob)")
parser.add_argument("-u", "--singularity-uri-base", dest="singularity_uri_base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)")
parser.add_argument("-g", "--grep", dest="grep", help="String to grep for")
parser.add_argument("-l", "--logfile", dest="logfile", help="Logfile path/name to tail (ie 'logs/access.log')")
parser.add_argument("-v", "--verbose", dest="verbose", help="more verbose output", action='store_true')

args = parser.parse_args(remaining_argv)

if args.deployId and not args.requestId:
exit("Must specify requestId (-r) when specifying deployId")
exit("Must specify request-id (-r) when specifying deploy-id")
elif not args.requestId and not args.deployId and not args.taskId:
exit('Must specify one of\n -t taskId\n -r requestId and -d deployId\n -r requestId')
exit('Must specify one of\n -t task-id\n -r request-id and -d deploy-id\n -r request-id')
elif not args.logfile:
exit("Must specify logfile to tail (-l)")

23 changes: 13 additions & 10 deletions scripts/logfetch/grep.py
Original file line number Diff line number Diff line change
@@ -2,18 +2,21 @@
import sys
from termcolor import colored

GREP_COMMAND_FORMAT = 'xargs -n {0} {1} < {2}'
GREP_COMMAND_FORMAT = 'while read file; do {0} "$file"; done < {1}'
DEFAULT_GREP_COMMAND = 'grep --color=always \'{0}\''

def grep_files(args, all_logs):
if args.grep:
greplist_filename = '{0}/.greplist'.format(args.dest)
create_greplist(args, all_logs, greplist_filename)
command = grep_command(args, all_logs, greplist_filename)
sys.stderr.write(colored('Running "{0}" this might take a minute'.format(command), 'blue') + '\n')
sys.stdout.write(os.popen(command).read() + '\n')
remove_greplist(greplist_filename)
sys.stderr.write(colored('Finished grep, exiting', 'green') + '\n')
if all_logs:
greplist_filename = '{0}/.greplist'.format(args.dest)
create_greplist(args, all_logs, greplist_filename)
command = grep_command(args, all_logs, greplist_filename)
sys.stderr.write(colored('Running "{0}" this might take a minute'.format(command), 'blue') + '\n')
sys.stdout.write(os.popen(command).read() + '\n')
remove_greplist(greplist_filename)
sys.stderr.write(colored('Finished grep, exiting', 'green') + '\n')
else:
sys.stderr.write(colored('No logs found\n', 'magenta'))

def create_greplist(args, all_logs, greplist_filename):
greplist_file = open(greplist_filename, 'wb')
@@ -27,6 +30,6 @@ def remove_greplist(greplist_filename):

def grep_command(args, all_logs, greplist_filename):
if 'grep' in args.grep:
return GREP_COMMAND_FORMAT.format(len(all_logs), args.grep, greplist_filename)
return GREP_COMMAND_FORMAT.format(args.grep, greplist_filename)
else:
return GREP_COMMAND_FORMAT.format(len(all_logs), DEFAULT_GREP_COMMAND.format(args.grep), greplist_filename)
return GREP_COMMAND_FORMAT.format(DEFAULT_GREP_COMMAND.format(args.grep), greplist_filename)
87 changes: 50 additions & 37 deletions scripts/logfetch/live_logs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import sys
import fnmatch
import grequests
from glob import glob
from termcolor import colored
from callbacks import generate_callback
from singularity_request import get_json_response
@@ -15,50 +14,50 @@ def download_live_logs(args):
async_requests = []
zipped_files = []
all_logs = []
sys.stderr.write(colored('Removing old service.log files', 'blue') + '\n')
for f in glob('{0}/*service.log'.format(args.dest)):
os.remove(f)
sys.stderr.write(colored('Downloading current live log files', 'blue') + '\n')
sys.stderr.write(colored('Finding current live log files', 'cyan') + '\n')
for task in tasks:
metadata = files_json(args, task)
uri = DOWNLOAD_FILE_FORMAT.format(metadata['slaveHostname'])
service_log = '{0}-service.log'.format(task)
tail_log = '{0}-tail_of_finished_service.log'.format(task)
async_requests.append(
grequests.AsyncRequest('GET',uri ,
callback=generate_callback(uri, args.dest, service_log, args.chunk_size),
params={'path' : '{0}/{1}/service.log'.format(metadata['fullPathToRoot'], metadata['currentDirectory'])}
)
)
all_logs.append('{0}/{1}'.format(args.dest, service_log))
async_requests.append(
grequests.AsyncRequest('GET',uri ,
callback=generate_callback(uri, args.dest, tail_log, args.chunk_size),
params={'path' : '{0}/{1}/tail_of_finished_service.log'.format(metadata['fullPathToRoot'], metadata['currentDirectory'])}
)
)
all_logs.append('{0}/{1}'.format(args.dest, service_log))
for log_file in logs_folder_files(args, task):
logfile_name = '{0}-{1}'.format(task, log_file)
async_requests.append(
grequests.AsyncRequest('GET',uri ,
callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size),
params={'path' : '{0}/{1}/logs/{1}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)}
)
)
if logfile_name.endswith('.gz'):
zipped_files.append('{0}/{1}'.format(args.dest, logfile_name))
all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log')))
if 'slaveHostname' in metadata:
uri = DOWNLOAD_FILE_FORMAT.format(metadata['slaveHostname'])
for log_file in base_directory_files(args, task, metadata):
logfile_name = '{0}-{1}'.format(task, log_file)
if (args.logtype and logfetch_base.log_matches(log_file, args.logtype)) or not args.logtype:
async_requests.append(
grequests.AsyncRequest('GET',uri ,
callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size),
params={'path' : '{0}/{1}/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)}
)
)
if logfile_name.endswith('.gz'):
zipped_files.append('{0}/{1}'.format(args.dest, logfile_name))
all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log')))

grequests.map(async_requests, stream=True, size=args.num_parallel_fetches)
logfetch_base.unpack_logs(zipped_files)
for log_file in logs_folder_files(args, task):
logfile_name = '{0}-{1}'.format(task, log_file)
if (args.logtype and logfetch_base.log_matches(log_file, args.logtype)) or not args.logtype:
async_requests.append(
grequests.AsyncRequest('GET',uri ,
callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size),
params={'path' : '{0}/{1}/logs/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)}
)
)
if logfile_name.endswith('.gz'):
zipped_files.append('{0}/{1}'.format(args.dest, logfile_name))
all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log')))

if async_requests:
sys.stderr.write(colored('Starting live logs downloads\n', 'cyan'))
grequests.map(async_requests, stream=True, size=args.num_parallel_fetches)
if zipped_files:
sys.stderr.write(colored('Unpacking logs\n', 'cyan'))
logfetch_base.unpack_logs(zipped_files)
return all_logs

def tasks_to_check(args):
if args.taskId:
return [args.taskId]
else:
return logfetch_base.tasks_for_request(args)
return logfetch_base.tasks_for_requests(args)

def files_json(args, task):
uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task)
@@ -72,3 +71,17 @@ def logs_folder_files(args, task):
return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])]
else:
return [f['path'].rsplit('/')[-1] for f in files_json if logfetch_base.is_in_date_range(args, f['mtime'])]

def base_directory_files(args, task, files_json):
if 'files' in files_json:
files = files_json['files']
return [f['name'] for f in files if valid_logfile(args, f)]
else:
return [f['path'].rsplit('/')[-1] for f in files_json if valid_logfile(args, f)]

def valid_logfile(args, fileData):
is_in_range = logfetch_base.is_in_date_range(args, fileData['mtime'])
not_a_directory = not fileData['mode'].startswith('d')
is_a_logfile = fnmatch.fnmatch(fileData['name'], '*.log') or fnmatch.fnmatch(fileData['name'], '*.out') or fnmatch.fnmatch(fileData['name'], '*.err')
return is_in_range and not_a_directory and is_a_logfile

56 changes: 33 additions & 23 deletions scripts/logfetch/logfetch_base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os
import sys
import gzip
import fnmatch
from datetime import datetime
from termcolor import colored
from singularity_request import get_json_response

BASE_URI_FORMAT = '{0}{1}'
ALL_REQUESTS = '/requests'
REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks'
ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active'

@@ -20,32 +22,38 @@ def unpack_logs(logs):
file_out.close()
file_in.close
os.remove(zipped_file)
sys.stderr.write(colored('Unpacked {0}'.format(zipped_file), 'green') + '\n')
sys.stderr.write(colored('Unpacked ', 'green') + colored(zipped_file, 'white') + '\n')
except:
if os.path.isfile(zipped_file):
os.remove(zipped_file)
sys.stderr.write(colored('Could not unpack {0}'.format(zipped_file), 'red') + '\n')
continue

def base_uri(args):
if not args.singularity_uri_base:
exit("Specify a base uri for Singularity (-u)")
uri_prefix = "" if args.singularity_uri_base.startswith(("http://", "https://")) else "http://"
uri = BASE_URI_FORMAT.format(uri_prefix, args.singularity_uri_base)
return uri
return BASE_URI_FORMAT.format(uri_prefix, args.singularity_uri_base)

def tasks_for_request(args):
if args.requestId and args.deployId:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)]
else:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)]
if hasattr(args, 'task_count'):
tasks = tasks[0:args.task_count]
return tasks
def tasks_for_requests(args):
all_tasks = []
for request in all_requests(args):
if args.requestId and args.deployId:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args, request) if log_matches(task["taskId"]["deployId"], args.deployId)]
else:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args, request)]
tasks = tasks[0:args.task_count] if hasattr(args, 'task_count') else tasks
all_tasks = all_tasks + tasks
return all_tasks

def log_matches(inputString, pattern):
return fnmatch.fnmatch(inputString, pattern) or fnmatch.fnmatch(inputString, pattern + '*.gz')

def all_tasks_for_request(args):
uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId))
def all_tasks_for_request(args, request):
uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(request))
active_tasks = get_json_response(uri)
if hasattr(args, 'start_days'):
uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId))
uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(request))
historical_tasks = get_json_response(uri)
if len(historical_tasks) == 0:
return active_tasks
@@ -56,16 +64,18 @@ def all_tasks_for_request(args):
else:
return active_tasks

def all_requests(args):
uri = '{0}{1}'.format(base_uri(args), ALL_REQUESTS)
requests = get_json_response(uri)
included_requests = []
for request in requests:
if fnmatch.fnmatch(request['request']['id'], args.requestId):
included_requests.append(request['request']['id'])
return included_requests

def is_in_date_range(args, timestamp):
timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp)
if args.end_days:
if timedelta.days > args.start_days or timedelta.days <= args.end_days:
return False
else:
return True
return False if timedelta.days > args.start_days or timedelta.days <= args.end_days else True
else:
if timedelta.days > args.start_days:
return False
else:
return True

return False if timedelta.days > args.start_days else True
69 changes: 27 additions & 42 deletions scripts/logfetch/s3_logs.py
Original file line number Diff line number Diff line change
@@ -2,69 +2,54 @@
import sys
import re
import grequests
from datetime import datetime
from termcolor import colored

import logfetch_base
from singularity_request import get_json_response
from termcolor import colored
from callbacks import generate_callback
from singularity_request import get_json_response

TASK_FORMAT = '/task/{0}'
DEPLOY_FORMAT = '/request/{0}/deploy/{1}'
REQUEST_FORMAT = '/request/{0}'
S3LOGS_URI_FORMAT = '{0}/logs{1}'

def download_s3_logs(args):
sys.stderr.write(colored('Checking for S3 log files', 'blue') + '\n')
logs = get_json_response(singularity_s3logs_uri(args))
sys.stderr.write(colored('Checking for S3 log files', 'cyan') + '\n')
logs = logs_for_all_requests(args)
async_requests = []
all_logs = []
for log_file in logs:
filename = log_file['key'].rsplit("/", 1)[1]
full_log_path = '{0}/{1}'.format(args.dest, filename.replace('.gz', '.log'))
full_gz_path = '{0}/{1}'.format(args.dest, filename)
if in_date_range(args, filename):
if not (os.path.isfile(full_log_path) or os.path.isfile(full_gz_path)):
if logfetch_base.is_in_date_range(args, time_from_filename(filename)):
if not already_downloaded(args.dest, filename):
async_requests.append(
grequests.AsyncRequest('GET', log_file['getUrl'],
callback=generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size)
)
grequests.AsyncRequest('GET', log_file['getUrl'], callback=generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size))
)
all_logs.append('{0}/{1}'.format(args.dest, filename.replace('.gz', '.log')))
grequests.map(async_requests, stream=True, size=args.num_parallel_fetches)
if async_requests:
sys.stderr.write(colored('Starting S3 Downloads', 'cyan'))
grequests.map(async_requests, stream=True, size=args.num_parallel_fetches)
zipped_files = ['{0}/{1}'.format(args.dest, log_file['key'].rsplit("/", 1)[1]) for log_file in logs]
sys.stderr.write(colored('Unpacking S3 logs\n', 'cyan'))
logfetch_base.unpack_logs(zipped_files)
sys.stderr.write(colored('All S3 logs up to date', 'blue') + '\n')
sys.stderr.write(colored('All S3 logs up to date', 'cyan') + '\n')
return all_logs

def in_date_range(args, filename):
timedelta = datetime.utcnow() - time_from_filename(filename)
if args.end_days:
if timedelta.days > args.start_days or timedelta.days <= args.end_days:
return False
else:
return True
def already_downloaded(dest, filename):
return (os.path.isfile('{0}/{1}'.format(dest, filename.replace('.gz', '.log'))) or os.path.isfile('{0}/{1}'.format(dest, filename)))

def logs_for_all_requests(args):
if args.taskId:
return get_json_response(singularity_s3logs_uri(args, args.taskId))
else:
if timedelta.days > args.start_days:
return False
else:
return True
tasks = logfetch_base.tasks_for_requests(args)
logs = []
for task in tasks:
s3_logs = get_json_response(singularity_s3logs_uri(args, task))
logs = logs + s3_logs if s3_logs else logs
return logs

def time_from_filename(filename):
time_string = re.search('(\d{13})', filename).group(1)
return datetime.utcfromtimestamp(int(time_string[0:-3]))


def singularity_s3logs_uri(args):
if args.taskId:
singularity_path = TASK_FORMAT.format(args.taskId)
elif args.deployId and args.requestId:
singularity_path = DEPLOY_FORMAT.format(args.requestId, args.deployId)
elif args.requestId:
singularity_path = REQUEST_FORMAT.format(args.requestId)
else:
exit("Specify one of taskId, requestId and deployId, or requestId")
singularity_uri = S3LOGS_URI_FORMAT.format(logfetch_base.base_uri(args), singularity_path)
return int(time_string[0:-3])

return singularity_uri
def singularity_s3logs_uri(args, idString):
return S3LOGS_URI_FORMAT.format(logfetch_base.base_uri(args), TASK_FORMAT.format(idString))

3 changes: 2 additions & 1 deletion scripts/logfetch/singularity_request.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import requests
from termcolor import colored

ERROR_STATUS_FORMAT = 'Singularity responded with an invalid status code ({0})'

@@ -8,6 +9,6 @@ def get_json_response(uri, params={}):
if singularity_response.status_code < 199 or singularity_response.status_code > 299:
sys.stderr.write(uri + '\n')
sys.stderr.write(str(params) + '\n')
exit(ERROR_STATUS_FORMAT.format(singularity_response.status_code))
sys.stderr.write(colored(ERROR_STATUS_FORMAT.format(singularity_response.status_code), 'red') + '\n')
return singularity_response.json()

11 changes: 6 additions & 5 deletions scripts/logfetch/tail.py
Original file line number Diff line number Diff line change
@@ -12,13 +12,13 @@
def start_tail(args):
if args.requestId:
sys.stderr.write('Fetching tasks\n')
tasks = [str(t) for t in logfetch_base.tasks_for_request(args)]
tasks = [str(t) for t in logfetch_base.tasks_for_requests(args)]
else:
tasks = [args.taskId]
if args.verbose:
sys.stderr.write(colored('Tailing logs for tasks:\n', 'green'))
for t in tasks:
sys.stderr.write(colored('{0}\n'.format(t), 'blue'))
sys.stderr.write(colored('{0}\n'.format(t), 'yellow'))
sys.stderr.write(colored('ctrl+c to exit\n', 'cyan'))
try:
threads = []
@@ -31,7 +31,7 @@ def start_tail(args):
if not t.isAlive:
break
except KeyboardInterrupt:
sys.stderr.write(colored('Stopping tail', 'cyan'))
sys.stderr.write(colored('Stopping tail', 'magenta'))
sys.exit(0)

class LogStreamer(threading.Thread):
@@ -73,6 +73,7 @@ def fetch_new_log_data(self, uri, path, offset, args, task):
if args.grep:
params['grep'] = args.grep
response = requests.get(uri, params=params).json()
prefix = '({0}) =>'.format(task) if args.verbose else ''
sys.stdout.write('{0}{1}\n'.format(colored(prefix, 'blue'), response['data']))
prefix = '({0}) =>\n'.format(task) if args.verbose else ''
if response['data'] != '':
sys.stdout.write('{0}{1}'.format(colored(prefix, 'cyan'), response['data']))
return offset + len(response['data'].encode('utf-8'))
5 changes: 3 additions & 2 deletions scripts/setup.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@

setup(
name='singularity-logfetch',
version='0.0.8',
version='0.0.9',
description='Singularity log fetching and searching',
author="HubSpot",
author_email='singularity-users@googlegroups.com',
@@ -22,7 +22,8 @@
entry_points={
'console_scripts':[
'logfetch=logfetch.entrypoint:fetch',
'logtail=logfetch.entrypoint:tail'
'logtail=logfetch.entrypoint:tail',
'logcat=logfetch.entrypoint:cat'
],
}
)

0 comments on commit 98c2daa

Please sign in to comment.