版权声明:任何获得Matrix授权的网站,转载时请务必以超链接形式标明文章原始出处和作者信息及本声明
作者:Jeff Hanson;steven_guo
原文地址:http://www.javaworld.com/javaworld/jw-01-2005/jw-0131-soa.html
中文地址:http://www.matrix.org.cn/resource/article/43/43929_SOA_Event_driven.html
关键词: SOA,Event-driven
摘要
及时响应实时的变化和事件成为了企业级架构的最重要需求。这篇文章讨论面向服务框架的技术和机制,这些技术使得该框架高效发送、接受那些跨越层级结构的同步和异步事件,而不需要知道产生这些事件的系统方面的细节
Internet事务,B2B系统,P2P程序,和实时工作流,这些系统有着非常高的动态性,复杂的系统处理,用传统的面向过程的处理方法不能有效地实现。
一个面向服务的框架代表了一个动态的运行时环境,在那里服务提供者和服务消费者松散耦合、更灵活的组件交互。建立一个具备所有这些优势的交互模型,成为软件开发中最优先考虑的。一个事件驱动的交互模型,比通常的请求/响应机制对实时变化和激励有着更好的应答效率。
面向服务的架构和事件驱动的架构天生就有着对分布式系统的适应性,这些架构都有着模块性、松散耦合,和适应性等特性。
在这篇文章里,讨论使用Mule实现一个高效的事件驱动和面向服务的平台,一个轻量级的事件-消息架构,企业信息总线(ESB)模式。组件和程序可以使用Mule通过公共的JMS或其他的消息处理技术去实现通信。
面向服务架构概述
“面向服务这个术语已经演变成一个架构,在那里服务作为一个软件组件嵌入在企业业务逻辑和特新的核心中,特性如下:
· 松散耦合:服务部与其它组件有着根深蒂固的关系
· 协议独立:多种协议透明访问
· 位置不可知:一个服务执行一组业务逻辑,针对这次调用返回一个结果
· 粗粒度:不论在什么位置均可访问该服务。
· 维护无用户状态
服务是典型地专注于解决业务领域的问题。
通常,服务使用端根据配置数据,注册项和软件工厂去决定给服务的位置,协议和公共接口。
应用程序通常被表述成他们有什么功能,而不强调这个应用程序是什么东西,包含什么。基于这个院应,更多直接描述一个应用程序通过使用动词(服务)而不是用名词(应用主体)。因为,一个名词(应用主体)是定义了了一个事务,而不是动作,当强制把一个组件有什么功能作为一个组件是什么来定义,那就会出现误解。在SOA领域,一个应用程序能很自然的被描述,因为每个应用程序的业务逻辑操作能被描述成为一个服务的执行选择。因此,SOA解决了这种误解,它允许应用程序和组件去访问一个服务所能实现的功能,例如,他们执行什么动作。依次,应用程序开发者能更容易匹配他们的需要与适当的服务,因为服务接口的描述更完整地说清了他们要解决的问题。
事件驱动架构概述
一个事件驱动框架(EDA)定义了一个设计和实现一个应用系统得方法学,在这个系统里事件可传输于松散耦合的软件组件和服务之间。一个事件驱动系统典型地由事件消费者和事件产生者组成。事件消费者向事件管理器订阅事件,事件产生者向事件管理器发布事件。当事件管理器从事件产生者那接收到一个事件时,事件管理把这个事件转送给相应的事件消费者。如果这个事件消费者是不可用的,事件管理这将保留这个事件,一段间隔之后再次转送该事件消费者。这种事件传送方法在基于消息的系统里就是:储存(store)和转送(forward)。
构建一个包含事件驱动构架的应用程序和系统,这样就使得这些应用程序和系统响应更灵敏,因为事件驱动的系统更适合应用在不可预知的和异步的环境里。
事件驱动设计和开发的优势:
事件驱动设计和开发所提供的优势如下:
· 可以更容易开发和维护大规模分布式应用程序和不可预知的服务或异步服务
· 可以很容易,低成本地集成、再集成、再配置新的和已存在的英勇程序和服务
· 促进远程组件和服务的再使用,拥有一个更灵敏、没有Bug的开发环境
· 短期利益:更容易定制。因为设计对动态处理又更好的响应。
· 长期利益:系统和组织的状态变得更精准,对实时变化的响应接近于同步。
EDA 和 SOA 整合
不象请求/响应系统,要求请求者必须明确发送请求信息,而一个事件驱动 架构提供一个机制去动态响应事件。在一个EDA系统里,事件产生者发布事件,事件消费者接受事件。
业务系统可以从SOA和EDA中受益匪浅,因为当事件发生时EDA能触发事件消费者,SOA服务可以快速地从相同的消费者中访问、查询。
系统要有最高的响应性,当事件触发时这个系统必须能快速决定必须的动作。到事件结束,事件应该被发布和消费,而且事件要穿越SOA所有的边界,包括整个体系结构和物理层。
图1演示了事件被激发和穿越体系结构的所有层
图1:事件穿越体系结构的层级
在图1的环境中,一个事件能被定义为任何系统的,平台的,组件的,业务的或英勇进程的变化。事件可能是高层的业务事件或底层的系统事件。因为事件能被传送和接收,订阅事件的英勇程序和服务能对这些变化做出响应。
事件分类和因果关系
理解一个事件的秘诀是知道这个事件发生的原因,这个就是通常说的因果关系。事件的因果关系典型地分为两类:
· 平行关系:时间源和触发在体系结构的同一层。
· 垂直关系:时间源和触发在体系结构的不同层。
垂直关系意味着一个事件的分类方法,这些事件保留了一些不变的东西而穿越一个系统不同的层,事件分类如下:
· 生命周期事件:一个实体生命周期的变化,例如一个进程的停止或启动
· 执行事件:运行时事件,例如服务或组件的调用
· 管理事件:当一个状态超过了预先的定义或一定范围时
平行关系意味着一个事件的分类方法,这些事件保留了一些不变的东西而穿越一个系统不同的层,事件分类如下:
· 系统层事件:系统级动作,例如创建一个文件或关闭一个端口
· 平台层事件:平台级动作,例如修改一个数据源或增加一个新的服务
· 组件层事件:组件级动作,例如视图对象的转换或状态机变化
· 业务层事件:业务级动作,例如创建用户或删除帐号
· 应用层事件:应用级动作,例如增加保险金或报价提交
许多ESB框架和平台意识到在SOA中包含基于事件驱动的通信有很多优势。在Java开发领域,Mule就是这些最有前景的平台之一。
介绍Mule
Mule是一个开源消息ESB框架,一个消息代理,一个分级事件驱动的框架(SEDA)。SEDA定义了一个依照分级队列、高度并行的企业级平台。Mule使用SED的概念增加事件处理的性能。
Mule支持同步、异步和请求响应事件,事件处理和传输实用不同的技术例如JMS,HTTP,电子邮件和基于XML的RPC。Mule能很容易地嵌入到任何应用框架中,明确支持Spring框架。Mule也支持动态的,预定义的,基于内容的和基于规则的消息路由。Mule使得预定义的和计划性的事务更容易,包括XA事务支持。Mule提供一个有代表性的状态调用(REST)API提供给与Web的事件访问。
Mule ESB模式驱动系统中所有服务,这个系统有着一个分离的消息通讯中枢。服务注册在总线上,但不知道其他任何被注册的消息;因此,每个服务只关心处理它收到的事件。Mule也把容器,传输,转换细节从服务中分离出来,允许任何对象作为服务注册到总线的。
我使用Mule框架去演示这篇文章所讨论的概念和思想
Mule框架
Mule框架主要包含下列内容:
通用消息对象(UMO)API
UMO API第一了所有被Mule 管理的服务和对象交互
UMO组件
在Mule系统中,UMO组件可以使任何在系统中接收、处理和发送事件消息的组件
Mule服务器
Mule服务器组件是一个在Mule应用环境中自动加载的服务器应用程序
描述器
描述器组件描述一个Mule UMO属性。新的Mule MUO对象能被它们所关联的描述器初始化。一个描述器包含:
· UMO组件名
· UMO组件版本
· UMO组件实现类
· 异常策略
· 入站和出站提供者
· 入站和出站路由器
· 拦截器
· 接收和发送切入点
· 入站和出站转换器
· 各种各样的特性
连接器
连接器是一些组件,它们可以连接到外部系统或其他协议、管理那些系统或协议的状态。一个连接器负责发送消息到外部消息接收器、管理消息接收器的注册和注销。
提供者
提供者是一些组件,管理把事件数据发送到外部系统、从外部系统接受事件数据和转换事件数据等事项。在Mule框架里,他们能连接到外部系统或其他组件。一个提供者就像一个从外部系统进入Mule或从Mule内部访问外部系统的桥接器。实际上,提供者有一组对象组成,可以与下层系统连接并与之通信。提供者的组成部件是:
· 连接器:负责连接到下层系统
· 消息接收器:从系统接收事件
· 连接调度者:传送系统到系统
· 转换器:转换从系统接收到的或要发送到系统的数据
· 终端:所建立连接的通道地址
· 事务配制:定义连接的事务属性
终端调解者
当UMO组件接收到一个事件时,终端调解者决定去调用它的什么方法
转换器
转换器组件负责双向转换消息或事件的有效载荷。当一个事件到达接收的对象之前,转换器可以链接到一起去执行一系列的装换操作。
消息适配器
消息适配器提供一中公共的方式去读外部系统的异构数据。
消息接收器
消息接收器是一些列终端监听线程,负责从外部系统接收数据。
消息调度者
消息调度者发送(同步)或派遣(异步)时间到下层系统。
消息路由器
消息路由器是一系列组件,可以使被配制的UMO组件依据消息或其他配制图路有一个消息到不同的提供者。
代理
代理是一些帮定到外部服务的组建,例如JME服务器。
Mule模型
一个Mule模型封装和管理一个Mule服务器实例的运行时行为。一个模型包含:
· 描述器
· UMO组件
· 一个终端调解者
· 一个生命周期适配器工厂
· 一个组件调解者
· 一个池化工厂
· 一个异常策略
Mule管理器
Mule管理器维护和提供以下服务 :
· 代理
· 提供者
· 连接器
· 终端
· 转换器
· 拦截器堆栈
· 一个Mule模型
· 一个Mule服务器
· 事务管理器
· 应用程序属性
· Mule配制
图2演示了Mule框架上层消息流视图
图2:Mule上层架构
Mule事件对象
Mule事件对象对象包含事件数据和被组件所感知和操控的属性。属性是任意的,在事件创建之后任何时间可被设置。
org.mule.umo.UMOEvent类代表了一个在Mule环境中出现的时间。所有在组件之间发送或接收的数据都是org.mule.umo.UMOEvent的一个实体。可以访问一个原始的或被转换的Mule事件对象中的数据能。一个Mule事件对象使用一个与提供者管理的提供者转换数据,提供者收到数据后把事件中的有效载荷转换成当前组件所识别的格式。
一个Mule事件对象的有效有效载荷能通过org.mule.umo.UMOMessage接口访问,一个org.mule.umo.UMOMessage实例由有效载荷和它的属性组成。这个接口是不同技术实现的消息对象的一个抽象。
org.mule.extras.client.MuleClient类定义了一个简单的借口,允许Mule客户端从Mule服务器接收和发送事件数据。在大多数Mule应用程序里,时间是被一些外部的并发行为所触发,例如一个主题上接收到消息或在目录里一个文件被删除。
下面演示了如何去发送一个同步事件到另外的Mule组件:
String componentName = "MyReceiver"; // The name of the receiving component.
String transformers = null; // A comma-separated list of transformers
// to apply to the result message.
String payload = "A test event"; // The payload of the event.
java.util.Map messageProperties = null; // Any properties to be associated
// with the payload.
MuleClient client = new MuleClient();
UMOMessage message = client.sendDirect(componentName,
transformers,
payload,
messageProperties);
System.out.println("Event result: " + message.getPayloadAsString());
MuleClient类需要一个服务器URL区定义它所连接的远程Mule服务器的终端。URL定义了传输协议、接收消息的地址,提供者在派遣一个事件时可以随时使用这些信息。终端例示如下:
· vm://com.jeffhanson.receivers.Default: 使用虚拟机的提供者派遣到一个com.jeffhanson.receivers.Default
· jms://jmsProvider/accounts.topic:使用全局注册的jmsProvider派遣一个JMS消息到ccounts.topic.
· jms://accounts.topic: 使用第一个(默认)的JMS提供者派遣JMS消息
Mule事件处理
Mule可以在三种不同的方式发送和节后艘事件:
1.异步方式:一个组件可通过不同的线程同时处理多个事件的发送和接收
2.同步方式:在一个组件重新工作之前,一个单一的事件必须被处理完。换言之,一个创建了事件的组建发送事件时将被阻断,直到发送任务完成,因此,一次只允许处理一个事件
3.请求-应答方式:一个组建专门请求一个事件,然后等待一个特定的时间去接收回应。
org.mule.impl.MuleComponent实现类提供了一个具体的组建类,它包括又有创建,发送和接收事件的功能。
执行同步动作的对象应该实现org.mule.umo.lifecycle.Callable接口,这个定义了一个简单的方法Object onCall(UMOEventContext eventContext)。Callable接口提供支持事件调用的UMO对象。虽然不是强制的,但这个接口提供了一个生命周期控制的方法,当实现这个接口的组建接收到一个消息时执行这个方法。下面展示了这个接口的简单实现。
import org.mule.umo.lifecycle.Callable;
public class EchoComponent
implements Callable
{
public Object onCall(UMOEventContext context) throws Exception
{
String msg = context.getMessageAsString();
// Print message to System.out
System.out.println("Received synchronous message: " + msg);
// Echo transformed message back to sender
return context.getTransformedMessage();
}
}
从onCall()方法可返回任何对象。当组件的UMOLifecycleAdapter接收这个对象时,它首先看看这个对象是否是一个UMOMessage;如果这个对象既不是UMOMessage也不是Null,那么以这个对象作为有效载荷去创建一个新的消息。这个新事件经由所配制的出站路有器发布,如果UMO对象已经配制了一个出站路由器,那么在UMOEventContext实例中不能调用setStopFurtherProcessing(true)方法。
Mule使用的一个简单的事件框架
让我们把这几段Mule的代码放到一起去构建一个简单的事件框架。这个框架包含一个负责注册和注销事件的管理器,可以接收事件,和负责路有同步和异步消息到他们相应的服务。
Mule的虚拟机协议要求有一个放置事件管理器工作目录META-INF/services/org/mule/providers/vm路径下的可配制文件,配制文件为协议定义了大量的组件,例如连接器和调度工厂。配制文件的内容如下:
connector=org.mule.providers.vm.VMConnector
dispatcher.factory=org.mule.providers.vm.VMMessageDispatcherFactory
message.receiver=org.mule.providers.vm.VMMessageReceiver
message.adapter=org.mule.providers.vm.VMMessageAdapter
endpoint.builder=org.mule.impl.endpoint.ResourceNameEndpointBuilder
一个简单的借口定义了事件管理器的公有结构:
package com.jeffhanson.mule;
import org.mule.umo.FutureMessageResult;
public interface EventManager
{
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName,
Object payload)
throws EventException;
/**
* Sends an event message asynchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any.
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName,
Object payload)
throws EventException;
/**
* Starts this event manager.
*/
public void start();
/**
* Stops this event manager.
*/
public void stop();
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol();
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service
* or a fully-qualified class name.
*/
public void registerService(String serviceName,
String implementation)
throws EventException;
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName)
throws EventException;
}
事件管理器类是被封装在一个工厂类里,因此,可以依据需要去改变它的实现而不会影响到它的客户端。事件管理器实现如下:
package com.jeffhanson.mule;
import org.mule.umo.*;
import org.mule.extras.client.MuleClient;
import org.mule.impl.endpoint.MuleEndpoint;
import org.mule.config.QuickConfigurationBuilder;
import java.util.HashMap;
import java.util.Map;
public class EventManagerFactory
{
private static HashMap instances = new HashMap();
/**
* Retrieves the event manager instance for a given protocol.
*
* @param protocol The protocol to use.
* @return EventManager The event manager instance.
*/
public static EventManager getInstance(String protocol)
{
EventManager instance = (EventManager)instances.get(protocol);
if (instance == null)
{
instance = new EventManagerImpl(protocol);
instances.put(protocol, instance);
}
return instance;
}
/**
* A concrete implementation for a simple event manager.
*/
private static class EventManagerImpl
implements EventManager
{
private UMOManager manager = null;
private QuickConfigurationBuilder builder = null;
private MuleClient eventClient = null;
private String protocol = null;
private MuleEndpoint receiveEndpoint = null;
private MuleEndpoint sendEndpoint = null;
private EventManagerImpl(String protocol)
{
this.protocol = protocol;
}
/**
* Starts this event manager.
*/
public void start()
{
try
{
builder = new QuickConfigurationBuilder();
manager = builder.createStartedManager(true,
protocol + "tmp/events");
eventClient = new MuleClient();
receiveEndpoint = new MuleEndpoint(protocol
+ "tmp/events/receive");
sendEndpoint = new MuleEndpoint(protocol + "tmp/events/send");
}
catch (UMOException e)
{
System.err.println(e);
}
}
/**
* Stops this event manager.
*/
public void stop()
{
try
{
manager.stop();
}
catch (UMOException e)
{
System.err.println(e);
}
}
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol()
{
return protocol;
}
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service
* or a fully-qualified class name
* to use as the component implementation.
*/
public void registerService(String serviceName,
String implementation)
throws EventException
{
if (!manager.getModel().isComponentRegistered(serviceName))
{
try
{
builder.registerComponent(implementation,
serviceName,
receiveEndpoint,
sendEndpoint);
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
}
}
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName)
throws EventException
{
try
{
builder.unregisterComponent(serviceName);
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
}
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName,
Object payload)
throws EventException
{
try
{
if (!manager.getModel().isComponentRegistered(serviceName))
{
throw new EventException("Service: " + serviceName
+ " is not registered.");
}
String transformers = null;
Map messageProperties = null;
UMOMessage result = eventClient.sendDirect(serviceName,
transformers,
payload,
messageProperties);
if (result == null)
{
return null;
}
return result.getPayload();
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
catch (Exception e)
{
throw new EventException(e.toString());
}
}
/**
* Sends an event message asynchronously.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName,
Object payload)
throws EventException
{
FutureMessageResult result = null;
try
{
if (!manager.getModel().isComponentRegistered(serviceName))
{
throw new EventException("Service: " + serviceName
+ " is not registered.");
}
String transformers = null;
Map messageProperties = null;
result = eventClient.sendDirectAsync(serviceName,
transformers,
payload,
messageProperties);
}
catch (UMOException e)
{
throw new EventException(e.toString());
}
return result;
}
}
}
Mule框架依据消息有效载荷的类型来派遣消息。事件框架使用基于有效载荷的派遣机制,这种派遣机制把注册到事件管理器中一般定义的事件方法作为事件接收器。下面的类定义了一个包含三个重载的receiveEvent()方法的服务:
package com.jeffhanson.mule;
import java.util.Date;
public class TestService
{
public void receiveEvent(String eventMessage)
{
System.out.println("nnTestService.receiveEvent(String) received "
+ "event message: " + eventMessage + "nn");
}
public void receiveEvent(Integer eventMessage)
{
System.out.println("nnTestService.receiveEvent(Integer) received "
+"event message: " + eventMessage + "nn");
}
public void receiveEvent(Date eventMessage)
{
System.out.println("nnTestService.receiveEvent(Date) received "
+ "event message: " + eventMessage + "nn");
}
}
事件管理器客户端应用程序发送三个事件到测试服务中,去测试每一个receiveEvent()方法。客户端应用程序如下:
package com.jeffhanson.mule;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.BasicConfigurator;
import java.util.Date;
public class EventClient
{
static Logger logger = Logger.getLogger(EventClient.class);
public static void main(String[] args)
{
// Set up a simple configuration that logs on the console.
BasicConfigurator.configure();
logger.setLevel(Level.ALL);
try
{
EventManager eventManager =
EventManagerFactory.getInstance("vm://");
eventManager.start();
String serviceName = TestService.class.getName();
String implementation = serviceName;
eventManager.registerService(serviceName, implementation);
Object result =
eventManager.sendSynchronousEvent(serviceName, "A test message");
if (result != null)
{
System.out.println("Event result: " + result.toString());
}
result =
eventManager.sendSynchronousEvent(serviceName, new Integer(23456));
if (result != null)
{
System.out.println("Event result: " + result.toString());
}
result =
eventManager.sendSynchronousEvent(serviceName, new Date());
if (result != null)
{
System.out.println("Event result: " + result.toString());
}
eventManager.stop();
}
catch (EventException e)
{
System.err.println(e.toString());
}
}
}
Mule平台简化和抽象了前面所叙述框架的事件方面的处理,使得你发送和接收穿越一个层级结构的同步和异步消息时,不需要知道下层系统的细节。工厂模式和SOA准则的应用,则使得这个框架有了一个松散耦合和可扩展的设计。
总结
当服务和进程需要穿越多层结构,使用多种协议去交互时,设计一个有效地事件驱动的软件系统可能变得复杂了。可是,一个使用标准模式包含适当事件管理层的面向服务架构能减少,甚至消除这些问题。
Mule 平台提供API,组件和抽象对象,这些都可以用于去建立一个强大,健壮,事件驱动的有着良好的伸缩性和可维护性的系统。
关于作者
Jeff Hanson 有着18年的软件行业从业经验,曾经作为高级软件工程师工作于Windows OpenDoc项目,作为主管架构师在Novell的Route66框架工作。现在,作为eReinsure.com的首席架构师,正在构建Web服务框架和基于J2EE再保险平台。Hanson已经写作了大量的文章和书籍,包括:《Pro JMX: Java Management Extensions》(Apress出版社,2003年11月; ISBN: 1590591011)和《Web Services Business Strategies and Architectures》(Wrox 出版社,2002年8月; ISBN: 1904284132)
资源
·javaworld.com:javaworld.com
·Matrix-Java开发者社区:http://www.matrix.org.cn/
·Mule主页: http://wiki.muleumo.org/display/MULEPROJ/Home
·Mule FAQ: http://wiki.muleumo.org/display/MULE/Mule+FAQ
Java, java, J2SE, j2se, J2EE, j2ee, J2ME, j2me, ejb, ejb3, JBOSS, jboss, spring, hibernate, jdo, struts, webwork, ajax, AJAX, mysql, MySQL, Oracle, Weblogic, Websphere, scjp, scjd
标签: