Skip to content

Commit

Permalink
Merge pull request #47 from GlobalFishingWatch/PIPELINE-1531
Browse files Browse the repository at this point in the history
Refactor join to use station_id only when tagblock group id is not present
  • Loading branch information
pwoods25443 authored Oct 20, 2023
2 parents 516597b + 0012308 commit f101eb7
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 25 deletions.
1 change: 1 addition & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
include:
- os: "ubuntu-20.04"
python-version: "3.6"
Expand Down
2 changes: 1 addition & 1 deletion ais_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Tools for managing AIS messages
"""

__version__ = 'v0.1.5.dev7'
__version__ = 'v0.1.5.dev8'
__author__ = 'Paul Woods'
__email__ = '[email protected]'
__source__ = 'https://github.com/GlobalFishingWatch/ais-tools'
Expand Down
8 changes: 2 additions & 6 deletions ais_tools/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,9 @@ def encode(input, output):
help="Retain an unmatched message part in the buffer until at least max_count messages have"
"been seen after the message part was added to the buffer"
)
@click.option('--use-station-id/--no-use-station-id', default=True,
help="Only match message parts if the station_id from the tagblock also matches"
)
def join_multipart(input, output, max_time, max_count, use_station_id):
def join_multipart(input, output, max_time, max_count):
for nmea in safe_join_multipart_stream(input,
max_time_window=max_time,
max_message_window=max_count,
use_station_id=use_station_id):
max_message_window=max_count):
output.write(nmea)
output.write('\n')
26 changes: 17 additions & 9 deletions ais_tools/nmea.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def expand_nmea(line, validate_checksum=False):
tagblock['tagblock_sentence'] = int(fields[2])
if fields[3] != '':
tagblock['tagblock_id'] = int(fields[3])
else:
tagblock['tagblock_group_id'] = tagblock['tagblock_id']
tagblock['tagblock_channel'] = fields[4]
body = fields[5]
pad = int(nmea.split('*')[0][-1])
Expand Down Expand Up @@ -73,7 +75,7 @@ def join_multipart(lines):
raise DecodeError("all lines to be joined must start with the same character, either '\\' or '!'")


def safe_join_multipart_stream(lines, max_time_window=500, max_message_window=1000, use_station_id=True):
def safe_join_multipart_stream(lines, max_time_window=500, max_message_window=1000):
"""
Same as join_multipart_stream but for any message that cannot decoded, it will just emit
that message back out and not raise a DecodeError exception
Expand All @@ -82,8 +84,7 @@ def safe_join_multipart_stream(lines, max_time_window=500, max_message_window=10
lines,
max_time_window=max_time_window,
max_message_window=max_message_window,
ignore_decode_errors=True,
use_station_id=use_station_id
ignore_decode_errors=True
)
for line in lines:
yield line
Expand All @@ -92,8 +93,7 @@ def safe_join_multipart_stream(lines, max_time_window=500, max_message_window=10
def join_multipart_stream(lines,
max_time_window=500,
max_message_window=1000,
ignore_decode_errors=False,
use_station_id=True):
ignore_decode_errors=False):
"""
Takes a stream of nmea text lines and tries to find the matching parts of multi part messages
which may not be adjacent in the stream and may come out of order.
Expand Down Expand Up @@ -124,12 +124,20 @@ def join_multipart_stream(lines,
# make a key for matching message parts
# - tagblock_groupsize is the number of parts we are looking for
# - tagblock_station is the source of the message and may not have a value
# - tagblock_id is a sequence number that is the same for all message parts, but it is not unique
# - tagblock_id is a sequence number that is the same for all message parts, but it is
# a single digit only so not unique
# - tagblock_group_id if present, is a sequence number that is the same for all message parts, and it
# should be locally unique within the stream. It is a 4-digit number
# - tagblock_channel is the AIS RF channel (either A or B) that was used for transmission

station_id = tagblock.get('tagblock_station') if use_station_id else None
key = (total_parts, station_id, tagblock.get('tagblock_id'),
tagblock.get('tagblock_channel'))
tagblock_group_id = tagblock.get('tagblock_group_id')
if tagblock_group_id:
# only need this group id
key = (total_parts, None, tagblock_group_id, None)
else:
# no group id present, so use everything else we have to try to make a locally unique signature
key = (total_parts, tagblock.get('tagblock_station'), tagblock.get('tagblock_id'),
tagblock.get('tagblock_channel'))

# pack up the message part
# - tagblock_sentence is the index of this part relative to the other parts, where the first part is 1
Expand Down
9 changes: 7 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,21 @@
from ais_tools.cli import cli
from ais_tools.tagblock import split_tagblock
from ais_tools.tagblock import decode_tagblock
# from ais_tools.tagblock import parseTagBlock
import ais_tools


def test_help():
runner = CliRunner()
result = runner.invoke(cli)
assert not result.exception
print(result.output)
assert result.output.startswith('Usage')

def test_version():
runner = CliRunner()
args = '--version'
result = runner.invoke(cli, args=args)
assert not result.exception
assert result.output.strip() == f'Version: {ais_tools.__version__}'

def test_add_tagblock():

Expand Down
8 changes: 2 additions & 6 deletions tests/test_nmea.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
{'tagblock_timestamp': 1509502436, 'tagblock_sentence': 1}),
("\\g:1-2-4372,s:rORBCOMM109,c:1426032000,T:2015-03-11 00.00.00*32"
"\\!AIVDM,2,1,2,B,576u>F02>hOUI8AGR20tt<j104p4l62222222216H14@@Hoe0JPEDp1TQH88,0*16",
{'tagblock_sentence': 1, 'tagblock_groupsize': 2}),
{'tagblock_sentence': 1, 'tagblock_groupsize': 2, 'tagblock_id': 4372, 'tagblock_group_id': 4372}),
("\\g:2-2-4372,s:rORBCOMM109,c:1426032000,T:2015-03-11 00.00.00*31"
"\\!AIVDM,2,2,2,B,88888888880,2*25",
{'tagblock_sentence': 2, 'tagblock_groupsize': 2}),
Expand Down Expand Up @@ -129,11 +129,7 @@ def test_join_multipart_stream_triple(nmea):
'\\g:2-2-1786*55\\!AIVDM,2,2,6,B,88888888880,2*21']),
])
def test_join_multipart_stream_station_id_mismatch(nmea):
combined = list(join_multipart_stream(nmea, use_station_id=True))
assert len(combined) == 2
assert combined == nmea

combined = list(join_multipart_stream(nmea, use_station_id=False))
combined = list(join_multipart_stream(nmea))
assert len(combined) == 1
assert combined == [''.join(nmea)]

Expand Down
3 changes: 2 additions & 1 deletion tests/test_tagblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def test_encode_tagblock(fields, expected):
'tagblock_sentence': 1,
'tagblock_groupsize': 2,
'tagblock_id': 3}),
('s:rMT5858,*0E', {'tagblock_station': 'rMT5858'}) # test for issue #45
('s:rMT5858,*0E', {'tagblock_station': 'rMT5858'}), # test for issue #45
('g:1-2-3456*5A', {'tagblock_sentence': 1, 'tagblock_groupsize': 2, 'tagblock_id': 3456})
])
def test_decode_tagblock(tagblock_str, expected):
assert expected == tagblock.decode_tagblock(tagblock_str)
Expand Down

0 comments on commit f101eb7

Please sign in to comment.