MySQL dump线程分析

2019-08-16  本文已影响0人  真之棒2016

author:sufei

版本:8.0.16


 本文主要分析MySQL复制中主库dump线程的执行过程。当然dump线程分为语句gtid的dump以及文件位置dump。这里主要分析基于gtid的dump线程。也就是从库发送COM_BINLOG_DUMP_GTID指令给主库,主库dump线程入口函数为com_binlog_dump_gtid

一、DUMP处理逻辑

dump线程逻辑

 以上是dump线程的处理逻辑,下面来逐步分析。

二、分步详解

2.1 检测阶段

  1. 权限检测,对于复制用户,需要有REPL_SLAVE_ACL权限,没有则直接拒绝
if (check_global_access(thd, REPL_SLAVE_ACL)) DBUG_RETURN(false);
  1. 获取从库相关信息,这里我们需要了解一下,dump指令格式

dump指令格式说明

 从库发送到主库dump命令字符串信息格式如下:

命令类型COM_BINLOG_DUMP_GTID (1字节)

flags (两字节)

serverid (4字节):从库server id

file_name_length (4字节) :binlog文件名长度

file_name (file_name_length 字节):binlog文件名

pos (8字节):复制开始位置点

data_size (4字节):表示后面从库executed gtid信息长度

data (data_size 字节):gtid信息

 从从库dump指令解析出上述信息。

  1. 如果该从库存在dump线程,则kill原来僵死dump线程

 从库相关dump线程标识为:对于5.5版本之前通过server id,之后的版本通过uuid来标识。具体查找代码如下:

//Find_zombie_dump_thread类运算符重载
virtual bool operator()(THD *thd) {
  THD *cur_thd = current_thd;
  //找到另一个dump线程
  if (thd != cur_thd && (thd->get_command() == COM_BINLOG_DUMP ||
                         thd->get_command() == COM_BINLOG_DUMP_GTID)) {
    String tmp_uuid;
    bool is_zombie_thread = false;
    get_slave_uuid(thd, &tmp_uuid); //获得该dump线程的从库uuid
    if (m_slave_uuid.length()) { 
      //自身m_slave_uuid与找到的dump线程tmp_uuid比较,如果相同,说明找到了原来的dump线程
      is_zombie_thread = 
            (tmp_uuid.length() &&
             !strncmp(m_slave_uuid.c_ptr(), tmp_uuid.c_ptr(), UUID_LENGTH));
    } else {
      // 5.5 之前通过serverid 找到相同的dump线程
      is_zombie_thread =
            ((thd->server_id == cur_thd->server_id) && !tmp_uuid.length());
    }
    if (is_zombie_thread) {
      mysql_mutex_lock(&thd->LOCK_thd_data);
      return true;
    }
  }
  return false;
}

 通过上述函数找到了僵死dump线程后,通过调用awake,关闭该线程。有关kill逻辑可参考《kill connection的实现简析》

  1. 构建binlog sender类,并进行相应检测。
if (!mysql_bin_log.is_open()) {
    set_fatal_error("Binary log is not open");
    DBUG_VOID_RETURN;
  }
if (DBUG_EVALUATE_IF("simulate_no_server_id", true, server_id == 0)) {
    set_fatal_error("Misconfigured master - master server_id is 0");
    DBUG_VOID_RETURN;
  }
  1. 从库GTID可行性检测,这一步主要包含如下两个检测操作:
if (!m_exclude_gtid->is_subset_for_sid(&gtid_executed_and_owned,
                                           gtid_state->get_server_sidno(),
                                           subset_sidno)) {
    errmsg = ER_THD(m_thd, ER_SLAVE_HAS_MORE_GTIDS_THAN_MASTER);
    global_sid_lock->unlock();
    set_fatal_error(errmsg);
    return 1;
}
if (!gtid_state->get_lost_gtids()->is_subset(m_exclude_gtid)) {
      Gtid_set gtid_missing(gtid_state->get_lost_gtids()->get_sid_map());
      gtid_missing.add_gtid_set(gtid_state->get_lost_gtids());
      gtid_missing.remove_gtid_set(m_exclude_gtid);

      String tmp_uuid;
      get_slave_uuid(m_thd, &tmp_uuid);
      char *missing_gtids = NULL;
      gtid_missing.to_string(&missing_gtids, false, NULL);
      LogErr(WARNING_LEVEL, ER_FOUND_MISSING_GTIDS, tmp_uuid.ptr(),
             missing_gtids);
      my_free(missing_gtids);

      errmsg = ER_THD(m_thd, ER_MASTER_HAS_PURGED_REQUIRED_GTIDS);
      global_sid_lock->unlock();
      set_fatal_error(errmsg);
      return 1;
}
  1. binlog传输观察类的传输开始位置挂钩,调用transmit_start函数,半同步插件通过该接口,实现监听新从库的加入,以及相关状态更新;

2.2 binlog文件处理逻辑

  1. 进入binlog传输,只有当遇到错误或者被其他线程kill才退出binlog dump逻辑
while (!has_error() && !m_thd->killed) {
//dump 逻辑
}
  1. 在处理每个binlog文件开始前,首先发送一个rotate event(该event在binlog中不存在的),其作用就是告知从库接下来传输的binlog文件名以及位置。

  2. 发送一个format_description event,由于binlog格式有多个版本,该事件主要是说明格式,辅助从库解析binlog,所有不管有没有指定start pos,都会先发送一个format_description event。

  3. 检测文件头是否包含previous_gtid_event,没有就返回,说明binlog文件有问题。

  4. 进入单个文件event传输流程,这是一个循环过程,不断的从当前binlog文件中读取event,并发送给从库。具体过程在11-22详解讲解

  5. 处理完一个binlog文件之后,获取binlog index文件锁,并打开文件

  6. 更加偏移量找到下一个待处理的binlog文件,这里需要注意:由于是在dump线程中,查找下一个binlog文件是通过该文件在index文件中的便宜量来操作,具体代码如下:

  my_b_seek(&index_file, linfo->index_file_offset);  //偏移量设置

  linfo->index_file_start_offset = linfo->index_file_offset;
  if ((length = my_b_gets(&index_file, fname, FN_REFLEN)) <= 1) { //读取文件名字
    error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
    goto err;
  }

  if (fname[0] != 0) {
    if (normalize_binlog_name(full_fname, fname, is_relay_log)) {
      error = LOG_INFO_EOF;
      goto err;
    }
    length = strlen(full_fname);
  }

  linfo->index_file_offset = my_b_tell(&index_file);  //更新偏移量到新记录以备下一次使用

 所以如果手动修改index文件,由于dump线程中的index偏移量没有被修改,从而造成复制问题,所有如果手动修改了index文件,一定要先停止复制,然后执行flush binary logs语句,在开启复制,刷新dump线程中偏移量。

  1. 找到下一个文件后,重新设置开始位置,又跳转到第7步,处理下一个binlog文件
start_pos = BIN_LOG_HEADER_SIZE;

2.3 event处理流程

 下面详细说明event发送过程。

  1. 得到文件end pos点

 这一步是event传输的关键步骤,不同情况的处理不同,下面分别讨论:

 上述逻辑的具体代码如下:

int Binlog_sender::get_binlog_end_pos(File_reader *reader, my_off_t *end_pos) {
  DBUG_ENTER("Binlog_sender::get_binlog_end_pos()");
  my_off_t read_pos = reader->position();  //获得当前dump读取位置

  do {
    // 获取binlog文件的结束位置,注意:该位置可能是动态的
    *end_pos = mysql_bin_log.get_binlog_end_pos();

    //如果当前binlog为非活跃状态,则直接设置end_pos 为 0 ,并返回进入event发送逻辑
    if (unlikely(!mysql_bin_log.is_active(m_linfo.log_file_name))) {
      *end_pos = 0;
      DBUG_RETURN(0);
    }

    DBUG_PRINT("info", ("Reading file %s, seek pos %llu, end_pos is %llu",
                        m_linfo.log_file_name, read_pos, *end_pos));
    DBUG_PRINT("info", ("Active file is %s", mysql_bin_log.get_log_fname()));
 //此时binlog为活跃状态,但是dump读取位置小于end pos,说明有新事物写入binlog,返回进入event发送逻辑
    if (read_pos < *end_pos) DBUG_RETURN(0);

    /* Some data may be in net buffer, it should be flushed before waiting */
    if (!m_wait_new_events || flush_net()) DBUG_RETURN(1);
    //执行到这里,说明没有新的binlog,进入等待逻辑
    if (unlikely(wait_new_events(read_pos))) DBUG_RETURN(1);
  } while (unlikely(!m_thd->killed));

  DBUG_RETURN(1);
}

等待逻辑

  1. 进入等待逻辑后,修改dump线程状态
m_thd->ENTER_COND(mysql_bin_log.get_log_cond(),
                    mysql_bin_log.get_binlog_end_pos_lock(),
                    &stage_master_has_sent_all_binlog_to_slave, &old_stage);
  1. 在等待过程中,根据从库心跳周期设置,会定期发送心跳包,实现如下:

等待逻辑

if (!timeout)  //从库心跳周期时间
    mysql_cond_wait(&update_cond, &LOCK_binlog_end_pos);
  else
    //设置了超时的条件等待
    ret = mysql_cond_timedwait(&update_cond, &LOCK_binlog_end_pos,
                               const_cast<struct timespec *>(timeout));

等待返回处理逻辑

do {
    set_timespec_nsec(&ts, m_heartbeat_period);
    ret = mysql_bin_log.wait_for_update(&ts); //等待逻辑
    if (!is_timeout(ret)) break; //如果不是超时返回,说明被唤醒,有新binlog,退出
    if (send_heartbeat_event(log_pos)) return 1; //超时返回,发送心跳event
} while (!m_thd->killed);

event发送逻辑

  1. 读取一个event
if (unlikely(read_event(reader, &event_ptr, &event_len))) DBUG_RETURN(1);
  1. binlog传输观察类的event传输开始位置挂钩,调用注册的before_send_event函数
  2. 跳过从库执行的gtid,这里的跳过是跳过该gtid下一组event
/*
参数:
    event_ptr 指向读取的event
    in_exclude_group 表明该event是否在exclude的gtid事务内
返回值:(调用是又赋值给参数in_exclude_group)
    ture 说明在exclude gtid内,应该过滤掉
该函数的处理逻辑就是:事务所有event循环进入该函数,通过in_exclude_group标识event是否应该被过滤掉,只有通过gtid event才会重置in_exclude_group。
*/
inline bool Binlog_sender::skip_event(const uchar *event_ptr,
                                      bool in_exclude_group) {
  DBUG_ENTER("Binlog_sender::skip_event");

  uint8 event_type = (Log_event_type)event_ptr[LOG_EVENT_OFFSET];
  switch (event_type) { //GTID EVENT需要判断以更新in_exclude_group
    case binary_log::GTID_LOG_EVENT: { 
      Format_description_log_event fd_ev;
      fd_ev.common_footer->checksum_alg = m_event_checksum_alg;
      Gtid_log_event gtid_ev(reinterpret_cast<const char *>(event_ptr), &fd_ev);
      Gtid gtid;
      gtid.sidno = gtid_ev.get_sidno(m_exclude_gtid->get_sid_map());
      gtid.gno = gtid_ev.get_gno();
      DBUG_RETURN(m_exclude_gtid->contains_gtid(gtid));
    }
    case binary_log::ROTATE_EVENT:  //rotate event 必然不属于任何事务,所以放回false
      DBUG_RETURN(false);
  }
  DBUG_RETURN(in_exclude_group);  //其他event是否过滤根据参数in_exclude_group
}
  1. 可能由于跳过的event很多,造成在心跳周期内没有event发送给从库,从库可能认为主库宕机,为避免这种情况,在跳过event过程中,也会进行周期时间检查,定期发送心跳事件。
  2. 发送event,通过了过滤检查,即可发送event到从库
if (unlikely(send_packet())) DBUG_RETURN(1);
  1. binlog传输观察类的event传输结束位置挂钩,调用注册的after_send_event函数
上一篇 下一篇

猜你喜欢

热点阅读