Skip to content

Commit

Permalink
Add generic CLI and segment command
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaslink committed Jun 14, 2024
1 parent a5e9256 commit e248382
Show file tree
Hide file tree
Showing 15 changed files with 278 additions and 78 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ COPY --from=apache/beam_python3.8_sdk:2.56.0 /opt/apache/beam /opt/apache/beam

# Perform any additional customizations if desired
COPY ./requirements.txt ./
RUN pip install -r requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Temporary. TODO: Use a local test docker image with extra dependencies.
COPY ./requirements/test.txt ./
RUN pip install -r test.txt
RUN pip install --no-cache-dir -r test.txt

# Temporary. TODO: Use a local dev docker image with extra dependencies.
COPY ./requirements/dev.txt ./
RUN pip install -r dev.txt
RUN pip install --no-cache-dir -r dev.txt

# Setup local packages
COPY . /opt/project
Expand Down
2 changes: 1 addition & 1 deletion examples/example_segment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ docker compose run dev segment \
--labels=version=v3 \
--labels=step=segment \
--labels=stage=productive \
--runner=direct \
--runner=DirectRunner \
--project=world-fishing-827 \
--temp_location=gs://pipe-temp-us-central-ttl7/dataflow_temp \
--staging_location=gs://pipe-temp-us-central-ttl7/dataflow_staging \
Expand Down
2 changes: 1 addition & 1 deletion examples/example_segment_identity.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ docker compose run dev segment_identity_daily \
--labels=version=v3 \
--labels=step=segment \
--labels=stage=productive \
--runner=direct \
--runner=DirectRunner \
--project=world-fishing-827 \
--temp_location=gs://pipe-temp-us-central-ttl7/dataflow_temp \
--staging_location=gs://pipe-temp-us-central-ttl7/dataflow_staging
32 changes: 0 additions & 32 deletions pipe_segment/__main__.py

This file was deleted.

95 changes: 95 additions & 0 deletions pipe_segment/cli/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import sys
import logging
import argparse

from rich.logging import RichHandler

from .commands.segment import Segment

logger = logging.getLogger(__name__)


class CLI:
"""Generic command-line interface."""
HELP_VERBOSE = 'whether to run with DEBUG log level (default: %(default)s).'
HELP_LOG_FILE = "file to send logging output to."

def __init__(self, args):
self._args = args

self._setup_logger()
self._add_commands()
self._parse_args()

def execute(self):
return self.COMMANDS[self.command].run(self.args, self.extra_args)

def _add_commands(self):
self.parser = argparse.ArgumentParser(
prog=self.NAME, description=self.DESCRIPTION, formatter_class=self.formatter())

self.parser.add_argument('-v', '--verbose', action='store_true', help=self.HELP_VERBOSE)
self.parser.add_argument('-l', '--log_file', metavar='\b', help=self.HELP_LOG_FILE)

self.subparsers = self.parser.add_subparsers(dest='command', help='available commands')

for command in self.COMMANDS.values():
command.add_to_subparsers(self.subparsers)

def _parse_args(self):
self.args, self.extra_args = self.parser.parse_known_args(args=self._args or ['--help'])

if self.args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

if self.args.log_file:
logging.getLogger().addHandler(logging.FileHandler(self.args.log_file))

self.command = self.args.command

del self.args.verbose
del self.args.log_file
del self.args.command

def _setup_logger(self):
logging.basicConfig(
level=logging.INFO,
format=self.LOG_FORMAT, handlers=[RichHandler(level="NOTSET")], force=True)
# force = True is needed because some other library is setting the root logger.

for module in self.SUPRESS_LOG:
logging.getLogger(module).setLevel(logging.ERROR)


class PIPE(CLI):
NAME = 'PIPE'
DESCRIPTION = 'Executes a GFW pipeline.'
LOG_FORMAT = '%(name)s - %(message)s'

# packages / moudules for which to supress any log level except ERROR.
SUPRESS_LOG = [
"apache_beam.io.gcp",
]

COMMANDS = {
Segment.NAME: Segment
}

@staticmethod
def formatter():
def formatter(prog):
return argparse.RawTextHelpFormatter(prog, max_help_position=50)

return formatter


def run(args):
PIPE(args).execute()


def main():
run(sys.argv[1:])


if __name__ == '__main__':
main()
110 changes: 110 additions & 0 deletions pipe_segment/cli/commands/segment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import json
import logging
import argparse
from abc import ABC, abstractmethod

from pipe_segment import pipeline


logger = logging.getLogger(__name__)


EXAMPLE_SEGMENTER_PARAMS = {
"max_hours": 24,
"max_speed": 30,
"noise_dist": 0,
"reported_speed_multiplier": 1.1,
"max_speed_multiplier": 15,
"max_speed_exponent": 1.3,
}

EXAMPLE_MERGE_PARAMS = dict(buffer_hours=0.5)


class Command(ABC):
@classmethod
@abstractmethod
def add_to_subparsers(cls, subparsers):
pass

@classmethod
@abstractmethod
def run(cls, **kwargs):
pass


class Segment(Command):
NAME = "segment"

HELP = "segmenter pipeline."
HELP_SOURCE = "Table to read normalized messages."
HELP_MSG_DEST = "Table to write segmented messages."
HELP_FRAGMENT_TBL = "Table to read and write fragments."
HELP_SEGMENT_DEST = "Table to write segments-days."
HELP_SAT_SOURCE = "Table, query or file to read normalized messages. Subset of `source`."
HELP_NORAD = "Table that links NORAD IDs and receivers."
HELP_SAT_POSITIONS = "Table with distance to satellite by receiver at 1s resolution."
HELP_SAT_OFFSET_DEST = "Table to write satellite offsets to."
HELP_BAD_HOUR = "Hours on either side of an hour with bad satellite timing to suppress."
HELP_MAX_OFFSET = "Max. offset (in seconds) of a satellite clock before we drop its messages."
HELP_DATE_RANGE = "Range of dates to read from source. Format 'YYYY-MM-DD,YYYY-MM-DD'."
HELP_WAIT = "Wait until the job finishes before returning."

HELP_SEGMENTER_PARAMS = (
"JSON object with fragmenter parameters, or filepath @path/to/file.json. "
f"For Example: \n {json.dumps(EXAMPLE_SEGMENTER_PARAMS)}"
)

HELP_MERGE_PARAMS = (
"JSON object with fragmenter parameters, or filepath @path/to/file.json. "
f"For Example: \n {json.dumps(EXAMPLE_MERGE_PARAMS)}"
)
HELP_SSVID_FILTER = (
"Query that returns a list of ssvid to trim the sourced data down to. "
"Note that the returned list is used in memory so should not be too large. "
"This meant for testing purposes. If tempted to use for production, "
"more work should be done so that the data is pruned on the way in."
)

HELP_BINS_PER_DAY = "Amount of containers per day to tag fragments and messages."

EPILOG = "Example: pipe segment --help"

@classmethod
def add_to_subparsers(cls, subparsers):
p = subparsers.add_parser(
cls.NAME, help=cls.HELP, epilog=cls.EPILOG, formatter_class=cls.formatter())

required = p.add_argument_group("Required")
add = required.add_argument
add("--source", required=True, metavar='\b', help=cls.HELP_SOURCE)
add("--msg_dest", required=True, metavar='\b', help=cls.HELP_MSG_DEST)
add("--fragment_tbl", required=True, metavar='\b', help=cls.HELP_FRAGMENT_TBL)
add("--segment_dest", required=True, metavar='\b', help=cls.HELP_SEGMENT_DEST)

optional = p.add_argument_group("Optional")
add = optional.add_argument
add("--sat_source", metavar=' ', help=cls.HELP_SAT_SOURCE)
add("--norad_to_receiver_tbl", metavar=' ', help=cls.HELP_NORAD)
add("--sat_positions_tbl", metavar=' ', help=cls.HELP_SAT_POSITIONS)
add("--sat_offset_dest", metavar=' ', help=cls.HELP_SAT_OFFSET_DEST)
add("--bad_hour_padding", type=int, default=1, metavar=' ', help=cls.HELP_BAD_HOUR)
add("--max_timing_offset_s", type=int, default=30, metavar=' ', help=cls.HELP_MAX_OFFSET)
add("--date_range", metavar=' ', help=cls.HELP_DATE_RANGE)
add("--wait_for_job", default=False, action="store_true", help=cls.HELP_WAIT)
add("--segmenter_params", default="{}", metavar=' ', help=cls.HELP_SEGMENTER_PARAMS)
add("--merge_params", default="{}", metavar=' ', help=cls.HELP_MERGE_PARAMS)
add("--ssvid_filter_query", metavar=' ', help=cls.HELP_SSVID_FILTER)
add("--bins_per_day", default=4, metavar=' ', help=cls.HELP_BINS_PER_DAY)

@classmethod
def run(cls, args, extra_args):
logger.info("Running pipe segment command...")
pipeline.run(args, extra_args)

@staticmethod
def formatter():
def argparse_formatter(prog):
return argparse.HelpFormatter(prog, max_help_position=50)

return argparse_formatter
Loading

0 comments on commit e248382

Please sign in to comment.