Skip to content

Commit c1d46ad

Browse files
committed
Fix multiprocessing bug #313
1 parent 0628d62 commit c1d46ad

File tree

2 files changed

+19
-6
lines changed

2 files changed

+19
-6
lines changed

couchdb/http.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from base64 import b64encode
1515
from datetime import datetime
16+
import multiprocessing
1617
import errno
1718
import socket
1819
import time
@@ -482,17 +483,20 @@ class ConnectionPool(object):
482483
def __init__(self, timeout, disable_ssl_verification=False):
483484
self.timeout = timeout
484485
self.disable_ssl_verification = disable_ssl_verification
485-
self.conns = {} # HTTP connections keyed by (scheme, host)
486+
self.conns = {} # HTTP connections keyed by (current_process_pid, scheme, host)
486487
self.lock = Lock()
487488

488-
def get(self, url):
489+
@property
490+
def _current_process_id(self):
491+
return multiprocessing.current_process().pid
489492

493+
def get(self, url):
490494
scheme, host = util.urlsplit(url, 'http', False)[:2]
491495

492496
# Try to reuse an existing connection.
493497
self.lock.acquire()
494498
try:
495-
conns = self.conns.setdefault((scheme, host), [])
499+
conns = self.conns.setdefault((self._current_process_id, scheme, host), [])
496500
if conns:
497501
conn = conns.pop(-1)
498502
else:
@@ -520,7 +524,7 @@ def release(self, url, conn):
520524
scheme, host = util.urlsplit(url, 'http', False)[:2]
521525
self.lock.acquire()
522526
try:
523-
self.conns.setdefault((scheme, host), []).append(conn)
527+
self.conns.setdefault((self._current_process_id, scheme, host), []).append(conn)
524528
finally:
525529
self.lock.release()
526530

couchdb/tests/client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# you should have received as part of this distribution.
88

99
from datetime import datetime
10+
import multiprocessing
1011
import os
1112
import os.path
1213
import shutil
@@ -19,6 +20,10 @@
1920
from couchdb.tests import testutil
2021

2122

23+
def _current_pid():
24+
return multiprocessing.current_process().pid
25+
26+
2227
class ServerTestCase(testutil.TempDatabaseMixin, unittest.TestCase):
2328

2429
def test_init_with_resource(self):
@@ -481,7 +486,9 @@ def test_changes_releases_conn(self):
481486
# that the HTTP connection made it to the pool.
482487
list(self.db.changes(feed='continuous', timeout=0))
483488
scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2]
484-
self.assertTrue(self.db.resource.session.connection_pool.conns[(scheme, netloc)])
489+
current_pid = _current_pid()
490+
key = (current_pid, scheme, netloc)
491+
self.assertTrue(self.db.resource.session.connection_pool.conns[key])
485492

486493
def test_changes_releases_conn_when_lastseq(self):
487494
# Consume a changes feed, stopping at the 'last_seq' item, i.e. don't
@@ -490,8 +497,10 @@ def test_changes_releases_conn_when_lastseq(self):
490497
for obj in self.db.changes(feed='continuous', timeout=0):
491498
if 'last_seq' in obj:
492499
break
500+
current_pid = _current_pid()
493501
scheme, netloc = util.urlsplit(client.DEFAULT_BASE_URL)[:2]
494-
self.assertTrue(self.db.resource.session.connection_pool.conns[(scheme, netloc)])
502+
key = (current_pid, current_pid, scheme, netloc)
503+
self.assertTrue(self.db.resource.session.connection_pool.conns[key])
495504

496505
def test_changes_conn_usable(self):
497506
# Consume a changes feed to get a used connection in the pool.

0 commit comments

Comments
 (0)