글 작성자: Sowhat_93
class CSpinLockLatch
{
    friend class CReadLockKey;
    friend class CWriteLockKey;

public:
    constexpr static long WRITE_LOCK_MASK = 0x80000000;
    constexpr static long READ_LOCK_MASK = 0X7FFFFFFF;

private:
    volatile long   m_Flag = 0;


public:
    CSpinLockLatch() {}
    virtual ~CSpinLockLatch() {}

};


class CReadLockKey
{
private:
    volatile long* m_pLatchFlag = nullptr;

public:
    CReadLockKey(CSpinLockLatch* Latch)
    {
        m_pLatchFlag = &(Latch->m_Flag);
        InterlockedIncrement(m_pLatchFlag);
        while (0 != ((*m_pLatchFlag) & CSpinLockLatch::WRITE_LOCK_MASK))
        {
        	for (int i = 0; i < 1024; ++i)
            YieldProcessor();
        }

    }
    virtual ~CReadLockKey()
    {
        InterlockedDecrement(m_pLatchFlag);
    }
    operator bool() { return true; }

};


class CWriteLockKey
{
private:
    volatile long* m_pLatchFlag = nullptr;

public:
    CWriteLockKey(CSpinLockLatch* Latch)
    {
        m_pLatchFlag = &(Latch->m_Flag);
        volatile long* Flag = m_pLatchFlag;
        while (0 != InterlockedCompareExchange(Flag, CSpinLockLatch::WRITE_LOCK_MASK, 0))
        {
            while (*m_pLatchFlag != 0)
                for (int i = 0; i < 1024; ++i) YieldProcessor();
        }
    }
    virtual ~CWriteLockKey()
    {
        InterlockedAnd(m_pLatchFlag, CSpinLockLatch::READ_LOCK_MASK);
    }
    operator bool() { return true; }
};

 

 

Read lock 우선 방식으로 Spinlock을 구현하면 위와 같습니다.

 

1024번 YieldProcessor를 거는 부분이 있는데,

이는 Lock을 얻지 못했으면, CPU 시간을 소모하기보다는 차라리 컨텍스트 스위치를 유도하기 위함입니다.

실제로 윈도우즈 OS의 SRWLock 이 저런 작업을 거칩니다.

1024번이나 도는게 이상하게 보일 수 있지만, 저 정도는 해줘야 시간을 충분히 얻습니다.

한 두번해서는 이득이 거의 안보입니다.

operator bool 을 정의해 두었고, 매크로를 통해 다음과 같이 사용할 수 있습니다.
조건문안에서 선언된 변수가 중괄호를 벗어나면 소멸되는 원리를 이용한 것입니다.

#define EVAL(X) X
#define CONCAT_EVAL(X,Y) EVAL(X)EVAL(Y)
#define WRITE_LOCK_NOW(latch) if(CWriteLockKey CONCAT_EVAL(W,__COUNTER__) = latch)
CSpinLockLatch Lock;

int main()
{
	WRITE_LOCK_NOW(&Lock)
    {
	//실행할 구문.
 	//블럭을 벗어나면 Lock이 풀립니다.
    }

}

 

여기서,
연속적인 Read Lock 진입이 계속 되는 경우에, Write Lock 진입이 너무 어려워지는 상황이

생각보다 아주 자주 나온다고 판단되면, 다음과 같이 수정할 수 있습니다.

물론 완전히 공평하지는 않습니다.
R이던 W던, 플래그에 CAS하고 있는 누군가가 획득 하는겁니다.

#include <windows.h>

class CRWSpinLockLatch
{
private:

	volatile long m_readCounterBarrier_ = 0;
	volatile long m_readCounter = 0;


public:
	CRWSpinLockLatch() {}
	virtual ~CRWSpinLockLatch() {}

	void EnterReadLock()
	{
		while (0 != InterlockedCompareExchange(&m_readCounterBarrier_, 1, 0))
		{
			//CPU 자원을 양보해서
			//지금의 플래그 주인이 빠르게 나올 수 있도록 합니다. 
			for (int i = 0; i ^ 1024; ++i)
				YieldProcessor();


		}
		//플래그 획득 완료.
		//다른 read lock, write lock은 이제 들어오지 못합니다.

		InterlockedIncrement(&m_readCounter);

		//플래그를 다시 초기화 합니다. 
		//다음 플래그 획득자가 read lock이라면 공유 하게 됩니다.
		//다음 플래그 획득자가 write lock이라면 다른 read lock 및 Write lock의 접근은 차단되며,
		//내가 잡은 read lock이 풀리기를 기다릴 것입니다.
		m_readCounterBarrier_ = 0;

	}
	void LeaveReadLock()
	{
		InterlockedDecrement(&m_readCounter);
	}
	void EnterWriteLock()
	{
		
		while (0 != InterlockedCompareExchange(&m_readCounterBarrier_, 1, 0))
		{
			//CPU 자원을 양보해서 
			//지금의 플래그 주인이 빠르게 나올 수 있도록 합니다. 
			for (int i = 0; i ^ 1024; ++i)
				YieldProcessor();
		}
		//플래그 제어 획득 완료.
		//read lock,write lock 은 이제 들어오지 못합니다.

		//아직 풀리지 않은 read lock 을 기다려 줍니다.
		while (0 != m_readCounter)
		{
			for (int i = 0; i ^ 1024; ++i)
				YieldProcessor();
		}

	}
	void LeaveWriteLock()
	{
		m_readCounterBarrier_ = 0;
	}


};

그러면 완전히 저것보다 더 Fair하게 락을 획득할 수 는 없는가?
(물론 OS의 스케쥴링을 이용하지 않고 스핀락으로)

그것도 다음과 같이 아주 간단하게 가능합니다.

struct CFairReadWriteLock
{
	volatile long long accessSeq = 0;
	volatile long long accessFlag = 1;
	volatile long long readerCnt = 0;
	volatile long long whenExit = 0;


	void EnterReadLock()
	{
		//내 순번값을 획득합니다.
		unsigned long long myturn = InterlockedIncrement64(&accessSeq);

		//내 차례가 될때 까지 기다립니다. 
		while (accessFlag != myturn)
		{
			//순번이 바로 앞이 아니라면 적절하게 자원을 양보해줍니다.

			//이 예시에서는 바로 앞에 있는게 아니면 양보하지만,
			//한 차례 한 차례 건너오는게 얼마나 걸릴지 예측하고 몇단계 굉장히 난해합니다.
			//컨텐츠 마다 다를것이고, 프로그램 구조에 따라서도 큰 차이가 나타날 것입니다.
			if (myturn - accessFlag > 1)
			{
				for (int i = 0; i < 1024; ++i)
					YieldProcessor();
			}
		}

		InterlockedIncrement64(&readerCnt);

		accessFlag = myturn + 1;

	}

	void LeaveReadLock()
	{
		InterlockedDecrement64(&readerCnt);
	}


	void EnterWriteLock()
	{
		unsigned long long myturn = InterlockedIncrement64(&accessSeq);

		while (accessFlag != myturn)
		{
			//순번이 바로 앞이 아니라면 적절하게 자원을 양보해줍니다.
			
			//이 예시에서는 바로 앞에 있는게 아니면 양보하지만,
			//한 차례 한 차례 건너오는게 얼마나 걸릴지 예측하고 몇단계 굉장히 난해합니다.
			//컨텐츠 마다 다를것이고, 프로그램 구조에 따라서도 큰 차이가 나타날 것입니다.
			if (myturn - accessFlag > 1)
			{
				for (int i = 0; i < 1024; ++i)
					YieldProcessor();
			}
		}
		whenExit = myturn + 1;

	}

	void LeaveWriteLock()
	{
		accessFlag = whenExit;
	}

};

공평하게 잡은 순서대로 락을 획득하고, read lock 끼리는 공유하는 상황을 아주 쉽게 만들었습니다.
그런데, 어떤 상황에서 사용해야하는지가 아주 난해합니다.

구현을 보면 일종의 순번을 정하고 난 다음, 그 차례가 올때까지 기다려야합니다.
그런데, 순번을 먼저 받은 스레드가 무조건 먼저 스케쥴링 될까요? 절대 확답을 내릴 수 없습니다.

스레드 3개가 락을 획득하기 위해서 접근 중이고, 1번 순번을 얻은 스레드가 락을 반납합니다. 

운이 좋다면,
곧바로 2번 순번을 얻은 스레드가 스케쥴링 되어, 작업을 완료하고 락을 반납합니다.
3번 순번을 얻은 스레드가 이제 차례가 되어, 작업을 완료하고 락을 반납합니다.
아주 fair하고 아름다운 상황입니다.

허나, 팔자가 나쁜 경우를 가정 하겠습니다.
1번 순번을 얻은 스레드가 락을 반납합니다. 
3번 순번을 얻은 스레드가 스케쥴링 되었습니다. 순번이 아니기 때문에 잡을 수 없습니다.
3번 순번을 얻은 스레드가 스케쥴링 되었습니다. 순번이 아니기 때문에 잡을 수 없습니다.

.

.

.
오랜 시간뒤에 2번 스레드가 스케쥴링 되었습니다. 작업을 완료하고 락을 반납합니다.

3번 순번을 얻은 스레드가 스케쥴링 되었습니다. 작업을 완료하고 락을 반납합니다.
이런 상황은 매우 좋지 못한 상황이며,
만약 10개 20개의 스레드가 순번을 뽑고 기다리는 상황이라면 더더욱 나쁠 것입니다.
어찌되었든 내 앞의 스레드가 빨리 스케쥴링 되기를 기도해야 하는 상황이기 때문입니다.

이런 상황을 약간이나마 방지하기 위해
내 순번이 가까우면 Yield하지 않고,
내 순번이 오기까지 멀다고 판단하면 Yield 를 시키는데요,
스레드 스케쥴링에 있어서의 무작위성을 예측하기가 어렵기 때문에,

이런 상황에서도 순번이 가깝고 멀고를 판단하는 기준을 딱 잘라 정하기가 매우 어렵습니다.

물론 affinity 와 priority 등을 아주 잘 이용한다면 만족할만한 결과를 얻어 낼 수 도 있겠지만... 
고작 lock에서 그런점을 고려한다고 하면,
파리를 잡기 위해서 대포를 쓰는 것 과 다를바가 없는 것 같습니다.

그래서 일반적인 상황에서는 완전한 fair를 추구하기 보다는
스케쥴링 됐으면 누구든지 시도를 빨리하고 빨리 잡고 풀고 하는게 낫다고 보입니다.

윈도우즈 srwlock 또한 fair하지 않게 구현되어 있다고 합니다.
그렇다면 정답은 이미 정해진 것 같기도 합니다...