Skip to content

Commit

Permalink
Implement a test for connection lost
Browse files Browse the repository at this point in the history
  • Loading branch information
julien-duponchelle committed Nov 9, 2012
1 parent 31da057 commit 21190e9
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 9 deletions.
1 change: 1 addition & 0 deletions TODO
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
* MySQL 5.6 support: http://dev.mysql.com/doc/internals/en/row-based-replication.html#rows-event
* Support binlog_row_image=minimal or binlog_row_image=noblob
* Support connection lost
* Test log file change
22 changes: 13 additions & 9 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
only_events: Array of allowed events
'''
connection_settings['charset'] = 'utf8'
self.__stream_connection = pymysql.connect(**connection_settings)
self._stream_connection = pymysql.connect(**connection_settings)
ctl_connection_settings = copy.copy(connection_settings)
ctl_connection_settings['db'] = 'information_schema'
ctl_connection_settings['cursorclass'] = pymysql.cursors.DictCursor
Expand All @@ -28,16 +28,17 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
self.__blocking = blocking
self.__only_events = only_events
self.__server_id = server_id
self.__log_pos = None

#Store table meta informations
self.table_map = {}

def close(self):
self.__stream_connection.close()
self._stream_connection.close()
self.__ctl_connection.close()

def __connect_to_stream(self):
cur = self.__stream_connection.cursor()
cur = self._stream_connection.cursor()
cur.execute("SHOW MASTER STATUS")
(log_file, log_pos) = cur.fetchone()[:2]
cur.close()
Expand All @@ -50,24 +51,27 @@ def __connect_to_stream(self):
command = COM_BINLOG_DUMP
prelude = struct.pack('<i', len(log_file) + 11) \
+ int2byte(command)
if self.__resume_stream:
prelude += struct.pack('<I', log_pos)
if self.__log_pos is None:
if self.__resume_stream:
prelude += struct.pack('<I', log_pos)
else:
prelude += struct.pack('<I', 4)
else:
prelude += struct.pack('<I', 4)
prelude += struct.pack('<I', self.__log_pos)
if self.__blocking:
prelude += struct.pack('<h', 0)
else:
prelude += struct.pack('<h', 1)
prelude += struct.pack('<I', self.__server_id)
self.__stream_connection.wfile.write(prelude + log_file.encode())
self.__stream_connection.wfile.flush()
self._stream_connection.wfile.write(prelude + log_file.encode())
self._stream_connection.wfile.flush()
self.__connected = True

def fetchone(self):
if self.__connected == False:
self.__connect_to_stream()
while True:
pkt = self.__stream_connection.read_packet()
pkt = self._stream_connection.read_packet()
if not pkt.is_ok_packet():
return None
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self.__ctl_connection)
Expand Down
24 changes: 24 additions & 0 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,30 @@ def test_read_query_event(self):
self.assertIsInstance(event, QueryEvent)
self.assertEqual(event.query, query)

def test_connection_lost_event(self):
self.stream.close()
self.stream = BinLogStreamReader(connection_settings = self.database, blocking = True)

query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
query2 = "INSERT INTO test (data) VALUES('a')";
for i in range(0, 1000):
self.execute(query2)

#RotateEvent
self.stream.fetchone()

self.conn_control.kill(self.stream._stream_connection.thread_id())
#FormatDescription
self.stream.fetchone()

event = self.stream.fetchone()
self.assertIsInstance(event, QueryEvent)
self.assertEqual(event.query, query)
for i in range(0, 1000):
event = self.stream.fetchone()
self.assertEqual(event.query, query2)

def test_filtering_events(self):
self.stream.close()
self.stream = BinLogStreamReader(connection_settings = self.database, only_events = [QueryEvent])
Expand Down

0 comments on commit 21190e9

Please sign in to comment.