- IPC(Inter Process Communication),进程间通信,用于在不同进程间传递或同步信息。在RT-Thread中主要提供的方式有消息队列、互斥锁、信号量、事件、邮箱及信号()。
- IPC对象的基础数据结构
struct rt_ipc_object
{
struct rt_object parent; /**< 父对象节点 */
rt_list_t suspend_thread; /**< 挂起线程链表 */
};
- rt_ipc_object_init:主要用于IPC对象链表的初始化。此函数主要将rt_list_init做了封装。
- rt_ipc_list_suspend:
rt_inline rt_err_t rt_ipc_list_suspend(rt_list_t *list,
struct rt_thread *thread,
rt_uint8_t flag)
{
/* suspend thread */
rt_thread_suspend(thread);
switch (flag)
{
case RT_IPC_FLAG_FIFO:
rt_list_insert_before(list, &(thread->tlist));
break;
case RT_IPC_FLAG_PRIO:
{
struct rt_list_node *n;
struct rt_thread *sthread;
/* find a suitable position */
for (n = list->next; n != list; n = n->next)
{
sthread = rt_list_entry(n, struct rt_thread, tlist);
/* find out */
if (thread->current_priority < sthread->current_priority)
{
/* insert this thread before the sthread */
rt_list_insert_before(&(sthread->tlist), &(thread->tlist));
break;
}
}
/*
* not found a suitable position,
* append to the end of suspend_thread list
*/
if (n == list)
rt_list_insert_before(list, &(thread->tlist));
}
break;
}
return RT_EOK;
}
- 此函数先将IPC所在线程挂起,后根据IPC初始化设置的模式:
- RT_IPC_FLAG_FIFO:FIFO(先入先出)模式。将IPC对象所在线程头插入线程表中。
- RT_IPC_FLAG_PRIO:优先级模式。根据线程优先级在挂起表中进行排序,若未找到合适的位置,则将其插入表尾。
- rt_ipc_list_resume:用于将IPC对象所在线程唤醒。主要通过rt_list_entry获取线程TCB指针,之后调用rt_thread_resume对其进行唤醒。
- rt_ipc_list_resume_all:对IPC对象挂起线程表进行遍历,唤醒每个挂起线程。
- 信号量通常用于防止多线程之间出现共享资源的竞争访问。这里做个比喻,一间房子同时住着父亲,母亲还有自己(这里房子相当于共享资源,能够同时被父亲,母亲及自己访问),父亲、母亲还有自己能够同时拥有房子门的钥匙(这里钥匙相当于信号量),三个人都有资格出入这间房子(即能够同时对其进行访问)。当然,并不是说需要访问共享资源的对象数必须与信号量最大数相等,也可以是不等的。好比,房子只有父母亲有钥匙,自己没有;或者大家都有房子的钥匙,并且还准备了若干钥匙备用。
- 有一点值得注意的是,信号量在获取一次之后,必须将其释放一次,即获取及释放必须成对出现,否则将出现个别对象无法访问资源而出现阻塞的情况。还是拿上面那个房子的比喻,假设房子有两把钥匙,某天自己出门忘带钥匙,而父亲出门时恰巧将自己的钥匙一并带出了(这里相当于A获取了两次信号量),当自己回家而父亲还未回家时(这里表示A未释放信号量,而B此时需要获取信号量访问资源),这时候自己只能在房子门前等着(这里相当于信号量的阻塞获取),或先离开一会再回来看看(这里相当于信号量的非阻塞获取)。
- 信号量能够配合互斥量(锁)实现读写锁(支持多对象同时读取资源,但在写入资源时将互斥);还能根据其特性用作一些逻辑应用(不推荐)。
- 信号量基础数据结构
struct rt_semaphore
{
struct rt_ipc_object parent; /**< 父对象节点 */
rt_uint16_t value; /**< 信号量最大值. */
rt_uint16_t reserved; /**< reserved field */
};
typedef struct rt_semaphore *rt_sem_t;
创建
- 创建包含静态创建和动态创建,这里的区别在于是否由用户分配对象内存,是否在堆上申请对象内存。
- 静态创建通过函数rt_sem_init执行,值得注意的是,这里会对信号量最大值进行限制,必须小于65536。这里设置的信号量最大值相当于信号量的能够占有的最大数值,也使初始能够支持对象占有的最大数值。
rt_err_t rt_sem_init(rt_sem_t sem,
const char *name,
rt_uint32_t value,
rt_uint8_t flag)
{
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(value < 0x10000U);
/* init object */
rt_object_init(&(sem->parent.parent), RT_Object_Class_Semaphore, name);
/* init ipc object */
rt_ipc_object_init(&(sem->parent));
/* set init value */
sem->value = (rt_uint16_t)value;
/* set parent */
sem->parent.parent.flag = flag;
return RT_EOK;
}
- 动态创建通过函数rt_sem_create执行。执行内容与rt_sem_init相差无几,不同之处在于是否在堆上申请对象内存。
- 在销毁时,动态创建与静态创建的信号量仍存在区别。动态创建的信号量从内核分离后会进行内存释放;静态创建的信号量仅仅是将其从内核中分离,不释放内存。但两者在销毁时均会调用rt_ipc_list_resume_all唤醒所有所有挂起表中的线程。这里贴出信号量删除函数的源码:
rt_err_t rt_sem_delete(rt_sem_t sem)
{
RT_DEBUG_NOT_IN_INTERRUPT;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_ASSERT(rt_object_is_systemobject(&sem->parent.parent) == RT_FALSE);
/* wakeup all suspend threads */
rt_ipc_list_resume_all(&(sem->parent.suspend_thread));
/* delete semaphore object */
rt_object_delete(&(sem->parent.parent));
return RT_EOK;
}
设置
rt_err_t rt_sem_control(rt_sem_t sem, int cmd, void *arg)
{
rt_ubase_t level;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
if (cmd == RT_IPC_CMD_RESET)
{
rt_uint32_t value;
/* get value */
value = (rt_uint32_t)arg;
/* disable interrupt */
level = rt_hw_interrupt_disable();
/* resume all waiting thread */
rt_ipc_list_resume_all(&sem->parent.suspend_thread);
/* set new value */
sem->value = (rt_uint16_t)value;
/* enable interrupt */
rt_hw_interrupt_enable(level);
rt_schedule();
return RT_EOK;
}
return -RT_ERROR;
}
- 此函数仅支持对信号量进行重置。重置时能够重置其信号量最大值。首先失能中断,唤醒挂起表中的所有线程,重置其信号量最大值,最后使能中断,调度切换一次。
- RT-Thread中支持rt_sem_take及rt_sem_trytake两个函数,这里rt_sem_trytake为rt_sem_take的非阻塞版本,即将阻塞超时时间设为0,因此这里直接分析rt_sem_take函数:
rt_err_t rt_sem_take(rt_sem_t sem, rt_int32_t time)
{
register rt_base_t temp;
struct rt_thread *thread;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(sem->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s take sem:%s, which value is: %d\n",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));
if (sem->value > 0)
{
/* semaphore is available */
sem->value --;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
}
else
{
/* no waiting, return with timeout */
if (time == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
else
{
/* current context checking */
RT_DEBUG_IN_THREAD_CONTEXT;
/* semaphore is unavailable, push to suspend list */
/* get current thread */
thread = rt_thread_self();
/* reset thread error number */
thread->error = RT_EOK;
RT_DEBUG_LOG(RT_DEBUG_IPC, ("sem take: suspend thread - %s\n",
thread->name));
/* suspend thread */
rt_ipc_list_suspend(&(sem->parent.suspend_thread),
thread,
sem->parent.parent.flag);
/* has waiting time, start thread timer */
if (time > 0)
{
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&time);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* do schedule */
rt_schedule();
if (thread->error != RT_EOK)
{
return thread->error;
}
}
}
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(sem->parent.parent)));
return RT_EOK;
}
- 此函数执行逻辑:
- 失能中断
- 判断信号量值,若大于0,则表示仍有信号量能够获取,则将信号量值减1,使能中断,调用对象获取钩子函数后退出。
- 若信号量值等于0,则表示无信号量能够获取,这里就需要判断信号量获取是阻塞还是非阻塞。
- 若信号量为非阻塞获取,则直接使能中断后退出;反之,则将当前线程挂起放入线程挂起表中,若阻塞时间小于0则表示获取信号量的线程若未获取到信号量则永久挂起。若阻塞时间大于0,则设置线程定时器,将阻塞时间设置为线程定时器超时时间(即挂起时间),启动定时器。最后使能中断,进行一次调度切换后退出。
rt_err_t rt_sem_release(rt_sem_t sem)
{
register rt_base_t temp;
register rt_bool_t need_schedule;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(sem->parent.parent)));
need_schedule = RT_FALSE;
/* disable interrupt */
temp = rt_hw_interrupt_disable();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s releases sem:%s, which value is: %d\n",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));
if (!rt_list_isempty(&sem->parent.suspend_thread))
{
/* resume the suspended thread */
rt_ipc_list_resume(&(sem->parent.suspend_thread));
need_schedule = RT_TRUE;
}
else
sem->value ++; /* increase value */
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* resume a thread, re-schedule */
if (need_schedule == RT_TRUE)
rt_schedule();
return RT_EOK;
}
- 这里直接分析信号量释放的执行逻辑:
- 失能中断
- 若线程挂起表非空,则表示线程仍占有信号量,这里直接唤醒该占有线程,并执行一次调度切换;反之,表示线程已释放信号量,则将信号量当前值加1。
- 使能中断。
- 互斥锁相当于信号量最大值为1的信号量。主要用于共享资源不支持对象同时访问的场合,如通信端口对象的读写 *** 作。
- 基础数据结构,这里直接看源码:
struct rt_mutex
{
struct rt_ipc_object parent; /**< inherit from ipc_object */
rt_uint16_t value; /**< 互斥量值,为0或1 */
rt_uint8_t original_priority; /**< 持有互斥量的线程优先级 */
rt_uint8_t hold; /**< 持有互斥量的线程数 */
struct rt_thread *owner; /**< 当前持有互斥量的线程TCB指针 */
};
typedef struct rt_mutex *rt_mutex_t;
- 互斥量的创建、删除、获取及释放 *** 作与信号量相似,这里不多做赘述。
- RT-Thread中支持事件这一IPC。事件在发送及接收时最大能够接收4个字节的数据(即uint32_t),由此,单个事件对象在通信时最多支持32类事件同步,且支持一对多,多对一通信。
- 为了支持通信,在线程TCB中带有32位事件集及8位的事件接收标志。这样,在每次调用事件阻塞接收或事件发送函数时,则可以与之做比较,判断事件是否发送或接收完成。
/**
* Thread structure
*/
struct rt_thread
{
...
#if defined(RT_USING_EVENT)
/* thread event */
rt_uint32_t event_set;
rt_uint8_t event_info;
#endif
...
};
typedef struct rt_thread *rt_thread_t;
- 基础数据结构:
struct rt_event
{
struct rt_ipc_object parent; /**< 父对象节点 */
rt_uint32_t set; /**< 事件集。发送及接收均通过此成员进行 */
};
typedef struct rt_event *rt_event_t;
- 事件对象的创建与删除类似于信号量,此处不多做赘述,主要分析设置、发送及接收函数。
- 接收时支持的三种模式
/**
* flag defintions in event
*/
#define RT_EVENT_FLAG_AND 0x01 /**< logic and */
#define RT_EVENT_FLAG_OR 0x02 /**< logic or */
#define RT_EVENT_FLAG_CLEAR 0x04 /**< clear flag */
- RT_EVENT_FLAG_AND:与模式。一次接收多个事件。只有所有设定接收事件位置位,才认为接收成功。
- RT_EVENT_FLAG_OR:或模式。一次接收多个事件。只要其中一个设定接收事件位置位,则认为接收成功。
- RT_EVENT_FLAG_CLEAR:清除模式。接收一次后,则在对象中将对应事件位清除。
- rt_event_control仅支持对事件对象的重置 *** 作,而重置主要是将该事件的当前接收及发送事件集重置。以下为源码:
rt_err_t rt_event_control(rt_event_t event, int cmd, void *arg)
{
rt_ubase_t level;
/* parameter check */
RT_ASSERT(event != RT_NULL);
RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event);
if (cmd == RT_IPC_CMD_RESET)
{
/* disable interrupt */
level = rt_hw_interrupt_disable();
/* resume all waiting thread */
rt_ipc_list_resume_all(&event->parent.suspend_thread);
/* init event set */
event->set = 0;
/* enable interrupt */
rt_hw_interrupt_enable(level);
rt_schedule();
return RT_EOK;
}
return -RT_ERROR;
}
发送
- 事件发送时,无论是否有线程正在阻塞进行事件接收,都会将其发送(即将事件集对应事件位置位),可以理解为无阻塞发送。这里若根据线程TCB中的事件接收标志完成所有发送工作后,会直接唤醒相应的事件接收线程。能够在下一次调度周期中,立即响应事件。
rt_err_t rt_event_send(rt_event_t event, rt_uint32_t set)
{
struct rt_list_node *n;
struct rt_thread *thread;
register rt_ubase_t level;
register rt_base_t status;
rt_bool_t need_schedule;
/* parameter check */
RT_ASSERT(event != RT_NULL);
RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event);
if (set == 0)
return -RT_ERROR;
need_schedule = RT_FALSE;
/* disable interrupt */
level = rt_hw_interrupt_disable();
/* set event */
event->set |= set;
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(event->parent.parent)));
if (!rt_list_isempty(&event->parent.suspend_thread))
{
/* search thread list to resume thread */
n = event->parent.suspend_thread.next;
while (n != &(event->parent.suspend_thread))
{
/* get thread */
thread = rt_list_entry(n, struct rt_thread, tlist);
status = -RT_ERROR;
if (thread->event_info & RT_EVENT_FLAG_AND)
{
if ((thread->event_set & event->set) == thread->event_set)
{
/* received an AND event */
status = RT_EOK;
}
}
else if (thread->event_info & RT_EVENT_FLAG_OR)
{
if (thread->event_set & event->set)
{
/* save recieved event set */
thread->event_set = thread->event_set & event->set;
/* received an OR event */
status = RT_EOK;
}
}
/* move node to the next */
n = n->next;
/* condition is satisfied, resume thread */
if (status == RT_EOK)
{
/* clear event */
if (thread->event_info & RT_EVENT_FLAG_CLEAR)
event->set &= ~thread->event_set;
/* resume thread, and thread list breaks out */
rt_thread_resume(thread);
/* need do a scheduling */
need_schedule = RT_TRUE;
}
}
}
/* enable interrupt */
rt_hw_interrupt_enable(level);
/* do a schedule */
if (need_schedule == RT_TRUE)
rt_schedule();
return RT_EOK;
}
- 直接分析其逻辑:
- 判断需要发送的事件集是否为0,若为0则直接退出。
- 失能中断。
- 置位事件对象中相应事件位。
- 调用钩子函数。
- 判断线程挂起表是否为空,若为空,表示无线程正在阻塞接收事件,则直接使能中断,退出。
- 若线程挂起表非空,表示存在线程正在阻塞接收事件,则遍历线程挂起表,若挂起线程中接收标志为RT_EVENT_FLAG_AND或RT_EVENT_FLAG_OR,则根据其特性对事件线程中存储的集置位。若所有事件均以接收,则进行线程唤醒及一次调度切换,若挂起线程中接收标志带有RT_EVENT_FLAG_CLEAR,则将事件对象中存储的事件集重置对应事件位。
- 使能线程。
- 这里直接看源码,分析主要逻辑:
rt_err_t rt_event_recv(rt_event_t event,
rt_uint32_t set,
rt_uint8_t option,
rt_int32_t timeout,
rt_uint32_t *recved)
{
struct rt_thread *thread;
register rt_ubase_t level;
register rt_base_t status;
RT_DEBUG_IN_THREAD_CONTEXT;
/* parameter check */
RT_ASSERT(event != RT_NULL);
RT_ASSERT(rt_object_get_type(&event->parent.parent) == RT_Object_Class_Event);
if (set == 0)
return -RT_ERROR;
/* init status */
status = -RT_ERROR;
/* get current thread */
thread = rt_thread_self();
/* reset thread error */
thread->error = RT_EOK;
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(event->parent.parent)));
/* disable interrupt */
level = rt_hw_interrupt_disable();
/* check event set */
if (option & RT_EVENT_FLAG_AND)
{
if ((event->set & set) == set)
status = RT_EOK;
}
else if (option & RT_EVENT_FLAG_OR)
{
if (event->set & set)
status = RT_EOK;
}
else
{
/* either RT_EVENT_FLAG_AND or RT_EVENT_FLAG_OR should be set */
RT_ASSERT(0);
}
if (status == RT_EOK)
{
/* set received event */
if (recved)
*recved = (event->set & set);
/* received event */
if (option & RT_EVENT_FLAG_CLEAR)
event->set &= ~set;
}
else if (timeout == 0)
{
/* no waiting */
thread->error = -RT_ETIMEOUT;
}
else
{
/* fill thread event info */
thread->event_set = set;
thread->event_info = option;
/* put thread to suspended thread list */
rt_ipc_list_suspend(&(event->parent.suspend_thread),
thread,
event->parent.parent.flag);
/* if there is a waiting timeout, active thread timer */
if (timeout > 0)
{
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(level);
/* do a schedule */
rt_schedule();
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
/* received an event, disable interrupt to protect */
level = rt_hw_interrupt_disable();
/* set received event */
if (recved)
*recved = thread->event_set;
}
/* enable interrupt */
rt_hw_interrupt_enable(level);
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(event->parent.parent)));
return thread->error;
}
- 此函数主要工作,此处分接收模式说明:
- 非阻塞接收(即接收超时时间为0):每次会直接判断事件对象中事件集相应的事件位,若均置位,则接收完成;反之,则直接退出。
- 阻塞接收(即接收超时时间非0):会在超时时间到达之前,判断事件是否接收完全(事件集相应事件位是否均置位),若未接收完全,则将此线程挂起,进行调度切换。
- 邮箱用于线程中消息或数据的传递。主要特点是开销低、效率较高,这里是相较于消息队列来说的。邮箱中的每一封邮件只能容纳固定的 4 字节内容(针对 32 位处理系统,指针的大小即为 4 个字节,所以一封邮件恰好能够容纳一个指针)。支持一对多交互。
- 邮箱的创建及删除类似于信号量,值得注意的是,邮箱除了父对象节点中IPC线程挂起表,还有它自己拥有的发送线程挂起表。可以理解为一条发送线程挂起表及一条接收线程挂起表(IPC线程挂起表)
- 当邮箱发送及接收均选择阻塞模式时,个人认为可以理解为对共享内存的读写 *** 作。
- 基础数据结构
struct rt_mailbox
{
struct rt_ipc_object parent; /**< 父对象节点 */
rt_uint32_t *msg_pool; /**< 消息池 */
rt_uint16_t size; /**< 消息池大小,即消息池中存储消息数的最大值 */
rt_uint16_t entry; /**< 消息池中的新消息索引 */
rt_uint16_t in_offset; /**< 消息池中的入池索引偏移 */
rt_uint16_t out_offset; /**< 消息池中的出池索引偏移 */
rt_list_t suspend_sender_thread; /**< 消息发送线程挂起表 */
};
typedef struct rt_mailbox *rt_mailbox_t;
- 邮箱中的发送及接收依赖于入池索引以及出池索引,在读取消息后不会将对象直接释放,而是使用新消息将其覆盖。
- 这里的设置,同上面几种IPC一样,仅支持重置 *** 作,源码如下:
rt_err_t rt_mb_control(rt_mailbox_t mb, int cmd, void *arg)
{
rt_ubase_t level;
/* parameter check */
RT_ASSERT(mb != RT_NULL);
RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox);
if (cmd == RT_IPC_CMD_RESET)
{
/* disable interrupt */
level = rt_hw_interrupt_disable();
/* resume all waiting thread */
rt_ipc_list_resume_all(&(mb->parent.suspend_thread));
/* also resume all mailbox private suspended thread */
rt_ipc_list_resume_all(&(mb->suspend_sender_thread));
/* re-init mailbox */
mb->entry = 0;
mb->in_offset = 0;
mb->out_offset = 0;
/* enable interrupt */
rt_hw_interrupt_enable(level);
rt_schedule();
return RT_EOK;
}
return -RT_ERROR;
}
- 主要是将线程挂起表及邮件发送线程挂起表中的所有线程唤醒,并且重置邮箱池中的邮件起始索引,入池索引及出池索引。
- RT-Thread中封装了阻塞发送和无阻塞发送函数接口,分别为rt_mb_send_wait及rt_mb_send,其中rt_mb_send为rt_mb_send_wait超时时间设为0的封装。
- 邮件每次仅支持4字节数据发送,大小等于一个指针,因此可以通过传递指针达到交互大批量数据的效果,但是需要注意发送数所占用的内存是否会使系统出现内存瓶颈。
- 邮件发送均以FIFO的方式进行,保证发送顺序。
- 此处仅分析rt_mb_send_wait函数,源码如下:
rt_err_t rt_mb_send_wait(rt_mailbox_t mb,
rt_uint32_t value,
rt_int32_t timeout)
{
struct rt_thread *thread;
register rt_ubase_t temp;
rt_uint32_t tick_delta;
/* parameter check */
RT_ASSERT(mb != RT_NULL);
RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox);
/* initialize delta tick */
tick_delta = 0;
/* get current thread */
thread = rt_thread_self();
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mb->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* for non-blocking call */
if (mb->entry == mb->size && timeout == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_EFULL;
}
/* mailbox is full */
while (mb->entry == mb->size)
{
/* reset error number in thread */
thread->error = RT_EOK;
/* no waiting, return timeout */
if (timeout == 0)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_EFULL;
}
RT_DEBUG_IN_THREAD_CONTEXT;
/* suspend current thread */
rt_ipc_list_suspend(&(mb->suspend_sender_thread),
thread,
mb->parent.parent.flag);
/* has waiting time, start thread timer */
if (timeout > 0)
{
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_send_wait: start timer of thread:%s\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* re-schedule */
rt_schedule();
/* resume from suspend state */
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* if it's not waiting forever and then re-calculate timeout tick */
if (timeout > 0)
{
tick_delta = rt_tick_get() - tick_delta;
timeout -= tick_delta;
if (timeout < 0)
timeout = 0;
}
}
/* set ptr */
mb->msg_pool[mb->in_offset] = value;
/* increase input offset */
++ mb->in_offset;
if (mb->in_offset >= mb->size)
mb->in_offset = 0;
/* increase message entry */
mb->entry ++;
/* resume suspended thread */
if (!rt_list_isempty(&mb->parent.suspend_thread))
{
rt_ipc_list_resume(&(mb->parent.suspend_thread));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
rt_schedule();
return RT_EOK;
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return RT_EOK;
}
- 执行逻辑为:
- 失能中断
- 判断是否为无阻塞调用并且邮箱消息池已满,若是,则直接使能中断,退出
- 判断邮箱消息池是否已满,若已满,则进入循环,将发送线程挂起放入对象内的发送线程挂起表中。之后判断超时时间是否大于0,若大于0则设置并开启线程定时器,之后使能中断,进行一次调度切换。最后设置新的超时时间。若下一次循环,邮箱消息池中仍满,则继续上述 *** 作,直到超时时间到达,使能中断,退出。
- 退出循环后,说明邮箱消息池中的消息已被读取过。此时,根据当前入池索引更新消息池中的消息,更新入池索引(若入池索引达到消息池消息数最大值时则重置)(加1),更新消息池中的新消息索引(加1)。
- 若线程挂起表中非空,表示存在线程正在进行阻塞接收,此时将其唤醒,进行一次调度切换
- 使能中断
- 邮件接收的大小仅支持4字节。
rt_err_t rt_mb_recv(rt_mailbox_t mb, rt_uint32_t *value, rt_int32_t timeout)
{
struct rt_thread *thread;
register rt_ubase_t temp;
rt_uint32_t tick_delta;
/* parameter check */
RT_ASSERT(mb != RT_NULL);
RT_ASSERT(rt_object_get_type(&mb->parent.parent) == RT_Object_Class_MailBox);
/* initialize delta tick */
tick_delta = 0;
/* get current thread */
thread = rt_thread_self();
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mb->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* for non-blocking call */
if (mb->entry == 0 && timeout == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
/* mailbox is empty */
while (mb->entry == 0)
{
/* reset error number in thread */
thread->error = RT_EOK;
/* no waiting, return timeout */
if (timeout == 0)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
thread->error = -RT_ETIMEOUT;
return -RT_ETIMEOUT;
}
RT_DEBUG_IN_THREAD_CONTEXT;
/* suspend current thread */
rt_ipc_list_suspend(&(mb->parent.suspend_thread),
thread,
mb->parent.parent.flag);
/* has waiting time, start thread timer */
if (timeout > 0)
{
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("mb_recv: start timer of thread:%s\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* re-schedule */
rt_schedule();
/* resume from suspend state */
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* if it's not waiting forever and then re-calculate timeout tick */
if (timeout > 0)
{
tick_delta = rt_tick_get() - tick_delta;
timeout -= tick_delta;
if (timeout < 0)
timeout = 0;
}
}
/* fill ptr */
*value = mb->msg_pool[mb->out_offset];
/* increase output offset */
++ mb->out_offset;
if (mb->out_offset >= mb->size)
mb->out_offset = 0;
/* decrease message entry */
mb->entry --;
/* resume suspended thread */
if (!rt_list_isempty(&(mb->suspend_sender_thread)))
{
rt_ipc_list_resume(&(mb->suspend_sender_thread));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent)));
rt_schedule();
return RT_EOK;
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mb->parent.parent)));
return RT_EOK;
}
- 执行逻辑为:
- 失能中断
- 判断是否为无阻塞调用并且邮箱消息池空,若是,则直接使能中断,退出
- 判断邮箱消息池是否为空,若为空,则进入循环,将当前线程挂起放入线程挂起表中。之后判断超时时间是否大于0,若大于0则设置并开启线程定时器,之后使能中断,进行一次调度切换。最后设置新的超时时间。若下一次循环,邮箱消息池中仍空,则继续上述 *** 作,直到超时时间到达,使能中断,退出为止。
- 退出循环后,说明邮箱消息池中已存在未读取消息。此时,根据当前出池索引读取消息池中的消息,更新出池索引(若出池索引达到消息池消息数最大值时则重置)(加1),更新消息池中的新消息索引(减1)。
- 若发送线程挂起表中非空,表示存在线程正在进行阻塞发送,此时将其唤醒,进行一次调度切换
- 使能中断
- 消息队列不同于邮箱,单次数据传输上,它支持任意大小的数据传输。
- 基础数据结构
struct rt_messagequeue
{
struct rt_ipc_object parent; /**< 父对象节点 */
void *msg_pool; /**< 消息池 */
rt_uint16_t msg_size; /**< 消息大小 */
rt_uint16_t max_msgs; /**< 最大消息数,即消息池大小 */
rt_uint16_t entry; /**< 消息在消息队列中的起始索引 */
void *msg_queue_head; /**< 消息队列头指针 */
void *msg_queue_tail; /**< 消息队列尾指针 */
void *msg_queue_free; /**< 消息队列空闲位置指针 */
};
typedef struct rt_messagequeue *rt_mq_t;
- 以下主要分析发送及接收函数。
- RT-Thread中消息队列主要提供了rt_mq_send及rt_mq_urgent。区别在于前者将新消息进行尾插,而后者进行的是头插。
- 以下仅贴出rt_mq_send源码:
rt_err_t rt_mq_send(rt_mq_t mq, void *buffer, rt_size_t size)
{
register rt_ubase_t temp;
struct rt_mq_message *msg;
/* parameter check */
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
RT_ASSERT(buffer != RT_NULL);
RT_ASSERT(size != 0);
/* greater than one message size */
if (size > mq->msg_size)
return -RT_ERROR;
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* get a free list, there must be an empty item */
msg = (struct rt_mq_message *)mq->msg_queue_free;
/* message queue is full */
if (msg == RT_NULL)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_EFULL;
}
/* move free list pointer */
mq->msg_queue_free = msg->next;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* the msg is the new tailer of list, the next shall be NULL */
msg->next = RT_NULL;
/* copy buffer */
rt_memcpy(msg + 1, buffer, size);
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* link msg to message queue */
if (mq->msg_queue_tail != RT_NULL)
{
/* if the tail exists, */
((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;
}
/* set new tail */
mq->msg_queue_tail = msg;
/* if the head is empty, set head */
if (mq->msg_queue_head == RT_NULL)
mq->msg_queue_head = msg;
/* increase message entry */
mq->entry ++;
/* resume suspended thread */
if (!rt_list_isempty(&mq->parent.suspend_thread))
{
rt_ipc_list_resume(&(mq->parent.suspend_thread));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
rt_schedule();
return RT_EOK;
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return RT_EOK;
}
接收
- 主要工作同邮箱接收类似,这里的消息接收是从消息队列头进行接收的,保证接收的都是最新消息。
rt_err_t rt_mq_recv(rt_mq_t mq,
void *buffer,
rt_size_t size,
rt_int32_t timeout)
{
struct rt_thread *thread;
register rt_ubase_t temp;
struct rt_mq_message *msg;
rt_uint32_t tick_delta;
/* parameter check */
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
RT_ASSERT(buffer != RT_NULL);
RT_ASSERT(size != 0);
/* initialize delta tick */
tick_delta = 0;
/* get current thread */
thread = rt_thread_self();
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* for non-blocking call */
if (mq->entry == 0 && timeout == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
/* message queue is empty */
while (mq->entry == 0)
{
RT_DEBUG_IN_THREAD_CONTEXT;
/* reset error number in thread */
thread->error = RT_EOK;
/* no waiting, return timeout */
if (timeout == 0)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
thread->error = -RT_ETIMEOUT;
return -RT_ETIMEOUT;
}
/* suspend current thread */
rt_ipc_list_suspend(&(mq->parent.suspend_thread),
thread,
mq->parent.parent.flag);
/* has waiting time, start thread timer */
if (timeout > 0)
{
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* re-schedule */
rt_schedule();
/* recv message */
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* if it's not waiting forever and then re-calculate timeout tick */
if (timeout > 0)
{
tick_delta = rt_tick_get() - tick_delta;
timeout -= tick_delta;
if (timeout < 0)
timeout = 0;
}
}
/* get message from queue */
msg = (struct rt_mq_message *)mq->msg_queue_head;
/* move message queue head */
mq->msg_queue_head = msg->next;
/* reach queue tail, set to NULL */
if (mq->msg_queue_tail == msg)
mq->msg_queue_tail = RT_NULL;
/* decrease message entry */
mq->entry --;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* copy message */
rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* put message to free list */
msg->next = (struct rt_mq_message *)mq->msg_queue_free;
mq->msg_queue_free = msg;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));
return RT_EOK;
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)