MSMQ 3.0 中的病毒消息处理

本示例演示如何在服务中执行病毒消息处理。本示例基于已经过事务处理的 MSMQ 绑定示例。其中使用到了 netMsmqBinding。该服务是自承载控制台应用程序,通过它可以观察服务接收排队消息。

在排队通信中,客户端使用队列与服务进行通信。更确切地说,客户端向队列发送消息。服务从队列接收消息。因此不必同时运行服务和客户端便可使用队列进行通信。

病毒消息是一类当服务读取消息时不能对消息进行处理,并因此终止从中读取消息的事务时从队列重复读取的消息。在这种情况下,将再次重试消息。如果消息出现问题,这种情况在理论上将永远继续下去。注意,这种情况仅在您使用事务从队列中读取消息并调用服务操作时发生。

根据 MSMQ 版本,NetMsmqBinding 支持对病毒消息进行有限检测和完全检测。在已经将消息检测为病毒后,可以通过多种方式对消息进行处理。

本示例演示 Windows Server 2003 和 Windows XP 平台上提供的有限病毒功能和 Windows Vista 上提供的完全病毒功能。这两个示例的目的是将病毒消息从一个队列移出到另一个队列,病毒消息服务随后即可为后者提供服务。

MSMQ v3.0(Windows Server 2003 和 Windows XP)病毒处理示例

MSMQ 3.0 版不提供病毒子队列支持。因此,在此示例中,我们创建一个用于保存病毒消息的队列。这是 MSMQ 3.0 版中处理病毒消息以阻止数据丢失的首选方法。

Windows Server 2003 和 Windows XP 中的病毒消息检测限制为配置单个属性 ReceiveRetryCountReceiveRetryCount 是可以从队列中重复读取消息的次数。Windows Communication Foundation (WCF) 在内存中保存此计数。达到该计数后(即,消息已中毒),我们立即转换到如何处理病毒消息。绑定上的枚举 ReceiveErrorHandling 属性提供它可以采用的可能值。枚举值为:

  • Fault(默认值):将侦听器和服务主机视为出现故障。
  • Drop:将删除消息。
  • Move:将消息移到病毒消息子队列中。此值仅在 Windows Vista 中可用。
  • Reject:要通过将消息发送回发送方死信队列来拒绝消息。此值仅在 Windows Vista 中可用。

当使用 MSMQ 3.0 版时,在这些值中仅 Fault 和 Drop 是有效值。本示例演示如何使用 MSMQ 3.0 版处理病毒消息。本示例也可以运行在 MSMQ 4.0 版中,运行方式与使用 MSMQ 3.0 版时完全一样。

服务协定是 IOrderProcessor,它定义了适合与队列一起使用的单向服务。

[ServiceContract(Namespace="http://Microsoft.ServiceModel.Samples")]
public interface IOrderProcessor
{
    [OperationContract(IsOneWay = true)]
    void SubmitPurchaseOrder(PurchaseOrder po);
}

该服务操作显示一条指示它正在处理订单的消息。若要演示病毒消息功能,SubmitPurchaseOrder 服务操作将引发异常,用于回滚服务的某些随机调用上的事务。这将导致消息被放回队列中。并最终将消息标记为病毒。该配置在病毒消息上设置为 fault。一旦消息中毒,队列通道将引发 MsmqPoisonMessageException 并将 ServiceHost 视为故障,其中该异常包含病毒消息的 MessageLookupId。错误处理程序附加到此异常的监视器上并采取相应的措施。为了添加错误处理程序,我们将创建 PoisonErrorBehaviorAttribute 并使用此行为对 OrderProcessorService 进行批注。

    // Service class that implements the service contract.
    // Added code to write output to the console window.
    [PoisonErrorBehavior]
    public class OrderProcessorService : IOrderProcessor
    {
        static Random r = new Random(137);
        public static string QueueName;
        public static string PoisonQueueName;

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {

            int randomNumber = r.Next(10);

            if (randomNumber % 2 == 0)
            {
                Orders.Add(po);
                Console.WriteLine("Processing {0} ", po);
            }
            else
            {
                Console.WriteLine("Aborting transaction, cannot process purchase order: " + po.PONumber);
                Console.WriteLine();
                throw new Exception("Cannot process purchase order: " + po.PONumber);
            }
        }

        public static void StartThreadProc(object stateInfo)
        {
            StartService();
        }

        public static void StartService()
        {
            // Get MSMQ queue name from app settings in configuration
            QueueName = ConfigurationManager.AppSettings["queueName"];

            // Get MSMQ queue name for the final poison queue
            PoisonQueueName = ConfigurationManager.AppSettings["poisonQueueName"];

            // Create the transacted MSMQ queue if necessary.
            if (!System.Messaging.MessageQueue.Exists(QueueName))
                System.Messaging.MessageQueue.Create(QueueName, true);

            // Create the transacted poison message MSMQ queue if necessary.
            if (!System.Messaging.MessageQueue.Exists(PoisonQueueName))
                System.Messaging.MessageQueue.Create(PoisonQueueName, true);


            // Get the base address that is used to listen for WS-MetaDataExchange requests
            // This is useful to generate a proxy for the client
            string baseAddress = ConfigurationManager.AppSettings["baseAddress"];

            // Create a ServiceHost for the OrderProcessorService type.
            ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService), new Uri(baseAddress));

            // Hook on to the service host faulted events
            serviceHost.Faulted += new EventHandler(OnServiceFaulted);

            // Open the ServiceHostBase to create listeners and start listening for messages.
            serviceHost.Open();
            
        }


        public static void OnServiceFaulted(object sender, EventArgs e) 
        {
            Console.WriteLine("Service Faulted");
        }
        

        // Host the service within this EXE console application.
        public static void Main()
        {

            OrderProcessorService.StartService();

            // The service can now be accessed.
            Console.WriteLine("The service is ready.");
            Console.WriteLine("Press <ENTER> to stop the service");
            Console.ReadLine();
            Console.WriteLine("Exiting service");

        }
    }

PoisonErrorBehaviorAttribute 安装 PoisonErrorHandlerPoisonErrorHandlerIErrorHandler 的一个实现。每当系统中引发异常时都将调用 IErrorHandler。在我们的实现中,我们将查找指示已发现病毒消息的 MsmqPoisonMessageException。我们从异常中检索 MSMQ 消息查找 ID 并使用 System.Messaging API 从队列中接收病毒消息,然后将病毒消息移到特定的病毒消息队列,供以后处理。因为消息可能在 ServiceModel 事务中仍处于锁定状态,我们将等待以得知读取是否成功并在不成功时再次重试。一旦将消息移到其他队列,就可以使用队列中的其他消息,这时将创建新的 ServiceHost 并打开该 ServiceHost 用于处理。下面的代码示例演示此行为:

    public class PoisonErrorHandler : IErrorHandler
    {
        static WaitCallback orderProcessingCallback = new WaitCallback(OrderProcessorService.StartThreadProc);

        public void ProvideFault(Exception error, MessageVersion version, ref Message fault)
        {
            // no-op -we are not interested in this.
        }
        
        // Handle poison message exception by moving the offending message out of the way for regular processing to go on.
        public bool HandleError(Exception error)
        {
            MsmqPoisonMessageException poisonException = error as MsmqPoisonMessageException;
            if (null != poisonException)
            {
                long lookupId = poisonException.MessageLookupId;
                Console.WriteLine(" Poisoned message -message look up id = {0}", lookupId);

                // Get MSMQ queue name from app settings in configuration.

                System.Messaging.MessageQueue orderQueue = new System.Messaging.MessageQueue(OrderProcessorService.QueueName);
                System.Messaging.MessageQueue poisonMessageQueue = new System.Messaging.MessageQueue(OrderProcessorService.PoisonQueueName);
                System.Messaging.Message message = null;
                
                // Use a new transaction scope to remove the message from the main application queue and add it to the poison queue.
                // The poison message service processes messages from the poison queue.
                using(TransactionScope txScope= new TransactionScope(TransactionScopeOption.RequiresNew))
                {
                    int retryCount = 0;
                    while(retryCount < 3) 
                    {
                        retryCount++;
                    
                        try 
                        {
                            // Look up the poison message using the look up id.
                            message = orderQueue.ReceiveByLookupId(lookupId);
                            if(message != null) 
                            {
                                // Send the message to the poison message queue.
                                poisonMessageQueue.Send(message, System.Messaging.MessageQueueTransactionType.Automatic);
                                
                                // complete transaction scope
                                txScope.Complete();

                                Console.WriteLine("Moved poisoned message with look up id: {0} to poison queue: {1} ", lookupId, OrderProcessorService.PoisonQueueName);
                                break;
                            }
                        
                        }
                        catch(InvalidOperationException ) 
                        {
                            //Code for the case when the message may still not be available in the queue because of a race condition in transaction or 
                            //another node in the farm may actually have taken the message.
                            if (retryCount < 3) 
                            {
                                Console.WriteLine("Trying to move poison message but message is not available, sleeping for 10 seconds before retrying");
                                Thread.Sleep(TimeSpan.FromSeconds(10));
                            }
                            else 
                            {
                                Console.WriteLine("Giving up on trying to move the message");
                            }
                        }
                    
                    }
                }

                // Restart the service host.
                Console.WriteLine();
                Console.WriteLine("Restarting the service to process rest of the messages in the queue");
                Console.WriteLine("Press <ENTER> to stop the service");
                ThreadPool.QueueUserWorkItem(orderProcessingCallback);
                return true;
            }

            return false;
        }
    }

服务配置包括下面的病毒消息属性:receiveRetryCountreceiveErrorHandling,如下面的配置文件所示。当运行在 Windows Server 2003 和 Windows XP 上时,将忽略属性 maxRetryCyclesretryCycleDelay

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <!-- Use appSetting to configure the MSMQ queue name. -->
    <add key="queueName" value=".\private$\ServiceModelSamplesPoison" />
    <add key="poisonQueueName" value=".\private$\OrderProcessorPoisonQueue" />
    <add key="baseAddress" value="https://localhost:8000/orderProcessor/poisonSample"/>
  </appSettings>
  <system.serviceModel>
    <services>
      <service 
              name="Microsoft.ServiceModel.Samples.OrderProcessorService">
        <!-- Define NetMsmqEndpoint -->
        <endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison"
                  binding="netMsmqBinding"
                  bindingConfiguration="PoisonBinding" 
                  contract="Microsoft.ServiceModel.Samples.IOrderProcessor" />
      </service>
    </services>

    <bindings>
      <netMsmqBinding>
        <binding name="PoisonBinding" 
                 receiveRetryCount="0"
                 maxRetryCycles="1"
                 retryCycleDelay="00:00:05"                      
                 receiveErrorHandling="Fault">
        </binding>
      </netMsmqBinding>
    </bindings>
  </system.serviceModel>
</configuration>

处理病毒消息队列中的消息

病毒消息服务从最终病毒消息队列中读取消息并处理这些消息。

病毒消息队列中的消息是指发送到正在处理消息的服务的消息,这可能不同于病毒消息服务终结点。因此,当病毒消息服务从队列中读取消息时,WCF 通道层可发现与终结点的不匹配之处,并且不会调度消息。在这种情况下,消息将发送到订单处理服务,但将由病毒消息服务接收。如果无论消息是否发送到不同的终结点都要继续接收消息,则我们必须添加 ServiceBehavior 来筛选地址,其中匹配标准是要匹配消息发送到的任何服务终结点。这是成功处理从病毒消息队列中读取的消息所必需的。

病毒消息服务实现本身非常类似于服务实现。它实现协定并处理订单。代码示例如下所示。

    // Service class that implements the service contract.
    // Added code to write output to the console window.
    [ServiceBehavior(AddressFilterMode=AddressFilterMode.Any)]
    public class OrderProcessorService : IOrderProcessor
    {
        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {
            Orders.Add(po);
            Console.WriteLine("Processing {0} ", po);
        }

        public static void OnServiceFaulted(object sender, EventArgs e) 
        {
            Console.WriteLine("Service Faulted...exiting app");
            Environment.Exit(1);
        }


        // Host the service within this EXE console application.
        public static void Main()
        {
   
            // Create a ServiceHost for the OrderProcessorService type.
            ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService));

            // Hook on to the service host faulted events.
            serviceHost.Faulted += new EventHandler(OnServiceFaulted);
            
            serviceHost.Open();

            // The service can now be accessed.
            Console.WriteLine("The poison message service is ready.");
            Console.WriteLine("Press <ENTER> to terminate service.");
            Console.WriteLine();
            Console.ReadLine();

            // Close the ServiceHostBase to shutdown the service.
            if(serviceHost.State != CommunicationState.Faulted)
            {
                serviceHost.Close();
            }
        }

与从订单队列中读取消息的订单处理服务不同,病毒消息服务从病毒子队列中读取消息。病毒队列是主队列的子队列,其名称为“poison”,由 MSMQ 自动生成。若要访问该队列,请提供主队列名称,后接“;”并在子队列为“poison”的情况下提供子队列名称,如下面的示例配置中所示。

提示

在 MSMQ 3.0 版的示例中,病毒队列名称不是子队列,而是将消息移动到的队列。

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>
    <services>
      <service name="Microsoft.ServiceModel.Samples.OrderProcessorService">
        <!-- Define NetMsmqEndpoint -->
        <endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison;poison"
                  binding="netMsmqBinding"
                  contract="Microsoft.ServiceModel.Samples.IOrderProcessor" >
        </endpoint>
      </service>
    </services>

  </system.serviceModel>
</configuration> 

当运行示例时,客户端、服务和病毒消息服务活动将显示在控制台窗口中。您可以看到服务从客户端接收消息。在每个控制台窗口中按 Enter 可以关闭服务。

服务开始运行、处理订单并随机开始终止处理。如果消息指示服务已经处理完订单,则可以再次运行客户端来发送其他消息,直到您看到服务确实已经终止某条消息。根据配置的病毒设置,在将消息移到最终病毒队列中之前将再次重试处理消息。

The service is ready.
Press <ENTER> to terminate service.

Processing Purchase Order: 0f063b71-93e0-42a1-aa3b-bca6c7a89546
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

Processing Purchase Order: 5ef9a4fa-5a30-4175-b455-2fb1396095fa
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

Aborting transaction, cannot process purchase order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89

启动病毒消息服务以从病毒队列中读取病毒消息。在本示例中,病毒消息服务读取消息并处理这些消息。您可以看到病毒消息服务读取已终止并已中毒的采购订单。

The service is ready.
Press <ENTER> to terminate service.

Processing Purchase Order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

设置、生成和运行示例

  1. 请确保已经执行了 Windows Communication Foundation 示例的一次性安装过程

  2. 若要生成 C# 或 Visual Basic .NET 版本的解决方案,请按照生成 Windows Communication Foundation 示例中的说明进行操作。

  3. 若要用单一计算机配置或跨计算机配置来运行示例,请更改队列名称,以映射实际的主机名而不是本地主机,并按照运行 Windows Communication Foundation 示例中的说明操作。

默认情况下对 netMsmqBinding 绑定传输启用了安全性。MsmqAuthenticationModeMsmqProtectionLevel 这两个属性共同确定了传输安全性的类型。默认情况下,身份验证模式设置为 Windows,保护级别设置为 Sign。MSMQ 必须是域的成员才可以提供身份验证和签名功能。如果在不是域成员的计算机上运行此示例,则会接收以下错误:“用户的内部消息队列证书不存在”。

在加入到工作组的计算机上运行此示例

  1. 如果计算机不是域成员,请将身份验证模式和保护级别设置为 None 以禁用传输安全性,如下面的示例配置所示:

    <bindings>
        <netMsmqBinding>
            <binding name="TransactedBinding">
                <security mode="None"/>
            </binding>
        </netMsmqBinding>
    </bindings>
    

    确保通过设置终结点的 bindingConfiguration 属性将终结点与绑定关联。

  2. 确保在运行示例前更改 PoisonMessageServer、服务器和客户端上的配置。

    提示

    security mode 设置为 None 等效于将 MsmqAuthenticationModeMsmqProtectionLevelMessage 安全设置为 None

Send comments about this topic to Microsoft.
© 2007 Microsoft Corporation. All rights reserved.