一、c++跨线程间的“击鼓传花”-事务触发
线程间事务机制本质上就是线程 *** 作共享资源,最简单的实现,例如利用一个全局变量+全局互斥锁,各个线程就可以简单实现相互消息传递。
在实际应用中,引入事件对象,本质就是将这些共享资源包装成一个事件对象,在调用的过程中,所有线程都可以在一个等待函数中指定事件对象句柄。当指定的事务对象的状态被置为有信号状态时,事务对象等待函数将返回,线程将获得通知而被触发。
在win下,Windows API函数CreateEvent、WaitForSingleObject、SetEvent、ResetEvent、CloseHandle等实现事件对象的创建、设置、取值、重设、删除等 *** 作,其内部使用了线程锁进行对象管理,实现跨线程事务相互调用。
在 linux下,没有提供现成API函数调用,但基于类似实现原理,采用线程互斥锁对象(ptread_mutex_t),通过其相关的pthread_mutex_init、pthread_mutex_lock、pthread_mutex_unlock、pthread_cond_wait、pthread_mutex_destroy等配套函数,实现实现跨线程间事务相互调用。
二、线程事务对象c++代码设计
工程项目如下,测试层逻辑简单设计,创建一个event_handle,将其传递给线程对象,并进行事件等待命令,在主线程进行命令输入来触发事件信号,线程对象获得事件返回来执行后续 *** 作。
event_test
bin
build_win
build_linux
src
event.h
event.cpp
myThread.h
myThread.cpp
win32Thread.h
win32Thread.cpp
testThread.h
testThrad.cpp
main.cpp
CMakeLists.txt
event.h:
#ifndef _HIK_EVENT_H_
#define _HIK_EVENT_H_
#ifdef WIN32
#include
#define event_handle HANDLE
#else
#include
typedef struct
{
bool state;
bool manual_reset;
pthread_mutex_t mutex;
pthread_cond_t cond;
}event_t;
#define event_handle event_t*
#endif
//返回值:NULL 出错
event_handle event_create(bool manual_reset, bool init_state);
//返回值:0 等到事件,-1出错,句柄必须被触发信号后,函数才会返回
int event_wait(event_handle hevent);
//返回值:0 等到事件,1 超时,-1出错,建议比event_wait优先使用
int event_timedwait(event_handle hevent, long milliseconds);
//返回值:0 成功,-1出错
int event_set(event_handle hevent);
//返回值:0 成功,-1出错
int event_reset(event_handle hevent);
//返回值:无
void event_destroy(event_handle hevent);
#endif
event.cpp
#include "event.h"
#ifdef __linux
#include
#include
#endif
#include
event_handle event_create(bool manual_reset, bool init_state)
{
#ifdef WIN32
HANDLE hevent = CreateEvent(NULL, manual_reset, init_state, NULL);
#else
event_handle hevent = new(std::nothrow) event_t;
if (hevent == NULL)
{
return NULL;
}
hevent->state = init_state;
hevent->manual_reset = manual_reset;
if (pthread_mutex_init(&hevent->mutex, NULL))
{
delete hevent;
return NULL;
}
if (pthread_cond_init(&hevent->cond, NULL))
{
pthread_mutex_destroy(&hevent->mutex);
delete hevent;
return NULL;
}
#endif
return hevent;
}
int event_wait(event_handle hevent)
{
#ifdef WIN32
DWORD ret = WaitForSingleObject(hevent, INFINITE);
if (ret == WAIT_OBJECT_0)
{
return 0;
}
return -1;
#else
if (pthread_mutex_lock(&hevent->mutex))
{
return -1;
}
while (!hevent->state)
{
if (pthread_cond_wait(&hevent->cond, &hevent->mutex))
{
pthread_mutex_unlock(&hevent->mutex);
return -1;
}
}
if (!hevent->manual_reset)
{
hevent->state = false;
}
if (pthread_mutex_unlock(&hevent->mutex))
{
return -1;
}
return 0;
#endif
}
int event_timedwait(event_handle hevent, long milliseconds)
{
#ifdef WIN32
DWORD ret = WaitForSingleObject(hevent, milliseconds);
if (ret == WAIT_OBJECT_0)
{
return 0;
}
if (ret == WAIT_TIMEOUT)
{
return 1;
}
return -1;
#else
int rc = 0;
struct timespec abstime;
struct timeval tv;
gettimeofday(&tv, NULL);
abstime.tv_sec = tv.tv_sec + milliseconds / 1000;
abstime.tv_nsec = tv.tv_usec*1000 + (milliseconds % 1000)*1000000;
if (abstime.tv_nsec >= 1000000000)
{
abstime.tv_nsec -= 1000000000;
abstime.tv_sec++;
}
if (pthread_mutex_lock(&hevent->mutex) != 0)
{
return -1;
}
while (!hevent->state)
{
if ((rc = pthread_cond_timedwait(&hevent->cond, &hevent->mutex, &abstime)))
{
if (rc == ETIMEDOUT) break;
pthread_mutex_unlock(&hevent->mutex);
return -1;
}
}
if (rc == 0 && !hevent->manual_reset)
{
hevent->state = false;
}
if (pthread_mutex_unlock(&hevent->mutex) != 0)
{
return -1;
}
if (rc == ETIMEDOUT)
{
//timeout return 1
return 1;
}
//wait event success return 0
return 0;
#endif
}
int event_set(event_handle hevent)
{
#ifdef WIN32
return !SetEvent(hevent);
#else
if (pthread_mutex_lock(&hevent->mutex) != 0)
{
return -1;
}
hevent->state = true;
if (hevent->manual_reset)
{
if(pthread_cond_broadcast(&hevent->cond))
{
return -1;
}
}
else
{
if(pthread_cond_signal(&hevent->cond))
{
return -1;
}
}
if (pthread_mutex_unlock(&hevent->mutex) != 0)
{
return -1;
}
return 0;
#endif
}
int event_reset(event_handle hevent)
{
#ifdef WIN32
//ResetEvent 返回非零表示成功
if (ResetEvent(hevent))
{
return 0;
}
return -1;
#else
if (pthread_mutex_lock(&hevent->mutex) != 0)
{
return -1;
}
hevent->state = false;
if (pthread_mutex_unlock(&hevent->mutex) != 0)
{
return -1;
}
return 0;
#endif
}
void event_destroy(event_handle hevent)
{
if(hevent){
#ifdef WIN32
CloseHandle(hevent);
#else
pthread_cond_destroy(&hevent->cond);
pthread_mutex_destroy(&hevent->mutex);
delete hevent;
#endif
}
}
win32Thread.h,win下父线程类实现
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef WIN32THREAD_H
#define WIN32THREAD_H
#include
#include
typedef void *HANDLE;
class MyThread
{
public:
MyThread();
~MyThread();
void start();
virtual int Run();
HANDLE getThread();
private:
HANDLE hThread;
static void agent(void *p);
};
#endif
win32Thread.cpp
#include "win32Thread.h"
#include
MyThread::MyThread()
{
}
MyThread::~MyThread()
{
WaitForSingleObject(hThread, INFINITE);
}
void MyThread::start()
{
hThread =(HANDLE)_beginthread(agent, 0, (void *)this);
}
int MyThread::Run()
{
printf("Base Thread\n");
return 0;
}
void MyThread::agent(void *p)
{
MyThread *agt = (MyThread *)p;
agt->Run();
}
HANDLE MyThread::getThread()
{
return hThread;
}
myThread.h,Linux下父线程类实现
/*
* add arg in linux system and compile: -lpthread
*/
#ifndef _MYTHREAD_H
#define _MYTHREAD_H
#include
#include
class MyThread
{
private:
//current thread ID
pthread_t tid;
//thread status
int threadStatus;
//get manner pointer of execution
static void* run0(void* pVoid);
//manner of execution inside
void* run1();
public:
//threadStatus-new create
static const int THREAD_STATUS_NEW = 0;
//threadStatus-running
static const int THREAD_STATUS_RUNNING = 1;
//threadStatus-end
static const int THREAD_STATUS_EXIT = -1;
// constructed function
MyThread();
~MyThread();
//the entity for thread running
virtual int Run()=0;
//start thread
bool start();
//gte thread ID
pthread_t getThreadID();
//get thread status
int getState();
//wait for thread end
void join();
//wait for thread end in limit time
void join(unsigned long millisTime);
};
#endif /* _MYTHREAD_H */
myThread.cpp
#include "myThread.h"
#include
void* MyThread::run0(void* pVoid)
{
MyThread* p = (MyThread*) pVoid;
p->run1();
return p;
}
void* MyThread::run1()
{
threadStatus = THREAD_STATUS_RUNNING;
tid = pthread_self();
Run();
threadStatus = THREAD_STATUS_EXIT;
tid = 0;
pthread_exit(NULL);
}
MyThread::MyThread()
{
tid = 0;
threadStatus = THREAD_STATUS_NEW;
}
MyThread::~MyThread()
{
join(10);
}
int MyThread::Run()
{
while(true){
printf("thread is running!\n");
sleep(100);
}
return 0;
}
bool MyThread::start()
{
return pthread_create(&tid, NULL, run0, this) == 0;
}
pthread_t MyThread::getThreadID()
{
return tid;
}
int MyThread::getState()
{
return threadStatus;
}
void MyThread::join()
{
if (tid > 0)
{
pthread_join(tid, NULL);
}
}
void MyThread::join(unsigned long millisTime)
{
if (tid == 0)
{
return;
}
if (millisTime >0)
{
unsigned long k = 0;
while (threadStatus != THREAD_STATUS_EXIT && k <= millisTime)
{
usleep(100);
k++;
}
}
join();
}
testThread.h,基于自定义win父线程类或linux父线程类为基类,实现等待事件响应输出
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#ifndef TIME_UP_INFO_H
#define TIME_UP_INFO_H
/*
测试线程,测试事件类跨线程通信机制.
*/
#ifdef WIN32
#include "win32Thread.h"
#endif
#ifdef linux
#include "myThread.h"
#endif
#include "event.h"
class TestThread : public MyThread
{
public:
TestThread(event_handle handle_);
~TestThread();
int Run();
private:
bool running;
event_handle handle_test;
};
#endif
testThread.cpp
#include "testThread.h"
#include
#ifdef WIN32
#include
#endif // WIN32
#ifdef linux
#include
#endif
TestThread::TestThread(event_handle handle_)
: running(true)
, handle_test(handle_)
{
};
TestThread::~TestThread()
{
running = false;
};
int TestThread::Run()
{
int i=0,j=0;
while (running)
{
//超时等待及持续等待测试
if(0==event_timedwait(handle_test,10000))
//if(0==event_wait(handle_test))
{
printf("event_wait success %d \n",++i);
}else{
printf("event_wait failed %d \n",++j);
}
#ifdef WIN32
Sleep(10);
#else
usleep(10000);
#endif
}
return 0;
};
main.cpp,创建一个事件对象,并传给测试线程类创建一个线程对象,在主线程捕获用户输入来触发事件设置,测试线程对象是否顺便获得事务通知,实现线程间消息传递。
#ifdef WIN32
#include
#endif
#ifdef linux
#include
#endif
#include "event.h"
#include "testThread.h"
int main(int argc, char* argv[])
{
event_handle handle_ = event_create(false,true);
//event_handle handle_ = event_create(true,true);
if(NULL==handle_)
{
printf("event_create failed!\n");
return 0;
}
TestThread th(handle_);
th.start();
bool bExit = false;
char ch = '0';
int i=1, j=0;
while(!bExit)
{
ch = getchar();
switch(ch)
{
case 'q':
bExit = true;
break;
case 'w':
if(0==event_set(handle_))
printf("event_set success %d \n",++i);
break;
case 's':
if(0==event_reset(handle_))
printf("event_reset success %d \n",++j);
break;
default:
break;
}
}
event_destroy(handle_);
printf("test exit!");
return 0;
}
三、代码编译及测试
win下采用cmake+vs2010,linux下采用cmake+gcc编译,CMakeLists.txt
# CMake 最低版本号要求
cmake_minimum_required (VERSION 2.8)
# 项目信息
project (event_test)
#
if(WIN32)
message(STATUS "windows compiling...")
add_definitions(-D_PLATFORM_IS_WINDOWS_)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /MT")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /MTd")
set(WIN_OS true)
else(WIN32)
message(STATUS "linux compiling...")
add_definitions( -D_PLATFORM_IS_LINUX_)
add_definitions("-Wno-invalid-source-encoding")
# add_definitions("-O2")
set(UNIX_OS true)
set(_DEBUG true)
endif(WIN32)
#
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
# 指定源文件的目录,并将名称保存到变量
SET(source_h
#
${PROJECT_SOURCE_DIR}/src/event.h
${PROJECT_SOURCE_DIR}/src/testThread.h
)
SET(source_cpp
#
${PROJECT_SOURCE_DIR}/src/event.cpp
${PROJECT_SOURCE_DIR}/src/testThread.cpp
${PROJECT_SOURCE_DIR}/src/main.cpp
)
#头文件目录
#include_directories(${PROJECT_SOURCE_DIR}/include)
if (${UNIX_OS})
SET(source_h_linux
${PROJECT_SOURCE_DIR}/src/myThread.h
)
SET(source_cpp_linux
${PROJECT_SOURCE_DIR}/src/myThread.cpp
)
add_definitions(
"-W"
"-fPIC"
"-Wall"
# "-Wall -g"
"-Werror"
"-Wshadow"
"-Wformat"
"-Wpointer-arith"
"-D_REENTRANT"
"-D_USE_FAST_MACRO"
"-Wno-long-long"
"-Wuninitialized"
"-D_POSIX_PTHREAD_SEMANTICS"
"-DACL_PREPARE_COMPILE"
"-Wno-unused-parameter"
"-fexceptions"
)
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
link_directories()
# 指定生成目标
add_executable(event_test ${source_h} ${source_cpp} ${source_h_linux} ${source_cpp_linux})
#link
target_link_libraries(event_test
-lpthread -pthread -lz -lrt -ldl
)
endif(${UNIX_OS})
if (${WIN_OS})
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4819")
SET(source_h_win
${PROJECT_SOURCE_DIR}/src/win32Thread.h
)
SET(source_cpp_win
${PROJECT_SOURCE_DIR}/src/win32Thread.cpp
)
add_definitions(
"-D_CRT_SECURE_NO_WARNINGS"
"-D_WINSOCK_DEPRECATED_NO_WARNINGS"
"-DNO_WARN_MBCS_MFC_DEPRECATION"
"-DWIN32_LEAN_AND_MEAN"
)
link_directories()
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG ${PROJECT_SOURCE_DIR}/bin)
# 指定生成目标
add_executable(event_testd ${source_h} ${source_cpp} ${source_h_win} ${source_cpp_win})
else(CMAKE_BUILD_TYPE)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELEASE ${PROJECT_SOURCE_DIR}/bin)
# 指定生成目标
add_executable(event_test ${source_h} ${source_cpp} ${source_cpp_win})
endif (CMAKE_BUILD_TYPE)
endif(${WIN_OS})
编译命令如下
进入event_test目录
win:
mkdir build_win
cmake -G "Visual Studio 10 2010 Win64" -DCMAKE_BUILD_TYPE=Release ..
msbuild event_test.sln /p:Configuration="Release" /p:Platform="x64"
Linux:
mkdir build_linux
cmake ..
make
省略编译过程,以下是输出程序运行效果,左边是linux程序:
四、补充
完整的示例代码已经上传CSDN:
c++代码如何实现在win/linux下跨线程间事务触发实现完整示例代码-C++文档类资源-CSDN下载
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)