mysql源码分析——THD数据结构

mysql源码分析——THD数据结构,第1张

mysql源码分析——THD数据结构 一、什么是THD

是非常非常不想说这个数据结构的,按照设计原则和设计思想,这个类就是个让人无语的。可存在,就有它的道理,绕是绕不过去的。先看一下它的继承结构:

这个类在前面提到过,它是对线程描述的一个数据结构。MySql里用到线程的地方不少,那么理所当然的这个THD类用到的地方也不少。在前面分析网络 *** 作的时候儿就看到这个类的身影,讲真,是不想分析这个类的,不是说这个类多难,是这个类承载的东西太多。本身线程自己的定义就承载了很多相关的信息,再在此基础上进行抽象封装的THD类,一定是更多,但是不分析,又绕不过去,就勉强为之一下。

二、数据结构定义和分析

在前面的"线程池和线程"中大概分析过这个类,今天再次深入的分析一下。
这个类给人的第一印象是太大了,成员太多了。从811行的定义到4374行的一个巨型类。一般在编程的建议中,一个类的大小一般建议不超过1000行,不过,看国外的开源代码,确实一个类几千行的有不少,所以请明白建议就是建议,干起活儿来,听不听就是两回事儿了。


class THD : public MDL_context_owner,
            public Query_arena,
            public Open_tables_state {
 private:
  inline bool is_stmt_prepare() const {
    assert(0);
    return Query_arena::is_stmt_prepare();
  }

  inline bool is_stmt_prepare_or_first_sp_execute() const {
    assert(0);
    return Query_arena::is_stmt_prepare_or_first_sp_execute();
  }

  inline bool is_stmt_prepare_or_first_stmt_execute() const {
    assert(0);
    return Query_arena::is_stmt_prepare_or_first_stmt_execute();
  }

  inline bool is_regular() const {
    assert(0);
    return Query_arena::is_regular();
  }
  ......

  public:
  Transactional_ddl_context m_transactional_ddl{this};
  bool m_inside_system_variable_global_update;

  public:
  PS_PARAM * bind_parameter_values;

  unsigned long bind_parameter_values_count;
}

在这个类的开头的注释上说明:每个客户端连接到服务器时都会创建一个单独的线程类(THD)对象为做对线程<=>连接(这二者相当于形成一个映射)的整体的描述。这里对几个重要的分类进行简要说明一下:
1、继承关系上,三个类非常明显:
MDL_context_owner:抽象接口代码,用来实现MDL模块和THD以及服务端代码分离。主要有元数据锁的相关接口,含控制和信息通知等。
Query_arena:看它的元素定义Item * m_item_list,它内部一定维护着一个很大的列表 而Item又继承自Parse_tree_node,所以它内部一定维护着Query语句(存储过程)的相关解析后的元素,也就是抽象语法树的结节。
Open_tables_state:该类保存线程有关已打开和锁定的表的状态,同时维护了表的信息和和锁信息。其提供了压入和d出两个状态接口函数来 *** 作这两类状态。
2、其内部有几个嵌套类:
对嵌套类不了解的可以看一下《c++编程思想》,简单理解就是一个类内部的类,用于控制访问权限。
Attachable_trx:表示只读可附加事务的类,封装了解如何备份当前事务的状态,启动SE中的只读可附加事务,完成它,然后还原返回原始事务的状态。还用作的基类可读写可附加事务实现。
Attachable_trx_rw:Attachable_trx类派生的类允许在可附加事务中进行更新。
Query_plan:先用LOCK_query_plan锁定再进行EXPLAINable命令的查询计划。
Transaction_state:Attachable_trx的实用程序结构。
3、一般来说,线程必然离不开锁,所以此类一定有不少锁相关的类、函数和接口等。比如MDL_context、User_level_lock、Global_read_lock等。
4、日志处理相关,如Gtid、Log、Binlog。
5、相关上下文处理的类和接口如sp_rcontext、Resource_group_ctx、Security_context等。
6、事务相关如Transaction_ctx等上面提到的内部类。
7、相关数据的处理和分析类和接口,如Item_change_record、sql_digest_state:Current、sp_cache等。
8、相关其它数据结构及协议和语法语义的接口如通信的VIO、语法分析LEX、内部状态机Parser_state以及提到的内部类Query_plan。另外还有相关的协议处理如传统协议(Protocal_classic),文本协议(Protocal_text),二进制协议(Protocal_binary)。
9、其它辅助的相关对象如插件相关信息等。

官方文档的定义在:
https://dev.mysql.com/doc/dev/mysql-server/latest/classTHD.html

三、应用

1、初始化
在前面分析过,初始化是在事件循环中展开的:

void connection_event_loop() {
  Connection_handler_manager * mgr =
      Connection_handler_manager::get_instance();
  while (!connection_events_loop_aborted()) {
    Channel_info * channel_info = m_listener->listen_for_connection_event();
    if (channel_info != nullptr) mgr->process_new_connection(channel_info);
  }
}

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  DBUG_TRACE;

  // Simulate thread creation for test case before we check thread cache
  DBUG_EXECUTE_IF("fail_thread_create", error = 1; goto handle_error;);

  if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;

  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
#ifndef NDEBUG
handle_error:
#endif  // !NDEBUG

  if (error) {
    connection_errors_internal++;
    if (!create_thd_err_log_throttle.log())
      LogErr(ERROR_LEVEL, ER_CONN_PER_THREAD_NO_THREAD, error);
    channel_info->send_error_and_close_channel(ER_CANT_CREATE_THREAD, error,
                                               true);
    Connection_handler_manager::dec_connection_count();
    return true;
  }

  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  return false;
}

uint Per_thread_connection_handler::get_max_threads() const {
  return max_connections;
}

它想出了线程THD的初始化:

extern "C" {
static void * handle_connection(void * arg) {
 Global_THD_manager * thd_manager = Global_THD_manager::get_instance();
 Connection_handler_manager * handler_manager =
     Connection_handler_manager::get_instance();
 Channel_info * channel_info = static_cast(arg);
 bool pthread_reused MY_ATTRIBUTE((unused)) = false;

 if (my_thread_init()) {
   connection_errors_internal++;
   channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false);
   handler_manager->inc_aborted_connects();
   Connection_handler_manager::dec_connection_count();
   delete channel_info;
   my_thread_exit(nullptr);
   return nullptr;
 }

 for (;;) {
   //初始化
   THD * thd = init_new_thd(channel_info);
   if (thd == nullptr) {
     connection_errors_internal++;
     handler_manager->inc_aborted_connects();
     Connection_handler_manager::dec_connection_count();
     break;  // We are out of resources, no sense in continuing.
   }

#ifdef HAVE_PSI_THREAD_INTERFACE
   if (pthread_reused) {

     PSI_thread * psi = PSI_THREAD_CALL(new_thread)(key_thread_one_connection,
                                                   thd, thd->thread_id());
     PSI_THREAD_CALL(set_thread_os_id)(psi);
     PSI_THREAD_CALL(set_thread)(psi);
   }
#endif

#ifdef HAVE_PSI_THREAD_INTERFACE

   PSI_thread * psi = PSI_THREAD_CALL(get_thread)();

   thd->set_psi(psi);
#endif
   mysql_thread_set_psi_id(thd->thread_id());
   mysql_thread_set_psi_THD(thd);
   MYSQL_SOCKET socket = thd->get_protocol_classic()->get_vio()->mysql_socket;
   mysql_socket_set_thread_owner(socket);
   thd_manager->add_thd(thd);

   if (thd_prepare_connection(thd))
     handler_manager->inc_aborted_connects();
   else {
     while (thd_connection_alive(thd)) {
       if (do_command(thd)) break;
     }
     end_connection(thd);
   }
   close_connection(thd, 0, false, false);

   thd->get_stmt_da()->reset_diagnostics_area();
   thd->release_resources();

   // Clean up errors now, before possibly waiting for a new connection.
#if OPENSSL_VERSION_NUMBER < 0x10100000L
   ERR_remove_thread_state(0);
#endif  
   thd_manager->remove_thd(thd);
   Connection_handler_manager::dec_connection_count();

#ifdef HAVE_PSI_THREAD_INTERFACE

   thd->set_psi(nullptr);
   PSI_THREAD_CALL(delete_current_thread)();
#endif  

   delete thd;

   // Server is shutting down so end the pthread.
   if (connection_events_loop_aborted()) break;

   channel_info = Per_thread_connection_handler::block_until_new_connection();
   if (channel_info == nullptr) break;
   pthread_reused = true;
   if (connection_events_loop_aborted()) {
     // Close the channel and exit as server is undergoing shutdown.
     channel_info->send_error_and_close_channel(ER_SERVER_SHUTDOWN, 0, false);
     delete channel_info;
     channel_info = nullptr;
     Connection_handler_manager::dec_connection_count();
     break;
   }
 }

 my_thread_end();
 my_thread_exit(nullptr);
 return nullptr;
}
}

它又会调用初始化函数:

static THD *init_new_thd(Channel_info *channel_info) {
  THD * thd = channel_info->create_thd();
  if (thd == nullptr) {
    channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false);
    delete channel_info;
    return nullptr;
  }

  thd->set_new_thread_id();

  if (channel_info->get_prior_thr_create_utime() != 0) {

    ulonglong launch_time =
        thd->start_utime - channel_info->get_prior_thr_create_utime();
    if (launch_time >= slow_launch_time * 1000000ULL)
      Per_thread_connection_handler::slow_launch_threads++;
  }
  delete channel_info;


  thd_set_thread_stack(thd, (char *)&thd);
  thd->store_globals();

  return thd;
}
THD *Channel_info::create_thd() {
  DBUG_EXECUTE_IF("simulate_resource_failure", return nullptr;);

  Vio * vio_tmp = create_and_init_vio();
  if (vio_tmp == nullptr) return nullptr;

  THD * thd = new (std::nothrow) THD;
  if (thd == nullptr) {
    vio_delete(vio_tmp);
    return nullptr;
  }

  thd->get_protocol_classic()->init_net(vio_tmp);

  return thd;
}

2、应用
从PSI_THREAD_CALL这个宏可以看到下面的函数调用:

//pfs_thread_provider.h
#ifdef __cplusplus
class THD;
#endif 



#define PSI_THREAD_CALL(M) pfs_##M##_vc

void pfs_register_thread_vc(const char *category, PSI_thread_info *info,
                            int count);

int pfs_spawn_thread_vc(PSI_thread_key key, my_thread_handle *thread,
                        const my_thread_attr_t *attr,
                        void *(*start_routine)(void *), void *arg);

PSI_thread *pfs_new_thread_vc(PSI_thread_key key, const void *identity,
                              ulonglong processlist_id);

void pfs_set_thread_id_vc(PSI_thread *thread, ulonglong processlist_id);

ulonglong pfs_get_current_thread_internal_id_vc();

ulonglong pfs_get_thread_internal_id_vc(PSI_thread *thread);

PSI_thread *pfs_get_thread_by_id_vc(ulonglong processlist_id);

#ifdef __cplusplus
void pfs_set_thread_THD_vc(PSI_thread *thread, THD *thd);
#endif 

void pfs_set_thread_os_id_vc(PSI_thread *thread);

PSI_thread *pfs_get_thread_vc(void);

void pfs_set_thread_user_vc(const char *user, int user_len);

void pfs_set_thread_account_vc(const char *user, int user_len, const char *host,
                               int host_len);

void pfs_set_thread_db_vc(const char *db, int db_len);

void pfs_set_thread_command_vc(int command);

void pfs_set_thread_start_time_vc(time_t start_time);

void pfs_set_thread_state_vc(const char *state);

void pfs_set_connection_type_vc(opaque_vio_type conn_type);

void pfs_set_thread_info_vc(const char *info, uint info_len);

int pfs_set_thread_resource_group_vc(const char *group_name, int group_name_len,
                                     void *user_data);

int pfs_set_thread_resource_group_by_id_vc(PSI_thread *thread,
                                           ulonglong thread_id,
                                           const char *group_name,
                                           int group_name_len, void *user_data);

void pfs_set_thread_vc(PSI_thread *thread);

void pfs_set_thread_peer_port_vc(PSI_thread *thread, uint port);

void pfs_aggregate_thread_status_vc(PSI_thread *thread);

void pfs_delete_current_thread_vc(void);

void pfs_delete_thread_vc(PSI_thread *thread);

int pfs_set_thread_connect_attrs_vc(const char *buffer, uint length,
                                    const void *from_cs);

void pfs_get_current_thread_event_id_vc(ulonglong *internal_thread_id,
                                        ulonglong *event_id);

void pfs_get_thread_event_id_vc(PSI_thread *thread,
                                ulonglong *internal_thread_id,
                                ulonglong *event_id);

int pfs_get_thread_system_attrs_vc(PSI_thread_attrs *thread_attrs);

int pfs_get_thread_system_attrs_by_id_vc(PSI_thread *thread,
                                         ulonglong thread_id,
                                         PSI_thread_attrs *thread_attrs);

int pfs_register_notification_vc(const PSI_notification *callbacks,
                                 bool with_ref_count);

int pfs_unregister_notification_vc(int handle);

void pfs_notify_session_connect_vc(PSI_thread *thread);

void pfs_notify_session_disconnect_vc(PSI_thread *thread);

void pfs_notify_session_change_user_vc(PSI_thread *thread);

#endif 
#endif 
#endif 
#endif 

#endif

然后看一下:

void Global_THD_manager::add_thd(THD *thd) {
  DBUG_PRINT("info", ("Global_THD_manager::add_thd %p", thd));
  // Should have an assigned ID before adding to the list.
  assert(thd->thread_id() != reserved_thread_id);
  const int partition = thd_partition(thd->thread_id());
  MUTEX_LOCK(lock_list, &LOCK_thd_list[partition]);
  // Technically it is not supported to compare pointers, but it works.
  std::pair insert_result =
      thd_list[partition].insert_unique(thd);
  if (insert_result.second) ++atomic_global_thd_count;
  // Adding the same THD twice is an error.
  assert(insert_result.second);
}

它会开始:

bool do_command(THD *thd) {
  bool return_value;
  int rc;
  NET *net = nullptr;
  enum enum_server_command command;
  COM_DATA com_data;
  DBUG_TRACE;
  assert(thd->is_classic_protocol());

  
  thd->lex->set_current_query_block(nullptr);

  
  thd->clear_error();  // Clear error message
  thd->get_stmt_da()->reset_diagnostics_area();

  
  net = thd->get_protocol_classic()->get_net();
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
  net_new_transaction(net);

  
  DEBUG_SYNC(thd, "before_do_command_net_read");

  
  thd->m_server_idle = true;
  rc = thd->get_protocol()->get_command(&com_data, &command);
  thd->m_server_idle = false;

  if (rc) {
#ifndef NDEBUG
    char desc[VIO_DEscriptION_SIZE];
    vio_description(net->vio, desc);
    DBUG_PRINT("info", ("Got error %d reading command from socket %s",
                        net->error, desc));
#endif  // NDEBUG
    
    thd->m_statement_psi = MYSQL_REFINE_STATEMENT(
        thd->m_statement_psi, com_statement_info[COM_END].m_key);

    

    
    assert(thd->is_error());
    thd->send_statement_status();

    
    MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
    thd->m_statement_psi = nullptr;
    thd->m_digest = nullptr;

    if (rc < 0) {
      return_value = true;  // We have to close it.
      goto out;
    }
    net->error = NET_ERROR_UNSET;
    return_value = false;
    goto out;
  }

#ifndef NDEBUG
  char desc[VIO_DEscriptION_SIZE];
  vio_description(net->vio, desc);
  DBUG_PRINT("info", ("Command on %s = %d (%s)", desc, command,
                      command_name[command].str));
#endif  // NDEBUG
  DBUG_PRINT("info", ("packet: '%*.s'; command: %d",
                      (int)thd->get_protocol_classic()->get_packet_length(),
                      thd->get_protocol_classic()->get_raw_packet(), command));
  if (thd->get_protocol_classic()->bad_packet)
    assert(0);  // Should be caught earlier

  // Reclaim some memory
  thd->get_protocol_classic()->get_output_packet()->shrink(
      thd->variables.net_buffer_length);
  
  my_net_set_read_timeout(net, thd->variables.net_read_timeout);

  DEBUG_SYNC(thd, "before_command_dispatch");

  return_value = dispatch_command(thd, &com_data, command);
  thd->get_protocol_classic()->get_output_packet()->shrink(
      thd->variables.net_buffer_length);

out:
  
  assert(thd->m_digest == nullptr);
  assert(thd->m_statement_psi == nullptr);
  return return_value;
}

这个老重要的开始就在这个线程的处理中展开了。

3、销毁
在end_connection后调用:

void close_connection(THD *thd, uint sql_errno, bool server_shutdown,
                      bool generate_event) {
  DBUG_TRACE;

  if (sql_errno) net_send_error(thd, sql_errno, ER_DEFAULT_NONCONST(sql_errno));
  thd->disconnect(server_shutdown);

  if (generate_event) {
    mysql_audit_notify(thd, AUDIT_EVENT(MYSQL_AUDIT_CONNECTION_DISCONNECT),
                       sql_errno);
#ifdef HAVE_PSI_THREAD_INTERFACE
    PSI_THREAD_CALL(notify_session_disconnect)(thd->get_psi());
#endif  
  }

  thd->security_context()->logout();
}

在delete thd前调用:

void THD::release_resources() {
  assert(m_release_resources_done == false);

  Global_THD_manager::get_instance()->release_thread_id(m_thread_id);


  mysql_mutex_lock(&LOCK_thd_data);
  mysql_mutex_lock(&LOCK_query_plan);


  if (is_classic_protocol() && get_protocol_classic()->get_vio()) {
    vio_delete(get_protocol_classic()->get_vio());
    get_protocol_classic()->end_net();
  }


  assert(query_plan.get_modification_plan() == nullptr);
  mysql_mutex_unlock(&LOCK_query_plan);
  mysql_mutex_unlock(&LOCK_thd_data);
  mysql_mutex_lock(&LOCK_thd_query);
  mysql_mutex_unlock(&LOCK_thd_query);

  stmt_map.reset();  
  if (!cleanup_done) cleanup();

  mdl_context.destroy();
  ha_close_connection(this);


#if defined(ENABLED_DEBUG_SYNC)

  debug_sync_end_thread(this);
#endif  

  plugin_thdvar_cleanup(this, m_enable_plugins);

  assert(timer == nullptr);

  if (timer_cache) thd_timer_destroy(timer_cache);

  if (rli_fake) {
    rli_fake->end_info();
    delete rli_fake;
    rli_fake = nullptr;
  }
  mysql_audit_free_thd(this);

  if (current_thd == this) restore_globals();

  mysql_mutex_lock(&LOCK_status);

  add_to_status(&global_status_var, &status_var);
#ifdef HAVE_PSI_THREAD_INTERFACE

  if (m_psi != nullptr) {
    PSI_THREAD_CALL(aggregate_thread_status)(m_psi);
  }
#endif

  status_var_aggregated = true;

  mysql_mutex_unlock(&LOCK_status);

  m_release_resources_done = true;
}

再调用:

void Global_THD_manager::remove_thd(THD *thd) {
  DBUG_PRINT("info", ("Global_THD_manager::remove_thd %p", thd));
  const int partition = thd_partition(thd->thread_id());
  MUTEX_LOCK(lock_remove, &LOCK_thd_remove[partition]);
  MUTEX_LOCK(lock_list, &LOCK_thd_list[partition]);

  assert(unit_test || thd->release_resources_done());

  DBUG_EXECUTE_IF("sleep_after_lock_thread_count_before_delete_thd", sleep(5););

  const size_t num_erased = thd_list[partition].erase_unique(thd);
  if (num_erased == 1) --atomic_global_thd_count;
  // Removing a THD that was never added is an error.
  assert(1 == num_erased);
  mysql_cond_broadcast(&COND_thd_list[partition]);
}

my_thread_id Global_THD_manager::get_new_thread_id() {
  my_thread_id new_id;
  MUTEX_LOCK(lock, &LOCK_thread_ids);
  do {
    new_id = thread_id_counter++;
  } while (!thread_ids.insert_unique(new_id).second);
  return new_id;
}

其实在前面讲线程池时就提到过,线程其实不会被销毁是会被重用:

Channel_info *Per_thread_connection_handler::block_until_new_connection() {
  Channel_info * new_conn = nullptr;
  mysql_mutex_lock(&LOCK_thread_cache);
  if (blocked_pthread_count < max_blocked_pthreads && !shrink_cache) {

    DBUG_PRINT("info", ("Blocking pthread for reuse"));

    DBUG_POP();
    assert(!_db_is_pushed_());

    // Block pthread
    blocked_pthread_count++;
    while (!connection_events_loop_aborted() && !wake_pthread && !shrink_cache)
      mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);
    blocked_pthread_count--;

    if (shrink_cache && blocked_pthread_count <= max_blocked_pthreads) {
      mysql_cond_signal(&COND_flush_thread_cache);
    }

    if (wake_pthread) {
      wake_pthread--;
      if (!waiting_channel_info_list->empty()) {
        new_conn = waiting_channel_info_list->front();
        waiting_channel_info_list->pop_front();
        DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
      } else {
        assert(0);  // We should not get here.
      }
    }
  }
  mysql_mutex_unlock(&LOCK_thread_cache);
  return new_conn;
}
``
一个完整的流程就出来了,基本应用就是如此,但实际到具体的每个应用线程的位置还需要不断的看内部的代码。

## 四、总结
MySql的源码太复杂了,看上去有点难度,慢慢读吧,有时间就啃一块儿,“零敲牛皮糖”,积小胜为大胜,量变催生质变。
努力吧,归来的少年!
![在这里插入图片描述](https://img-blog.csdnimg.cn/4eece8b3f2fd45f1b96016666de35014.png#pic_center)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5691888.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存