其他线程如何正确关闭阻塞的Socket线程

其他线程如何正确关闭阻塞的Socket线程,第1张

UINT DataRecvThreadProc(LPVOID lpParam)
{
C工具Dlg aDlg = (C工具Dlg)lpParam;//获取主对话窗的句柄!
int len;
CString str;
extern struct _TestNumRange TestNumRange;
if (!AfxSocketInit())
{
AfxMessageBox(IDP_SOCKETS_INIT_FAILED);
return 1;
}
int nPort = TestNumRangePortID;;
CSocket aSocket;
CSocket serverSocket;
if (!aSocketSocket())
{
AfxMessageBox(_T("创建套接字错误!"));
return 1;
}
BOOL bOptVal = TRUE;
int bOptLen = sizeof(BOOL);
aSocketSetSockOpt(SO_REUSEADDR, (void )&bOptVal, bOptLen, SOL_SOCKET);
if (!aSocketBind(nPort))
{
AfxMessageBox(_T("绑定端口失败!"));
return 1;
}
if (!aSocketListen(5))
{
AfxMessageBox(_T("监听失败!"));
return 1;
}
while (m_connect)
{
if (!aSocketAccept(serverSocket))
{
continue;
}
else
{
unsigned char szRecvMsg[17412] = { 0 };
while (1)
{
len = serverSocketReceive(szRecvMsg, 17412);
if (!m_connect) break;
if (len == 0 || len == -1)
{
m_connect = false; Recv_Signal = false;
if (Recv_Signal)  break;
serverSocketClose();
aSocketClose();
aDlg->m_pThread = NULL;        //在这里把指针NULL掉会不会出错?
AfxMessageBox(_T("探头掉线!"));
return 0;
}
if (Recv_Signal)
{
global_CriticalSectionLock();
frashfifo = true;
signalFIFOInput(szRecvMsg, len);
global_CriticalSectionUnlock();
}
}
serverSocketClose();
}
}
aSocketClose();
return 0;
}
在界面上有两个按钮,启动按钮就是开启这个工作线程,而结束按钮我是这样写的:
m_connect = false; Recv_Signal = false;
::WaitForSingleObject(m_pThread->m_hThread, 1000); 
m_pThread = NULL;
return;
但是这样关闭的话,有一种情况会出错:
当工作线程阻塞在
len = serverSocketReceive(szRecvMsg, 17412);
的时候,若我按下启动按钮而在这1000ms内Socket没有接收到新的信号,则工作线程无法关闭,而只是把指针NULL了。
当我再开启工作线程时,就会出现两个Socket监听同一个端口的情况,这就会导致严重的后果。。。
因此,我想知道在这样的情况下我要怎么实现UI去控制工作线程的开关。
注:在实际使用中,serverSocketReceive会有几种接收的情况:5ms一个包,一分钟一个包,因此如果用CEvent的话,我不知道应该怎么设计才能保证不掉包的同时能实现对工作线程的开关控制。

一、通讯框架类图
二、框架说明
上图是通讯框架静态类图,其抽象模型是:服务器在指定的IP和端口上进行监听,当收到一个连接请求时就会创建一个连接,然后把这个连接交给一个执行器执行处理该连接,一个连接包含一个或多个会话,每个会话在一个线程上执行,不同的会话间互相不影响,只要客户端不主动关闭连接,服务器就可以在同一连接上处理多个会话。
XServiceHost是服务主机,即监听者,它负责在指定的IP和端口上监听来自客户端的请求,它运行在一个单独的线程中;当收到来自客户端的请求时就调用XconnectionCreateor的create接口创建一个连接(XConnection的派生类,不同的服务派生不同),然后把该connection对象添加到Xconnectionmanager中,xconnectionmanager负责创建一个执行器对象即XExecuter去执行请求,XExecuter对象是一个单独的线程;XServiceHost继续监听接收来自其它客户端的请求,如此往复持续提供服务。
所有的应用服务只需派生XConnectionCreator和XConnection即可,前者负责创建具体的业务connection对象,如果逻辑在XConnection的派生类中处理较为复杂,则在connection中创建不同的XRequest派生类对象去执行具体的业务逻辑。
XHostContext是对服务框架所运行的主机对象的抽象,目前提供了一个退出方法即Exit,该接口在XServiceHost调用监听失败时调用,通知主机退出监听,结束服务。
21、XServiceHost类
1、构造函数
XServiceHost(XHostContext context,XConnectionCreator connectioncreator, string ip, int port)
说明:
Context 入参表示的是调用此方法的上下文,即XServiceHost对象的所有者;
Connectioncreator XConnectionCreator类的对象
Ip 表示要监听的IP
Port 表示要监听的端口;
2、Start() 函数功能是启动对应的监听线程;
3、Stop() 函数功能是停止对应的监听线程;
4、Listen() 表示此监听线程开始执行时要调用的方法;
此函数会调用C#封装好的Plasterer类来进行对指定端口的监听;使用此方法避免直接调用Socket底层的方法,简化代码,降低复杂度;需要使用到的函数AcceptSocket(),使用此函数接受挂起的连接请求;
22、XConnectionCreator接口
XConnectionCreator提供对连接创建的一个约束,不同的服务可以实现不同功能,从而实现封装和抽象,可以不改变XServiceHost的情况下进行良好的运行;
XConnection Create(Socket socket,string clientip)方法
声明创建连接的约定,具体的服务可以继承此接口进行相应功能的开发;并返回对应的连接实例对象;
23、XConnection类
XConnection类为对连接的一个抽象,为基类,具体的服务继承此基类进行对应的 *** 作;
1、XConnection(Socket socket, string clientip) 构造函数;
2、virtual void Process(Command cmd, MemoryStream packet)为抽象函数;具体的子类进行实现;cmd是命令的枚举对象;packet是对应需要 *** 作的数据;
3、public void Execute() 此函数会生成一个XSession类的对象,用来异步的获取各个系统的Socket连接;获取完数据后将数据传送到对应的XConnection实例对象,调用Process(Command cmd, MemoryStream packet)方法对消息进行相应的处理;
24、XconnectionManager类
XconnectionManager是一个使用单例模式设计的连接管理者,
Add(XConnection connection)方法使用XExecuter类的实例对象为每个连接生成一个执行器,该执行器会单独起一个线程对对应的连接进行处理;
25、XExecuter类
XExecuter用来执行请求,XExecuter对象是一个单独的线程;此类的对象会维护一个List<XConnection>队列;
1、public void Accept(XConnection con)方法该方法对外提供为该队列增加连接的功能;
2、拥有的独立线程会挂起,直到队列中出现连接,此时会唤起该线程进行该线程,并调用获取的连接对象的Execute()方法进行处理;
26、XSession类
XSession类使用Socket类的BeginReceive和EndReceive方法异步获取连接发送的数据,获取数据后使用XConnection类的Parse方法将数据传回连接对象,从而对数据进行相应的处理;
27、XRequest类
XRequest类主要负责不同消息的处理,此为一个基类,具体的响应需要子类完成,来负责收到不同消息的处理;
三、获取数据后的界面处理
对应界面各个模块来说,都相当于是一个单例,所以可以使用单例模式的方式对各个模块的控件进行管理,如Beam模块,可以抽象一个管理者来负责对其的控制;此单例负责所有针对此模块的逻辑;
目前有两种方案来进行界面响应:AsyncOperation和BackgroundWorker
31 AsyncOperation
AsyncOperation类通过回调函数可以将子线程处理完的数据传到UI线程,从而进行对应的UI *** 作;可以使用Post 函数来切换子线程到UI线程
具体实现:
1、当具体的消息传到具体的XRequest子类的实体对象后,XRequest子类对象可以调用对应消息的单例,在对应的单例里声明了对应的委托,并进行声明如:
public delegate void PushMessage(object msg);
PushMessagem_RouteMapSaved;
2、并且会提供具体的注册和取消注册的函数,在界面生成的时候就会调用注册函数进行注册,具体的注册和取消注册的函数如下:
public void BeamChangedMsg(PushMessage callback)
{
m_BeamChanged += callback;
if (null == m_Messager)
{
m_Messager = AsyncOperationManagerCreateOperation(null);
}
}
public void UnBeamChangedMsg(PushMessage callback)
{
m_BeamChanged -= callback;
}
注:m_BeamChanged为声明的PushMessage 委托对象;m_Messager为AsyncOperation类的对象。
1、进行完上述的步骤之后,就可以再具体的位置调用AsyncOperation类的 Post函数进行线程上下文切换,进入到UI线程进行对应的界面刷新,如:
m_MessagerPost(newSendOrPostCallback(m_BeamChanged), parameters);
2、当退出界面的时候可以调用UnBeamChangedMsg函数进行取消注册,避免发生异常;
32、BackgroundWorker类
BackgroundWorker类其实是对AsyncOperation类的使用的一次封装,更加简洁方便的使用异步回调的方式进行多线程编程,主要涉及到三个事件:
public event DoWorkEventHandlerDoWork; //实现开辟子线程完成耗时 *** 作
public event ProgressChangedEventHandlerProgressChanged;//实现返回 *** 作进度或自定义参数
public event RunWorkerCompletedEventHandlerRunWorkerCompleted;//完成后返回数据的处理;
需要注意的是DoWork事件是在子线程完成,具体函数不能包含对UI控件的 *** 作;
具体的UI *** 作可以放入到ProgressChanged和RunWorkerCompleted事件中完成。
具体实现:
与使用AsyncOperation基本一致:
首先在对应的单例中声明BackgroundWorker,在创建UI的时候对BackgroundWorker对象进行各个事件的注册,最后使用RunWorkerAsync方法进行调用;
四、通信工具
通信工具类
该工具类提供网络通信的两个接口,一个是只发送请求的SendRequest函数,另一个是先发送请求然后等待服务端返回结果的GetResponse函数;SendRequestByLongConnection用于建立长连接的通讯,ReleaseConnection函数用来释放连接;

socket_client
---------------------------------------------------------
using System;
using SystemCollectionsGeneric;
using SystemComponentModel;
using SystemData;
using SystemDrawing;
using SystemText;
using SystemWindowsForms;
using SystemNet;
using SystemNetSockets;
using SystemThreading;
namespace socket_client
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
IPEndPoint end;
Socket s;
Thread td;
private void button1_Click(object sender, EventArgs e)
{
try
{
int port = Int32Parse(textBox2TextSubstring(textBox2TextLastIndexOf(":") + 1));
IPAddress ip = IPAddressParse(textBox1TextSubstring(textBox1TextLastIndexOf(":") + 1));
end = new IPEndPoint(ip, port);

}
catch
{
MessageBoxShow("输入有误!!");
textBox1Text = "IP:";
textBox2Text = "端口:";
return;
}
try
{
s = new Socket(AddressFamilyInterNetwork, SocketTypeStream, ProtocolTypeTcp);
sConnect(end);
label1Text = "成功联接上服务器 " + textBox1Text + ":" + textBox2Text;
td = new Thread(new ThreadStart(bb));
tdIsBackground = true;
tdStart();
}
catch
{
label1Text = "联接失败服务器!! ";
}

}
void bb()
{
while (true)
{

byte[] bb = new byte[1024];
int i= sReceive(bb); //接收数据,返回每次接收的字节总数
string removeMsg = EncodingUnicodeGetString(bb,0,i);

if (removeMsg == "cmd---exit")//收到的是退出通知
{
richTextBox1Text = "";
label1Text = "无连接";
DialogResult re=MessageBoxShow("服务器已经关闭\n\"确定\"后退出程序,\n\"取消\"继续停留!", "消息提示:", MessageBoxButtonsOKCancel, MessageBoxIconWarning);
MessageBoxShow(reToString());
if (re == DialogResultOK)
{
sendExit();//告诉服务器我退出了
ApplicationExit();
}
return;
}
richTextBox1AppendText(removeMsg) ;
richTextBox1ScrollToCaret();
}
}
private void button2_Click(object sender, EventArgs e)
{
string msg = "客户端说:" + richTextBox2Text+"\n";
richTextBox1AppendText(msg);
byte[] by = EncodingUnicodeGetBytes(msg);
sSend(by);
richTextBox2Text = "";
richTextBox2Focus();
richTextBox1ScrollToCaret();

}
void sendExit()
{
string msg = "cmd---exit";
byte[] by = EncodingUnicodeGetBytes(msg);
sSend(by);
}
}
}
socket_server
-----------------------------------------------------------------
using System;
using SystemCollectionsGeneric;
using SystemComponentModel;
using SystemData;
using SystemDrawing;
using SystemText;
using SystemWindowsForms;
using SystemNet;
using SystemNetSockets;
using SystemThreading;
namespace socket_server
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
label1Text = "监听端口" + DnsGetHostByName(DnsGetHostName())AddressList[0] + ":" + Int32Parse(textBox1TextSubstring(textBox1TextLastIndexOf(":") + 1)) + "中";
Thread td = new Thread(new ThreadStart(aa));
tdStart();
if (button1Text == "开始监听")
{
button1Text = "停止监听";

return;
}
else
{
sendExit();
ssShutdown(SocketShutdownBoth);
sClose();
tdAbort();
label1Text = "停止监听!";
richTextBox1Text = "";
button1Text = "开始监听";

}
}
void sendExit()
{
string msg = "cmd---exit";
byte[] by = EncodingUnicodeGetBytes(msg);
ssSend(by);
}
Socket s;
Socket ss;
void aa()
{

int port = Int32Parse(textBox1TextSubstring(textBox1TextLastIndexOf(":") + 1));

IPEndPoint end = new IPEndPoint(DnsGetHostByName(DnsGetHostName())AddressList[0], port);
s = new Socket(AddressFamilyInterNetwork, SocketTypeStream, ProtocolTypeTcp);
sBind(end);
sListen(5);
ss=sAccept();
if (ssConnected)
{
label1Text = "有客户端联接上端口:"+textBox1Text;
while (true)
{
byte[] by = new byte[1024];
int i = ssReceive(by);
string msg = EncodingUnicodeGetString(by, 0, i);
if (msg == "cmd---exit")
{
label1Text = "客户端已经取消连接!";
return;
}
richTextBox1AppendText(msg);
richTextBox1ScrollToCaret();
}
}

}
private void button2_Click(object sender, EventArgs e)
{
string m = "服务器说:" + richTextBox2Text + "\n";
richTextBox1AppendText(m);
byte[] by = EncodingUnicodeGetBytes(m);
ssSend(by);
richTextBox2Text = "";
richTextBox2Focus();
richTextBox1ScrollToCaret();
}
private void button3_Click(object sender, EventArgs e)
{
string w="123455";
MessageBoxShow(wSubstring(0));
}
}
}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/10540154.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-09
下一篇 2023-05-09

发表评论

登录后才能评论

评论列表(0条)

保存