問題的提出
所謂單個寫入程序/多個閱讀程序的線程同步問題是指任意數量的線程訪問共享資源時寫入程序(線程)需要修改共享資源而閱讀程序(線程)需要讀取數據在這個同步問題中很容易得到下面二個要求
)當一個線程正在寫入數據時其他線程不能寫也不能讀
)當一個線程正在讀入數據時其他線程不能寫但能夠讀
在數據庫應用程序環境中經常遇到這樣的問題比如說有n個最終用戶他們都要同時訪問同一個數據庫其中有m個用戶要將數據存入數據庫nm個用戶要讀取數據庫中的記錄
很顯然在這個環境中我們不能讓兩個或兩個以上的用戶同時更新同一條記錄如果兩個或兩個以上的用戶都試圖同時修改同一記錄那麼該記錄中的信息就會被破壞
我們也不讓一個用戶更新數據庫記錄的同時讓另一用戶讀取記錄的內容因為讀取的記錄很有可能同時包含了更新和沒有更新的信息也就是說這條記錄是無效的記錄
實現分析
規定任一線程要對資源進行寫或讀操作前必須申請鎖根據操作的不同分為閱讀鎖和寫入鎖操作完成之後應釋放相應的鎖將單個寫入程序/多個閱讀程序的要求改變一下可以得到如下的形式
一個線程申請閱讀鎖的成功條件是當前沒有活動的寫入線程
一個線程申請寫入鎖的成功條件是當前沒有任何活動(對鎖而言)的線程
因此為了標志是否有活動的線程以及是寫入還是閱讀線程引入一個變量m_nActive如果m_nActive > 則表示當前活動閱讀線程的數目如果m_nActive=則表示沒有任何活動線程m_nActive <表示當前有寫入線程在活動注意m_nActive<時只能取的值因為只允許有一個寫入線程活動
為了判斷當前活動線程擁有的鎖的類型我們采用了線程局部存儲技術(請參閱其它參考書籍)將線程與特殊標志位關聯起來
申請閱讀鎖的函數原型為public void AcquireReaderLock( int millisecondsTimeout )其中的參數為線程等待調度的時間函數定義如下
public void AcquireReaderLock( int millisecondsTimeout )
{
// m_mutext很快可以得到以便進入臨界區
m_mutexWaitOne( );
// 是否有寫入線程存在
bool bExistingWriter = ( m_nActive < );
if( bExistingWriter )
{ //等待閱讀線程數目加當有鎖釋放時根據此數目來調度線程
m_nWaitingReaders++;
} else
{ //當前活動線程加
m_nActive++;
}
m_mutexReleaseMutex();
//存儲鎖標志為Reader
SystemLocalDataStoreSlot slot = ThreadGetNamedDataSlot(m_strThreadSlotName);
object obj = ThreadGetData( slot );
LockFlags flag = LockFlagsNone;
if( obj != null )
flag = (LockFlags)obj ;
if( flag == LockFlagsNone )
{
ThreadSetData( slot LockFlagsReader );
}
else
{
ThreadSetData( slot (LockFlags)((int)flag | (int)LockFlagsReader ) );
}
if( bExistingWriter )
{ //等待指定的時間
thism_aeReadersWaitOne( millisecondsTimeout true );
} }
它首先進入臨界區(用以在多線程環境下保證活動線程數目的操作的正確性)判斷當前活動線程的數目如果有寫線程(m_nActive<)存在則等待指定的時間並且等待的閱讀線程數目加如果當前活動線程是讀線程(m_nActive>=)則可以讓讀線程繼續運行
申請寫入鎖的函數原型為public void AcquireWriterLock( int millisecondsTimeout )其中的參數為等待調度的時間函數定義如下
public void AcquireWriterLock( int millisecondsTimeout )
{
// m_mutext很快可以得到以便進入臨界區
m_mutexWaitOne( );
// 是否有活動線程存在
bool bNoActive = m_nActive == ;
if( !bNoActive )
{ m_nWaitingWriters++;
} else
{
m_nActive;
}
m_mutexReleaseMutex();
//存儲線程鎖標志
SystemLocalDataStoreSlot slot = ThreadGetNamedDataSlot( myReaderWriterLockDataSlot );
object obj = ThreadGetData( slot );
LockFlags flag = LockFlagsNone;
if( obj != null )
flag = (LockFlags)ThreadGetData( slot );
if( flag == LockFlagsNone )
{ ThreadSetData( slot LockFlagsWriter );
} else
{
ThreadSetData( slot (LockFlags)((int)flag | (int)LockFlagsWriter ) );
}
//如果有活動線程等待指定的時間
if( !bNoActive )
thism_aeWritersWaitOne( millisecondsTimeout true );
}
它首先進入臨界區判斷當前活動線程的數目如果當前有活動線程存在不管是寫線程還是讀線程(m_nActive)線程將等待指定的時間並且等待的寫入線程數目加否則線程擁有寫的權限
釋放閱讀鎖的函數原型為public void ReleaseReaderLock()函數定義如下
public void ReleaseReaderLock()
{
SystemLocalDataStoreSlot slot = ThreadGetNamedDataSlot(m_strThreadSlotName );
LockFlags flag = (LockFlags)ThreadGetData( slot );
if( flag == LockFlagsNone )
{ return;
}
bool bReader = true; switch( flag )
{
case LockFlagsNone:
break;
case LockFlagsWriter:
bReader = false;
break;
}
if( !bReader )
return;
ThreadSetData( slot LockFlagsNone );
m_mutexWaitOne();
AutoResetEvent autoresetevent = null;
thism_nActive ;
if( thism_nActive == )
{ if( thism_nWaitingReaders > )
{
m_nActive ++ ;
m_nWaitingReaders ;
autoresetevent = thism_aeReaders;
}
else if( thism_nWaitingWriters > )
{
m_nWaitingWriters;
m_nActive ;
autoresetevent = thism_aeWriters ;
} }
m_mutexReleaseMutex();
if( autoresetevent != null )
autoreseteventSet();
}
釋放閱讀鎖時首先判斷當前線程是否擁有閱讀鎖(通過線程局部存儲的標志)然後判斷是否有等待的閱讀線程如果有先將當前活動線程加等待閱讀線程數目減然後置事件為有信號如果沒有等待的閱讀線程判斷是否有等待的寫入線程如果有則活動線程數目減等待的寫入線程數目減釋放寫入鎖與釋放閱讀鎖的過程基本一致可以參看源代碼
注意在程序中釋放鎖時只會喚醒一個閱讀程序這是因為使用AutoResetEvent的原歷讀者可自行將其改成ManualResetEvent同時喚醒多個閱讀程序此時應令m_nActive等於整個等待的閱讀線程數目
測試
測試程序取自Net FrameSDK中的一個例子只是稍做修改測試程序如下
using System;
using SystemThreading;
using MyThreading;
class Resource {
myReaderWriterLock rwl = new myReaderWriterLock();
public void Read(Int threadNum) {
rwlAcquireReaderLock(TimeoutInfinite);
try { ConsoleWriteLine(Start Resource reading (Thread={}) threadNum);
ThreadSleep();
ConsoleWriteLine(Stop Resource reading (Thread={}) threadNum);
}
finally { rwlReleaseReaderLock();
} }
public void Write(Int threadNum) {
rwlAcquireWriterLock(TimeoutInfinite);
try {
ConsoleWriteLine(Start Resource writing (Thread={}) threadNum);
ThreadSleep();
ConsoleWriteLine(Stop Resource writing (Thread={}) threadNum);
}
finally { rwlReleaseWriterLock();
} }
}
class App {
static Int numAsyncOps = ;
static AutoResetEvent asyncOpsAreDone = new AutoResetEvent(false);
static Resource res = new Resource();
public static void Main() {
for (Int threadNum = ; threadNum < ; threadNum++) {
ThreadPoolQueueUserWorkItem(new WaitCallback(UpdateResource) threadNum);
}
asyncOpsAreDoneWaitOne();
ConsoleWriteLine(All operations have completed);
ConsoleReadLine();
}
// The callback methods signature MUST match that of a SystemThreadingTimerCallback
// delegate (it takes an Object parameter and returns void)
static void UpdateResource(Object state) {
Int threadNum = (Int) state;
if ((threadNum % ) != ) resRead(threadNum);
else resWrite(threadNum);
if (InterlockedDecrement(ref numAsyncOps) == )
asyncOpsAreDoneSet();
}
}
From:http://tw.wingwit.com/Article/program/net/201311/12614.html