Skip to content

Commit

Permalink
MDEV-35465 Async replication stops working on Galera async replica no…
Browse files Browse the repository at this point in the history
…de when parallel replication is enabled

Parallel slave failed to retry in retry_event_group() with error

    WSREP: Parallel slave worker failed at wsrep_before_command() hook

Fix wsrep transaction cleanup/restart in retry_event_group() to properly
clean up previous transaction by calling wsrep_after_statement().
Also move call to reset error after call to wsrep_after_statement()
to make sure that it remains effective.

Add a MTR test galera_as_slave_parallel_retry to reproduce the error
when the fix is not present.

Other issues which were detected when testing with sysbench:

Check if parallel slave is killed for retry before waiting for prior
commits in THD::wsrep_parallel_slave_wait_for_prior_commit(). This
is required with slave-parallel-mode=optimistic to avoid deadlock
when a slave later in commit order manages to reach prepare phase
before a lock conflict is detected.

Suppress wsrep applier specific warning for slave threads.
  • Loading branch information
temeo committed Nov 22, 2024
1 parent cf2d49d commit 6fefc7c
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 14 deletions.
22 changes: 22 additions & 0 deletions mysql-test/suite/galera/r/galera_as_slave_parallel_retry.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
connection node_2;
connection node_1;
connect master, 127.0.0.1, root, , test, $NODE_MYPORT_3;
connect node_1_ctrl, 127.0.0.1, root, , test, $NODE_MYPORT_1;
connection node_1;
START SLAVE;
connection master;
CREATE TABLE t1 (f1 INT PRIMARY KEY) ENGINE=InnoDB;
connection node_1;
SET GLOBAL debug_dbug = '+d,rpl_parallel_simulate_temp_err_xid,sync.wsrep_retry_event_group';
connection master;
INSERT INTO t1 VALUES (1);
connection node_1_ctrl;
SET debug_sync = 'now WAIT_FOR sync.wsrep_retry_event_group_reached';
SET GLOBAL debug_dbug = '';
SET debug_sync = 'now SIGNAL signal.wsrep_retry_event_group';
connection node_1;
SET debug_sync = 'RESET';
connection master;
DROP TABLE t1;
connection node_1;
STOP SLAVE;
10 changes: 10 additions & 0 deletions mysql-test/suite/galera/t/galera_as_slave_parallel_retry.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
!include ../galera_2nodes_as_slave.cnf

[mysqld]
log-bin=mysqld-bin
log-slave-updates
binlog-format=ROW

[mysqld.1]
slave-parallel-threads=2
slave-parallel-mode=optimistic
52 changes: 52 additions & 0 deletions mysql-test/suite/galera/t/galera_as_slave_parallel_retry.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# MDEV-35465 Async replication stops working on Galera async replica node
# when parallel replication is enabled

--source include/have_innodb.inc
--source include/have_log_bin.inc
--source include/galera_cluster.inc
--source include/have_debug_sync.inc

# Node 3 is not a Galera node, use it as a master
--connect master, 127.0.0.1, root, , test, $NODE_MYPORT_3
--connect node_1_ctrl, 127.0.0.1, root, , test, $NODE_MYPORT_1

--connection node_1
--disable_query_log
--eval CHANGE MASTER TO MASTER_HOST='127.0.0.1', MASTER_USER='root', MASTER_PORT=$NODE_MYPORT_3;
--enable_query_log
START SLAVE;

--connection master
CREATE TABLE t1 (f1 INT PRIMARY KEY) ENGINE=InnoDB;

--connection node_1
--let $wait_condition = SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'
--source include/wait_condition.inc

#
--let debug_dbug_orig = `SELECT @@GLOBAL.debug_dbug`
SET GLOBAL debug_dbug = '+d,rpl_parallel_simulate_temp_err_xid,sync.wsrep_retry_event_group';

--connection master
INSERT INTO t1 VALUES (1);

--connection node_1_ctrl
SET debug_sync = 'now WAIT_FOR sync.wsrep_retry_event_group_reached';
--eval SET GLOBAL debug_dbug = '$debug_dbug_orig'
SET debug_sync = 'now SIGNAL signal.wsrep_retry_event_group';

--let $wait_condition = SELECT COUNT(*) = 1 FROM t1;
--source include/wait_condition.inc


--connection node_1
SET debug_sync = 'RESET';

--connection master
DROP TABLE t1;

--connection node_1
--let $wait_condition = SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1'
--source include/wait_condition.inc

STOP SLAVE;
2 changes: 1 addition & 1 deletion sql/log_event_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5511,7 +5511,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (unlikely(open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0)))
{
#ifdef WITH_WSREP
if (WSREP(thd))
if (WSREP(thd) && !thd->slave_thread)
{
WSREP_WARN("BF applier thread=%lu failed to open_and_lock_tables for "
"%s, fatal: %d "
Expand Down
30 changes: 20 additions & 10 deletions sql/rpl_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
asynchronously, we need to be sure they will be completed before starting a
new transaction. Otherwise the new transaction might suffer a spurious kill.
*/
static void
void
wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi)
{
PSI_stage_info old_stage;
Expand Down Expand Up @@ -831,8 +831,13 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
err= 0;
errmsg= NULL;
#ifdef WITH_WSREP
thd->wsrep_cs().reset_error();
WSREP_DEBUG("retrying async replication event");
DBUG_EXECUTE_IF("sync.wsrep_retry_event_group", {
WSREP_INFO("sync.wsrep_retry_event_group");
const char act[]= "now "
"SIGNAL sync.wsrep_retry_event_group_reached "
"WAIT_FOR signal.wsrep_retry_event_group";
debug_sync_set_action(thd, STRING_WITH_LEN(act));
};);
#endif /* WITH_WSREP */

/*
Expand Down Expand Up @@ -981,15 +986,20 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
*/
thd->reset_killed();
#ifdef WITH_WSREP
if (wsrep_before_command(thd))
if (WSREP(thd))
{
WSREP_WARN("Parallel slave worker failed at wsrep_before_command() hook");
err= 1;
goto err;
/* Exec after statement hook to make sure that the failed transaction
* gets cleared and reset error state. */
if (wsrep_after_statement(thd))
{
WSREP_WARN("Parallel slave worker failed at wsrep_after_statement() hook");
err= 1;
goto err;
}
thd->wsrep_cs().reset_error();
wsrep_start_trx_if_not_started(thd);
WSREP_DEBUG("parallel slave retry, after trx start");
}
wsrep_start_trx_if_not_started(thd);
WSREP_DEBUG("parallel slave retry, after trx start");

#endif /* WITH_WSREP */
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
Expand Down
1 change: 1 addition & 0 deletions sql/rpl_parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ struct rpl_parallel {
extern struct rpl_parallel_thread_pool global_rpl_thread_pool;


extern void wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi);
extern int rpl_parallel_resize_pool_if_no_slaves(void);
extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
Expand Down
11 changes: 8 additions & 3 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3426,11 +3426,16 @@ enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit)

bool THD::wsrep_parallel_slave_wait_for_prior_commit()
{
if (rgi_slave && rgi_slave->is_parallel_exec && wait_for_prior_commit())
if (rgi_slave && rgi_slave->is_parallel_exec)
{
return 1;
wait_for_pending_deadlock_kill(this, rgi_slave);
if (rgi_slave->killed_for_retry) {
my_error(ER_LOCK_DEADLOCK, MYF(0));
return true;
}
return wait_for_prior_commit();
}
return 0;
return false;
}

/***** callbacks for wsrep service ************/
Expand Down

0 comments on commit 6fefc7c

Please sign in to comment.