ProcessPoolExecutor并行编程

ProcessPoolExecutor并行编程,第1张

你有个程序要执行CPU密集型工作,你想让他利用多核CPU的优势来运行的快一点。

concurrentfutures 库提供了一个 ProcessPoolExecutor 类, 可被用来在一个单独的Python解释器中执行计算密集型函数。 不过,要使用它,你首先要有一些计算密集型的任务。 我们通过一个简单而实际的例子来演示它。假定你有个Apache web服务器日志目录的gzip压缩包:

进一步假设每个日志文件内容类似下面这样:

下面是一个脚本,在这些日志文件中查找出所有访问过robotstxt文件的主机:

前面的程序使用了通常的map-reduce风格来编写。 函数 find_robots() 在一个文件名集合上做map *** 作,并将结果汇总为一个单独的结果, 也就是 find_all_robots() 函数中的 all_robots 集合。 现在,假设你想要修改这个程序让它使用多核CPU。 很简单——只需要将map() *** 作替换为一个 concurrentfutures 库中生成的类似 *** 作即可。 下面是一个简单修改版本:

通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了35倍。 实际的性能优化效果根据你的机器CPU数量的不同而不同。

ProcessPoolExecutor 的典型用法如下:

其原理是,一个 ProcessPoolExecutor 创建N个独立的Python解释器, N是系统上面可用CPU的个数。你可以通过提供可选参数给 ProcessPoolExecutor(N) 来修改 处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成, 然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。

被提交到池中的工作必须被定义为一个函数。有两种方法去提交。 如果你想让一个列表推导或一个 map() *** 作并行执行的话,可使用 poolmap() :

另外,你可以使用 poolsubmit() 来手动的提交单个任务:

如果你手动提交一个任务,结果是一个 Future 实例。 要获取最终结果,你需要调用它的 result() 方法。 它会阻塞进程直到结果被返回来。

如果不想阻塞,你还可以使用一个回调函数,例如:

回调函数接受一个 Future 实例,被用来获取最终的结果(比如通过调用它的result()方法)。 尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点:

一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。

它会克隆Python解释器,包括fork时的所有程序状态。 而在Windows上,克隆解释器时不会克隆状态。 实际的fork *** 作会在第一次调用 poolmap() 或 poolsubmit() 后发生。

你应该在创建任何线程之前先创建并激活进程池(比如在程序启动的main线程中创建进程池)。

ESP32具有两个 32 位 Tensilica Xtensa LX6 微处理器,这使其成为功能强大的双核(core0 和 core1)微控制器。它有单核和双核两种变体。但双核版本更受欢迎,因为没有明显的价格差异。

ESP32 可以使用 Arduino IDE、Espressif IDF、Lua RTOS 等进行编程。使用 Arduino IDE 进行编程时,代码只能在 Core1 上运行,因为 Core0 已经针对射频通信进行了编程。但这是本教程,我们将展示如何使用 ESP32的两个内核同时执行两个 *** 作。这里的第一个任务是闪烁板载 LED,第二个任务是从 DHT11 传感器获取温度数据。

让我们首先看看多核处理器相对于单核的优势。

多核处理器的优势

当有两个以上的进程同时工作时,多核处理器很有用。

由于工作分布在不同的内核之间,它的速度会提高,并且可以同时完成多个进程。

可以降低功耗,因为当任何内核处于空闲模式时,它都可以用来关闭当时未使用的外围设备。

双核处理器必须比单核处理器更少地在不同线程之间切换,因为它们可以一次处理两个而不是一次处理一个。

ESP32 和 FreeRTOS

ESP32 板上已经安装了 FreeRTOS 固件。FreeRTOS 是一个开源实时 *** 作系统,在多任务处理中非常有用。RTOS 有助于管理资源和最大化系统性能。FreeRTOS 有许多用于不同目的的 API 函数,使用这些 API,我们可以创建任务并使它们在不同的内核上运行。

可以在此处找到 FreeRTOS API 的完整文档。我们将尝试在代码中使用一些 API 来构建将在两个内核上运行的多任务应用程序。

查找 ESP32 内核 ID

在这里,我们将使用Arduino IDE 将代码上传到 ESP32中。要知道运行代码的Core ID,有一个API函数

xPortGetCoreID()

可以从void setup()和void loop()函数调用此函数,以了解运行这些函数的核心 ID。

您可以通过上传以下草图来测试此 API:

无效设置() {

Serialbegin(115200);

Serialprint("setup() 函数在核心上运行:");

Serialprintln(xPortGetCoreID());

}

void loop() {

Serialprint("loop() 函数在核心上运行:");

Serialprintln(xPortGetCoreID());

}

上传上面的草图后,打开串口监视器,你会发现这两个功能都在 core1 上运行,如下图所示。

从以上观察可以得出结论,默认的 Arduino 草图始终在 core1 上运行。

ESP32 双核编程

Arduino IDE 支持 ESP32 的 FreeRTOS,FreeRTOS API 允许我们创建可以在两个内核上独立运行的任务。任务是在板上执行一些 *** 作的代码,例如闪烁的 LED、发送温度等。

以下函数用于创建可以在两个内核上运行的任务。在这个函数中,我们必须给出一些参数,比如优先级、核心 ID 等。

现在,按照以下步骤创建任务和任务功能。

1首先在void setup函数中创建任务。在这里,我们将创建两个任务,一个用于每 05 秒后闪烁 LED,另一个任务是每 2 秒后获取温度读数。

xTaskCreatePinnedToCore() 函数有 7 个参数:

实现任务的函数名(task1)

任务的任何名称(“task1”等)

以字为单位分配给任务的堆栈大小(1 个字=2 字节)

任务输入参数(可以为NULL)

任务的优先级(0为最低优先级)

任务句柄(可以为 NULL)

任务将运行的核心 ID(0 或 1)

现在,通过在 xTaskCreatePinnedToCore() 函数中提供所有参数来创建用于闪烁 LED 的 Task1 。

xTaskCreatePinnedToCore(Task1code, "Task1", 10000, NULL, 1, NULL, 0);

同样,为 Task2创建 Task2并在第 7个参数中设置 core id 1。

xTaskCreatePinnedToCore(Task2code, "Task2", 10000, NULL, 1, NULL, 1);

您可以根据任务的复杂性更改优先级和堆栈大小。

2 现在,我们将实现Task1code和Task2code函数。这些函数包含所需任务的代码。在我们的例子中,第一个任务将闪烁 LED,另一个任务将获取温度。因此,在 void setup 函数之外为每个任务创建两个单独的函数。

Task1code 功能 实现了 05 秒后闪烁板载 LED,如下所示。

void Task1code( void parameter) {

Serialprint("Task1 在核心上运行");

Serialprintln(xPortGetCoreID());

for(;;) {//无限循环

digitalWrite(led, HIGH);

延迟(500);

digitalWrite(led, LOW);

延迟(500);

}

}

同样,实现获取温度的Task2code函数。

void Task2code( void pvParameters ){

Serialprint("Task2 在核心上运行");

Serialprintln(xPortGetCoreID());

for(;;){

浮动 t = dhtreadTemperature();

Serialprint("温度:");

序列号print(t);

延迟(2000);

}

}

3 这里的void 循环函数将保持为空。我们已经知道循环和设置函数在 core1 上运行,因此您也可以在void 循环函数中实现 core1 任务。

现在编码部分已经结束,所以只需在工具菜单中选择 ESP32 板,使用 Arduino IDE 上传代码。确保您已将 DHT11 传感器连接到 ESP32 的引脚 D13。

现在可以在 Serial Monitor 或 Arduino IDE 上监控结果,如下所示:

通过使用 ESP32 的双核同时运行多个任务,可以构建像实时系统这样的复杂应用。

#include "DHTh"

#define DHTPIN 13

#define DHTTYPE DHT11

const int led = 2;

DHT dht(DHTPIN, DHTTYPE);

无效设置() {

Serialbegin(115200);

pinMode(LED,输出);

dhtbegin();

xTaskCreatePinnedToCore(Task1code, "Task1", 10000, NULL, 1, NULL, 1);

延迟(500);

xTaskCreatePinnedToCore(Task1code, "Task1", 10000, NULL, 1, NULL, 0);

延迟(500);

}

void Task1code( void pvParameters ){

Serialprint("Task1 在核心上运行");

Serialprintln(xPortGetCoreID());

for(;;){

digitalWrite(led, HIGH);

延迟(300);

数字写入(领导,低);

延迟(300);

}

}

void Task2code( void pvParameters ){

Serialprint("Task2 在核心上运行");

Serialprintln(xPortGetCoreID());

for(;;){

浮动 h = dhtreadHumidity();

浮动 t = dhtreadTemperature();

浮动 f = dhtreadTemperature(true);

Serialprint("温度:");

序列号print(t);

Serialprint(" C \n ");

if (isnan(h) || isnan(t) || isnan(f)) {

Serialprintln("读取 DHT 传感器失败!");

返回;

}

延迟(2000);

}

}

无效循环() {

}

以上就是关于ProcessPoolExecutor并行编程全部的内容,包括:ProcessPoolExecutor并行编程、esp32如何并行、等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/10109398.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存