Seastar源码阅读(一)启动过程

Seastar源码阅读(一)启动过程,第1张

Seastar源码阅读(一)启动过程 Seastar启动过程

Seastar程序的启动是通过调用run或run_deprecated函数来完成的,用户传入命令行参数和需要执行的主函数体即可。

run, run_deprecated

run是对run_deprecated的简单包装,所以我们只谈论后者。

run_deprecated主要做以下事情:

使用boost::program_options加载选项描述并解析命令行选项。设置logger配置,如log level,log type等。调用smp::configure()启动reactors。注册continuation,当主线程对应的reactor启动后执行传入的func。调用engine().run()启动主reactor(进入事件循环)。(事件循环结束后)调用smp::cleanup(),进行清理工作。返回退出码。

一部分对应代码:

int app_template::run_deprecated(int ac, char ** av, std::function&& func) noexcept {
    boost::program_options::options_description all_opts;
    all_opts.add(_app_opts);
    all_opts.add(_seastar_opts);

    bpo::variables_map configuration;
    try {
        bpo::store(bpo::command_line_parser(ac, av)
                    .options(all_opts)
                    .positional(_pos_opts)
                    .run()
            , configuration);
        _conf_reader(configuration);
    } catch (bpo::error& e) {
        fmt::print("error: {}nnTry --help.n", e.what());
        return 2;
    }
    if (configuration.count("help")) {
        if (!_opts.description.empty()) {
            std::cout << _opts.description << "n";
        }
        std::cout << _app_opts << "n";
        return 1;
    }
    
    // ...
    
    _opts.reactor_opts._argv0 = std::string(av[0]);
    _opts.reactor_opts._auto_handle_sigint_sigterm = _opts.auto_handle_sigint_sigterm;
    if (auto* native_stack = dynamic_cast(_opts.reactor_opts.network_stack.get_selected_candidate_opts())) {
        native_stack->_hugepages = _opts.smp_opts.hugepages;
    }

    // Needs to be before `smp::configure()`.
    try {
        apply_logging_settings(log_cli::extract_settings(_opts.log_opts));
    } catch (const std::runtime_error& exn) {
        std::cout << "logging configuration error: " << exn.what() << 'n';
        return 1;
    }

    try {
        _smp->configure(_opts.smp_opts, _opts.reactor_opts);
    } catch (...) {
        std::cerr << "Could not initialize seastar: " << std::current_exception() << std::endl;
        return 1;
    }
    _configuration = {std::move(configuration)};
    // No need to wait for this future.
    // func is waited on via engine().run()
    (void)engine().when_started().then([this] {
        return seastar::metrics::configure(_opts.metrics_opts).then([this] {
            // set scollectd use the metrics configuration, so the later
            // need to be set first
            scollectd::configure( _opts.scollectd_opts);
        });
    }).then(
        std::move(func)
    ).then_wrapped([] (auto&& f) {
        try {
            f.get();
        } catch (std::exception& ex) {
            std::cout << "program failed with uncaught exception: " << ex.what() << "n";
            engine().exit(1);
        }
    });

    auto exit_code = engine().run();
    _smp->cleanup();
    return exit_code;
}
smp::configure

该函数主要做以下事情:

屏蔽大部分信号,只保留SIGHUP, SIGQUIT, SIGILL, SIGABRT, SIGFPE, SIGSEGV等信号。

读取配置,决定是否要进行cpu亲和,内存空间绑定、保留内存等 *** 作;设置smp::count为CPU核心数或线程数(如果用户指定)。

使用mlockall系统调用来把所有已分配和将要分配的内存空间锁定在物理内存上。

使用pthread_setaffinity_np系统调用来让当前线程和第一个CPU核心绑定。

创建剩余的的smp::count - 1个线程,在这些线程内部各自进行内存分配,CPU核心绑定,屏蔽SIGSEGV之外的其他信号,初始化reactors等 *** 作,并等待IO队列、消息队列建立完成后启动reactor::do_run()进入事件循环。

主线程等待所有其他线程初始化reactor完成,然后在所有线程间两两建立消息队列。

主线程返回。

其中主副线程的协同初始化代码,使用三个barrier完成了同步初始化:

void smp::configure(const smp_options& smp_opts, const reactor_options& reactor_opts) {
	// ...	
	boost::barrier reactors_registered(smp::count);
    boost::barrier smp_queues_constructed(smp::count);
    // We use shared_ptr since this thread can exit while other threads are still unlocking
    auto inited = std::make_shared(smp::count);

	// ...

	unsigned i;
    auto smp_tmain = smp::_tmain;
    for (i = 1; i < smp::count; i++) {
        auto allocation = allocations[i];
        create_thread([this, smp_tmain, inited, &reactors_registered, &smp_queues_constructed, &reactor_opts, &reactors, hugepages_path, i, allocation, assign_io_queues, alloc_io_queues, thread_affinity, heapprof_enabled, mbind, backend_selector, reactor_cfg] {
          try {
            // initialize thread_locals that are equal across all reacto threads of this smp instance
            smp::_tmain = smp_tmain;
            auto thread_name = seastar::format("reactor-{}", i);
            pthread_setname_np(pthread_self(), thread_name.c_str());
            if (thread_affinity) {
                smp::pin(allocation.cpu_id);
            }
            memory::configure(allocation.mem, mbind, hugepages_path);
            if (heapprof_enabled) {
                memory::set_heap_profiling_enabled(heapprof_enabled);
            }
            sigset_t mask;
            sigfillset(&mask);
            for (auto sig : { SIGSEGV }) {
                sigdelset(&mask, sig);
            }
            auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
            throw_pthread_error(r);
            init_default_smp_service_group(i);
            allocate_reactor(i, backend_selector, reactor_cfg);
            reactors[i] = &engine();
            alloc_io_queues(i);
            reactors_registered.wait();
            smp_queues_constructed.wait();
            // _qs_owner is only initialized here
            _qs = _qs_owner.get();
            start_all_queues();
            assign_io_queues(i);
            inited->wait();
            engine().configure(reactor_opts);
            engine().do_run();
          } catch (const std::exception& e) {
              seastar_logger.error(e.what());
              _exit(1);
          }
        });
    }

	// ...

	reactors_registered.wait();
    _qs_owner = decltype(smp::_qs_owner){new smp_message_queue* [smp::count], qs_deleter{}};
    _qs = _qs_owner.get();
    for(unsigned i = 0; i < smp::count; i++) {
        smp::_qs_owner[i] = reinterpret_cast(operator new[] (sizeof(smp_message_queue) * smp::count));
        for (unsigned j = 0; j < smp::count; ++j) {
            new (&smp::_qs_owner[i][j]) smp_message_queue(reactors[j], reactors[i]);
        }
    }
    _alien._qs = alien::instance::create_qs(reactors);
    smp_queues_constructed.wait();
    start_all_queues();
    assign_io_queues(0);
    inited->wait();
	engine().configure(reactor_opts);
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5713377.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存