mysqlsh:set_primary_instance安全切换

2020-07-23  本文已影响0人  真之棒2016

author:sufei

版本:mysql shell 8.0.19


本文与接下来的《force_primary_instance强制切换实现逻辑》主要讲解使用mysqlsh来管理异步复制集群时,进行切换的执行逻辑,以便给后续工作分析中一个参考。

本文主要分析mysqlsh实现安全切换逻辑,即set_primary_instance的内部实现逻辑。

一、set_primary_instance处理逻辑

​ 执行整个安全切换的入口函数为Replica_set_impl::set_primary_instance,

void Replica_set_impl::set_primary_instance(const std::string &instance_def,
                                            uint32_t timeout, bool dry_run)

其中参数:

instance_def 表示切换到的新主;

timeout 表示切换时从库同步超时时间

dry_run 如果为true则表示并不进行真正的切换操作,只进行相关检测

下面是其主要处理逻辑框图

set_primary_instance逻辑

二、步骤详解

  1. 获取并连接到主库,并在函数结束释放
acquire_primary();
auto finally = shcore::on_leave_scope([this]() { release_primary(); });

连接主库主要是有以下两个原因:

当然,如果此时连接主库异常就会出错,从而<font color=red>确保了set_primary_instance必须确保主库存活</font>。

  1. 获取集群信息
topology::Server_global_topology *srv_topology = nullptr;
auto topology = setup_topology_manager(&srv_topology);

其具体过程如下:

mysql> SELECT * FROM (
    -> SELECT cluster_type, primary_mode, cluster_id, cluster_name,
    ->       description, NULL as group_name, async_topology_type
    -> FROM mysql_innodb_cluster_metadata.v2_ar_clusters
    -> UNION ALL
    -> SELECT cluster_type, primary_mode, cluster_id, cluster_name,
    ->       description, group_name, NULL as async_topology_type
    -> FROM mysql_innodb_cluster_metadata.v2_gr_clusters
    -> ) as c\G
*************************** 1. row ***************************
       cluster_type: ar
       primary_mode: pm
         cluster_id: f28d671b-c67d-11ea-a41d-74a063f38881
       cluster_name: set123
        description: Default ReplicaSet
         group_name: NULL
async_topology_type: SINGLE-PRIMARY-TREE
1 row in set (0.00 sec)
mysql> SELECT i.instance_id, i.cluster_id, am.master_instance_id, am.master_member_id, am.member_role, am.view_id, i.label, i.mysql_server_uuid, i.address, i.endpoint, i.xendpoint
    -> FROM mysql_innodb_cluster_metadata.v2_instances i LEFT JOIN mysql_innodb_cluster_metadata.v2_ar_members am
    -> ON am.instance_id = i.instance_id
    -> WHERE i.cluster_id = 'f28d671b-c67d-11ea-a41d-74a063f38881'\G  
    # cluster_id为上面查到的集群id
*************************** 1. row ***************************
       instance_id: 1
        cluster_id: f28d671b-c67d-11ea-a41d-74a063f38881
master_instance_id: NULL
  master_member_id: NULL
       member_role: PRIMARY
           view_id: 9
             label: 10.142.90.80:8035
 mysql_server_uuid: e1b6bb96-c1aa-11ea-86a2-74a063f38881
           address: 10.142.90.80:8018
          endpoint: 10.142.90.80:8018
         xendpoint: NULL
*************************** 2. row ***************************
       instance_id: 2
        cluster_id: f28d671b-c67d-11ea-a41d-74a063f38881
master_instance_id: 1
  master_member_id: e1b6bb96-c1aa-11ea-86a2-74a063f38881
       member_role: SECONDARY
           view_id: 9
             label: 10.142.90.82:8035
 mysql_server_uuid: e24d525d-c1aa-11ea-a3e3-74a063f3899f
           address: 10.142.90.82:8018
          endpoint: 10.142.90.82:8018
         xendpoint: NULL
2 rows in set (0.00 sec)
topo->check_servers(deep);
  1. 检测需提升的从库与主库是否一致,一致则无需切换
/* 
   连接指定从库,如果连接不成功,则抛出异常Could not connect to target instance
   并检查指定从库是否在集群中,如果不在则抛出异常SHERR_DBA_CLUSTER_METADATA_MISSING
*/
const topology::Server *promoted =
      check_target_member(srv_topology, instance_def);
/*
   获取集群中的主库,并且检测主库状态为非invalidated,
   不然则抛出The global topology has no valid PRIMARY defined
*/
const topology::Server *demoted = dynamic_cast<const topology::Server *>(
      srv_topology->get_primary_master_node());

if (promoted == demoted) {
    console->print_info("Target instance " + promoted->label +
                        " is already the PRIMARY.");
    return;
}
  1. 检测待提升的从库状态,如果状态不为online,则抛出相应的异常
validate_node_status(promoted);
  1. 连接到集群中所有集群,如果存在不可连接的机器,则抛出移除;并且在可先连接的机器中,找到旧主和新主
// 其中instances保存所有可连接的机器,unreachable保存所有不可连接的机器
std::list<Scoped_instance> instances(
      connect_all_members(0, false, &unreachable));
if (!unreachable.empty()) {
    throw shcore::Exception("One or more instances are unreachable",
                            SHERR_DBA_ASYNC_MEMBER_UNREACHABLE);
}
Scoped_instance master;  // 旧主
Scoped_instance new_master;  // 新主
{
    auto it = std::find_if(instances.begin(), instances.end(),
                           [promoted](const Scoped_instance &i) {
                             return i->get_uuid() ==
                                    promoted->get_primary_member()->uuid;
                           });
    if (it == instances.end())  // 新主未找到则报错
      throw shcore::Exception::runtime_error(promoted->label +
                                             " cannot be promoted");

    new_master = *it;

    it = std::find_if(instances.begin(), instances.end(),
                      [demoted](const Scoped_instance &i) {
                        return i->get_uuid() ==
                               demoted->get_primary_member()->uuid;
                      });
    if (it == instances.end())    // 旧主未找到报内部错误
      throw std::logic_error("Internal error: couldn't find primary");
    master = *it;
}
  1. 待提升的实例第一次预同步主库

​ 这里主要:

sync_transactions(*new_master, k_async_cluster_channel_name, timeout);

具体逻辑如下:

// SELECT @@GLOBAL.GTID_EXECUTED获取主库执行的gtid
std::string gtid_set =
      mysqlshdk::mysql::get_executed_gtid_set(*m_primary_master);
// SELECT WAIT_FOR_EXECUTED_GTID_SET等待从库完成相应gtid完成
bool sync_res = wait_for_gtid_set_safe(target_instance, gtid_set,
                                         channel_name, timeout, true);
  1. 更新元数据

​ 这里第一步是更新旧主连接新主复制的密码,其账号为"mysql_innodb_rs_"+ get_sysvar_int("server_id"),密码为随机密码。

Async_replication_options ar_options;
// 内部通过SET PASSWORD FOR ?@? = ? 进行密码重新设置
ar_options.repl_credentials = refresh_replication_user(
      master.get(), k_async_cluster_user_name, dry_run);

然后mysql_innodb_cluster_metadata.async_cluster_members表中的元数据,更新主要步骤如下:

// 插入新主信息
execute_sqlf(
            "INSERT INTO"
            " mysql_innodb_cluster_metadata.async_cluster_members"
            " (cluster_id, view_id, instance_id, master_instance_id, "
            "   primary_master, attributes)"
            " SELECT cluster_id, ?, instance_id, NULL, 1, attributes"
            " FROM mysql_innodb_cluster_metadata.async_cluster_members"
            " WHERE cluster_id = ? AND view_id = ? AND instance_id = ?",
            aclvid, cluster_id, last_aclvid, new_primary_id);
// 插入旧主信息
execute_sqlf(
            "INSERT INTO"
            " mysql_innodb_cluster_metadata.async_cluster_members"
            " (cluster_id, view_id, instance_id, master_instance_id, "
            "   primary_master, attributes)"
            " SELECT cluster_id, ?, instance_id, ?, 0, attributes"
            " FROM mysql_innodb_cluster_metadata.async_cluster_members"
            " WHERE cluster_id = ? AND view_id = ? AND instance_id = ?",
            aclvid, new_primary_id, cluster_id, last_aclvid, old_primary_id);
// 其他从库更新到新主
execute_sqlf(
            "INSERT INTO "
            "mysql_innodb_cluster_metadata.async_cluster_members"
            " (cluster_id, view_id, instance_id, master_instance_id, "
            "   primary_master, attributes)"
            " SELECT cluster_id, ?, instance_id, ?,"
            "     IF(instance_id = ?, 1, 0), attributes"
            " FROM mysql_innodb_cluster_metadata.async_cluster_members"
            " WHERE cluster_id = ? AND view_id = ?"
            "   AND instance_id NOT IN (?, ?)",
            aclvid, new_primary_id, new_primary_id, cluster_id, last_aclvid,
            new_primary_id, old_primary_id);
mysql_innodb_cluster_metadata.async_cluster_members
  1. 同步所有从库,并锁定所有数据库
global_locks.acquire(lock_instances, demoted->get_primary_member()->uuid,
                         timeout, dry_run);

到这一步才真正的锁住这个服务,不能进行更新操作,具体逻辑如下:

// 1、并行预同步所有从库,同样是减少后面锁主库的时间
std::string master_gtid_set =
      mysqlshdk::mysql::get_executed_gtid_set(*m_master);
std::list<shcore::Dictionary_t> errors = execute_in_parallel(……)

// 2、锁定主库,确保没有新的事务写入,以及同时只能一个进程进行切换操作
m_master->execute("FLUSH TABLES WITH READ LOCK");
m_master->execute("SET global super_read_only=1");
m_master->execute("FLUSH BINARY LOGS");

// 3、串行同步所有从库
  for (const auto &inst : instances) {
    if (m_master->get_uuid() != inst->get_uuid()) {
      try {
        // 同步等待
        wait_pending_master_transactions(master_gtid_set, inst.get(),
                                         gtid_sync_timeout);
      } catch (const shcore::Exception &e) {
        console->print_error(shcore::str_format("%s: GTID sync failed: %s",
                                                inst->descr().c_str(),
                                                e.format().c_str()));
        throw;
      }
      try {
        // 锁定从库
        inst->execute("FLUSH TABLES WITH READ LOCK");
      } catch (const shcore::Exception &e) {
        console->print_error(
            inst->descr() +
            ": FLUSH TABLES WITH READ LOCK failed: " + e.format());
        throw;
      }
    }
  }

  1. 底层执行主从切换逻辑

入口函数为:Replica_set_impl::do_set_primary_instance。

其接下来的所有的操作都预先设置好undo操作插入到回滚队列中,以下任何一部出错都没进行回滚,确保集群状态一致。

instance->set_sysvar("SUPER_READ_ONLY", true,
                         mysqlshdk::mysql::Var_qualifier::PERSIST);
stop_channel(promoted, k_channel_name, dry_run);

这里使用的用户名为:

setup_slave(promoted, current_primary, k_channel_name, repl_options, dry_run);
instance->set_sysvar("SUPER_READ_ONLY", false,
                         mysqlshdk::mysql::Var_qualifier::PERSIST);
// Set SUPER_READ_ONLY=1 will also set READ_ONLY
instance->set_sysvar("READ_ONLY", false,
                         mysqlshdk::mysql::Var_qualifier::PERSIST);
// 确保super read only开启
if (!dry_run) fence_instance(slave.get());

// 切换到新主
change_master_instance(slave_ptr, primary, k_channel_name, dry_run);
// 内部执行 RESET SLAVE ALL
reset_channel(new_master, true, dry_run);
上一篇下一篇

猜你喜欢

热点阅读