Skip to content

Commit

Permalink
Re-factor tagblock parsing (#35)
Browse files Browse the repository at this point in the history
* new tagblock decode and encode tools

* replace parseTagblock with split_tagblock + decode_tagblock

* fix CI that is breaking for python 3.6 for #36

* improve test coverage
  • Loading branch information
pwoods25443 authored Jan 11, 2023
1 parent 5037391 commit 3252442
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 33 deletions.
24 changes: 18 additions & 6 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,35 @@ on:
jobs:
build:

runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]

os:
- macos-11
- ubuntu-latest
python-version:
- "3.7"
- "3.8"
- "3.9"
- "3.10"
include:
- os: "ubuntu-20.04"
python-version: "3.6"
steps:
- uses: actions/checkout@v2
- name: Checkout source
uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
pip install -e .\[dev\]
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip install -r requirements.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
Expand Down
2 changes: 1 addition & 1 deletion ais_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Tools for managing AIS messages
"""

__version__ = 'v0.1.4'
__version__ = 'v0.1.5.dev1'
__author__ = 'Paul Woods'
__email__ = '[email protected]'
__source__ = 'https://github.com/GlobalFishingWatch/ais-tools'
Expand Down
24 changes: 9 additions & 15 deletions ais_tools/nmea.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

from ais import DecodeError
from ais_tools.checksum import is_checksum_valid
from ais_tools.tagblock import parseTagBlock
from ais_tools.tagblock import split_tagblock
from ais_tools.tagblock import decode_tagblock

REGEX_BANG = re.compile(r'(![^!]+)')
REGEX_BACKSLASH = re.compile(r'(\\[^\\]+\\![^!\\]+)')
REGEX_BACKSLASH_BANG = re.compile(r'(\\![^!\\]+)')


def expand_nmea(line, validate_checksum=False):
try:
tagblock, nmea = parseTagBlock(line)
except ValueError as e:
raise DecodeError('Failed to parse tagblock (%s) %s' % (str(e), line))
tagblock_str, nmea = split_tagblock(line)
tagblock = decode_tagblock(tagblock_str, validate_checksum=validate_checksum)

nmea = nmea.strip()
fields = nmea.split(',')
Expand All @@ -26,22 +25,17 @@ def expand_nmea(line, validate_checksum=False):
raise DecodeError('Invalid checksum')

try:
tagblock['tagblock_groupsize'] = int(fields[1])
tagblock['tagblock_sentence'] = int(fields[2])
if fields[3] != '':
tagblock['tagblock_id'] = int(fields[3])
if 'tagblock_groupsize' not in tagblock:
tagblock['tagblock_groupsize'] = int(fields[1])
tagblock['tagblock_sentence'] = int(fields[2])
if fields[3] != '':
tagblock['tagblock_id'] = int(fields[3])
tagblock['tagblock_channel'] = fields[4]
body = fields[5]
pad = int(nmea.split('*')[0][-1])
except ValueError:
raise DecodeError('Unable to convert field to int in nmea message')

if 'tagblock_group' in tagblock:
tagblock_group = tagblock.get('tagblock_group', {})
del tagblock['tagblock_group']
group_fields = {'tagblock_' + k: v for k, v in tagblock_group.items()}
tagblock.update(group_fields)

return tagblock, body, pad


Expand Down
81 changes: 76 additions & 5 deletions ais_tools/tagblock.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import datetime
from datetime import timezone

from ais import DecodeError
from ais_tools.checksum import checksumstr
from ais_tools.checksum import is_checksum_valid

import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from ais.stream import parseTagBlock # noqa: F401
# import warnings
# with warnings.catch_warnings():
# warnings.simplefilter("ignore")
# from ais.stream import parseTagBlock # noqa: F401

TAGBLOCK_T_FORMAT = '%Y-%m-%d %H.%M.%S'

Expand Down Expand Up @@ -56,7 +58,9 @@ def split_tagblock(nmea):
"""
tagblock = ''
if nmea.startswith("\\") and not nmea.startswith("\\!"):
tagblock, nmea = nmea[1:].split("\\", 1)
parts = nmea[1:].split("\\", 1)
if len(parts) == 2:
tagblock, nmea = parts
return tagblock, nmea


Expand All @@ -81,3 +85,70 @@ def add_tagblock(tagblock, nmea, overwrite=True):
tagblock = existing_tagblock

return join_tagblock(tagblock, nmea)


tagblock_fields = {
'c': 'tagblock_timestamp',
'n': 'tagblock_line_count',
'r': 'tagblock_relative_time',
'd': 'tagblock_destination',
's': 'tagblock_station',
't': 'tagblock_text',
}

tagblock_fields_reversed = {v: k for k, v in tagblock_fields.items()}

tagblock_group_fields = ["tagblock_sentence", "tagblock_groupsize", "tagblock_id"]


def encode_tagblock(**kwargs):
group_fields = {}
fields = {}

for k, v in kwargs.items():
if k in tagblock_group_fields:
group_fields[k] = str(v)
elif k in tagblock_fields_reversed:
fields[tagblock_fields_reversed[k]] = v
else:
fields[k.replace('tagblock_', '')] = v

if len(group_fields) == 3:
fields['g'] = '-'.join([group_fields[k] for k in tagblock_group_fields])

base_str = ','.join(["{}:{}".format(k, v) for k, v in fields.items()])
return '{}*{}'.format(base_str, checksumstr(base_str))


def decode_tagblock(tagblock_str, validate_checksum=False):

tagblock = tagblock_str.rsplit("*", 1)[0]

fields = {}

if not tagblock:
return fields

if validate_checksum and not is_checksum_valid(tagblock_str):
raise DecodeError('Invalid checksum')

for field in tagblock.split(","):
try:
key, value = field.split(":")

if key == 'g':
fields.update(dict(zip(tagblock_group_fields,
[int(part) for part in value.split("-")])))
else:
if key in ['n', 'r']:
value = int(value)
elif key == 'c':
value = int(value)
if value > 40000000000:
value = value / 1000.0

fields[tagblock_fields.get(key, key)] = value
except ValueError:
raise DecodeError('Unable to decode tagblock string')

return fields
8 changes: 6 additions & 2 deletions tests/test_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from ais_tools.checksum import checksum
from ais_tools.checksum import is_checksum_valid
from ais_tools.checksum import checksumstr
from ais.stream.checksum import checksumStr

import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from ais.stream.checksum import checksumStr as libais_checksumstr


@pytest.mark.parametrize("str,expected", [
Expand All @@ -27,7 +31,7 @@ def test_checksum_str(str, expected):

assert actual == expected
if len(str) > 1:
assert actual == checksumStr(str)
assert actual == libais_checksumstr(str)


@pytest.mark.parametrize("str,expected", [
Expand Down
8 changes: 6 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from ais_tools.cli import encode
from ais_tools.cli import join_multipart
from ais_tools.cli import cli
from ais_tools.tagblock import parseTagBlock
from ais_tools.tagblock import split_tagblock
from ais_tools.tagblock import decode_tagblock
# from ais_tools.tagblock import parseTagBlock


def test_help():
Expand All @@ -27,7 +29,9 @@ def test_add_tagblock():
result = runner.invoke(add_tagblock, input=input, args=args)
assert not result.exception

tagblock, nmea = parseTagBlock(result.output.strip())
tagblock_str, nmea = split_tagblock(result.output.strip())
tagblock = decode_tagblock(tagblock_str)
# tagblock, nmea = parseTagBlock(result.output.strip())
assert nmea == input
assert tagblock['tagblock_station'] == 'test'

Expand Down
4 changes: 2 additions & 2 deletions tests/test_nmea.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ def test_expand_nmea(line, expected):
'!AIVDM,1,1,'
"\\s:bad-nmea,q:u,c:1509502436,T:2017-11-01 02.13.56*50\\!AIVDM,1,1,,A,13`el0gP000H=3JN9jb>4?wb0>`<,0*00",
"\\s:missing-tagblock-separator,q:u,c:1509502436,T:2017-11-01 02.13.56*50!AIVDM,1,1,,A,13`el0gP000H=3JN9jb>4?wb0>`<,0*00",
"\\s:missing-tagblock-checksum,q:u,c:1509502436,T:2017-11-01 02.13.56\\!AIVDM,1,1,,A,13`el0gP000H=3JN9jb>4?wb0>`<,0*00",
"\\s:missing_field_delimiter,q:u,c1509502436,T:2017-11-01 02.13.56*50\\!AIVDM,1,1,,A,13`el0gP000H=3JN9jb>4?wb0>`<,0*7B",
"\\s:bad_group,q:u,c:1509502436,T:2017-11-01 02.13.56*50\\!AIVDM,BAD,1,,A,13`el0gP000H=3JN9jb>4?wb0>`<,0*0D",
"\\s:missing_checksum,q:u,c:1509502436,T:2017-11-01 02.13.56\\!AIVDM,BAD,1,,A,13`el0gP000H=3JN9jb>4?wb0>`<,0*0D",
"\\s:bad-pad-value,q:u,c:1509502436,T:2017-11-01 02.13.56\\!AIVDM,1,1,,A,13`el0gP000H=3JN9jb>4?wb0>`<,BAD*0D",
])
def test_expand_nmea_fail(nmea):
with pytest.raises(DecodeError):
tagblock, body, pad = expand_nmea(nmea)
tagblock, body, pad = expand_nmea(nmea, validate_checksum=False)


@pytest.mark.parametrize("nmea", [
Expand Down
59 changes: 59 additions & 0 deletions tests/test_tagblock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest

from ais_tools import tagblock
from ais_tools.tagblock import DecodeError


@pytest.mark.parametrize("line,expected", [
Expand Down Expand Up @@ -56,3 +57,61 @@ def test_join_tagblock(t, nmea, expected):
])
def test_add_tagblock(t, nmea, overwrite, expected):
assert expected == tagblock.add_tagblock(t, nmea, overwrite)


@pytest.mark.parametrize("fields,expected", [
({}, '*00'),
({'z': 123}, 'z:123*70'),
({'tagblock_relative_time': 123}, 'r:123*78'),
({'tagblock_timestamp': 123456789}, 'c:123456789*68'),
({'tagblock_timestamp': 123456789, 'tagblock_station': 'test'}, 'c:123456789,s:test*1B'),
({'tagblock_timestamp': 123456789,
'tagblock_station': 'test',
'tagblock_sentence': 1}, 'c:123456789,s:test*1B'),
({'tagblock_timestamp': 123456789,
'tagblock_station': 'test',
'tagblock_sentence': 1,
'tagblock_groupsize': 2,
'tagblock_id': 3}, 'c:123456789,s:test,g:1-2-3*5A'),
])
def test_encode_tagblock(fields, expected):
assert expected == tagblock.encode_tagblock(**fields)


@pytest.mark.parametrize("tagblock_str,expected", [
('*00', {}),
('z:123*70', {'z': '123'}),
('r:123*78', {'tagblock_relative_time': 123}),
('c:123456789*68', {'tagblock_timestamp': 123456789}),
('c:123456789,s:test,g:1-2-3*5A',
{'tagblock_timestamp': 123456789,
'tagblock_station': 'test',
'tagblock_sentence': 1,
'tagblock_groupsize': 2,
'tagblock_id': 3}),
])
def test_decode_tagblock(tagblock_str, expected):
assert expected == tagblock.decode_tagblock(tagblock_str)
assert expected == tagblock.decode_tagblock(tagblock_str, validate_checksum=True)


@pytest.mark.parametrize("tagblock_str", [
('z:123*00'),
('c:123456789,s:invalid,g:1-2-3*5A'),
('c:123456789,s:invalid,g:1-2-3'),
('c:123456789,s:invalid,g:1-2-3*ZZ'),
('s:missing-tagblock-checksum,q:u,c:1509502436,T:2017-11-01 02.13.56')
])
def test_decode_tagblock_invalid_checksum(tagblock_str):
with pytest.raises(DecodeError, match='Invalid checksum'):
tagblock.decode_tagblock(tagblock_str, validate_checksum=True)


@pytest.mark.parametrize("tagblock_str", [
('invalid'),
('c:invalid'),
('c:123456789,z'),
])
def test_decode_tagblock_invalid(tagblock_str):
with pytest.raises(DecodeError, match='Unable to decode tagblock string'):
tagblock.decode_tagblock(tagblock_str, validate_checksum=False)

0 comments on commit 3252442

Please sign in to comment.