Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LCSSMatcher.match_trace_batch() Fails with Multiprocessing #187

Open
joshuacroff opened this issue Jul 18, 2024 · 2 comments
Open

LCSSMatcher.match_trace_batch() Fails with Multiprocessing #187

joshuacroff opened this issue Jul 18, 2024 · 2 comments

Comments

@joshuacroff
Copy link

joshuacroff commented Jul 18, 2024

Problem Description

I've encountered an issue when using the LCSSMatcher.match_trace_batch() method for batch matching with multiprocessing. Below is the method definition for reference:

def match_trace_batch(
        self,
        trace_batch: List[Trace],
        processes: int = 1,
    ) -> List[MatchResult]:
        if processes > 1:
            results = [self.match_trace(t) for t in trace_batch]
        else:
            with Pool(processes=processes) as p:
                results = p.map(self.match_trace, trace_batch)

        return results

It's located in the code here: lcss.py#L150

When I run the following code with the default single process:

matcher = LCSSMatcher(nx_map)
match_results = matcher.match_trace_batch(trace_list)

I get this error message:

ValueError: No roads found for Coordinate(coordinate_id=0, x=-13604638.834547484, y=4558638.110141623, crs=('EPSG', '3857'))

Observations

  • The error does occur when using the method with multiprocessing (processes=1).
  • When I set processes to a higher number (e.g., 8), it processes synchronously using list comprehension without errors, but it's slow:
matcher = LCSSMatcher(nx_map)
match_results = matcher.match_trace_batch(trace_list, processes=8)
  • Attempting multiprocessing with concurrent.futures also results in the same error:
matcher = LCSSMatcher(nx_map)
matched_traces = []
with ProcessPoolExecutor(max_workers=7) as executor:
    futures = [executor.submit(process_trace, trace_dict, matcher) for trace_dict in traces]
    for future in as_completed(futures):
        matched_traces.append(future.result())
return matched_traces

Hypothesis

It seems that multiprocessing might be causing the issue due to object serialization ('pickling').

Questions

  1. Could the need for pickling or serialization in multiprocessing be causing this issue?
  2. Should the method logic be updated to process synchronously if processes=1 and use multiprocessing otherwise?
  3. Is this a known issue, or could it be specific to my environment?

Any insights or suggestions would be greatly appreciated.

@nreinicke
Copy link
Collaborator

Thanks for opening this and for providing so much detail! Sorry you're having issues with it.

Should the method logic be updated to process synchronously if processes=1 and use multiprocessing otherwise?

Yes, you're absolutely right, we had the logic mixed up here and so I'll put in a quick fix to make sure the matcher uses multiprocessing only if processes>1, good find!

Could the need for pickling or serialization in multiprocessing be causing this issue?

Yes, I think so, I traced this back to the rtree spatial index that our road map uses to quickly lookup road geometry. From the docs it seems as if there is a known issue where the rtree cannot be pickled and unpickled correctly.

So, unfortunately it won't be possible to use multiprocessing with this matcher until the pickling issue gets fixed or we find a suitable workaround. Sorry that we don't have a quick fix in this situation but I'll open an issue to explore alternative options for multiprocessing that don't result in errors with the rtree.

This was referenced Jul 22, 2024
@joshuacroff
Copy link
Author

joshuacroff commented Jul 22, 2024

@nreinicke thanks for your responsiveness and for making the logic quick-fix! Also no worries on the multiprocessing issue, I totally understand. I burned quite a few hours myself trying to find a work-around and just couldn't quite make it work.

I did have some logic that worked, but required that I create a couple of new functions wrapping existing mappymatch functions/methods that allowed for asynchronous processing. However, the logic kind of hammers the Overpass API and after about a 1000 records or so, I saw a significant slowdown. You can see my attempt here.

I created two functions:

  • batch_process_traces_parallel()
    • This function is where I implemented the multiprocessing logic
    • This functions calls process_trace
  • process_trace()
    • This function creates a geofence from a single trace
    • Creates a networkx map from the geofence
    • and creates a LCSSMatcher

The problem with this approach seemed to be API throttling. I refactored to pull the entire network down within my area of interest to see if removing the constant calls to the API would help. This is when I ran into issues with pickling a matcher object. If you're curious, my current, non-working attempt is here.

I also tried synchronous logic. My thinking was that if I remove the api calls, perhaps I would get adequate enough speed-up that I wouldn't need to implement asynchronous logic. Somehow, that was slower than the synchronous logic where I made repeated calls to the API, perhaps because the network graph is so much larger? I'm now looking into ways to 'clip' the locally cached network to a trace buffer to see if that speeds things up. This has been a huge learning experience for me I'll say. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants