到 Google 资讯主页   
EasyJF首页   资料   源码   软件    论坛   网站    
   使用帮助    
    该信息为本站MyRSS系统缓存内容,部分图片及附件有可能无法正常使用.easyjf.comIBM developerWorks无关,不对该信息负责.通过http://kb.csdn.net/keyword/java//../../java/Articles/200206/549a5cdc-36a3-403d-84f0-c33c49b94c65.html访问该信息的原始内容.
页面功能  【加入收藏】 【推荐给朋友】 【字体:  】 【关闭】   
如何获取消息?使用 JMS 技术作为数据复制的解决方案
作者:Daniel Drasin 来源:IBM developerWorks  发布时间:2002-06-04 00:00:00.0

背景
在思考消息传递解决方案时,您可能会想到一个通过远程消息调用机制来集成两个不同应用程序的系统。一般来讲,对于不常通信的分布式实体以及数据传输量不是很多这样的情况,常常使用这种耦合。较经典的示例是,连接到异构后端和入口的同构接口,这些后端和入口指派进行用户请求的后端处理,然后为最终用户表示而对那些请求进行重新格式化。

消息传递方法中的公共线程一直有这样的假定:虽然消息传递解决方案在系统之间提供健壮、高度可用的通信,但它基本上效率很低,只用来作为在无法避免与外部系统通信时的最后一种手段。在出现远程方法调用(RMC)时关于消息传递的这种观点就开始流行一直到出现了更现代的象 CORBA 和 DCOM 那样的消息传递解决方案,而且,通常所应用的消息传递只局限于解决几类问题。

目标
在过去的十年中,人们对分布式系统需求有了更深入的理解。新兴技术(象 Java 和 .NET)已经包含了代码分布来作为它们基本编程模型的一部分。通过这样做,这些技术已将高度可用性和容错性融入到消息传递中,同时鼓励那些提供解决方案的供应商交付一些系统,这些系统在更广范围的问题上考虑性能。

近来我们公司被要求实现文件分布和复制的解决方案,在以前这样的方案需要集成安全的 FTP、数据库复制和其它一次性解决方案的定制系统。我们没有一味地埋头按照定制开发的道路前进,而是研究了将最新的消息传递解决方案应用到这个问题的可能性。我们发现 JMS 不仅为信息传送提供必要的基础结构,而且它还能处理我们客户要求的、与服务质量、安全性、可靠性和性能有关的所有基础结构问题。本文描述了我们团队面临的挑战,以及 JMS(以 MQSeries 的形式)如何让我们满足并超越客户的要求。

问题
我们的客户面临一个重大的分布式数据难题,在全国范围内有许多呼叫中心,在全国各地的呼叫中心里接线员要记录与客户之间的交互。必须快速可靠地在远程数据中心为这些记录建立索引并存档。建立索引和存档的存储过程不能影响接线员的系统记录和存储接线员正在与客户交互的信息的能力。该客户已经有了一个包含组合起来的代码、VPN 和其它技术的系统。但是,现有的解决方案远远达不到性能和可靠性上的目标,并且它是一种拙劣的技术,难以理解并且维护费用很高。

在开发替代客户原有系统时,我们考虑了 JMS 和多种非 JMS 的解决方案,尤其是那些基于 FTP 和安全复制(SCP)的解决方案。然而,非 JMS 解决方案有两个主要缺点:

  • 它们对于安全性方面的缺陷一筹莫展。(FTP 上的安全性漏洞已经人人皆知,并且人们对此已广泛地作了记载。如果需要这方面的例子,请参阅 参考资料。)
  • 它们提供的基础结构只适用于实际的数据传送,而对于处理可靠性、容错性、安全性、平台独立性以及性能优化等问题,需要定制开发来解决。

我们团队最后得出结论,对于添加这些额外的特性所需的开发工作是让人望而却步的,因此我们决定选用 JMS 解决方案,它可以摆脱这些问题。

解决方案
我们开发了一个基于 JMS 的系统,它:

  • 为已记录的多媒体文件提供可靠存档
  • 支持可扩展性,可以使多个数据中心接收文件
  • 支持对其它数据类型进行存档

我们这里正讨论的文件比以前那些涉及消息传递解决方案的项目中传送的数据还要大(50K - 500K)。我们第一个任务是确保数据大小不会影响 JMS 解决方案。通过测试系统传递各种大小的消息有效负载时的性能,我们评估了包括 IBM MQSeries 在内的许多 JMS 解决方案。结果显示:经过适当配置,大小达到 1 兆的消息不会对整个系统性能产生显著影响。因为常识认为消息传递解决方案只适用于定期的、小的有效负载,所以我们的结果是一个重大发现。我们继续分析系统的体系结构( 图 1中概述了此体系结构),它可以提供客户需要的安全性、高可用性和可靠性。

图 1. 高级系统体系结构
图 1:高级系统体系结构

现有的基础结构在每个客户机上有一个系统,当接线员与用户之间进行交互时,它创建多媒体文件,以此作为响应。此外,还需对这些文件进行存档。我们的系统启动一个进程(运行在每个机器上)并在已知目录中查找这些文件。当检测到新文件时,进程将它们打包成 JMS 有效负载并发送到其中一个数据中心的 JMS 服务器以便传递。一旦 JMS 服务器确认收到,则除去发送方中的这些文件。JMS 服务器将该数据传送到数据中心内的一个可用处理程序上,进行存档。

主要概念
JMS 是特定于 Java 的消息传递和排队的实现。在消息传递和排队中有两个基本思想:

  • 系统通过使用不连续的数据包进行通信,这些数据包都有一个 有效负载(即要传送的信息)和 属性(即该信息的特征以及它应如何通信)。这个数据包称为 消息
  • 消息 是被发送给系统,而是被发送到一个独立的保存区域。可以根据您的需要确定保存区域的数量,通过唯一的名称,可以标识并定位它们。每个保存区域都可以接收消息,并且根据配置的不同,该区域将每个消息要么传递给所有感兴趣的系统(发布-订阅),要么传递给第一个感兴趣的系统(点对点)。这个保存区域称为 目的地

我们构建的系统采用点对点的目的地,在 JMS 中称为 队列。排队是 图 1 中显示的系统设计的一个重要方面。该图显示了消息正从 JMS 代理直接传送到接收方的客户机上,但这并不十分准确。实际上,消息被传送到一个队列中,接收方客户机从队列中检索它们。稍后我们研究实现细节时,这个区别将变得非常重要,因为它让系统并行地处理收到的消息。

跨平台和交叉供应商
对我们客户机来说尽量减少对某家供应商的依赖,这意味着,我们所设计的代码应该使由于更改了 JMS 供应商而带来的影响降至最低,这是十分重要的。JMS 的一个主要优点是它以广泛的业界支持和开放标准为基础,因此有了正确设计的代码,我们就可以让系统使用任何 JMS 系统。(可以对现有系统进行直接改进,专门设计来使系统在某套硬件上运行并能与特定于供应商的解决方案相匹配。)

通过将所有特定于供应商的调用封装在称为 JMSProvider 的类中,就可以轻松实现平台独立性。这些 Provider 类处理特定于供应商的问题,例如工厂查询、错误处理、连接创建和消息特性设置等。请参阅下面 清单 1 中的示例代码。

清单 1. 在类 ar.jms.JmsProvider 中
public QueueConnection createConnection() throws JMSException {
     return getConnectionFactory().createQueueConnection(getUserName(), 
          getPassword());
}

通过利用“Java 命名和目录接口(JNDI)”,我们将特定于供应商的设置存储在一个资源库(例如,LDAP 库)中,这样实际代码就几乎不需要特定于供应商的引用。只需要少量特定于供应商的代码来处理一些特性,但是可以将这样的代码限定于一些“适配器”类,并将它保存在应用程序代码之外。请参阅下面 清单 2 中的示例代码。因为 JMS 被设计用来方便地使用 JNDI,所以与其它解决方案相比,这是另一个直接优点 ― 配置信息的集中存储不仅可以保存基于文本的信息,而且还可以存储已配置的对象。

清单 2. 在类 ar.jms.JmsProvider 中
public final static String 
 CONNECTION_FACTORY_LOOKUP_NAME_KEY = "CONNECTION_FACTORY_LOOKUP_NAME";
public final static 
 String FILE_TRANSFER_QUEUE_LOOKUP_NAME_KEY = 
  "FILE_TRANSFER_QUEUE_LOOKUP_NAME";
public final static String 
 JMS_PROVIDER_CLASS_KEY = "JMS_PROVIDER_CLASS";

public void init() throws NamingException {
    InitialContext jndi = createInitialContext();
    initConnectionFactory(jndi);
    initFileTransferQueue(jndi);
}

public QueueConnection createConnection() throws JMSException {return 
     getConnectionFactory().createQueueConnection(getUserName(), 
         getPassword());
}

public void initConnectionFactory(InitialContext jndi) throws 
     NamingException {
          setConnectionFactory((QueueConnectionFactory)jndi.lookup
               (getProperties().getProperty(CONNECTION_FACTORY_LOOKUP_NAME_KEY)));
}

public void initFileTransferQueue(InitialContext jndi) throws 
     NamingException {
          setFileTransferQueue((Queue) jndi.lookup
               (getProperties().getProperty(FILE_TRANSFER_QUEUE_LOOKUP_NAME_KEY)));
}

跳出传统模式,JMS 解决方案允许以可靠的方式传送消息,即一旦确认已将消息传送到 JMS 服务器,就将它传送至寻址到的目的地(队列)。MQSeries 也不例外。一旦成功执行了将消息发送到服务器的代码,客户机就保证目的地最终会接收到消息,即使所讨论的服务器在处理过程中出现故障(如果目的地暂时不可用,或者 JMS 服务器死机等等)。请参阅下面 清单 3 中的示例代码。下面代码中的类实际上负责一旦它确定需要发送文件,就执行数据的发送。

通过将消息配置为持久消息,我们可以保证一旦目的地(队列)接收到消息,那么消息将保留在那里直到它在该队列中被检索为止 ― 即使在系统有故障期间。因此,一旦安全地将消息传送到本地 JMS 服务器,就可以删除它了。不能过高估计克服系统故障的能力;对周期性系统故障的处理是开发分布式存档解决方案最重要的问题之一。客户现有系统上处理故障情况的代码很复杂很脆弱,而且对这些故障的处理和维护费用很高。通过一个健壮的、经测试成功的商业解决方案,JMS 使我们能解决所有这些问题。

>清单 3. 来自类 ar.jms.file.send.ConnectionElement
public void sendMessage(byte[] payload, boolean persistent) throws 
     SendFailedException {
     QueueSender sender = null;
     try {
     Message message = createMessage(payload);
     sender = createSender
     (persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
     sender.send(message);
         getClient().getLogService().logInfo(getName() + 
         " sent message " + message.getJMSMessageID() + ".");
     } catch (JMSException exception) {
         getClient().getLogService().logError
         ("JMS exception processing " +  getName(),exception);
         stop();
         throw new SendFailedException("JMS Message Send Failed");
    }
     try {
         sender.close();
     } catch (JMSException ignore) {
         getClient().getLogService().logInfo(getName() + " failed to 
         close sender.  Processing will continue.");
    }
}

这个解决方案的关键是配置 JMS 消息和服务器来 同时提供令人满意的性能和服务质量。JMS 规范定义了配置选项,并通过所有商业解决方案实现它们。但是,配置的确切方法根据不同的供应商而有所不同。

设置
我们创建的体系结构和系统具有通用性且很强大。但是,对于一些移动部件,必须使用正确的方式配置并钩连它们。以下内容是有关将 MQSeries 成功地设置为 JMS 服务器的概述、一些潜在缺陷和实际的指示信息。

对于 MQSeries,首先设置 JNDI 服务器来检索特定于实现的设置,在这种情况下是 JMS 连接工厂(JMS Connection Factory)。有许多不同方法来实现这个操作,但适宜的通用选项是轻量级目录访问协议(LDAP)服务器。我们选择使用 Qualcomm SLAPD。一旦安装好并运行该服务器,就可以用 MQSeries 管理工具(JMSAdmin.bat)来设置该服务器并将其作为 MQ 对象信息库来使用。请参阅 参考资料,获取有关讲述该过程的实用书籍的链接。同时,在设置期间,要特别注意在 IBM MQSeries 之上设置 JMS 的 IBM 文档,这很重要。这个过程涉及创建一些队列和其它对象,这些队列和对象是特定于 JMS 使用并且不属于标准 MQSeries 安装的。

完成 JNDI/LDAP 和 JMS 服务器的设置后,就可以准备配置客户机了。第一步是理解 JMS 如何与 IBM 的标准 MQSeries 实现交互。MQSeries 的 Java 客户机能以两种模式之一进行交互: 客户机绑定模式。只能通过 Java applet 来使用客户机模式,而绑定模式取决于客户机上的 DLL 或者对象库。因为实现的特性,当使用用于 JMS 连接信息的 LDAP 服务器时,只能使用绑定模式。(不清楚为什么有这个限制,但它确实存在。)因此,将用户登录和密码存储在一个全局位置( com.ibm.mq.MQEnvironment.class )而不是在连接时传递它们。要解决这些供应商问题,我们创建了标准 JmsProvider 类(称为 MQSeriesProvider )的子类。这个类将完成的唯一操作是覆盖如何创建连接。不象 清单 1 中那样

newQueueConnection = 
getConnectionFactory().createQueueConnection(getUserName(),getPassword));
  

进行调用,我们必须调用

newQueueConnection = getConnectionFactory().createQueueConnection();
  

最后,您需要将特定于 JMS 的元素(如队列、队列管理器、队列工厂等等)提供给客户机。现在,使用 LDAP 和 JNDI 的原因就变得很明显:我们使用 LDAP 服务器来存储这些元素并使用外部文件保存那些 LDAP 对象的键。LDAP 服务器可以作为 JNDI 服务器并通过返回存储的对象来响应名称查询。这就是 清单 2 中代码的工作原理。JMS 元素的名称可从类的静态变量(对于缺省名称)或者外部文件(使用非缺省的其它名称)中获取。简而言之,向 LDAP 服务器请求键(我们正讨论的)中存储的对象,并返回我们感兴趣的 JMS 对象(在这种情况下)。

我们基于 JMS 的解决方案通过使用现有的组件更方便地实现统一的、跨平台和交叉供应商的配置环境。现在我们的代码已尽可能地成为独立于特定于平台和特定于供应商的设置。

应用程序
应用程序中有两个关键组件: 发送器接收器。发送器启动一个后台程序,它在目录中轮询需要存档的文件,而接收器只是等待将要传递的 JMS 消息,然后将该消息中包含的文件存档。JMS API 使我们几乎无需关注正使用的特定 JMS 实现就可以定义这些组件。

发送器由三个主要部件组成:

  • JMSProvider ,用于创建连接
  • ConnectionPool ,用于获取现有的空闲连接(我们称之为 JMSConnection)
  • 轮询程序,监视需要传送的文件。

在启动时,使用 JMSProvider 来创建一些到 JMS 服务器的准备就绪的连接。这些连接放置在池中,然后启动轮询程序。当轮询程序检测到需要传送文件时,它就创建一个独立线程来处理这个文件。(可以通过派生(forking),创建一个独立的线程来创建消息和进行传送操作,描述该过程非常简单。但在实际应用中,常常将合用(pooling)与循环组合起来使用,这样可以确保很少创建新线程,而是重用线程。但是,那个过程相当复杂,过多的说明会分散本文的中心主题 ― JMS。)

在独立线程中,轮询程序接着从连接池中获取 JMSConnection,用它来创建一个 BytesMessage,并将这个文件的二进制内容放入那个消息中。最后这个消息查找到接收器,并发送到 JMS 服务器,接着将 JMSConnection 返回给 ConnectionPool。这个发送过程的部分步骤显示在下面的 图 2中。

图 2. 发送器过程
图 2:发送过程。

接收器是一个较简单的组件;它启动一些 FileListener 来等待将要放置在接收器队列中的消息。下面的 清单 4 中的代码显示了 FileListener 设置处理过程。图 6 中的类实际上负责从队列中检索消息并对它们进行存档。JMS 保证队列发送每个消息的次数仅一次,所以我们可以安全启动许多不同的 FileListener 线程并且知道每个消息(因此每个文件)只处理一次。这个保证是使用基于 JMS 解决方案的另一个重要优点。在自己设计的解决方案中开发这样的功能(比如基于 FTP 的功能),花销很大且易出错。

清单 4:来自类 ar.jms.file.receive.FileListener
public void startOn(Queue queue) {
    setQueue(queue);
    createConnection();
    try {
        createSession();
        createReceiver();
        getConnection().start();  // this starts 
        the queue listener
    } catch (JMSException exception) {
        // Handle the exception
    }
}

public void createReceiver() throws javax.jms.JMSException {
    try {
        QueueReceiver receiver = getSession().
createReceiver(getQueue());
        receiver.setMessageListener(this);
    } catch (JMSException exception) {
        // Handle the exception
    }
}
public void createSession() throws JMSException {
    setSession(getConnection().
createQueueSession(false, Session.AUTO_ACKNOWLEDGE));
}

public void createConnection() {
    while (!hasConnection()) {
        try {
            setConnection(getClient().createConnection());
        } catch (JMSException exception) {
            // Connections drop periodically on the 
            internet, log and try again.
            try {
                Thread.sleep(2000);
            } catch 
            (java.lang.InterruptedException ignored) {
            }
        }
    }
}
  

以回调的方式编写消息处理代码,回调是当将消息传递给 FileListener 时,JMS 自动调用的方法。这个消息的代码显示在下面的 清单 5中。

清单 5. 来自类 ar.jms.file.receive.FileListener
public void onMessage(Message message) {
    BytesMessage byteMessage = ((BytesMessage) message);
    OutputStream stream = 
new BufferedOutputStream(
new FileOutputStream(getFilenameFor(message)));
    byte[] buffer = new byte[getFileBufferSize()];
    int length = 0;
    try {
        while ((length = byteMessage.readBytes(buffer)) != -1) {
            stream.write(buffer, 0, length); 
        }
        stream.close();
    } catch (JMSException exception) {
        // Handle the JMSException
    } catch (IOException exception) {
        // Handle the IOException
    }
}
  

在设置接收器时要记住一条诀窍:在所有 FileListener 启动后,确保启动这些 FileListener 的原始线程继续运行。这是必要的,因为某些 JMS 实现在守护程序的线程中启动 QueueListener。所以,如果正在运行的唯一线程是守护程序的线程,那么 Java 虚拟机(JVM)可能会过早地意外退出。下面的 清单 6显示了一些防止这种情况发生的简单代码。

清单 6. 至少使一个非守护程序的线程保持运行
public static void main(String[] args) {
   ReceiverClient newReceiverClient = new ReceiverClient();
   newReceiverClient.init();
   setSoleInstance(newReceiverClient);
   while(!finished) {   // This prevents the VM from exiting early
      try {
          Thread.sleep(1000);
      } catch (InterruptedException ex) {
      }
   }
}
  

结束语
在该项目的最初实现之后,我们添加了一些功能,象消息压缩、当位置无法到达时的自动恢复、联合消息代理、安全性、健壮的日志记录、管理等等。添加这些功能很容易,因为 JMS 提供了开放模型,而且我们的体系结构也很健壮。构建整个系统花了六个星期的时间,并且还很快地替换了客户一直使用的现有的、劳动密集型的系统。在这些天里,系统已经超出了所有的基准测试程序的标准并且已更正了原来系统遗留下来的错误。这个项目不单超出了客户的期望,还证明了 JMS 是一个可行的解决方案,不仅适用于小型、面向消息的应用程序,而且还适用于大规模的、重要任务的数据传送操作。

参考资料

关于作者
Daniel DrasinDan Drasin 自 1989 年起一直致力于面向对象的技术。在 Applied Reasoning(www.appliedreasoning.com)中,他负责许多不同商业产品的开发和交付。他帮助众多财富 500 强的客户开发分布式 J2EE 系统。近来,Dan 一直在 Applied Reasoning 从事基于 Java 的移动计算解决方案的开发工作。可以通过 drasin@appliedreasoning.com与 Dan 联系。

 
相关文章
 
页面功能  【加入收藏】 【推荐给朋友】 【字体:  】 【关闭】   


EasyJF.com 2006 隐私政策 使用EasyJF前必读