熱點推薦:
您现在的位置: 電腦知識網 >> 編程 >> .NET編程 >> 正文

基於消息與.Net Remoting的分布式處理架構

2022-06-13   來源: .NET編程 

  一消息的定義

  要實現進程間的通信則通信內容的載體——消息就必須在服務兩端具有統一的消息標准定義從通信的角度來看消息可以分為兩類Request Messge和Reply Message為簡便起見這兩類消息可以采用同樣的結構

  消息的主體包括IDName和Body我們可以定義如下的接口方法來獲得消息主體的相關屬性

  public interface IMessage:ICloneable
    {

  IMessageItemSequence GetMessageBody();
         string GetMessageID();
         string GetMessageName();

  void SetMessageBody(IMessageItemSequence aMessageBody);
         void SetMessageID(string aID);
         void SetMessageName(string aName);
    }

  消息主體類Message實現了IMessage接口在該類中消息體Body為IMessageItemSequence類型這個類型用於Get和Set消息的內容Value和Item

  public interface IMessageItemSequence:ICloneable
    {       

  IMessageItem GetItem(string aName);
         void SetItem(string aNameIMessageItem aMessageItem);        

  string GetValue(string aName);   
         void SetValue(string aNamestring aValue);
    }

  Value為string類型並利用HashTable來存儲Key和Value的鍵值對而Item則為IMessageItem類型同樣的在IMessageItemSequence的實現類中利用HashTable存儲了Key和Item的鍵值對

  IMessageItem支持了消息體的嵌套它包含了兩部分SubValue和SubItem實現的方式和IMessageItemSequence相似定義這樣的嵌套結構使得消息的擴展成為可能一般的結構如下

  IMessage——Name
                     ——ID
                     ——Body(IMessageItemSequence)
                            ——Value
                            ——Item(IMessageItem)
                                   ——SubValue
                                   ——SubItem(IMessageItem)
                                          ——……

  各個消息對象之間的關系如下

  Distribute1.gif

  在實現服務進程通信之前我們必須定義好各個服務或各個業務的消息格式通過消息體的方法在服務的一端設置消息的值然後發送並在服務的另一端獲得這些值例如發送消息端定義如下的消息體

  IMessageFactory factory = new MessageFactory();
           IMessageItemSequence body = factoryCreateMessageItemSequence();
           bodySetValue(namevalue);
           bodySetValue(namevalue);

  IMessageItem item = factoryCreateMessageItem();
           itemSetSubValue(subnamesubvalue);
           itemSetSubValue(subnamesubvalue);

  IMessageItem subItem = factoryCreateMessageItem();
           subItemSetSubValue(subsubnamesubsubvalue);
           subItemSetSubValue(subsubnamesubsubvalue);
           IMessageItem subItem = factoryCreateMessageItem();
           subItemSetSubValue(subsubnamesubsubvalue);
           subItemSetSubValue(subsubnamesubsubvalue);

  itemSetSubItem(subitemsubItem);
           itemSetSubItem(subitemsubItem);

  bodySetItem(itemitem);

  //Send Request Message
           MyServiceClient service = new MyServiceClient(Client);
           IMessageItemSequence reply = serviceSendRequest(TestServiceTestbody);

  在接收消息端就可以通過獲得body的消息體內容進行相關業務的處理 

  二Net Remoting服務

  在Net中要實現進程間的通信主要是應用Remoting技術根據前面對消息的定義可知實際上服務的實現可以認為是對消息的處理因此我們可以對服務進行抽象定義接口IService:

  public interface IService
    {
         IMessage Execute(IMessage aMessage);
    }

  Execute()方法接受一條Request Message對其進行處理後返回一條Reply Message在整個分布式處理架構中可以認為所有的服務均實現該接口但受到Remoting技術的限制如果要實現服務則該服務類必須繼承自MarshalByRefObject同時必須在服務端被Marshal隨著服務類的增多必然要在服務兩端都要對這些服務的信息進行管理這加大了系統實現的難度與管理的開銷如果我們從另外一個角度來分析服務的性質基於消息處理而言所有服務均是對Request Message的處理我們完全可以定義一個Request服務負責此消息的處理

  然而Request服務處理消息的方式雖然一致但畢竟服務實現的業務即對消息處理的具體實現卻是不相同的對我們要實現的服務可以分為兩大類業務服務與Request服務實現的過程為首先具體的業務服務向Request服務發出Request請求Request服務偵聽到該請求然後交由其偵聽的服務來具體處理

  業務服務均具有發出Request請求的能力且這些服務均被Request服務所偵聽因此我們可以為業務服務抽象出接口IListenService

  public interface IListenService
    {
         IMessage OnRequest(IMessage aMessage);  
    }

  Request服務實現了IService接口並包含IListenService類型對象的委派以執行OnRequest()方法

  public class RequestListener:MarshalByRefObjectIService
    {
         public RequestListener(IListenService listenService)
         {
             m_ListenService = listenService;
         }

  private IListenService m_ListenService;

  #region IService Members

  public IMessage Execute(IMessage aMessage)
         {
             return thism_ListenServiceOnRequest(aMessage);
         }       

  #endregion
         public override object InitializeLifetimeService()
         {
             return null;
         }
    }

  在RequestListener服務中繼承了MarshalByRefObject類同時實現了IService接口通過該類的構造函數接收IListService對象

  由於Request消息均由Request服務即RequestListener處理因此業務服務的類均應包含一個RequestListener的委派唯一的區別是其服務名不相同業務服務類實現IListenService接口但不需要繼承MarshalByRefObject因為被Marshal的是該業務服務內部的RequestListener對象而非業務服務本身

  public abstract class Service:IListenService
    {
         public Service(string serviceName)
         {
             m_ServiceName = serviceName;  
             m_RequestListener = new RequestListener(this); 
         }       

  #region IListenService Members
         public IMessage OnRequest(IMessage aMessage)
         {
             //……
         }  

  #endregion

  private string m_ServiceName;
         private RequestListener m_RequestListener;     
    }

  Service類是一個抽象類所有的業務服務均繼承自該類最後的服務架構如下

  Distribute2.gif

  我們還需要在Service類中定義發送Request消息的行為通過它才能使業務服務被RequestListener所偵聽

  public IMessageItemSequence SendRequest(string aServiceNamestring                                        aMessageNameIMessageItemSequence aMessageBody)
         {

  IMessage message = m_FactoryCreateMessage();
             messageSetMessageName(aMessageName);
             messageSetMessageID();
             messageSetMessageBody(aMessageBody);

  IService service = FindService(aServiceName);
             IMessageItemSequence replyBody = m_FactoryCreateMessageItemSequence();
             if (service != null)
             {
                  IMessage replyMessage = serviceExecute(message);
                  replyBody = replyMessageGetMessageBody();          
             }
             else
             {          
                  replyBodySetValue(resultFailure);          
             }
             return replyBody;
         }

  注意SendRequest()方法的定義其參數包括服務名消息名和被發送的消息主體而在實現中最關鍵的一點是FindService()方法我們要查找的服務正是與之對應的RequestListener服務不過在此之前我們還需要先將服務Marshal
         public void Initialize()
         {                                          
             RemotingServicesMarshal(thism_RequestListenerthism_ServiceName +  RequestListener);
         }

  我們Marshal的對象是業務服務中的Request服務對象m_RequestListener這個對象在Service的構造函數中被實例化

  m_RequestListener = new RequestListener(this); 

  注意在實例化的時候是將this作為IListenService對象傳遞給RequestListener因此此時被Marshal的服務對象保留了業務服務本身即Service的指引可以看出在Service和RequestListener之間采用了雙重委派的機制

  通過調用Initialize()方法初始化了一個服務對象其類型為RequestListener(或IService)其服務名為Service的服務名 + RequestListener而該服務正是我們在SendRequest()方法中要查找的Service

  IService service = FindService(aServiceName);

  下面我們來看看FindService()方法的實現

  protected IService FindService(string aServiceName)
         {
             lock (thism_Services)
             {
                  IService service = (IService)m_Services[aServiceName];
                  if (service != null)
                  {
                      return service;
                  }
                  else
                  {
                      IService tmpService = GetService(aServiceName);
                      AddService(aServiceNametmpService);
                      return tmpService;
                  }
             }
         }

  可以看到這個服務是被添加到m_Service對象中該對象為SortedList類型服務名為KeyIService對象為Value如果沒有找到則通過私有方法GetService()來獲得

  private IService GetService(string aServiceName)
         {
             IService service = (IService)ActivatorGetObject(typeof(RequestListener)
                  tcp://localhost:/ + aServiceName + RequestListener);
             return service;
         }

  在這裡ChannelIPPort應該從配置文件中獲取為簡便起見這裡直接賦為常量

  再分析SendRequest方法在找到對應的服務後執行了IService的Execute()方法此時的IService為RequestListener而從前面對RequestListener的定義可知Execute()方法執行的其實是其偵聽的業務服務的OnRequest()方法

  我們可以定義一個具體的業務服務類來分析整個消息傳遞的過程該類繼承於Service抽象類

  public class MyService:Service
    {
         public MyService(string aServiceName):base(aServiceName)
         {}          
    }

  假設把進程分為服務端和客戶端那麼對消息處理的步驟如下

   在客戶端調用MyService的SendRequest()方法發送Request消息
  查找被Marshal的服務即RequestListener對象此時該對象應包含對應的業務服務對象MyService
  在服務端調用RequestListener的Execute()方法該方法則調用業務服務MyService的OnRequest()方法

  在這些步驟中除了第一步在客戶端執行外其他的步驟均是在服務端進行
 

  三業務服務對於消息的處理

  前面實現的服務架構已經較為完整地實現了分布式的服務處理但目前的實現並未體現對消息的處理我認為對消息的處理等價與具體的業務處理這些業務邏輯必然是在服務端完成每個服務可能會處理單個業務也可能會處理多個業務並且服務與服務之間仍然存在通信某個服務在處理業務時可能需要另一個服務的業務行為也就是說每一種類的消息處理的方式均有所不同而這些消息的唯一標識則是在SendRequest()方法已經有所體現的aMessageName

  雖然處理的消息不同所需要的服務不同但是根據我們對消息的定義我們仍然可以將這些消息處理機制抽象為一個統一的格式Net中體現這種機制的莫過於委托delegate我們可以定義這樣的一個委托

  public delegate void RequestHandler(string aMessageNameIMessageItemSequence aMessageBodyref IMessageItemSequence aReplyMessageBody);

  在RequestHandler委托中它代表了這樣一族方法接收三個入參aMessageNameaMessageBodyaReplyMessageBody返回值為void其中aMessageName代表了消息名它是消息的唯一標識aMessageBody是待處理消息的主體業務所需要的所有數據都存儲在aMessageBody對象中aReplyMessageBody是一個引用對象它存儲了消息處理後的返回結果通常情況下我們可以用<resultSuccess>或<result Failure>來代表處理的結果是成功還是失敗

  這些委托均在服務初始化時被添加到服務類的SortedList對象中鍵值為aMessageName所以我們可以在抽象類中定義如下方法     

  protected abstract void AddRequestHandlers();
         protected void AddRequestHandler(string aMessageNameRequestHandler handler)
         {
             lock (thism_EventHandlers)
             {
                  if (!thism_EventHandlersContains(aMessageName))
                  {
                      thism_EventHandlersAdd(aMessageNamehandler);
                  }
             }
         }

  protected RequestHandler FindRequestHandler(string aMessageName)
         {
             lock (thism_EventHandlers)
             {
                  RequestHandler handler = (RequestHandler)m_EventHandlers[aMessageName];
                  return handler;
             }
         }       

  AddRequestHandler()用於添加委托對象與aMessageName的鍵值對而FindRequestHandler()方法用於查找該委托對象而抽象方法AddRequestHandlers()則留給Service的子類實現簡單的實現如MyService的AddRequestHandlers()方法

  public class MyService:Service
    {
         public MyService(string aServiceName):base(aServiceName)
         {}

  protected override void AddRequestHandlers()
         {
             thisAddRequestHandler(Testnew RequestHandler(Test));
             thisAddRequestHandler(Testnew RequestHandler(Test));
         }

  private void Test(string aMessageNameIMessageItemSequence aMessageBodyref  IMessageItemSequence aReplyMessageBody)
         {
             ConsoleWriteLine(MessageName:{}\naMessageName);
             ConsoleWriteLine(MessageBody:{}\naMessageBody);
             aReplyMessageBodySetValue(resultSuccess);
         }

  private void Test(string aMessageNameIMessageItemSequence aMessageBodyref   IMessageItemSequence aReplyMessageBody)
         {
             ConsoleWriteLine(Test + aMessageBodyToString());
         }
    }

  Test和Test方法均為匹配RequestHandler委托簽名的方法然後在AddRequestHandlers()方法中通過調用AddRequestHandler()方法將這些方法與MessageName對應起來添加到m_EventHandlers中

  需要注意的是本文為了簡要的說明這種處理方式所以簡化了Test和Test方法的實現而在實際開發中它們才是實現具體業務的重要方法而利用這種方式則解除了服務之間依賴的耦合度我們隨時可以為服務添加新的業務邏輯也可以方便的增加服務

  通過這樣的設計Service的OnRequest()方法的最終實現如下所示

  public IMessage OnRequest(IMessage aMessage)
         {
             string messageName = aMessageGetMessageName();
             string messageID = aMessageGetMessageID();
             IMessage message = m_FactoryCreateMessage();

  IMessageItemSequence replyMessage = m_FactoryCreateMessageItemSequence();
             RequestHandler handler = FindRequestHandler(messageName);
             handler(messageNameaMessageGetMessageBody()ref replyMessage);

  messageSetMessageName(messageName);
             messageSetMessageID(messageID);
             messageSetMessageBody(replyMessage);

  return message;
         }

  利用這種方式我們可以非常方便的實現服務間通信以及客戶端與服務端間的通信例如我們分別在服務端定義MyService(如前所示)和TestService

  public class TestService:Service
    {
         public TestService(string aServiceName):base(aServiceName)
         {}

  protected override void AddRequestHandlers()
         {
             thisAddRequestHandler(Testnew RequestHandler(Test));         
         }

  private void Test(string aMessageNameIMessageItemSequence aMessageBodyref  IMessageItemSequence aReplyMessageBody)
         {           
             aReplyMessageBody = SendRequest(MyServiceaMessageNameaMessageBody);
             aReplyMessageBodySetValue(resultSuccess);
         }

  }

  注意在TestService中的Test方法它並未直接處理消息aMessageBody而是通過調用SendRequest()方法將其傳遞到MyService中

  對於客戶端而言情況比較特殊根據前面的分析我們知道除了發送消息的操作是在客戶端完成外其他的具體執行都在服務端實現所以諸如MyService和TestService等服務類只需要部署在服務端即可而客戶端則只需要定義一個實現Service的空類即可

  public class MyServiceClient:Service
    {
        public MyServiceClient(string aServiceName):base(aServiceName)
         {}

  protected override void AddRequestHandlers()
         {}
    }

  MyServiceClient類即為客戶端定義的服務類在AddRequestHandlers()方法中並不需要實現任何代碼如果我們在Service抽象類中將AddRequestHandlers()方法定義為virtual而非abstract方法則這段代碼在客戶端服務中也可以省去另外客戶端服務類中的aServiceName可以任意賦值它與服務端的服務名並無實際聯系至於客戶端具體會調用哪個服務則由SendRequest()方法中的aServiceName決定

  IMessageFactory factory = new MessageFactory();
           IMessageItemSequence body = factoryCreateMessageItemSequence();
           //……
           MyServiceClient service = new MyServiceClient(Client);
           IMessageItemSequence reply = serviceSendRequest(TestServiceTestbody);

  對於serviceSendRequest()的執行而言會先調用TestService的Test方法然後再通過該方法向MyService發送最終調用MyService的Test方法

  我們還需要另外定義一個類負責添加服務並初始化這些服務

  public class Server
    {
         public Server()
         {
             m_Services = new ArrayList();
         }
         private ArrayList m_Services;    
         public void AddService(IListenService service)
         {
             thism_ServicesAdd(service);
         }

  public void Initialize()
         {  

  IDictionary tcpProp = new Hashtable();
             tcpProp[name] = tcp;
             tcpProp[port] = ;

  TcpChannel channel = new TcpChannel(tcpPropnew                                             BinaryClientFormatterSinkProvider()new BinaryServerFormatterSinkProvider());           
             ChannelServicesRegisterChannel(channel);
             foreach (Service service in m_Services)
             {
                  serviceInitialize();
             }           
         }
    }

  同理這裡的ChannelIP和Port均應通過配置文件讀取最終的類圖如下所示

  Distribute3.gif

  在服務端可以調用Server類來初始化這些服務
      static void Main(string[] args)
         {  
             MyService service = new MyService(MyService);
             TestService service = new TestService(TestService);
 

  Server server = new Server();
             serverAddService(service);
             serverAddService(service);

  serverInitialize();
             ConsoleReadLine();
         }

  四結論

  利用這個基於消息與Net Remoting技術的分布式架構可以將企業的業務邏輯轉換為對消息的定義和處理要增加和修改業務就體現在對消息的修改上服務間的通信機制則完全交給整個架構來處理如果我們將每一個委托所實現的業務(或者消息)理解為Contract則該結構已經具備了SOA的雛形當然該架構僅僅處理了消息的傳遞而忽略了對底層事件的處理(類似於Corba的Event Service)這個功能我想留待後面實現

  唯一遺憾的是我缺乏驗證這個架構穩定性和效率的環境應該說這個架構是我們在企業項目解決方案中的一個實踐但是解決方案則是利用了CORBA中間件在Unix環境下實現並運行本架構僅僅是借鑒了核心的實現思想和設計理念從而完成的在Net平台下的移植由於Unix與Windows Server的區別其實際的優勢還有待驗證


From:http://tw.wingwit.com/Article/program/net/201311/13559.html
    推薦文章
    Copyright © 2005-2022 電腦知識網 Computer Knowledge   All rights reserved.