Skip to content

Commit

Permalink
Adds support for Apache Beam 2.28.0 and increments Google SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
smpiano committed Apr 29, 2021
1 parent 8a5a579 commit b92fe8b
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ install:
- docker-compose build

script:
- docker-compose run py.test tests
- docker-compose run test tests

notifications:
slack:
Expand Down
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to

## [UNRELEASED]

## v3.1.0 - 2021-04-29

### Added

* [Data Pipeline/PIPELINE-84](https://globalfishingwatch.atlassian.net/browse/PIPELINE-84):
Adds support of Apache Beam `2.28.0`.
Increments Google SDK version to `338.0.0`.
Fixes tests after update of Beam.

## v3.0.8 - 2020-11-13

### Added
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WORKDIR /opt/project

# Download and install google cloud. See the dockerfile at
# https://hub.docker.com/r/google/cloud-sdk/~/dockerfile/
ENV CLOUD_SDK_VERSION 268.0.0
ENV CLOUD_SDK_VERSION 338.0.0
RUN \
export CLOUD_SDK_APT_DEPS="curl gcc python-dev python-setuptools apt-transport-https lsb-release openssh-client git" && \
export CLOUD_SDK_PIP_DEPS="crcmod" && \
Expand Down
48 changes: 24 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Segment pipeline
# Segment pipeline

This repository contains the segment pipeline, a dataflow pipeline which
divides vessel tracks into contiguous "segments", separating
out noise and signals that may come from two or more vessels which are
divides vessel tracks into contiguous "segments", separating
out noise and signals that may come from two or more vessels which are
broadcasting using hte same mmsi at the same time

# Running
Expand Down Expand Up @@ -31,27 +31,27 @@ instructions there.

## Development and Testing

Run unit tests
Run unit tests
Quick run
`docker-compose run py.test tests`
`docker-compose run test tests`

Run with all tests including ones that hit some GCP API
`docker-compose run py.test tests --runslow`
Re-build the docker environment (needed if you modify setup.py or other environmental change)
`docker-compose run test tests --runslow`

Re-build the docker environment (needed if you modify setup.py or other environmental change)
`docker-compose build`
You can run the unit tests outside of docker like this

You can run the unit tests outside of docker like this
` py.test tests`
which may be convenient when debugging stuff. If you do this then you will need
to clear out the `__pycache__` with
which may be convenient when debugging stuff. If you do this then you will need
to clear out the `__pycache__` with
`rm -rf tests/__pycache__/`
or else you will get an error like this
`ImportMismatchError: ('conftest', '/opt/project/tests/conftest.py',

or else you will get an error like this
`ImportMismatchError: ('conftest', '/opt/project/tests/conftest.py',
local('/Users/paul/github/pipe-segment/tests/conftest.py'))`

You can do a local run using a query from BQ in order to get more data to run through it.
You can do a local run using a query from BQ in order to get more data to run through it.
Use the second command below to help view the output in sorted order

```console
Expand All @@ -64,19 +64,19 @@ cat local-output-00000-of-00001 | jq -s '. | sort_by(.mmsi + .timestamp)'
To get the schema for an existing bigquery table - use something like this

`bq show --format=prettyjson world-fishing-827:pipeline_measures_p_p516_daily.20170923 | jq '.schema'`

## Note on the gpsdio-segment dependency

This library depends on the python package [gpsdio-segment](https://github.com/SkyTruth/gpsdio-segment)
We would like to just specify the dependency in setup.py (see the comment in
that file). However, this does not work when installing in the remote worker

We would like to just specify the dependency in setup.py (see the comment in
that file). However, this does not work when installing in the remote worker
in dataflow because there is no git executable on the remote workers.

So instead we download the package tarball in setup.sh and then for local
execution we just pip install from that package, and for remote install we pass
So instead we download the package tarball in setup.sh and then for local
execution we just pip install from that package, and for remote install we pass
the tarball along via the extra_packages option in parser.py

# License

Copyright 2017 Global Fishing Watch
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- ".:/opt/project"
- "gcp:/root/.config/gcloud"

py.test:
test:
image: gfw/pipe-segment
build: .
entrypoint: py.test
Expand Down
2 changes: 1 addition & 1 deletion pipe_segment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""


__version__ = '3.0.8'
__version__ = '3.1.0'
__author__ = 'Paul Woods'
__email__ = '[email protected]'
__source__ = 'https://github.com/GlobalFishingWatch/pipe-segment'
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
https://codeload.github.com/GlobalFishingWatch/gpsdio-segment/tar.gz/0.20.2#egg=gpsdio-segment
https://codeload.github.com/GlobalFishingWatch/pipe-tools/tar.gz/v3.1.2#egg=pipe-tools
https://codeload.github.com/GlobalFishingWatch/pipe-tools/tar.gz/dPIPELINE-84-1#egg=pipe-tools
https://codeload.github.com/GlobalFishingWatch/ShipDataProcess/tar.gz/06b8495f55c6a44306ca9865b4c07d51d7ad5a7b#egg=shipdataprocess
35 changes: 21 additions & 14 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import pytest
import posixpath as pp
import newlinejson as nlj

from apache_beam.testing.test_pipeline import TestPipeline as _TestPipeline
from apache_beam import FlatMap
from apache_beam import GroupByKey
from apache_beam import Map
# rename the class to prevent py.test from trying to collect TestPipeline as a unit test class

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline as _TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.testing.util import open_shards
from apache_beam import Map
from apache_beam import GroupByKey
from apache_beam import FlatMap

from pipe_tools.coders import JSONDictCoder

from pipe_segment.__main__ import run as pipe_segment_run
from pipe_segment.transform.segment import Segment
from pipe_segment.options.segment import SegmentOptions
from pipe_segment.pipeline import SegmentPipeline
from pipe_segment.transform.segment import Segment

from pipe_tools.coders import JSONDictCoder

import apache_beam as beam
import newlinejson as nlj
import posixpath as pp
import pytest


@pytest.mark.filterwarnings('ignore:Using fallback coder:UserWarning')
Expand Down Expand Up @@ -69,7 +69,14 @@ def test_Pipeline_parts(self, test_data_dir, temp_dir):
expected_messages = pp.join(test_data_dir, 'expected_messages.json')
expected_segments = pp.join(test_data_dir, 'expected_segments.json')

with _TestPipeline() as p:
args = [
f'--source={source}',
f'--msg_dest={messages_sink}',
f'--seg_dest={segments_sink}'
]
segop = SegmentOptions(args)

with _TestPipeline(options=segop) as p:
messages = (
p
| beam.io.ReadFromText(file_pattern=source, coder=JSONDictCoder())
Expand Down
48 changes: 28 additions & 20 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
import pytest
import unittest
from copy import deepcopy
from datetime import datetime
import pytz
import posixpath as pp
import newlinejson as nlj
from collections import Counter

import apache_beam as beam

from apache_beam.testing.test_pipeline import TestPipeline as _TestPipeline
# rename the class to prevent py.test from trying to collect TestPipeline as a unit test class

from apache_beam.testing.test_pipeline import TestPipeline as _TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import open_shards

from pipe_tools.timestamp import timestampFromDatetime
from pipe_tools.timestamp import datetimeFromTimestamp
from pipe_tools.utils.timestamp import as_timestamp
from collections import Counter

from pipe_tools.coders import JSONDictCoder
from copy import deepcopy

from datetime import datetime

from pipe_segment.transform.segment import Segment
from pipe_segment.transform.normalize import NormalizeDoFn
from pipe_segment.transform.segment import Segment
from pipe_segment.options.segment import SegmentOptions

from pipe_tools.coders import JSONDictCoder
from pipe_tools.timestamp import datetimeFromTimestamp
from pipe_tools.timestamp import timestampFromDatetime
from pipe_tools.utils.timestamp import as_timestamp

import apache_beam as beam
import newlinejson as nlj
import posixpath as pp
import pytest
import pytz
import unittest

# >>> Note that cogroupByKey treats unicode and char values as distinct,
# so tests can sometimes fail unless all ssvid are unicode.
Expand Down Expand Up @@ -79,7 +80,14 @@ def _run_segment(self, messages_in, segments_in, temp_dir):
messages_file = pp.join(temp_dir, '_run_segment', 'messages')
segments_file = pp.join(temp_dir, '_run_segment', 'segments')

with _TestPipeline() as p:
args = [
f'--source={messages_in}',
f'--msg_dest={messages_file}',
f'--seg_dest={segments_file}'
]
segop = SegmentOptions(args)

with _TestPipeline(options=segop) as p:
messages = (
p | 'CreateMessages' >> beam.Create(messages_in)
| 'AddKeyMessages' >> beam.Map(self.groupby_fn)
Expand Down

0 comments on commit b92fe8b

Please sign in to comment.