Skip to content

Commit

Permalink
updates to allow running under Py3
Browse files Browse the repository at this point in the history
  • Loading branch information
bitsofbits committed Dec 17, 2019
1 parent 062682b commit c76b372
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:2.7
FROM python:3.7

# Configure the working directory
RUN mkdir -p /opt/project
Expand Down
6 changes: 3 additions & 3 deletions pipe_segment/io/gcp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from util import parse_gcp_path
from gcpsink import GCPSink
from gcpsource import GCPSource
from .util import parse_gcp_path
from .gcpsink import GCPSink
from .gcpsource import GCPSource
2 changes: 1 addition & 1 deletion pipe_segment/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def safe_dateFromTimestamp(ts):

def parse_date_range(s):
# parse a string YYYY-MM-DD,YYYY-MM-DD into 2 timestamps
return map(as_timestamp, s.split(',')) if s is not None else (None, None)
return list(map(as_timestamp, s.split(',')) if s is not None else (None, None))

def offset_timestamp(ts, **timedelta_args):
if ts is None:
Expand Down
4 changes: 2 additions & 2 deletions pipe_segment/transform/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from stdnum import imo as imo_validator

from shipdataprocess.normalize import normalize_callsign
from shipdataprocess.normalize import normalize_shipname_parts
from shipdataprocess.normalize import normalize_shipname

class NormalizeDoFn(beam.DoFn):
@staticmethod
Expand All @@ -13,7 +13,7 @@ def add_normalized_field(msg, fn, field):
msg['n_'+field] = normalized

def process(self, msg):
n_shipname = normalize_shipname_parts(msg.get('shipname'))['basename']
n_shipname = normalize_shipname(msg.get('shipname'))
if n_shipname:
msg['n_shipname'] = n_shipname

Expand Down
22 changes: 13 additions & 9 deletions pipe_segment/transform/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pipe_tools.timestamp import datetimeFromTimestamp
from pipe_tools.timestamp import timestampFromDatetime

from segment_implementation import SegmentImplementation
from .segment_implementation import SegmentImplementation

logger = logging.getLogger(__file__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -65,14 +65,17 @@ def _convert_message_out(msg):
@staticmethod
def _convert_segment_in(seg):
seg = dict(seg.items())
for k in ['timestamp', 'first_msg_timestamp', 'last_msg_timestamp']:
seg[k] = datetimeFromTimestamp(seg[k])
for k in ['timestamp', 'first_msg_timestamp', 'last_msg_timestamp',
'first_msg_of_day_timestamp', 'last_msg_of_day_timestamp']:
if seg[k] is not None:
seg[k] = datetimeFromTimestamp(seg[k])
return seg

@staticmethod
def _convert_segment_out(seg):
seg = dict(seg.items())
for k in ['timestamp', 'first_msg_timestamp', 'last_msg_timestamp',
'first_msg_of_day_timestamp', 'last_msg_of_day_timestamp',
'timestamp_first', 'timestamp_last', # Stats stuff TODO: clean out someday
'timestamp_min', 'timestamp_max']:
if k in seg and not seg[k] is None:
Expand Down Expand Up @@ -122,12 +125,13 @@ def add_field(name, field_type, mode='REQUIRED'):
add_field('closed', 'BOOLEAN')
add_field('message_count', 'INTEGER')
add_field('timestamp', 'TIMESTAMP')
for prefix in ['first_msg_', 'last_msg_', 'first_msg_on_day_', 'last_msg_on_day_']:
add_field(prefix + 'timestamp', 'TIMESTAMP')
add_field(prefix + 'lat', 'FLOAT')
add_field(prefix + 'lon', 'FLOAT')
add_field(prefix + 'course', 'FLOAT')
add_field(prefix + 'speed', 'FLOAT')
for prefix in ['first_msg_', 'last_msg_', 'first_msg_of_day_', 'last_msg_of_day_']:
mode = 'NULLABLE' if prefix.endswith('of_day_') else 'REQUIRED'
add_field(prefix + 'timestamp', 'TIMESTAMP', mode)
add_field(prefix + 'lat', 'FLOAT', mode)
add_field(prefix + 'lon', 'FLOAT', mode)
add_field(prefix + 'course', 'FLOAT', mode)
add_field(prefix + 'speed', 'FLOAT', mode)

def add_sig_field(name):
field = bigquery.TableFieldSchema()
Expand Down
2 changes: 1 addition & 1 deletion pipe_segment/transform/segment_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def _as_record_v1(self, record):
record['origin_ts'] = record.pop('first_msg_timestamp')
for k in ['lat', 'lon', 'course', 'speed']:
record.pop('first_msg_' + k)
for k in ['lat', 'lon', 'course', 'speed']:
for k in ['lat', 'lon', 'course', 'speed', 'timestamp']:
record.pop('first_msg_of_day_' + k)
record.pop('last_msg_of_day_' + k)
record['last_pos_ts'] = record.pop('last_msg_timestamp')
Expand Down
6 changes: 4 additions & 2 deletions pipe_segment/transform/stitcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ def __init__(self, stitcher_params=None, **kwargs):
@staticmethod
def _convert_segment_in(seg):
seg = dict(seg.items())
for k in ['timestamp', 'first_msg_timestamp', 'last_msg_timestamp']:
seg[k] = datetimeFromTimestamp(seg[k])
for k in ['timestamp', 'first_msg_timestamp', 'last_msg_timestamp',
'first_msg_of_day_timestamp', 'last_msg_of_day_timestamp']:
if seg[k] is not None:
seg[k] = datetimeFromTimestamp(seg[k])
return seg

def stitch(self, kv):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
https://codeload.github.com/GlobalFishingWatch/gpsdio-segment/tar.gz/v0.19-dev#egg=gpsdio-segment
https://codeload.github.com/GlobalFishingWatch/pipe-tools/tar.gz/v3.0.0#egg=pipe-tools
https://codeload.github.com/GlobalFishingWatch/pipe-tools/tar.gz/py3_compatibility#egg=pipe-tools
https://codeload.github.com/GlobalFishingWatch/ShipDataProcess/tar.gz/06b8495f55c6a44306ca9865b4c07d51d7ad5a7b#egg=shipdataprocess
8 changes: 2 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@
"python-stdnum",
"gpsdio-segment", #==0.19-dev",
"pipe-tools==3.0.0",
"shipdataprocess==0.5.8",
"shipdataprocess==0.6.9",
"jinja2-cli",
"google-apitools>=0.5.26,<0.5.27"
"google-apitools"
]



with codecs.open('README.md', encoding='utf-8') as f:
readme = f.read().strip()

with codecs.open('requirements.txt', encoding='utf-8') as f:
DEPENDENCY_LINKS=[line for line in f]

setup(
author=package.__author__,
author_email=package.__email__,
Expand All @@ -50,5 +47,4 @@
url=package.__source__,
version=package.__version__,
zip_safe=True,
dependency_links=DEPENDENCY_LINKS
)
5 changes: 4 additions & 1 deletion tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ def _run_pipeline (self, source, messages_sink, segments_sink, expected, args=[]

pipe_segment_run(args)

def listify(seq):
return sorted([sorted(x.items()) for x in seq])

with nlj.open(expected) as expected:
with open_shards('%s*' % messages_sink) as raw_output:
output = list(nlj.load(raw_output))
for x in output:
x.pop('msgid')
assert sorted(expected) == sorted(output)
assert listify(expected) == listify(output)

def test_Pipeline_basic_args(self, test_data_dir, temp_dir):
source = pp.join(test_data_dir, 'input.json')
Expand Down
12 changes: 11 additions & 1 deletion tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ def test_segment_segments_in(self, temp_dir):
'last_msg_course' : 0,
'last_msg_speed' : 0,
'last_msg_timestamp' : prev_ts,
'first_msg_of_day_lat': 0,
'first_msg_of_day_lon': 0,
'first_msg_of_day_course' : 0,
'first_msg_of_day_speed' : 0,
'first_msg_of_day_timestamp' : prev_ts,
'last_msg_of_day_lat': 0,
'last_msg_of_day_lon': 0,
'last_msg_of_day_course' : 0,
'last_msg_of_day_speed' : 0,
'last_msg_of_day_timestamp' : prev_ts,
'message_count': 1,
'shipnames' : [],
'callsigns' : [],
Expand Down Expand Up @@ -176,7 +186,7 @@ def test_segment_out_in(self, temp_dir):
@pytest.mark.parametrize("message, expected", [
({}, {}),
({'shipname': 'f/v boaty Mc Boatface'}, {'n_shipname': 'BOATYMCBOATFACE'}),
({'shipname': 'Bouy 42%'}, {'n_shipname': 'BOUY'}),
({'shipname': 'Bouy 42%'}, {'n_shipname': 'BOUY42'}),
({'callsign': '@@123'}, {'n_callsign': '123'}),
({'imo': 8814275}, {'n_imo': 8814275}),
])
Expand Down

0 comments on commit c76b372

Please sign in to comment.