From 54b24748cb535ccab1aa3b5db33aba6d63426363 Mon Sep 17 00:00:00 2001 From: Matt Sayler <msayler@alertlogic.com> Date: Thu, 10 Nov 2022 13:04:35 +0100 Subject: [PATCH 1/3] Initial --- almdrlib/search.py | 124 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 almdrlib/search.py diff --git a/almdrlib/search.py b/almdrlib/search.py new file mode 100644 index 0000000..791ce66 --- /dev/null +++ b/almdrlib/search.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- + +import almdrlib +import datetime +import logging +from time import sleep + +""" + almdrlib.search + ~~~~~~~~~~~~~~~ + Wrapper around search API to support submit/execute workflow +""" + +SEARCH_TYPE = 'batch' # complete searches vs. return first results quickly +ROWS_PER_FETCH = 10000 # per request to get search results + + +def iso_to_epoch(t): + '''covert iso8601 time to POSIX/epoch timestamp''' + return int(datetime.timestamp(datetime.fromisoformat(t))) + + +class Search(): + def __init__(self, session=None): + if session: + self.search_client = almdrlib.client('search', session=session) + else: + self.search_client = almdrlib.client('search') + + self.search_id = None + self.logger = logging.getLogger(__name__) + + def submit(self, account_id, query_string, + search_type=SEARCH_TYPE, timeframe=None, start=None, end=None): + """submit + Submit the query for execution.""" + + if timeframe is not None: + time_args = {'timeframe': timeframe} + elif start is not None and end is not None: + time_args = {'start': iso_to_epoch(args.start), + 'end': iso_to_epoch(args.end)} + # TODO support non-ISO + else: + raise Exception('Specify timeframe or stand and end') + + # POST search/v2/:account_id/searches?search_type=... + # request body is the query + response = self.search_client.start_query_execution( + account_id=account_id, query_string=query_string, + search_type=SEARCH_TYPE, **time_args) + self.search_id = response.json()['search_uuid'] + self.next_token = None + self.account_id = account_id + self.remaining = None + + return self.search_id + + def fetch(self, block=True): + # Ask for results for this query, and process a single batch + + while True: + # GET search/v2/:account_id/searches/:search_id/status + response = self.search_client.get_search_status( + account_id=self.account_id, search_uuid=self.search_id) + + status = response.json() + search_status = status['search_status'] + self.logger.debug(f'Search status for {self.search_id} is {search_status}') + + # pending: no results yet + if search_status == 'pending': + search_progress = status.get('search_progress') + if search_progress is not None: + self.progress = status['progress'] + self.records = search_progress['input_scanned_records'] + self.bytes = search_progress['input_scanned_bytes'] + self.estimated_results = search_progress['estimated_output_records'] + self.logger.debug(f'Search progress {self.progress}%: {self.records} ' + f'records, {self.bytes} bytes, {self.estimated_results} output') + + if not block: + return None + else: + sleep(1) + + # failed: syntax errors, runtime problems, etc. + elif search_status == 'failed': + raise Exception(f'Search {search_id} failed: {status}') + + # suspended or completed: some or all results available (respectively) + elif search_status in ['complete', 'suspended']: + # GET search/v2/:account_id/searches/:search_id?limit=... + if self.next_token is None: + self.logger.debug(f'Fetching initial results for {self.search_id}') + response = self.search_client.get_search_results( + account_id=self.account_id, search_uuid=self.search_id, + limit=ROWS_PER_FETCH) + else: + self.logger.debug(f'Fetching next page of results for {self.search_id} ' + f'with token {self.next_token}') + response = self.search_client.get_search_results( + account_id=self.account_id, search_uuid=self.search_id, + starting_token=self.next_token) + results = response.json() + self.remaining = results['remaining'] + self.logger.debug(f'{self.remaining} remaining results for {self.search_id}') + + if self.remaining != 0: + # advance to the next page of results on the next query + self.next_token = results['next_token'] + self.logger.debug(results) + return results['results']['records'] + + def results_remaining(self): + return self.remaining is None or self.remaining > 0 + + def fetch_all(self): + # Ask for results for this query, and process all batches + + results = [] + while self.results_remaining(): + results += self.fetch(block=True) + return results From cad75c70ba98029e90d0a4c132557af3fdda431f Mon Sep 17 00:00:00 2001 From: Matt Sayler <msayler@alertlogic.com> Date: Tue, 29 Nov 2022 10:59:26 -0600 Subject: [PATCH 2/3] stylist support --- almdrlib/search.py | 102 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 18 deletions(-) diff --git a/almdrlib/search.py b/almdrlib/search.py index 791ce66..163d734 100644 --- a/almdrlib/search.py +++ b/almdrlib/search.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- +import time import almdrlib -import datetime +from datetime import datetime import logging from time import sleep @@ -20,29 +21,46 @@ def iso_to_epoch(t): return int(datetime.timestamp(datetime.fromisoformat(t))) +def local_tz_offset(): + '''approximate a GMT offset''' + offset = -time.timezone # -3600 = +01:00 + + hours = int(offset / 3600) + minutes = int((offset - hours*3600) / 60) + return "{:+03d}:{:02d}".format(hours, minutes) + + +def resolve_time(start_ts=None, end_ts=None, + start=None, end=None, + timeframe=None): + '''resolve_time + returns arguments to be passed to search as a time range. Prefer + relative time over ISO time over timestamps''' + if timeframe is not None: + return {'timeframe': timeframe} + elif start is not None and end is not None: + return {'start': iso_to_epoch(start), + 'end': iso_to_epoch(end)} + elif start_ts is not None and end_ts is not None: + return {'start': start_ts, 'end': end_ts} + else: + raise Exception('Specify timeframe or stand and end') + + class Search(): def __init__(self, session=None): - if session: - self.search_client = almdrlib.client('search', session=session) - else: - self.search_client = almdrlib.client('search') + self.search_client = almdrlib.client('search', session=session) + self.stylist_client = almdrlib.client('search_stylist', session=session) self.search_id = None self.logger = logging.getLogger(__name__) def submit(self, account_id, query_string, - search_type=SEARCH_TYPE, timeframe=None, start=None, end=None): + search_type=SEARCH_TYPE, **timekwargs): """submit Submit the query for execution.""" - if timeframe is not None: - time_args = {'timeframe': timeframe} - elif start is not None and end is not None: - time_args = {'start': iso_to_epoch(args.start), - 'end': iso_to_epoch(args.end)} - # TODO support non-ISO - else: - raise Exception('Specify timeframe or stand and end') + time_args = resolve_time(**timekwargs) # POST search/v2/:account_id/searches?search_type=... # request body is the query @@ -56,8 +74,46 @@ def submit(self, account_id, query_string, return self.search_id - def fetch(self, block=True): + def wait_for_complete(self, block=True) -> object: + # Ask for results for this query, and process a single batch + # TODO isolate polling form result fetching + + while True: + # GET search/v2/:account_id/searches/:search_id/status + response = self.search_client.get_search_status( + account_id=self.account_id, search_uuid=self.search_id) + + status = response.json() + search_status = status['search_status'] + self.logger.debug(f'Search status for {self.search_id} is {search_status}') + + # pending: no results yet + if search_status == 'pending': + search_progress = status.get('search_progress') + if search_progress is not None: + self.progress = status['progress'] + self.records = search_progress['input_scanned_records'] + self.bytes = search_progress['input_scanned_bytes'] + self.estimated_results = search_progress['estimated_output_records'] + self.logger.debug(f'Search progress {self.progress}%: {self.records} ' + f'records, {self.bytes} bytes, {self.estimated_results} output') + + if not block: + return None + else: + sleep(1) + + # failed: syntax errors, runtime problems, etc. + elif search_status == 'failed': + raise Exception(f'Search {self.search_id} failed: {status}') + + # suspended or completed: some or all results available (respectively) + elif search_status in ['complete', 'suspended']: + return True + + def fetch(self, block=True) -> object: # Ask for results for this query, and process a single batch + # TODO isolate polling form result fetching while True: # GET search/v2/:account_id/searches/:search_id/status @@ -86,7 +142,7 @@ def fetch(self, block=True): # failed: syntax errors, runtime problems, etc. elif search_status == 'failed': - raise Exception(f'Search {search_id} failed: {status}') + raise Exception(f'Search {self.search_id} failed: {status}') # suspended or completed: some or all results available (respectively) elif search_status in ['complete', 'suspended']: @@ -110,7 +166,7 @@ def fetch(self, block=True): # advance to the next page of results on the next query self.next_token = results['next_token'] self.logger.debug(results) - return results['results']['records'] + return results def results_remaining(self): return self.remaining is None or self.remaining > 0 @@ -120,5 +176,15 @@ def fetch_all(self): results = [] while self.results_remaining(): - results += self.fetch(block=True) + raw_results = self.fetch(block=True) + results.append(raw_results) return results + + def export(self, format, utc_offset=None): + self.wait_for_complete(block=True) + awful = {"from_epochtime.utc_offset": utc_offset} + out = self.stylist_client.export_transformed_search_results( + account_id=self.account_id, search_uuid=self.search_id, + result_format=format, **awful) + + return out.text From 3d0739e75a30ec36691157d2555487f029139040 Mon Sep 17 00:00:00 2001 From: Matt Sayler <msayler@alertlogic.com> Date: Wed, 1 Feb 2023 07:55:46 -0600 Subject: [PATCH 3/3] update --- almdrlib/search.py | 74 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/almdrlib/search.py b/almdrlib/search.py index 163d734..b4aae7b 100644 --- a/almdrlib/search.py +++ b/almdrlib/search.py @@ -55,6 +55,18 @@ def __init__(self, session=None): self.search_id = None self.logger = logging.getLogger(__name__) + def submit_v1(self, account_id, query, datatype='logmsgs', + search_type=SEARCH_TYPE, **timekwargs): + """submit + Submit V1 query for execution.""" + + # XXX fetch_v1 needed, but doesn't really work. + response = self.search_client.start_log_search_query_execution( + account_id=account_id, query=query, datatype=datatype, + **timekwargs) + self.load_results(account_id, response.json()['search_uuid']) + return self.search_id + def submit(self, account_id, query_string, search_type=SEARCH_TYPE, **timekwargs): """submit @@ -66,14 +78,19 @@ def submit(self, account_id, query_string, # request body is the query response = self.search_client.start_query_execution( account_id=account_id, query_string=query_string, - search_type=SEARCH_TYPE, **time_args) - self.search_id = response.json()['search_uuid'] + search_type=search_type, **time_args) + self.load_results(account_id, response.json()['search_uuid']) + + return self.search_id + + def load_results(self, account_id, result_id): + """load_results + Attempt to load existing search results by ID""" + self.search_id = result_id self.next_token = None self.account_id = account_id self.remaining = None - return self.search_id - def wait_for_complete(self, block=True) -> object: # Ask for results for this query, and process a single batch # TODO isolate polling form result fetching @@ -111,12 +128,11 @@ def wait_for_complete(self, block=True) -> object: elif search_status in ['complete', 'suspended']: return True - def fetch(self, block=True) -> object: + def fetch(self, block=True, rows=ROWS_PER_FETCH) -> dict: # Ask for results for this query, and process a single batch # TODO isolate polling form result fetching while True: - # GET search/v2/:account_id/searches/:search_id/status response = self.search_client.get_search_status( account_id=self.account_id, search_uuid=self.search_id) @@ -151,7 +167,7 @@ def fetch(self, block=True) -> object: self.logger.debug(f'Fetching initial results for {self.search_id}') response = self.search_client.get_search_results( account_id=self.account_id, search_uuid=self.search_id, - limit=ROWS_PER_FETCH) + limit=rows) else: self.logger.debug(f'Fetching next page of results for {self.search_id} ' f'with token {self.next_token}') @@ -168,6 +184,50 @@ def fetch(self, block=True) -> object: self.logger.debug(results) return results + def fetch_v1(self, block=True, rows=ROWS_PER_FETCH) -> dict: + # XXX can't page through results, probably need to do math vs tokens + while True: + response = self.search_client.get_log_search_results( + account_id=self.account_id, search_uuid=self.search_id) + + status = response.json() + search_status = status['search_status'] + self.logger.debug(f'Search status for {self.search_id} is {search_status}') + + # pending: no results yet + if search_status == 'pending': + if not block: + return None + else: + sleep(1) + + # failed: syntax errors, runtime problems, etc. + elif search_status == 'failed': + raise Exception(f'Search {self.search_id} failed: {status}') + + # suspended or completed: some or all results available (respectively) + elif search_status in ['complete', 'suspended']: + # GET search/v2/:account_id/searches/:search_id?limit=... + if self.next_token is None: + results = status + else: + self.logger.debug(f'Fetching next page of results for {self.search_id} ' + f'with token {self.next_token}') + response = self.search_client.get_log_search_results( + account_id=self.account_id, search_uuid=self.search_id, + starting_token=self.next_token) + results = response.json() + self.remaining = results['remaining'] + self.logger.debug(f'{self.remaining} remaining results for {self.search_id}') + + if self.remaining != 0: + # advance to the next page of results on the next query + self.next_token = results['next_token'] + elif search_status == 'suspended': + self.remaining = 1 + # self.logger.debug(results) + return results + def results_remaining(self): return self.remaining is None or self.remaining > 0