Skip to content

Commit

Permalink
Support connection lost
Browse files Browse the repository at this point in the history
  • Loading branch information
julien-duponchelle committed Nov 10, 2012
1 parent 21190e9 commit 1f3a001
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion TODO
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
* Test transaction support
* MySQL 5.6 support: http://dev.mysql.com/doc/internals/en/row-based-replication.html#rows-event
* Support binlog_row_image=minimal or binlog_row_image=noblob
* Support connection lost
* Raise exception if too much connection lost
* Test log file change
24 changes: 17 additions & 7 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
blocking: Read on stream is blocking
only_events: Array of allowed events
'''
connection_settings['charset'] = 'utf8'
self._stream_connection = pymysql.connect(**connection_settings)
ctl_connection_settings = copy.copy(connection_settings)
self.__connection_settings = connection_settings
self.__connection_settings['charset'] = 'utf8'
ctl_connection_settings = copy.copy(self.__connection_settings)
ctl_connection_settings['db'] = 'information_schema'
ctl_connection_settings['cursorclass'] = pymysql.cursors.DictCursor
self.__ctl_connection = pymysql.connect(**ctl_connection_settings)
Expand All @@ -34,10 +34,13 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
self.table_map = {}

def close(self):
self._stream_connection.close()
if self.__connected:
self._stream_connection.close()
self.__connected = False
self.__ctl_connection.close()

def __connect_to_stream(self):
self._stream_connection = pymysql.connect(**self.__connection_settings)
cur = self._stream_connection.cursor()
cur.execute("SHOW MASTER STATUS")
(log_file, log_pos) = cur.fetchone()[:2]
Expand Down Expand Up @@ -68,17 +71,24 @@ def __connect_to_stream(self):
self.__connected = True

def fetchone(self):
if self.__connected == False:
self.__connect_to_stream()
while True:
pkt = self._stream_connection.read_packet()
if self.__connected == False:
self.__connect_to_stream()
pkt = None
try:
pkt = self._stream_connection.read_packet()
except pymysql.OperationalError as (code, message):
if code == 2013: #2013: Connection Lost
self.__connected = False
continue
if not pkt.is_ok_packet():
return None
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self.__ctl_connection)
if binlog_event.event_type == TABLE_MAP_EVENT:
self.table_map[binlog_event.event.table_id] = binlog_event.event
if self.__filter_event(binlog_event.event):
continue
self.__log_pos = binlog_event.log_pos
return binlog_event.event

def __filter_event(self, event):
Expand Down
9 changes: 6 additions & 3 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,25 @@ def test_connection_lost_event(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
query2 = "INSERT INTO test (data) VALUES('a')";
for i in range(0, 1000):
for i in range(0, 10000):
self.execute(query2)
self.execute("COMMIT")

#RotateEvent
self.stream.fetchone()

self.conn_control.kill(self.stream._stream_connection.thread_id())

#FormatDescription
self.stream.fetchone()

event = self.stream.fetchone()
self.assertIsInstance(event, QueryEvent)
self.assertEqual(event.query, query)

self.conn_control.kill(self.stream._stream_connection.thread_id())
for i in range(0, 1000):
event = self.stream.fetchone()
self.assertEqual(event.query, query2)
self.assertIsNotNone(event)

def test_filtering_events(self):
self.stream.close()
Expand Down

0 comments on commit 1f3a001

Please sign in to comment.