From 34d39e9fdc3ffd38cb6088a9681b6972264314eb Mon Sep 17 00:00:00 2001 From: Kurisudes Date: Tue, 20 Dec 2022 19:37:25 +0100 Subject: [PATCH] use pyhafas --- schiene/schiene.py | 235 ++++++++++++++++++--------------------------- 1 file changed, 94 insertions(+), 141 deletions(-) diff --git a/schiene/schiene.py b/schiene/schiene.py index 87386bd..4819929 100755 --- a/schiene/schiene.py +++ b/schiene/schiene.py @@ -1,150 +1,103 @@ #!/usr/bin/env python3 -import requests -import json from datetime import datetime -from bs4 import BeautifulSoup - - -def parse_connections(html): - soup = BeautifulSoup(html, "html.parser") - connections = list() - - for row in soup.find_all("td", class_="overview timelink"): - columns = row.parent.find_all("td") - - try: - price_raw = columns[3].find("span", class_="bold").text.strip().replace(',', '.') - price = float(price_raw) - except: - price = None - - data = { - 'details': columns[0].a.get('href').replace('!details=opened!', '!details=opened!detailsVerbund=opened!'), - 'departure': columns[0].a.contents[0].string, - 'arrival': columns[0].a.contents[2].string, - 'transfers': int(columns[2].contents[0]), - 'time': columns[2].contents[2], - 'products': columns[3].contents[0].split(', '), - 'price': price - } - - if not columns[1].find('img') and not columns[1].find('span', class_='delay') and not columns[1].find('span', class_='delayOnTime'): - data['ontime'] = True - data['canceled'] = False - else: - data = parse_delay(data) - - connections.append(data) - return connections - -def parse_stations(html): - """ - Strips JS code, loads JSON - """ - html = html.replace('SLs.sls=', '').replace(';SLs.showSuggestion();', '') - html = json.loads(html) - return html['suggestions'] - -def parse_delay(data): - """ - Prase the delay - """ - # parse data from the details view - rsp = requests.get(data['details']) - soup = BeautifulSoup(rsp.text, "html.parser") - - # get departure delay - delay_departure_raw = None - try: - delay_departure_raw= soup.find('div', class_="routeStart").find('span', class_=["delay", "delayOnTime"]) - except: - pass - - if delay_departure_raw: - delay_departure = calculate_delay(data['departure'], - delay_departure_raw.text) - else: - delay_departure = 0 - - # get arrival delay - delay_arrival_raw = None - try: - delay_arrival_raw = soup.find('div', class_=["routeEnd","routeEndAdditional"]).find('span', class_=["delay", "delayOnTime"]) - except: - pass - - if delay_arrival_raw: - delay_arrival = calculate_delay(data['arrival'], - delay_arrival_raw.text) - else: - delay_arrival = 0 - - # save the parsed data - if delay_departure + delay_arrival == 0: - data['ontime'] = True - else: - data['ontime'] = False - data['delay'] = { - 'delay_departure': int(delay_departure), - 'delay_arrival': int(delay_arrival) - } - - # TODO: this should not be hardcoded! - data['canceled'] = False - - return data - -def calculate_delay(original, delay): - """ - Calculate the delay - """ - original = datetime.strptime(original, '%H:%M') - delayed = datetime.strptime(delay, '%H:%M') - # if the original time was before 0:00 and the delayed time is in the new day, we need to do a different calculation - if delayed < original: - fullday = 86400 - diff = fullday - original + delayed - else: - diff = delayed - original - return diff.total_seconds() // 60 class Schiene(): + def __format_timedelta(self, delta): + """Formats a timedelta duration to [N days] %H:%M:%S format""" + seconds = int(delta.total_seconds()) + + secs_in_a_day = 86400 + secs_in_a_hour = 3600 + secs_in_a_min = 60 + + days, seconds = divmod(seconds, secs_in_a_day) + hours, seconds = divmod(seconds, secs_in_a_hour) + minutes, seconds = divmod(seconds, secs_in_a_min) + + time_fmt = f"{hours:02d}:{minutes:02d}" + + if days > 0: + suffix = "s" if days > 1 else "" + return f"{days} day{suffix} {time_fmt}" + + return time_fmt + + def __parse_journeys(self, journeys, client): + connections = list() + for i in range(3): + journey = client.journey(journeys[i].id) + products = '' + cancel_info = '' + cancelled = False + + for leg in journey.legs: + product = str(leg.name) + + if len(products) == 0: + products = product + else: + products = products + " / " + product + + if leg.cancelled and len(cancel_info) == 0: + cancel_info = " (cancelled)" + cancelled = True + + time = self.__format_timedelta(journey.duration) + data = { + 'details': 'To be removed', + 'departure': (journey.legs[0].departure).strftime("%H:%M") + cancel_info, + 'arrival': (journey.legs[0].arrival).strftime("%H:%M") + cancel_info, + 'transfers': len(journey.legs)-1, + 'time': time, + 'products': products, + 'price': 'Not supported at the moment - issue shall be placed in pyhafas', + 'canceled': cancelled + } + + if journey.legs[0].departureDelay == None: + delay_departure = 0 + else: + delay_departure = journey.legs[0].departureDelay + + if journey.legs[len(journey.legs)-1].arrivalDelay == None: + delay_arrival = 0 + else: + delay_arrival = journey.legs[len(journey.legs)-1].arrivalDelay + + if delay_departure + delay_arrival == 0: + data['ontime'] = True + else: + data['ontime'] = False + data['delay'] = { + 'delay_departure': int(delay_departure.seconds/60), + 'delay_arrival': int(delay_arrival.seconds/60) + } + + connections.append(data) + return connections - def stations(self, station, limit=10): - """ - Find stations for given queries + def connections(self, origin, destination, dt=datetime.now(), only_direct=False): + from pyhafas import HafasClient + from pyhafas.profile import DBProfile + from pyhafas.types.fptf import Leg - Args: - station (str): search query - limit (int): limit number of results - """ - query = { - 'start': 1, - 'S': station + '?', - 'REQ0JourneyStopsB': limit - } - rsp = requests.get('http://reiseauskunft.bahn.de/bin/ajax-getstop.exe/dn', params=query) - return parse_stations(rsp.text) + client = HafasClient(DBProfile()) + origin_location = client.locations(origin)[0] + destination_location = client.locations(destination)[0] - def connections(self, origin, destination, dt=datetime.now(), only_direct=False): - """ - Find connections between two stations - - Args: - origin (str): origin station - destination (str): destination station - dt (datetime): date and time for query - only_direct (bool): only direct connections - """ - query = { - 'S': origin, - 'Z': destination, - 'date': dt.strftime("%d.%m.%y"), - 'time': dt.strftime("%H:%M"), - 'start': 1, - 'REQ0JourneyProduct_opt0': 1 if only_direct else 0 - } - rsp = requests.get('http://mobile.bahn.de/bin/mobil/query.exe/dox?', params=query) - return parse_connections(rsp.text) + if only_direct: + changes = 0 + else: + changes = 100 + + journeys = client.journeys( + origin=origin_location, + via=[], + destination=destination_location, + date=dt, + max_changes=changes, + ) + + return self.__parse_journeys(journeys, client) \ No newline at end of file