From 36d0c20a5e23df14f911c71f47ae62e8f70edc62 Mon Sep 17 00:00:00 2001 From: Martin Nyhus Date: Wed, 18 May 2022 20:11:58 +0200 Subject: [PATCH 1/5] filter: handle municipality id for Oslo Treat municipality id as a string since type=int doesn't work when the leading zero is important. --- filter_buildings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filter_buildings.py b/filter_buildings.py index 4ae9b2d..bdf71ed 100644 --- a/filter_buildings.py +++ b/filter_buildings.py @@ -41,7 +41,7 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument('--input', required=True) parser.add_argument('--output', required=True) - parser.add_argument('--municipality', required=True, type=int) + parser.add_argument('--municipality', required=True) args = parser.parse_args() with open(args.input, 'r', encoding='utf-8') as file: From 00416c720ef076be745e89d51771d2c757a8c617 Mon Sep 17 00:00:00 2001 From: Martin Nyhus Date: Sun, 20 Feb 2022 18:03:43 +0100 Subject: [PATCH 2/5] filter: refactor for testing Move the data parsing and filtering steps to functions that can be unit tested. Also adds a few tests for the filtering code to put the testing infrastructure in place. --- Makefile | 16 ++++++++++++ filter_buildings.py | 58 ++++++++++++++++++++++++++++---------------- tests/test_filter.py | 50 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 21 deletions(-) create mode 100644 Makefile create mode 100644 tests/test_filter.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..75db788 --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +.PHONY : all flake8 test + + +FLAKE8_FILES := \ + filter_buildings.py \ + tests/test_filter.py \ + ; + + +all : flake8 test + +flake8 : $(FLAKE8_FILES) + flake8 $? + +test : + python3 -m unittest discover -s tests diff --git a/filter_buildings.py b/filter_buildings.py index bdf71ed..e2f8714 100644 --- a/filter_buildings.py +++ b/filter_buildings.py @@ -5,6 +5,10 @@ import requests +def parse_cadastral_data(data): + return json.loads(data)['features'] + + def parse_ref(raw_ref): return {int(ref) for ref in raw_ref.split(';') if ref} @@ -17,17 +21,21 @@ def run_overpass_query(query): request = requests.get(overpass_url, params=params, headers=headers) - return request.json()['elements'] + return request.text -def load_osm_refs(municipality_id): +def load_osm_data(municipality_id): query_fmt = '''[out:json][timeout:60]; (area[ref={}][admin_level=7][place=municipality];)->.county; nwr["ref:bygningsnr"](area.county); out tags noids; ''' query = query_fmt.format(municipality_id) - elements = run_overpass_query(query) + return run_overpass_query(query) + + +def load_osm_refs(osm_raw): + elements = json.loads(osm_raw)['elements'] osm_refs = set() for element in elements: @@ -37,6 +45,24 @@ def load_osm_refs(municipality_id): return osm_refs +def format_geojson(features): + geojson = { + 'type': 'FeatureCollection', + 'generator': 'filter_buildings.py', + 'features': features, + } + return json.dumps(geojson) + + +def filter_buildings(cadastral_buildings, osm_refs): + def in_osm(building): + raw_ref = building['properties']['ref:bygningsnr'] + building_refs = parse_ref(raw_ref) + return bool(building_refs & osm_refs) + + return [b for b in cadastral_buildings if not in_osm(b)] + + def main(): parser = argparse.ArgumentParser() parser.add_argument('--input', required=True) @@ -45,28 +71,18 @@ def main(): args = parser.parse_args() with open(args.input, 'r', encoding='utf-8') as file: - data = json.load(file) - import_buildings = data['features'] - print('Loaded {} buildings'.format(len(import_buildings))) - - osm_refs = load_osm_refs(args.municipality) - print('Loaded {} unique references from OSM'.format(len(osm_refs))) + cadastral = parse_cadastral_data(file.read()) + print(f'Loaded {len(cadastral)} buildings') - def in_osm(building): - raw_ref = building['properties']['ref:bygningsnr'] - building_refs = parse_ref(raw_ref) - return bool(building_refs & osm_refs) + osm_raw = load_osm_data(args.municipality) + osm_refs = load_osm_refs(osm_raw) + print(f'Loaded {len(osm_refs)} unique references from OSM') - missing_in_osm = [b for b in import_buildings if not in_osm(b)] - print('Writing {} buildings missing from OSM'.format(len(missing_in_osm))) + output = filter_buildings(cadastral, osm_refs) + print(f'Writing {len(output)} buildings missing from OSM') with open(args.output, 'w', encoding='utf-8') as file: - geojson = { - 'type': 'FeatureCollection', - 'generator': 'filter_buildings.py', - 'features': missing_in_osm, - } - json.dump(geojson, file) + file.write(format_geojson(output)) return 0 diff --git a/tests/test_filter.py b/tests/test_filter.py new file mode 100644 index 0000000..38a9419 --- /dev/null +++ b/tests/test_filter.py @@ -0,0 +1,50 @@ +import json +import unittest + +import filter_buildings + + +def cadastral(ref): + return { + 'properties': { + 'ref:bygningsnr': str(ref), + }, + } + + +def osm(ref): + return { + 'tags': { + 'ref:bygningsnr': str(ref), + }, + } + + +class TestBuildingFilter(unittest.TestCase): + def _run_filter(self, cadastral_buildings, osm_ref): + return filter_buildings.filter_buildings(cadastral_buildings, + osm_ref) + + def test_remove_if_imported(self): + output = self._run_filter([cadastral(1)], {1}) + self.assertEqual([], output) + + def test_keep_if_not_in_osm(self): + cadastral_buildings = [cadastral(1)] + output = self._run_filter(cadastral_buildings, set()) + self.assertEqual(cadastral_buildings, output) + + +class TestOsmDataParsing(unittest.TestCase): + def _parse(self, osm_buildings): + return filter_buildings.load_osm_refs( + json.dumps({'elements': osm_buildings})) + + def test_parse_empty(self): + self.assertEqual(set(), self._parse([])) + + def test_parse_single_building(self): + self.assertEqual({1}, self._parse([osm(1)])) + + def test_parse_duplicate_id(self): + self.assertEqual({2}, self._parse([osm(2), osm(2)])) From 73e9842e1005ac66c6254cfe54c0342cd7d1bea0 Mon Sep 17 00:00:00 2001 From: Martin Nyhus Date: Tue, 17 May 2022 00:16:40 +0200 Subject: [PATCH 3/5] Add tool for finding lifecycle updates Adds a tool for finding buildings that are planned or under construction in OSM, but are finished in the cadastral data. Since IG doesn't mean that construction has actually started, planned and construction are treated as the same thing. --- Makefile | 3 + filter_buildings.py | 50 ++------------ find_lifecycle_updates.py | 97 ++++++++++++++++++++++++++++ shared.py | 44 +++++++++++++ tests/test_find_lifecycle_updates.py | 66 +++++++++++++++++++ 5 files changed, 216 insertions(+), 44 deletions(-) create mode 100644 find_lifecycle_updates.py create mode 100644 shared.py create mode 100644 tests/test_find_lifecycle_updates.py diff --git a/Makefile b/Makefile index 75db788..d8b4917 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,10 @@ FLAKE8_FILES := \ filter_buildings.py \ + find_lifecycle_updates.py \ + shared.py \ tests/test_filter.py \ + tests/test_find_lifecycle_updates.py \ ; diff --git a/filter_buildings.py b/filter_buildings.py index e2f8714..e074a27 100644 --- a/filter_buildings.py +++ b/filter_buildings.py @@ -2,36 +2,7 @@ import json import sys -import requests - - -def parse_cadastral_data(data): - return json.loads(data)['features'] - - -def parse_ref(raw_ref): - return {int(ref) for ref in raw_ref.split(';') if ref} - - -def run_overpass_query(query): - overpass_url = "https://overpass-api.de/api/interpreter" - params = {'data': query} - version = '0.8.0' - headers = {'User-Agent': 'building2osm/' + version} - request = requests.get(overpass_url, - params=params, - headers=headers) - return request.text - - -def load_osm_data(municipality_id): - query_fmt = '''[out:json][timeout:60]; - (area[ref={}][admin_level=7][place=municipality];)->.county; - nwr["ref:bygningsnr"](area.county); - out tags noids; - ''' - query = query_fmt.format(municipality_id) - return run_overpass_query(query) +import shared def load_osm_refs(osm_raw): @@ -40,24 +11,15 @@ def load_osm_refs(osm_raw): osm_refs = set() for element in elements: raw_ref = element['tags']['ref:bygningsnr'] - osm_refs |= parse_ref(raw_ref) + osm_refs |= shared.parse_ref(raw_ref) return osm_refs -def format_geojson(features): - geojson = { - 'type': 'FeatureCollection', - 'generator': 'filter_buildings.py', - 'features': features, - } - return json.dumps(geojson) - - def filter_buildings(cadastral_buildings, osm_refs): def in_osm(building): raw_ref = building['properties']['ref:bygningsnr'] - building_refs = parse_ref(raw_ref) + building_refs = shared.parse_ref(raw_ref) return bool(building_refs & osm_refs) return [b for b in cadastral_buildings if not in_osm(b)] @@ -71,10 +33,10 @@ def main(): args = parser.parse_args() with open(args.input, 'r', encoding='utf-8') as file: - cadastral = parse_cadastral_data(file.read()) + cadastral = shared.parse_cadastral_data(file.read()) print(f'Loaded {len(cadastral)} buildings') - osm_raw = load_osm_data(args.municipality) + osm_raw = shared.load_building_tags(args.municipality) osm_refs = load_osm_refs(osm_raw) print(f'Loaded {len(osm_refs)} unique references from OSM') @@ -82,7 +44,7 @@ def main(): print(f'Writing {len(output)} buildings missing from OSM') with open(args.output, 'w', encoding='utf-8') as file: - file.write(format_geojson(output)) + file.write(shared.format_geojson(output)) return 0 diff --git a/find_lifecycle_updates.py b/find_lifecycle_updates.py new file mode 100644 index 0000000..1627d58 --- /dev/null +++ b/find_lifecycle_updates.py @@ -0,0 +1,97 @@ +import argparse +import json +import re +import sys + +import shared + + +def osm_buildings_by_ref(osm_buildings): + by_ref = {} + for osm_building in osm_buildings: + tags = osm_building['tags'] + raw_ref = tags['ref:bygningsnr'] + for osm_ref in shared.parse_ref(raw_ref): + try: + by_ref[osm_ref].append(osm_building) + except KeyError: + by_ref[osm_ref] = [osm_building] + + return by_ref + + +def cadastral_construction_finished(building): + tags = building['properties'] + if 'STATUS' not in tags: + raise RuntimeError + + if re.match('#(RA|IG) .*', tags['STATUS']): + return False + + return True + + +def osm_construction_finished(building): + tags = building['tags'] + if 'planned:building' in tags: + return False + elif 'building' in tags and tags['building'] == 'construction': + return False + else: + return True + + +def has_lifecycle_update(cadastral_building, osm_buildings): + for osm_building in osm_buildings: + cadastral_done = cadastral_construction_finished(cadastral_building) + osm_done = osm_construction_finished(osm_building) + + if cadastral_done and not osm_done: + return True + + return False + + +def find_lifecycle_updates(cadastral_buildings, osm_by_ref): + updated = [] + for cadastral_building in cadastral_buildings: + cadastral_ref = int(cadastral_building['properties']['ref:bygningsnr']) + try: + osm_buildings = osm_by_ref[cadastral_ref] + except KeyError: + # Building is missing from OSM + continue + + if has_lifecycle_update(cadastral_building, osm_buildings): + updated.append(cadastral_building) + continue + + return updated + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--input', required=True) + parser.add_argument('--output', required=True) + parser.add_argument('--municipality', required=True) + args = parser.parse_args() + + with open(args.input, 'r', encoding='utf-8') as file: + cadastral = shared.parse_cadastral_data(file.read()) + print(f'Loaded {len(cadastral)} buildings') + + osm_raw = shared.load_building_tags(args.municipality) + osm_buildings = json.loads(osm_raw)['elements'] + osm_by_ref = osm_buildings_by_ref(osm_buildings) + print(f'Loaded {len(osm_buildings)} buildings from OSM') + + output = find_lifecycle_updates(cadastral, osm_by_ref) + print(f'Writing {len(output)} updated buildings') + with open(args.output, 'w', encoding='utf-8') as file: + file.write(shared.format_geojson(output)) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/shared.py b/shared.py new file mode 100644 index 0000000..65a6450 --- /dev/null +++ b/shared.py @@ -0,0 +1,44 @@ +import json + +import requests + + +def parse_ref(raw_ref): + return {int(ref) for ref in raw_ref.split(';') if ref} + + +def run_overpass_query(query): + overpass_url = "https://overpass-api.de/api/interpreter" + params = {'data': query} + version = '0.8.0' + headers = {'User-Agent': 'building2osm/' + version} + request = requests.get(overpass_url, + params=params, + headers=headers) + request.raise_for_status() + return request.text + + +def load_building_tags(municipality_id): + query = f'''[out:json][timeout:60]; + (area[ref={municipality_id}] + [admin_level=7] + [place=municipality]; + ) -> .county; + nwr["ref:bygningsnr"](area.county); + out tags noids; + ''' + return run_overpass_query(query) + + +def parse_cadastral_data(data): + return json.loads(data)['features'] + + +def format_geojson(features): + geojson = { + 'type': 'FeatureCollection', + 'generator': 'filter_buildings.py', + 'features': features, + } + return json.dumps(geojson) diff --git a/tests/test_find_lifecycle_updates.py b/tests/test_find_lifecycle_updates.py new file mode 100644 index 0000000..2939a15 --- /dev/null +++ b/tests/test_find_lifecycle_updates.py @@ -0,0 +1,66 @@ +import unittest + +import find_lifecycle_updates + + +def cadastral(ref, status): + if status == 'MB': + status = '#MB Midlertidig brukstillatelse' + elif status == 'IG': + status = '#IG Igangsettingstillatelse' + else: + raise RuntimeError + + return { + 'properties': { + 'ref:bygningsnr': str(ref), + 'STATUS': status, + }, + } + + +def osm(ref, planned=False, construction=False): + tags = { + 'ref:bygningsnr': str(ref), + } + + if planned: + tags['planned:building'] = 'yes' + elif construction: + tags['building'] = 'construction' + + return {'tags': tags} + + +class TestFindLifecycleUpdate(unittest.TestCase): + def _run_filter(self, cadastral_buildings, osm_buildings): + osm_by_ref = find_lifecycle_updates.osm_buildings_by_ref( + osm_buildings) + return find_lifecycle_updates.find_lifecycle_updates( + cadastral_buildings, + osm_by_ref) + + def test_provisional_use_permit_is_update_from_planned(self): + cadastral_buildings = [cadastral(1, status='MB')] + osm_buildings = [osm(1, planned=True)] + output = self._run_filter(cadastral_buildings, osm_buildings) + self.assertEqual(cadastral_buildings, output) + + def test_provisional_use_permit_is_update_from_construction(self): + cadastral_buildings = [cadastral(1, status='MB')] + osm_buildings = [osm(1, construction=True)] + output = self._run_filter(cadastral_buildings, osm_buildings) + self.assertEqual(cadastral_buildings, output) + + def test_dont_include_construction_permit_when_osm_has_planned(self): + # IG doesn't imply that construction has actually started, so planned + # might still be the correct OSM tagging + cadastral_buildings = [cadastral(1, status='IG')] + osm_buildings = [osm(1, planned=True)] + output = self._run_filter(cadastral_buildings, osm_buildings) + self.assertEqual([], output) + + def test_ignore_building_missing_from_osm(self): + cadastral_buildings = [cadastral(1, status='MB')] + output = self._run_filter(cadastral_buildings, []) + self.assertEqual([], output) From 6266a72ed384dde2660fb9069487edd256bbd9fa Mon Sep 17 00:00:00 2001 From: Martin Nyhus Date: Mon, 16 May 2022 17:06:26 +0200 Subject: [PATCH 4/5] Add tool for finding removed buildings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a tool for finding OSM buildings that have been removed from the cadastral registry. From testing in Lillestrøm this is unfortunately not a reliable indicator of the building actually having been removed, so the output format is intentionally not suitable for automatic uploading. --- Makefile | 2 + find_removed.py | 85 ++++++++++++++++++++++++++++++++++++++ shared.py | 5 ++- tests/test_find_removed.py | 71 +++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 find_removed.py create mode 100644 tests/test_find_removed.py diff --git a/Makefile b/Makefile index d8b4917..d743a1a 100644 --- a/Makefile +++ b/Makefile @@ -4,9 +4,11 @@ FLAKE8_FILES := \ filter_buildings.py \ find_lifecycle_updates.py \ + find_removed.py \ shared.py \ tests/test_filter.py \ tests/test_find_lifecycle_updates.py \ + tests/test_find_removed.py \ ; diff --git a/find_removed.py b/find_removed.py new file mode 100644 index 0000000..f6d9365 --- /dev/null +++ b/find_removed.py @@ -0,0 +1,85 @@ +import argparse +import json + +import shared + + +def collect_refs(buildings): + refs = set() + + for building in buildings: + try: + tags = building['tags'] + except KeyError: + tags = building['properties'] + + raw_ref = tags['ref:bygningsnr'] + for ref in shared.parse_ref(raw_ref): + refs.add(ref) + + return refs + + +def to_output(building): + if building['type'] == 'node': + lon = building['lon'] + lat = building['lat'] + else: + lon = building['center']['lon'] + lat = building['center']['lat'] + + return { + 'type': 'Feature', + 'geometry': { + 'type': 'Point', + 'coordinates': [ + lon, + lat, + ] + }, + 'properties': building['tags'], + } + + +def find_removed(cadastral_buildings, osm_buildings): + cadastral_refs = collect_refs(cadastral_buildings) + osm_refs = collect_refs(osm_buildings) + + removed_buildings = [] + for ref in osm_refs - cadastral_refs: + for osm_building in osm_buildings: + if ref in collect_refs([osm_building]): + try: + removed_buildings.append(to_output(osm_building)) + except Exception: + print(osm_building) + raise + + return removed_buildings + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--input', required=True) + parser.add_argument('--output', required=True) + parser.add_argument('--municipality', required=True) + args = parser.parse_args() + + with open(args.input, 'r', encoding='utf-8') as file: + cadastral = shared.parse_cadastral_data(file.read()) + print(f'Loaded {len(cadastral)} buildings') + + osm_raw = shared.load_building_tags(args.municipality, + with_position=True) + osm_buildings = json.loads(osm_raw)['elements'] + print(f'Loaded {len(osm_buildings)} buildings from OSM') + + output = find_removed(cadastral, osm_buildings) + print(f'Writing {len(output)} buildings that have been removed') + + with open(args.output, 'w', encoding='utf-8') as file: + file.write(shared.format_geojson(output)) + + +if __name__ == '__main__': + main() diff --git a/shared.py b/shared.py index 65a6450..2d00e1b 100644 --- a/shared.py +++ b/shared.py @@ -19,14 +19,15 @@ def run_overpass_query(query): return request.text -def load_building_tags(municipality_id): +def load_building_tags(municipality_id, with_position=False): + center = 'center' if with_position else '' query = f'''[out:json][timeout:60]; (area[ref={municipality_id}] [admin_level=7] [place=municipality]; ) -> .county; nwr["ref:bygningsnr"](area.county); - out tags noids; + out tags noids {center}; ''' return run_overpass_query(query) diff --git a/tests/test_find_removed.py b/tests/test_find_removed.py new file mode 100644 index 0000000..6b0dea6 --- /dev/null +++ b/tests/test_find_removed.py @@ -0,0 +1,71 @@ +import unittest + +import find_removed + + +expected_output_point = { + 'type': 'Feature', + 'geometry': { + 'type': 'Point', + 'coordinates': [ + 11.0, + 59.0, + ] + }, + 'properties': { + 'ref:bygningsnr': '1', + 'building': 'yes', + } + } + + +def cadastral(ref): + return {'properties': {'ref:bygningsnr': str(ref)}} + + +def osm_node(ref): + return { + 'type': 'node', + 'lat': 59.0, + 'lon': 11.0, + 'tags': { + 'building': 'yes', + 'ref:bygningsnr': str(ref), + } + } + + +def osm_way(ref): + return { + 'type': 'way', + 'center': { + 'lat': 59.0, + 'lon': 11.0, + }, + 'tags': { + 'building': 'yes', + 'ref:bygningsnr': str(ref), + } + } + + +class TestFindRemoved(unittest.TestCase): + def _find_removed(self, cadastral_buildings, osm_buildings): + return find_removed.find_removed(cadastral_buildings, + osm_buildings) + + def test_ignore_building_still_in_cadastral_data(self): + removed = self._find_removed([cadastral(1)], [osm_node(1)]) + self.assertEqual([], removed) + + def test_ignore_building_missing_from_osm(self): + removed = self._find_removed([cadastral(1)], []) + self.assertEqual([], removed) + + def test_output_removed_building_node(self): + removed = self._find_removed([], [osm_node(1)]) + self.assertEqual([expected_output_point], removed) + + def test_output_removed_building_way(self): + removed = self._find_removed([], [osm_way(1)]) + self.assertEqual([expected_output_point], removed) From b0fb41d863b9c151603d1a59b28f99f433d74d51 Mon Sep 17 00:00:00 2001 From: Martin Nyhus Date: Sun, 11 Jun 2023 00:40:48 +0200 Subject: [PATCH 5/5] Allow passing municipality by name to more tools --- Makefile | 1 + filter_buildings.py | 4 ++- find_lifecycle_updates.py | 4 ++- find_removed.py | 4 ++- shared.py | 65 +++++++++++++++++++++++++++++++++++++++ tests/test_shared.py | 61 ++++++++++++++++++++++++++++++++++++ 6 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 tests/test_shared.py diff --git a/Makefile b/Makefile index d743a1a..b72e69e 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ FLAKE8_FILES := \ tests/test_filter.py \ tests/test_find_lifecycle_updates.py \ tests/test_find_removed.py \ + tests/test_shared.py \ ; diff --git a/filter_buildings.py b/filter_buildings.py index e074a27..13aca60 100644 --- a/filter_buildings.py +++ b/filter_buildings.py @@ -32,11 +32,13 @@ def main(): parser.add_argument('--municipality', required=True) args = parser.parse_args() + muni_id = shared.handle_municipality_argument(args.municipality) + with open(args.input, 'r', encoding='utf-8') as file: cadastral = shared.parse_cadastral_data(file.read()) print(f'Loaded {len(cadastral)} buildings') - osm_raw = shared.load_building_tags(args.municipality) + osm_raw = shared.load_building_tags(muni_id) osm_refs = load_osm_refs(osm_raw) print(f'Loaded {len(osm_refs)} unique references from OSM') diff --git a/find_lifecycle_updates.py b/find_lifecycle_updates.py index 1627d58..9329872 100644 --- a/find_lifecycle_updates.py +++ b/find_lifecycle_updates.py @@ -76,11 +76,13 @@ def main(): parser.add_argument('--municipality', required=True) args = parser.parse_args() + muni_id = shared.handle_municipality_argument(args.municipality) + with open(args.input, 'r', encoding='utf-8') as file: cadastral = shared.parse_cadastral_data(file.read()) print(f'Loaded {len(cadastral)} buildings') - osm_raw = shared.load_building_tags(args.municipality) + osm_raw = shared.load_building_tags(muni_id) osm_buildings = json.loads(osm_raw)['elements'] osm_by_ref = osm_buildings_by_ref(osm_buildings) print(f'Loaded {len(osm_buildings)} buildings from OSM') diff --git a/find_removed.py b/find_removed.py index f6d9365..dedbaba 100644 --- a/find_removed.py +++ b/find_removed.py @@ -65,11 +65,13 @@ def main(): parser.add_argument('--municipality', required=True) args = parser.parse_args() + muni_id = shared.handle_municipality_argument(args.municipality) + with open(args.input, 'r', encoding='utf-8') as file: cadastral = shared.parse_cadastral_data(file.read()) print(f'Loaded {len(cadastral)} buildings') - osm_raw = shared.load_building_tags(args.municipality, + osm_raw = shared.load_building_tags(muni_id, with_position=True) osm_buildings = json.loads(osm_raw)['elements'] print(f'Loaded {len(osm_buildings)} buildings from OSM') diff --git a/shared.py b/shared.py index 2d00e1b..473348e 100644 --- a/shared.py +++ b/shared.py @@ -1,8 +1,19 @@ import json +import re +import sys import requests +class NoResults(Exception): + pass + + +class MultipleResults(Exception): + def __init__(self, *results): + self.results = list(results) + + def parse_ref(raw_ref): return {int(ref) for ref in raw_ref.split(';') if ref} @@ -43,3 +54,57 @@ def format_geojson(features): 'features': features, } return json.dumps(geojson) + + +def load_municipalities(): + url = ('https://ws.geonorge.no/kommuneinfo/v1/fylkerkommuner' + + '?filtrer=fylkesnummer%2Cfylkesnavn%2Ckommuner.kommunenummer' + + '%2Ckommuner.kommunenavnNorsk') + request = requests.get(url) + + municipalities = {} + for county in request.json(): + for municipality in county['kommuner']: + muni_number = municipality['kommunenummer'] + muni_name = municipality['kommunenavnNorsk'] + municipalities[muni_number] = muni_name + + return municipalities + + +def resolve_municipality_id(municipalities, lookup_name): + result = None + for muni_id in municipalities: + muni_name = municipalities[muni_id] + if lookup_name.casefold() in muni_name.casefold(): + current = { + 'id': muni_id, + 'name': muni_name, + } + + if result is not None: + raise MultipleResults(result, current) + else: + result = current + + if result is None: + raise NoResults + + return result['id'] + + +def handle_municipality_argument(municipality): + if re.match('[0-9]{4}', municipality): + return municipality + + municipalities = load_municipalities() + try: + return resolve_municipality_id( + municipalities, municipality) + except NoResults: + sys.exit(f'Municipality {municipality} not found') + except MultipleResults as e: + sys.exit('Found multiple matching municipalities: {}'.format( + ', '.join( + [f'{item["id"]}/{item["name"]}' for item in e.results] + ))) diff --git a/tests/test_shared.py b/tests/test_shared.py new file mode 100644 index 0000000..d51324b --- /dev/null +++ b/tests/test_shared.py @@ -0,0 +1,61 @@ +import unittest + +import shared + + +class TestMuncipalityResolution(unittest.TestCase): + def setUp(self): + self.municipalities = { + '0301': 'Oslo', + # '0231': 'Skedsmo', + '3018': 'Våler', + '3030': 'Lillestrøm', + '3419': 'Våler', + '4215': 'Lillesand', + '4637': 'Hyllestad', + } + + def _resolve(self, muni_name): + return shared.resolve_municipality_id( + self.municipalities, + muni_name) + + def _assert_resolves_to(self, muni_name, muni_id): + self.assertEqual(muni_id, self._resolve(muni_name)) + + def test_resolve_municipality(self): + self._assert_resolves_to('Lillestrøm', '3030') + + def test_resolve_zero_prefix(self): + self._assert_resolves_to('Oslo', '0301') + + def test_resolve_duplicate_name(self): + with self.assertRaises(shared.MultipleResults) as cm: + self._resolve('Våler') + + self.assertEqual(cm.exception.results, [ + {'id': '3018', 'name': 'Våler'}, + {'id': '3419', 'name': 'Våler'}, + ]) + + def test_resolve_missing(self): + with self.assertRaises(shared.NoResults): + self._resolve('Skedsmo') + + def test_resolve_with_different_case(self): + self._assert_resolves_to('lILLESTRØM', '3030') + + def test_resolve_using_prefix(self): + self._assert_resolves_to('Lillest', '3030') + + def test_prefix_resolution_to_multiple_results(self): + with self.assertRaises(shared.MultipleResults) as cm: + self._resolve('Lilles') + + self.assertEqual(cm.exception.results, [ + {'id': '3030', 'name': 'Lillestrøm'}, + {'id': '4215', 'name': 'Lillesand'}, + ]) + + def test_resolve_with_infix_match(self): + self._assert_resolves_to('llestr', '3030')