Analysis of Application Example of ActiveMQ in C

  • 2021-11-01 04:20:32
  • OfStack

This paper describes the application of ActiveMQ in C # with examples. Share it for your reference, as follows:

ActiveMQ is a good thing, needless to say. ActiveMQ provides multiple language support, such as Java, C, C + +, C #, Ruby, Perl, Python, PHP, etc. Since I develop GUI under windows, I care more about C + + and C #, among which ActiveMQ of C # is very simple, and Apache provides NMS (. Net Messaging Service) support. Net development, which only needs the following steps to establish a simple implementation. The application of C + + is relatively troublesome, which will be introduced in the following article.

1. Download the latest version of ActiveMQ from the official website of ActiveMQ at http://activemq.apache.org/download.html. I played 5.3. 1 before, and 5.3. 2 has come out now.

2. Go to the official website of ActiveMQ to download the latest version of Apache.NMS, website: http://activemq.apache.org/nms/download.html. You need to download two bin packages, Apache.NMS and Apache.NMS.ActiveMQ. If you are interested in the source code, you can also download src package. Here, I want to remind you that if you download version 1.2. 0 of NMS. ActiveMQ, Apache. NMS. ActiveMQ has an bug in actual use, that is, when you stop ActiveMQ application, you will throw WaitOne function exception. Looking at the source code in src package, it is found that it is caused by the following code in Apache. NMS. ActiveMQ-1. 2.0-src\ src\ main\ csharp\ Transport\ InactivityMonitor. The latest version 1.3. 0 has fixed this bug, so download the latest version.


private void StopMonitorThreads()
{
  lock(monitor)
  {
    if(monitorStarted.CompareAndSet(true, false))
    {
      AutoResetEvent shutdownEvent = new AutoResetEvent(false);
      // Attempt to wait for the Timers to shutdown, but don't wait
      // forever, if they don't shutdown after two seconds, just quit.
      this.readCheckTimer.Dispose(shutdownEvent);
      shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
      this.writeCheckTimer.Dispose(shutdownEvent);
      shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
      //WaitOne Definition of: public virtual bool WaitOne(TimeSpan timeout,bool exitContext)
      this.asyncTasks.Shutdown();
      this.asyncTasks = null;
      this.asyncWriteTask = null;
      this.asyncErrorTask = null;
    }
  }
}

3. Run ActiveMQ, find the bin folder after ActiveMQ decompression:...\ apache-activemq-5. 3.1\ bin, and execute activemq. bat batch file to start ActiveMQ server. The default port is 61616, which can be modified in the configuration file.

4. Write C # program to realize the simple application of ActiveMQ. New C # project (1 Producter project and 1 Consumer project), WinForm or Console program can be built. The Console project is built here, and references to Apache. NMS. dll and Apache. NMS. ActiveMQ. dll are added, and then the implementation code can be written. Simple Producer and Consumer implementation codes are as follows:

producer:


using System;
using System.Collections.Generic;
using System.Text;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System.IO;
using System.Xml.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
namespace Publish
{
  class Program
  {
    static void Main(string[] args)
    {
      try
      {
        //Create the Connection Factory
        IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
        using (IConnection connection = factory.CreateConnection())
        {
          //Create the Session
          using (ISession session = connection.CreateSession())
          {
            //Create the Producer for the topic/queue
            IMessageProducer prod = session.CreateProducer(
              new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
            //Send Messages
            int i = 0;
            while (!Console.KeyAvailable)
            {
              ITextMessage msg = prod.CreateTextMessage();
              msg.Text = i.ToString();
              Console.WriteLine("Sending: " + i.ToString());
              prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
              System.Threading.Thread.Sleep(5000);
              i++;
            }
          }
        }
        Console.ReadLine();
      }
      catch (System.Exception e)
      {
        Console.WriteLine("{0}",e.Message);
        Console.ReadLine();
      }
    }
  }
}

consumer:


using System;
using System.Collections.Generic;
using System.Text;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System.IO;
using System.Xml.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
namespace Subscribe
{
  class Program
  {
    static void Main(string[] args)
    {
      try
      {
        //Create the Connection factory
        IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
        //Create the connection
        using (IConnection connection = factory.CreateConnection())
        {
          connection.ClientId = "testing listener";
          connection.Start();
          //Create the Session
          using (ISession session = connection.CreateSession())
          {
            //Create the Consumer
            IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);
            consumer.Listener += new MessageListener(consumer_Listener);
            Console.ReadLine();
          }
          connection.Stop();
          connection.Close();
        }
      }
      catch (System.Exception e)
      {
        Console.WriteLine(e.Message);
      }
    }
    static void consumer_Listener(IMessage message)
    {
      try
      {
        ITextMessage msg = (ITextMessage)message;
        Console.WriteLine("Receive: " + msg.Text);
      }
      catch (System.Exception e)
      {
        Console.WriteLine(e.Message);
      }
    }
  }
}

Program functions: Producer producer establishes a topic named testing, and sends messages to the topic every 5 seconds, consumer consumer subscribes to the topic of testing, so as long as the producer sends the message of testing topic to ActiveMQ server, the server sends the message to the consumer who subscribes to the topic of testing.

Compile and generate producer. exe and consumer. exe, and execute two exe to see the message sent and received.

This example is a built theme (Topic), and ActiveMQ also supports another way: Queue, that is, P2P. What is the difference between the two? The difference is that Topic is broadcast, that is, if an Topic is subscribed by multiple consumers, as long as a message arrives at the server, the server will send the message to all consumers; While Queue is point-to-point, that is, one message can only be sent to one consumer. If an Queue is subscribed by multiple consumers, messages will be sent to different consumers one by one in turn without special circumstances, such as:

msg1-- > consumer A

msg2-- > consumer B

msg3-- > consumer C

msg4-- > consumer A

msg5-- > consumer B

msg6-- > consumer C

The special case means that ActiveMQ supports filtering mechanism, that is, the producer can set the attribute of the message (Properties), which corresponds to the Selector of the consumer. Only when the selector set by the consumer matches the Properties of the message will the message be sent to the consumer. Both Topic and Queue support Selector.

How do I set up Properties and Selector? Look at the following code:

producer:


ITextMessage msg = prod.CreateTextMessage();
msg.Text = i.ToString();
msg.Properties.SetString("myFilter", "test1");
Console.WriteLine("Sending: " + i.ToString());
prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

consumer:


// Generate consumer Setting by parameter when Selector
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");

More readers interested in C # can check the topic of this site: "C # Form Operation Skills Summary", "C # Common Control Usage Tutorial", "WinForm Control Usage Summary", "C # Programming Thread Use Skills Summary", "C # Operating Excel Skills Summary", "C # XML File Operation Skills Summary", "C # Data Structure and Algorithm Tutorial", "C # Array Operation Skills Summary" and "C # Object-Oriented Programming Introduction Tutorial"

I hope this article is helpful to everyone's C # programming.


Related articles: