The Cafe Sample(小賣部訂餐例子)
小賣部有一個訂飲料服務客戶可以通過訂單來訂購所需要飲料小賣部提供兩種咖啡飲料LATTE(拿鐵咖啡)和MOCHA(摩卡咖啡)每種又都分冷飲和熱飲整個流程如下
有一個下訂單模塊用戶可以按要求下一個或多個訂單
有一個訂單處理模塊處理訂單中那些是關於訂購飲料的
有一個飲料訂購處理模塊處理拆分訂購的具體是那些種類的飲料把具體需要生產的飲料要求發給生產模塊有一個生產模塊
這個例子利用Spring Integration實現了靈活的可配置化的模式集成了上述這些服務模塊
先來看一下配置文件
<beans:beans xmlns=
xmlns:xsi=instance
xmlns:beans=
xmlns:context=
xsi:schemaLocation=
beansxsd
integrationxsd
contextxsd>
<! 啟動Message bus 消息服務總線 支持四個屬性
autostartup[boolean是否自動啟動 default=true]如果設置false則需要手動調用applicationContextstart()方法
autocreatechannels[boolean是否自動注冊MessageChannel default=false]如果使用的MessagChannle不存在
errorchannel 設置錯誤時信息發送的MessageChannle如果不設置則使用DefaultErrorChannel
dispatcherpoolsize 使用的啟動線程數默認為>
<messagebus/>
<! 啟動支持元數據標記 >
<annotationdriven/>
<! 設置 @Component標識的元數據掃描包(package) >
<context:componentscan basepackage=orgspringframeworkintegrationsamplescafe/>
<! 下面啟動了四個 MessageChannel服務 處理接收發送端發過來的消息和把消息流轉到消息的消費端 >
<! 屬性說明 capacity 消息最大容量默認為 publishsubscribe是否是發布訂閱模式默認為否
id bean的id名稱 datatype ? >
<channel id=orders/> <! 訂單Channel >
<channel id=drinks/> <! 飲料訂單Channel處理飲料的類別 >
<channel id=coldDrinks/> <! 熱飲生產Channel >
<channel id=hotDrinks/> <! 冷飲生產Channel >
<! 消息處理終端 接收 channel coldDrinks的消息後執行baristaprepareColdDrink方法 生產冷飲 >
<! 屬性說明 inputchannel 接收消息的Channel必須 defaultoutputchannel設置默認回復消息Channel
handlerref 引用bean的id名稱 handlermethod Handler處理方法名(參數類型必須與發送消息的payLoad使用的一致)
errorhandler設置錯誤時信息發送的MessageChannle replyhandler 消息回復的Channel >
<endpoint inputchannel=coldDrinks handlerref=barista
handlermethod=prepareColdDrink/>
<! 消息處理終端 接收 channel hotDrinks的消息後執行baristaprepareHotDrink方法 生產熱飲 >
<endpoint inputchannel=hotDrinks handlerref=barista
handlermethod=prepareHotDrink/>
<! 定義一個啟動下定單操作的bean它通過 channel orders下定單 >
<beans:bean id=cafe class=orgspringframeworkintegrationsamplescafeCafe>
<beans:property name=orderChannel ref=orders/>
</beans:bean>
</beans:beans>
下面我們來看一下源代碼目錄
我們來看一下整體服務是怎麼啟動的
首先我們來看一下CafeDemo這個類它觸發下定單操作
public class CafeDemo {
public static void main(String[] args) {
//加載Spring 配置文件
AbstractApplicationContext context = null;
if(argslength > ) {
context = new FileSystemXmlApplicationContext(args);
}
else {
context = new ClassPathXmlApplicationContext(cafeDemoxml CafeDemoclass);
}
//啟動 Spring容器(啟動所有實現 orgntextLifecycle接口的實現類的start方法)
contextstart();
//從Spring容器 取得cafe實例
Cafe cafe = (Cafe) contextgetBean(cafe);
DrinkOrder order = new DrinkOrder();
//一杯熱飲 參數說明飲料類型 數量 是否是冷飲(true表示冷飲)
Drink hotDoubleLatte = new Drink(DrinkTypeLATTE false);
Drink icedTripleMocha = new Drink(DrinkTypeMOCHA true);
orderaddDrink(hotDoubleLatte);
orderaddDrink(icedTripleMocha);
//下個訂單
for (int i = ; i < ; i++) {
//調用cafe的placeOrder下訂單
cafeplaceOrder(order);
}
}
}
下面是Cafe的源代碼
public class Cafe {
private MessageChannel orderChannel;
public void setOrderChannel(MessageChannel orderChannel) {
thisorderChannel = orderChannel;
}
//其實下訂單操作調用的是orderChannel(orders channel)的send方法把消息發出去
public void placeOrder(DrinkOrder order) {
thisorderChannelsend(new GenericMessage<DrinkOrder>(order));
//GenericMessage有三個構建方法參考如下
//new GenericMessage<T>(Object id T payload);
//new GenericMessage<T>(T payload);
//new GenericMessage<T>(T payload MessageHeader headerToCopy)
}
}
下面我們來看一下哪個類標記有@MessageEndpoint(input=orders) 表示它會消費orders Channel的消息我們發現OrderSplitter類標記這個元數據下面是源代碼我們來分析
//標記 MessageEndpoint 元數據 input表示 設置後所有 orders Channel消息都會被OrderSplitter收到
@MessageEndpoint(input=orders)
public class OrderSplitter {
//@Splitter表示接收消息後調用這個類的該方法 其的參數類型必須與message的 payload屬性一致
//即在new GenericMessage<T>的泛型中指定
//元數據設置的 channel屬性表示方法執行完成後會把方法返回的結果保存到message的payload屬性後發送到指定的channel中去
//這裡指定發送到 drinks channel
@Splitter(channel=drinks)
public List<Drink> split(DrinkOrder order) {
return ordergetDrinks(); //方法中是把訂單中的飲料訂單取出來
}
}
接下來與找OrderSplitter方法相同我們要找哪個類標記有@MessageEndpoint(input=drinks) 表示它會消費drinks Channel的消息找到DrinkRouter這個類
@MessageEndpoint(input=drinks)
public class DrinkRouter {
//@Router表示接收消息後調用這個類的該方法 其的參數類型必須與message的 payload屬性一致
//方法執行完畢後其返回值為 在容器中定義的channel名稱channel名稱必須存在
@Router
public String resolveDrinkChannel(Drink drink) {
return (drinkisIced()) ? coldDrinks : hotDrinks; //方法中是根據處理飲料是否是冷飲送不同的channel處理
}
}
備注
@Router可以把消息路由到多個channel
實現方式如下
@Router
public MessageChannel route(Message message) {}
@Router
public List<MessageChannel> route(Message message) {}
@Router
public String route(Foo payload) {}
@Router
public List<String> route(Foo payload) {}
接下來我們就要找 MessageEndpoint 標記為處理 coldDrinks 和 hotDrinks 的類我們發現這個兩個類並不是通過元數據@MessageEndpoint來實現的而是通過容器配置(下面會演示如何用元數據配置但元數據配置有局限性這兩種配置方式看大家喜好系統中都是可以使用)
下面是容器配置信息
<! 消息處理終端 接收 channel coldDrinks的消息後執行baristaprepareColdDrink方法 生產冷飲 >
<endpoint inputchannel=coldDrinks handlerref=barista
handlermethod=prepareColdDrink/>
<! 消息處理終端 接收 channel hotDrinks的消息後執行baristaprepareHotDrink方法 生產熱飲 >
<endpoint inputchannel=hotDrinks handlerref=barista
handlermethod=prepareHotDrink/>
我們來看一下源代碼
@Component //這個必須要有表示是一個消息處理組件
public class Barista {
private long hotDrinkDelay = ;
private long coldDrinkDelay = ;
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();
public void setHotDrinkDelay(long hotDrinkDelay) {
thishotDrinkDelay = hotDrinkDelay;
}
public void setColdDrinkDelay(long coldDrinkDelay) {
ldDrinkDelay = coldDrinkDelay;
}
public void prepareHotDrink(Drink drink) {
try {
Threadsleep(thishotDrinkDelay);
} catch (InterruptedException e) {
ThreadcurrentThread()interrupt();
}
Systemoutprintln(prepared hot drink # + hotDrinkCounterincrementAndGet() + : + drink);
}
public void prepareColdDrink(Drink drink) {
try {
Threadsleep(ldDrinkDelay);
} catch (InterruptedException e) {
ThreadcurrentThread()interrupt();
}
Systemoutprintln(prepared cold drink # + coldDrinkCounterincrementAndGet() + : + drink);
}
}
如果要用元數據標識實現上述方法要用元數據配置它不像容器配置可以在一個類中支持多個不同的Handler方法以處理prepareColdDrink方法為例
@MessageEndpoint(input=coldDrinks) //加了該元數據它會自動掃描並作為@Componet標記處理
public class Barista {
private long hotDrinkDelay = ;
private long coldDrinkDelay = ;
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();
public void setHotDrinkDelay(long hotDrinkDelay) {
thishotDrinkDelay = hotDrinkDelay;
}
public void setColdDrinkDelay(long coldDrinkDelay) {
ldDrinkDelay = coldDrinkDelay;
}
public void prepareHotDrink(Drink drink) {
try {
Threadsleep(thishotDrinkDelay);
} catch (InterruptedException e) {
ThreadcurrentThread()interrupt();
}
Systemoutprintln(prepared hot drink # + hotDrinkCounterincrementAndGet() + : + drink);
}
@Handler//回調處理的方法
public void prepareColdDrink(Drink drink) {
try {
Threadsleep(ldDrinkDelay);
} catch (InterruptedException e) {
ThreadcurrentThread()interrupt();
}
Systemoutprintln(prepared cold drink # + coldDrinkCounterincrementAndGet() + : + drink);
}
}
這樣整個流程就執行完了最終我們的飲料產品就按照訂單生產出來了累了吧喝咖啡提神著呢!!!
初充下面是針對 Spring Integration adapter擴展的學習筆記
JMS Adapters
jms adapters 目前有兩種實現
JmsPollingSourceAdapter 和 JmsMessageDrivenSourceAdapter 前者是使用Srping的JmsTemplate模板類通過輪循的方式接收消息
後者是使用則通過代理Spring的DefaultMessageListenerContainer實例實現消息驅動的方式
xml配置如下
<bean class=orgspringframeworkintegrationadapterjmsJmsPollingSourceAdapter>
<constructorarg ref=jmsTemplate/>
<property name=channel ref=exampleChannel/>
<property name=period value=/> <! 輪循時間間隔 >
<property name=messageMapper ref=/> <! message轉換 >
</bean>
<! 備注消息的轉換方式如下
收到JMS Message消息後SourceAdapter會調用Spring的MessageConverter實現類把javaxjmsMessage對象轉換成普通Java對象再調用Spring Integration的MessageMapper把該對象轉成 orgspringframessageMessage對象 >
JmsMessageDrivenSourceAdapter
<bean class=orgspringframeworkintegrationadapterjmsJmsMessageDrivenSourceAdapter>
<property name=connectionFactory ref=connectionFactory/>
<property name=destinationName value=exampleQueue/>
<property name=channel ref=exampleChannel/>
<property name=messageConverter ref=/> <! jms消息對象轉換 >
<property name=messageMapper ref= /> <! 普通java對象轉換成 Spring Integration Message >
<property name=sessionAcknowledgeMode value= />
<! sesssion回復模式 AUTO_ACKNOWLEDGE= CLIENT_ACKNOWLEDGE= DUPS_OK_ACKNOWLEDGE= SESSION_TRASACTED=>
</bean>
另外還有一個比較有用的類JmsTargetAdapter 它實現了MessageHandler接口它提把Spring Integration Message對象轉換成JMS消息並發送到指定的消息隊列與JMS服務連接的實現可以通過設定 jmsTemplate屬性引用或是connectionFactory和destination或destinationName屬性
<bean class=orgspringframeworkintegrationadapterjmsJmsTargetAdapter>
<constructorarg ref=connectionFactory/>
<constructorarg value=examplequeue/>
<!或是以下配置
<property name=connectionFactory ref=connectionFactory/>
<property name=destinationName value=exampleQueue/>
或是
<constructorarg ref=jmsTemplate/> >
</bean>
From:http://tw.wingwit.com/Article/program/Java/ky/201311/28092.html