C#实现同Active MQ通讯的方法

C#实现同Active MQ通讯的方法,第1张

概述本文实例讲述了C#实现同ActiveMQ通讯方法。分享给大家供大家参考,具体如下:

本文实例讲述了C#实现同Active MQ通讯的方法。分享给大家供大家参考,具体如下:

内容概要:

主要以源码的形式介绍如何用C#实现同Active MQ 的通讯。本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础。

正文:

JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息。
Message 由消息头,属性和消息体三部份组成。
Active MQ支持过滤机制,即生产者可以设置消息的属性(PropertIEs),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的PropertIEs匹配,消息才会发给该消费者。topic和Queue都支持Selector。

示例代码:

using System;using System.Collections.Generic;using System.linq;using System.Text;using System.windows;using System.windows.Controls;using System.windows.Data;using System.windows.documents;using System.windows.input;using System.windows.Media;using System.windows.Media.Imaging;using System.windows.Navigation;using System.windows.Shapes;using Apache.NMS;using System.Diagnostics;using Apache.NMS.Util;using System.windows.Threading;/* * 功能描述:C#使用ActiveMQ示例 * 修改次数:2 * 最后更新: by Kagula,2012-07-31 * * 前提条件: * [1]apache-activemq-5.4.2 * [2]Apache.NMS.ActiveMQ-1.5.6-bin * [3]WinXP SP3 * [4]VS2008 SP1 * [5]WPF工程 With .NET Framework 3.5 * * 启动 * * 不带安全控制方式启动 * [你的解压路径]\apache-activemq-5.4.2\bin\activemq.bat * * 安全方式启动 * 添加环境变量:      ACTIVEMQ_ENCRYPTION_PASSWORD=activemq * [你的解压路径]\apache-activemq-5.4.2\bin>activemq xbean:file:../conf/activemq-security.xml * * Active MQ 管理地址 * http://127.0.0.1:8161/admin/ * 添加访问"http://127.0.0.1:8161/admin/"的限制 * * 第一步:添加访问限制 * 修改D:\apache\apache-activemq-5.4.2\conf\jetty.xml文件 * 下面这行编码,原 * <property name="authenticate" value="true" /> * 修改为 * <property name="authenticate" value="false" /> * * 第二步:修改登录用户名密码,缺省分别为admin,admin * D:\apache\apache-activemq-5.4.2\conf\jetty-realm.propertIEs * * 用户管理(前提:以安全方式启动ActiveMQ) * * 在[你的解压路径]\apache-activemq-5.4.2\conf\credentials.propertIEs文件中修改默认的用户名密码 * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中可以添加新的用户名 * e.g. 添加oa用户,密码同用户名。 * <authenticationUser username="oa" password="oa" groups="users,admins"/> * * 在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中你还可以设置指定的topic或Queue * 只能被哪些用户组read 或 write。 * * * 配置C# with WPF项目 * 项目的[Application]->[TargetFramework]属性设置为[.NETFramework 3.5](这是VS2008WPF工程的默认设置) * 添加[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\lib\Apache.NMS\net-3.5\Apache.NMS.dll的引用 * Apache.NMS.dll相当于接口 * * 如果是以DeBUG方式调试 * 把[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\build\net-3.5\deBUG\目录下的 * Apache.NMS.ActiveMQ.dll文件复制到你项目的DeBUG目录下 * Apache.NMS.ActiveMQ.dll相当于实现 * * 如果是以Release方式调试 * 参考上文,去取Apache.NMS,Release目录下相应的DLL文件,并复制到你项目的Release目录下。 * * * 参考资料 * [1]《C#调用ActiveMQ官方示例》 http://activemq.apache.org/nms/examples.HTML * [2]《ActiveMQ NMS下载地址》http://activemq.apache.org/nms/activemq-downloads.HTML * [3]《Active MQ在C#中的应用示例》https://www.oudahe.com/p/30705/ * [4]《NMS API Reference》http://activemq.apache.org/nms/nms-API.HTML */namespace testActiveMQSubscriber{  /// <summary>  /// Interaction logic for Window1.xaml  /// </summary>  public partial class Window1 : Window  {    private static IConnectionFactory connFac;    private static IConnection connection;    private static ISession session;    private static IDestination destination;    private static IMessageProducer producer;    private static IMessageConsumer consumer;    protected static ITextMessage message = null;    public Window1()    {      InitializeComponent();      initAMQ("MyFirsttopic");    }    private voID initAMQ(String strtopicname)    {      try      {        connFac = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));        //新建连接        //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码        //如果你要持久“订阅”,则需要设置ClIEntID,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息!        connection.ClIEntID = "testing Listener";        connection = connFac.CreateConnection();//如果你是缺省方式启动Active MQ服务,则不需填用户名、密码        //创建Session        session = connection.CreateSession();        //发布/订阅模式,适合一对多的情况        destination = SessionUtil.GetDestination(session,"topic://" + strtopicname);        //新建生产者对象        producer = session.CreateProducer(destination);        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留        //新建消费者对象:普通“订阅”模式        //consumer = session.CreateConsumer(destination);//不需要持久“订阅”        //新建消费者对象:持久"订阅"模式:        //  持久“订阅”后,如果你的程序被停止工作后,恢复运行,        //从第一次持久订阅开始,没收到的消息还可以继续收        consumer = session.CreateDurableConsumer(          session.Gettopic(strtopicname),connection.ClIEntID,null,false);        //设置消息接收事件        consumer.Listener += new MessageListener(OnMessage);        //启动来自Active MQ的消息侦听        connection.Start();      }      catch (Exception e)      {        //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!        DeBUG.Writeline(e.Message);      }    }    private voID SendMsg2topic_Click(object sender,RoutedEventArgs e)    {      //发送消息      ITextMessage request = session.CreateTextMessage(DateTime.Now.TolocalTime()+" "+tbMsg.Text);      producer.Send(request);    }    protected voID OnMessage(IMessage receivedMsg)    {      //接收消息      message = receivedMsg as ITextMessage;      //UI线程,显示收到的消息      dispatcher.Invoke(dispatcherPriority.normal,new Action(() =>      {        DateTime dt = new DateTime();        ListBoxItem lbi = new ListBoxItem();        lbi.Content = DateTime.Now.TolocalTime() + " " + message.Text;        lbR.Items.Add(lbi);      }));    }  }}

队列通讯方式,消费者例子

using System;using System.Collections.Generic;using System.linq;using System.Text;using Apache.NMS;using System.Diagnostics;using log4net;using Apache.NMS.Util;using System.Collections;namespace Cat8637autoCallServer{  public class SMTask  {    public String Callee { get; set; }    public String CheckNumber { get; set; }    public int Deadline { get; set; }    public overrIDe String ToString()    {      return String.Format("Callee={0},CheckNumber={1},Deadline={2}",Callee,CheckNumber,Deadline);    }  }  /*   * 负责接收任务,并把任务放在任务等待队列中。   */  public class MQClIEnt  {    private static Readonly ILog logger = LogManager.GetLogger(typeof(MQClIEnt));    private static IConnection connection = null;    private static ISession session = null;    Queue _voiceSMTasks = new Queue();    public MQClIEnt()    {      try      {        IConnectionFactory factory = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));        //新建连接        //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码        connection = factory.CreateConnection();        session = connection.CreateSession();        IMessageConsumer consumer = session.CreateConsumer(session.GetQueue("TaskIssue_VoiceSM"));        consumer.Listener += new MessageListener(OnMessage);        connection.Start();      }      catch (Exception ex)      {        DeBUG.Writeline(ex.Message);      }    }    protected voID OnMessage(IMessage receivedMsg)    {      IMessage message = receivedMsg as ITextMessage;      SMTask smTask = new SMTask();      smTask.Callee = message.PropertIEs["Callee"] as String;      smTask.CheckNumber = message.PropertIEs["Message"] as String;      smTask.Deadline = Convert.ToInt32(message.PropertIEs["deadline"] as String);      logger.Info("Received: "+smTask.ToString());      lock (_voiceSMTasks)      {        _voiceSMTasks.Enqueue(smTask);      }    }    public SMTask GetVoiceSMTask()    {      SMTask result = null;      lock (_voiceSMTasks)      {        if (_voiceSMTasks.Count > 0)        {          result = _voiceSMTasks.Dequeue() as SMTask;        }      }      return result;    }  }}

队列通讯方式,生产者例子

private voID Send_Click(object sender,RoutedEventArgs e){  try  {    IDestination destination = SessionUtil.GetDestination(session,"queue://TaskIssue_VoiceSM");    //新建生产者对象    IMessageProducer producer = session.CreateProducer(destination);    producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留    ITextMessage request = session.CreateTextMessage();    request.NMSCorrelationID = "TestVoiceSM";//这里我填了应用程序的名称。    request.PropertIEs["Callee"] = tbCallee.Text;    request.PropertIEs["Message"] = tbCheckNumber.Text;    request.PropertIEs["deadline"] = tbValIDDuration.Text;    producer.Send(request);  }  catch (Exception ex)  {    //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!    DeBUG.Writeline(ex.Message);  }}private voID Window_Closed(object sender,EventArgs e){  try  {    if (session == null)      return;    //if (connection == null)    //  return;    session.Close();    //connection.Close();  }  catch (Exception ex)  {    DeBUG.Writeline(ex.Message);  }}

更多关于C#相关内容感兴趣的读者可查看本站专题:《C#窗体 *** 作技巧汇总》、《C#常见控件用法教程》、《WinForm控件用法总结》、《C#程序设计之线程使用技巧总结》、《C# *** 作Excel技巧总结》、《C#中XML文件 *** 作技巧汇总》、《C#数据结构与算法教程》、《C#数组 *** 作技巧总结》及《C#面向对象程序设计入门教程》

希望本文所述对大家C#程序设计有所帮助。

总结

以上是内存溢出为你收集整理的C#实现同Active MQ通讯的方法全部内容,希望文章能够帮你解决C#实现同Active MQ通讯的方法所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1258620.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-07
下一篇 2022-06-07

发表评论

登录后才能评论

评论列表(0条)

保存