摘要
在企業中許多計算機由於在其上執行工作的性質而未得到充分利用或者因為過了上班時間而干脆得不到使用在許多情況下應用服務器耗光了寶貴的CPU(尤其是在執行CPU密集型的數學運算時)而網絡上的其他計算機則閒置一旁本文提出了一種框架用於把Java消息服務(Java Messaging ServiceJMS)客戶端放置在這些未充分利用的計算機上以分擔一些通常應由服務器執行的工作該客戶端可以監聽某個要執行的工作單元的請求隊列然後在應答隊列中做出響應此外本文還給出了一個BEA WebLogic Integration 架構的例子它通過把一個工作流以及相關的Java控件用作替代框架把工作分發到遠程客戶端上從而把工作單元可靠地分發給JMS請求隊列
簡介
本文提出了一種JEE框架用於解決把工作分配給未充分利用的計算機資源這個難題具體來說可以把JMS客戶端放置在這些未充分利用的計算機上從而分擔一些通常應由服務器執行的工作該客戶端可以監聽某個要執行的工作單元的請求隊列然後在應答隊列上做出響應可以使用一組消息驅動bean獲取應答隊列上的響應消息以便進行進一步的處理此外還可以使用一種servlet實現來管理性地啟動用於創建(要發送給JMS客戶端的)工作單元的整個子流程並使用它來終止這個子流程
我使用常見的BEA WebLogic Server作為把離散的工作單元分配給分布式JMS客戶端的例子在另一個更為復雜的例子中BEA WebLogic Integration (WLI)工作流也執行類似的分發任務但是通過對請求隊列進行監控它在靈活性Java控件的可重用性和可伸縮性等方面要更好一些
使用的例子
業內有相當多的例子可以演示如何使用JMS框架來充分利用計算機進行並行處理例如
一個銀行應用程序可以實現抵押貸款並以不同的比率和年份執行幾類利息計算從而為信貸官員提供與每個申請者相關的可能影響借貸類型的數據所有不同種類的計算可以按照申請者分配給可用的計算機來執行然後把結果返回給應用服務器儲存起來
一個記帳系統可以從數據庫讀取記錄然後重新計算記錄中的數字以求做到更加精確對於每條記錄它可能需要連接到業務用戶的本地系統中以獲得輔助數據這可能需要幾秒鐘如果順序執行當涉及到的記錄上千時這種方法不僅很慢而且可能進一步延長服務器線程等待從各地返回響應的時間通過把這些工作分發給JMS客戶端不僅可以並行完成處理而且還可以節省服務器線程
一個天氣預報系統或線性優化系統可能需要操縱或執行矩陣乘法隨著矩陣的大小和數量逐步增加服務器CPU的負擔也隨之加重如果這種情況經常發生那麼通過把矩陣操作和乘法分發給其他計算機上的JMS客戶端服務器的CPU就可以節省下來用於其他工作
使用常規的WebLogic Server來分發工作單元
借鑒最後一個例子我將構建一個簡單的例子用於執行矩陣乘法同時說明如何使用JMS框架把計算工作分發給企業中的計算機資源JMS客戶端將收到一個工作單元實例之後它將會調用其doWork()方法在這個簡單的例子中doWork()方法將把個×的矩陣相乘然後把結果保存到一個結果矩陣中
接著JMS客戶端使用工作單元實例的一個副本(工作是在這上面執行的)對應答隊列做出響應一個消息驅動bean將接受已完成的工作圖說明了我將要討論的各個組件
圖 該WebLogic Server實現中的各個組件
這種方法以常規方式使用了JMS系統在下面的內容中我將引入一些代碼並考慮幾個擴展問題
工作單元類
JMS請求隊列上的每個類都將實現一個UnitOfWork接口該接口有一個特別有趣的方法叫做doWork()
public interface UnitOfWork extends javaioSerializable {
// This method executes itself on the client machine
public void doWork();
// This method prints the current contents of performed work
public void print();
// This method stores the instance into a backing store
public void store();
}
在我們這個簡潔而直觀的例子中我使用一個SimpleMatri類實現了UnitOfWork接口
public class SimpleMatrix implements UnitOfWork {
private Integer m[][];
private Integer m[][];
private Integer result[][];
private Integer rows = new Integer();
private Integer cols = new Integer();
// May initialize m and m by locating records from a database
public SimpleMatrix() {
}
// This method actually multiplies m x m and stores in result
public void doWork() {
}
// This method stores result into a backing store
public void store() {
}
// This method prints the current contents of result
public void print() {
}
}
方法的實現相當簡單限於文章的篇幅這裡就不再進行說明請參見所附的示例代碼其中給出了完整實現這裡的要點在於這個SimpleMatrix實例被傳遞給一個JMS客戶端該客戶端只要調用doWork()即可利用其CPU來執行工作對於這個例子我不會實際從數據庫中檢索矩陣或者把矩陣保存到數據庫中但是在實際應用中這是必須完成的工作
Servlet工作創建程序
可以使用一個servlet來創建這些UnitOfWork實例盡管WebLogic Server啟動類可以執行同樣的功能但出於管理的目的從安全的Web浏覽器發送消息給servlet要更加容易(另一種可選的實現是使用Web服務)如果在servlet上安置了安全性通過身份驗證的用戶可以在查詢字符串中傳遞命令以便開始交付工作單元給JMS請求隊列或停止交付我將給出一個servlet的主干例子以說明其中的一些有用方法
public class WorkServlet extends HttpServlet {
private QueueSender qsender;
private ObjectMessage msg;
private int numMessages = ;
// This places the unit of work on the request queue
public synchronized boolean sendMessages(int numberOfMessages
PrintWriter o) {
for(int i=; i
JMS客戶端類的任務僅僅是接受請求隊列上的消息調用對象上的doWork()方法在這台計算機上執行工作然後把結果返回給應答隊列消息驅動bean從應答隊列中獲取結果以便進行進一步的處理和保存可以檢查它是否是文本消息然後告訴客戶端停止處理從而允許發送控件消息給客戶端當然在實際應用中消息可能包含客戶端的名稱這樣就不會造成所有的客戶端都停止處理
使用UnitOfWork接口的優點在於JMS客戶端只要編寫一次就可以用於以後實現該接口的任何類這使得JMS客戶端具有很大的通用性可以不加修改地應用到許多不同的場景中只需把編譯後的UnitOfWork接口以及它所有的實現類都包含在客戶端的類路徑中
在這個簡化模型中客戶端需要等待消息到達以啟動處理在實際情況中客戶端的方法等待的條件可以是一天中的某個時刻比如下午點或者是計算機的CPU負載低於某個阈值需要把此類邏輯添加到客戶端使之與計算機的使用安排更加一致
消息驅動bean接收程序
消息驅動bean實例將監聽應答隊列看看有沒有已完成的工作對象單元下面給出一個例子的主干部分public class MessageWorkBean implements MessageDrivenBean
MessageListener {
// This method will receive a unit of work object to store
public void onMessage(Message msg) {
ObjectMessage om = (ObjectMessage) msg;
try {
UnitOfWork unit = (UnitOfWork)omgetObject();
unitprint();
unitstore();
}
catch(JMSException ex) {
log(Message Driven Bean: Could not retrieve Unit of Work);
exprintStackTrace();
}
}
}
這裡有一個有趣的方法叫做onMessage()
這個方法的用途僅僅是從應答隊列接收已完成的對象
接著
它將調用其print()和store()方法
我的目標是讓服務器把它對這個工作單元的處理工作分發給其他計算機
我已經通過JMS客戶端實現了這一點
並使用消息驅動bean把結果返回給服務器
可擴展性方面的考慮
在這個框架的實際實現中
我們應該要解決幾個問題
從而讓例子變得可以擴展
考慮使用一個大小可以調整的消息驅動bean池來處理響應
如果請求隊列沒有外部使用者應該創建一些消息驅動bean來使用服務器上的請求隊列這與本文的主旨不相符但是可以防止隊列溢出或者在沒有使用者的情況下請求隊列利用不充分
如果存在多種類型的工作單元那麼每種類型都應該有自己的請求和響應隊列
對於WebLogic Server考慮使用JMS頁面調度技術以便防止當隊列中存在過多沒有及時使用的消息時出現內存不足問題
對於WebLogic Server如果生產者(servlet)生產出過多沒有使用的工作考慮使用WebLogic JMS的調節功能
對於WebLogic Server考慮對隊列使用分布式目的地因為這可以把隊列分布到多台服務器上在這種情況下應該集群化servlet本身並對其進行協調以避免創建重復的工作請求單元
還應該考慮本文結尾處的參考資料此外對其他服務器也適用的考慮事項是把客戶端部分交付給各台計算機的方式一種方式是自願即每台計算機的所有者都下載一個可以在客戶端計算機上配置和運行的安裝程序另一種方式是使用商業軟件分發包它可以自動下載客戶端的最新版本並把它安裝在客戶端計算機上
使用WebLogic Integration工作流來分發工作
前面給出了一種把工作單元分發給客戶端的直觀方法即使用servlet和消息驅動bean盡管該方法實現起來相當容易但是它不能解決的問題還很多比如如何以自支持的方式啟動過程定時把請求交付給請求隊列當然我們不希望讓管理員編寫一個shell腳本來不停地調用該servlet此外還應該以一種應用程序可以預先控制的方式限制所使用的請求數量考慮到這一點下面給出一個更加復雜的例子用於把工作單元分發給遠程JMS客戶端並對其做出響應從而利用未充分使用的計算機
該方法使用了兩個在BEA WebLogic Workshop中開發的WebLogic Integration (WLI)工作流
即Java流程定義(Java Process Definition
JPD)文件
它是BPEL/J (Business Process Engineering Language for Java)的前身
BPEL/J是在JSR
中定義的
第一個工作流響應某些Web服務請求而啟動
並執行初始化以通過一個JMS控件訂閱JMS請求隊列
該工作流使用一個Timer控件不停地進行循環
並定時喚醒一個while循環
從而在請求隊列上放置更多的工作單元
該工作流還將使用一個定制Java控件(在本文相關代碼中給出)來浏覽請求隊列
以便決定是否需要在隊列上放置更多請求來防止隊列出現過載
最後
工作流還將等待來自Web服務的停止消息
然後停止處理
第二個工作流執行的任務與前面例子中的消息驅動bean相同
因為它將對響應隊列中的消息做出響應
以便從出隊的響應隊列調用print()和store()方法
這是一個生存期很短的工作流
而WebLogic Integration將按照要求產生足夠的實例
浏覽JMS隊列
WebLogic Integration被用作一種為遠程流程構造和匯編服務的機制
有現成的組件程序集
即Java控件
它使得開發人員可以輕松地構建復合應用程序而不需要進行大量的開發
盡管WebLogic Integration提供了開箱即用的JMS控件
用於在使用JMS時抽象化內部細節
在某些情況下
由於要細粒度地訪問底層方法
最好還是創建一個可重用的定制控件
在這個示例框架中
我需要浏覽工作請求隊列
以統計在隊列中等待的工作項的數量
然後決定能否在隊列中放入更多工作項
而不會引起隊列過載
為此
我們編寫了一個定制Java控件
JMSBrowse
它有一個這樣的方法
public interface JMSBrowse extends Control {
int numberOfElementsInQueue(String qFactory
String qName);
}
這個控件的實現使用了JMS QueueBrowser類來查看一個帶有給定的JMS連接工廠的給定JMS隊列它返回隊列中等待處理的實例個數本文所附的代碼中提供了完整實現
啟動和停止工作流的Web服務
為了啟動和停止負責把工作單元分發給請求隊列的WebLogic Integration流程我們創建了一個Java Web Service (JWS)它服從JSR 帶有兩個方法public classControlWebService implements
combeajwsWebService {
/**
* @common:control
*/
private ControlsJMSStopControlMessage JMSStopControl;
/**
* @common:control
*/
private ControlsJMSControlMessage JMSControl;
static final long serialVersionUID = L;
/**
* @common:operation
*/
public void startFlow() {
JMSControlsubscribe();
JMSControlsendTextMessage(start);
JMSControlunsubscribe();
}
/**
* @common:operation
*/
public void stopFlow() {
JMSStopControlsubscribe();
JMSStopControlsendTextMessage(stop);
JMSStopControlunsubscribe();
}
}
該Web服務不是直接調用工作流而是把一條消息放在JMS隊列中然後調用WorkerMessage把消息發送給分發JPD這解除了Web服務實現與工作流之間的耦合以保持其模塊性在WebLogic Integration中有一個概念叫做事件生成器可以使用WebLogic Integration Administration Console對它進行配置您可以把事件生成器配置為從JMS WorkerMessage中取出消息然後將其交付給一個Message Broker通道(邏輯概念)分發工作流監聽/UnitOfWork/StartWorkflow通道該通道被綁定在與JMS WorkerMessage隊列相關聯的JMS事件生成器上只要有一個String start消息交付到此通道上工作流就會開始工作類似地開始之後分發工作流就會在它的一個Event Choice節點中監聽Message Broker通道(/UnitOfWork/StopWorkflow)以便從WorkerStopMessage JMS隊列接收stop消息然後事件生成器再次把WorkerStopMessage隊列上的JMS消息關聯到/UnitOfWork/StopWorkflow通道以便交付消息
這實際上創建了一種與啟動和停止分發工作流的實現解耦合的面向服務方法通過Web服務客戶端或者使用所提供的WebLogic Integration Workshop Test Browser可以輕松對Web服務進行測試
分發工作流
圖說明了負責分發工作單元的DistributeFlowjpd我們的簡單矩陣對象以及請求隊列的相關部分
圖 用於分發工作單元的工作流
while循環不斷地循環直到一條stop消息改變布爾變量的值才跳出循環並結束工作流Event Choice等待兩個Control Receive回調的其中一個第一個回調是通過剛剛描述的Web服務從一個Message Broker通道接收一條Stop消息第二個回調對一個Timer控件做出響應我們已經通過該控件的屬性面板對它進行了設置每秒鐘發生一次這將使處理繼續而下一個行為將調用定制Java控件來浏覽WorkerRequest隊列以獲得等待處理的請求的個數接下來決策節點檢查請求的個數是否已經超出請求的最大個數(此處的最大個數被設置為其值保存在一個變量中)如果尚未超出就會調用一個執行節點然後使用JMS控件在請求隊列中放入個矩陣對象如下所示
public void perform() throws Exception {
for(int i = ; i < maxInQueue; i++) {
matrix = new SimpleMatrix();
jmsControlsendObjectMessage(matrix);
}
}
響應工作流的JMS客戶端
響應工作流的JMS客戶端與前面在WebLogic Server部分中描述的JMS客戶端幾乎一模一樣惟一的區別在於現在客戶端使用一條字節消息(而不是對象消息)對響應隊列做出響應客戶端把SimpleMatrix對象轉換為一個字節數組並將其傳遞給響應隊列這樣做的理由是與綁定到響應隊列的事件生成器相關聯的Message Broker通道只能夠監聽數據流即StringXML Bean或字節數組相關代碼被設計用於對WebLogic Integration請求消息和普通的WebLogic Server請求消息做出響應
接收一個已完成的工作單元的工作流如圖所示
圖接收程序的工作流
這裡的重要行為是perform節點它用於把字節數組轉換為一個對象並調用print()和store()方法
public void perform() throws Exception {
ByteArrayInputStream arrayInputStream = new
ByteArrayInputStream(rawDatabyteValue());
ObjectInputStream objectInputStream = new
ObjectInputStream(arrayInputStream);
UnitOfWork unit = (UnitOfWork) objectInputStreamreadObject();
unitprint();
unitstore();
objectInputStreamclose();
}
使用WebLogic Integration工作流
您已經了解到使用工作流Java控件和Message Broker通道可以提供一種更加完善的方式把工作分發給未完全利用的計算機只要在流程流中添加更多的行為節點就可以讓處理過程變得像您所期望的那樣面面俱到例如該工作流可以有一個審計控件用於在把所有發送到內部日志文件的請求放到隊列中之前對其進行審計該工作流可以把請求重定向到其他JMS隊列只要修改JMS控件的屬性值即可為了實現可擴展性甚至可以讓遠程Web服務啟動多個工作流實例最後基於業務安排Timer控件的時間間隔可以更小
使用Message Broker通道和事件生成器的另一個好處在於WebLogic Integration Administration Console可以監控事件生成器的響應消息的數量以便實現進一步的控制通過控制台您可以掛起和恢復事件生成器及通道從而對生產事件做出響應
這種靈活性使得WebLogic Integration工作流成為一種極具吸引力的方法
結束語
使用遠程JMS客戶端來分發工作的優點在於它有效地利用了網絡計算機來進行某種類型的批處理工作同時減輕了原來服務器的負擔這種方法的一個著名例子是Search for Extraterrestrial Intelligence (SETI@home)系統它利用全世界的PC來執行工作單元本文使用了一種JMS客戶端的框架還討論了如何部署這類解決方案以實現可擴展性力求讓這種方法通用化本文還討論了用於把工作分發給遠程客戶端的多種方法並提供了一種面向服務的方法作為首選
From:http://tw.wingwit.com/Article/program/Java/hx/201311/27189.html