четверг, 8 июля 2010 г.

Upgradeable read для QReadWriteLock

Объект мультипотоковой синхронизации QReadWriteLock позволяет разделять эксклюзивный и неэксклюзивный режим взаимодействия с ресурсом. Это очень удобно, поскольку позволяет отделить синхронизацию операций чтения от синхронизации операций записи, разрешая одновременно читать сразу нескольким потокам, а записывать только одному.

Но этому объекту не хватает одного интересного механизма. Частенько нужно атомарно с точки зрения ресурса сначала захватить его на чтение, затем, если выполнились какие-то условия, не освобождая, повысить уровень эксклюзивности до возможности записи и вернуться обратно в режим чтения. К сожалению так нельзя: мы или читаем, или пишем. А вход-выход из режимов лишает операцию атомарности.

Следует доработать QReadWriteLock таким образом, чтобы кроме двух базовых состояний (read и write) у него появилось «третье» состояние upgradeable read. Когда поток находится в состоянии upgradeable read, другие потоки могут блокировать объект синхронизации только на чтение. Если в это же время другой поток попытается войти в состояние upgradeable read, то он будет остановлен до тех пор, пока текущий не выведет объект из этого состояния. Повышение уровня эксклюзивности из upgradeable read до write выглядит так же, как обычный вход в режим write.


Скрытая имплементация:

#include <QReadWriteLock>
#include <QMutex>
#include <QThread>
#include <QWaitCondition>

struct QUpgradeableReadWriteLockPrivate
{
QUpgradeableReadWriteLockPrivate(QReadWriteLock::RecursionMode recursionMode)
: accessCount(0), waitingReaders(0), waitingWriters(0),upgradeableReadAccessCount(0),
recursive(recursionMode == QReadWriteLock::Recursive), currentWriter(0),
waitingUpgradeableReaders(0), currentUpgradeableReader(0)
{ }

QMutex mutex;
QWaitCondition readerWait;
QWaitCondition writerWait;
QWaitCondition upgradeableReaderWait;

int accessCount;
int upgradeableReadAccessCount;

int waitingReaders;
int waitingUpgradeableReaders;
int waitingWriters;

bool recursive;
Qt::HANDLE currentUpgradeableReader;
Qt::HANDLE currentWriter;

QHash<Qt::HANDLE, int> currentReaders;
};


И, собственно, сам класс:

class QUpgradeableReadWriteLock
{
public:
QUpgradeableReadWriteLock()
:d(new QUpgradeableReadWriteLockPrivate(QReadWriteLock::NonRecursive))
{ }

QUpgradeableReadWriteLock(QReadWriteLock::RecursionMode recursionMode)
: d(new QUpgradeableReadWriteLockPrivate(recursionMode))
{ }

~QUpgradeableReadWriteLock()
{
delete d;
}

void lockForRead()
{
tryLockForRead(ULONG_MAX);
}

bool tryLockForRead()
{
return tryLockForRead(0);
}

void lockForUpgradeableRead()
{
tryLockForUpgradeableRead(ULONG_MAX);
}

bool tryLockForUpgradeableRead()
{
return tryLockForUpgradeableRead(0);
}

bool tryLockForUpgradeableRead(int timeout)
{
QMutexLocker lock(&d->mutex);
return upgradeableReadLockRequest(timeout);
}

bool tryLockForRead(int timeout)
{
QMutexLocker lock(&d->mutex);
return readLockRequest(timeout);
}

void lockForWrite()
{
tryLockForWrite(ULONG_MAX);
}

bool tryLockForWrite()
{
return tryLockForWrite(0);
}

bool tryLockForWrite(int timeout)
{
QMutexLocker lock(&d->mutex);
return writeLockRequest(timeout);
}

void unlock()
{
QMutexLocker lock(&d->mutex);

Qt::HANDLE self = QThread::currentThreadId();

bool unlocked = false;

if (d->accessCount < 0 && d->currentWriter == self)
{
unlocked = unlockFromWrite();
}
else
{
if (d->upgradeableReadAccessCount > 0 && d->currentUpgradeableReader == self)
{
unlocked = unlockFromUpgradeableRead();
}
else
{
if (d->accessCount > 0)
unlocked = unlockFromRead();
else
qFatal("Try to unlock object that was not locked");
}
}

if (unlocked)
wakeUp();
}

protected:
inline bool unlockFromWrite()
{
if (d->accessCount < 0 && ++d->accessCount == 0)
{
d->currentWriter = 0;
return true;
}
return false;
}

inline bool unlockFromUpgradeableRead()
{
if (d->upgradeableReadAccessCount > 0 && (--d->upgradeableReadAccessCount == 0))
{
d->currentUpgradeableReader = 0;
return true;
}
return false;
}

inline bool unlockFromRead()
{
Qt::HANDLE self = QThread::currentThreadId();

if (d->recursive)
{
QHash<Qt::HANDLE, int>::iterator it = d->currentReaders.find(self);

if (it != d->currentReaders.end())
{
if (--it.value() <= 0)
d->currentReaders.erase(it);
}
}

return --d->accessCount == 0;
}

inline bool writeLockRequest(int timeout)
{
Qt::HANDLE self = QThread::currentThreadId();

QHash<Qt::HANDLE, int>::iterator it = d->currentReaders.find(self);
Q_ASSERT_X(it == d->currentReaders.end() || !it.value(),"QUpgradeableReadWriteLock::writeLockRequest",
"This thread already locked for read");

if (d->currentWriter == self)
{
Q_ASSERT_X(d->recursive, "QUpgradeableReadWriteLock::writeLockRequest","Recursion is not supported");

--d->accessCount;
Q_ASSERT_X(d->accessCount < 0, "QUpgradeableReadWriteLock::writeLockRequest","Overflow in lock counter");
return true;
}

if (!timeout)
{
if (d->accessCount != 0)
return false;
}
else
{
while ((d->accessCount != 0) || (d->upgradeableReadAccessCount && (self != d->currentUpgradeableReader)))
{
++d->waitingWriters;
bool success = d->writerWait.wait(&d->mutex, timeout < 0 ? ULONG_MAX : timeout);
--d->waitingWriters;

if (!success)
return false;
}
}

d->currentWriter = self;

--d->accessCount;
Q_ASSERT_X(d->accessCount < 0, "QUpgradeableReadWriteLock::writeLockRequest",
"Overflow in lock counter");

return true;
}

inline bool readLockRequest(int timeout)
{
Qt::HANDLE self = QThread::currentThreadId();

if (d->upgradeableReadAccessCount>0 && (self == d->currentUpgradeableReader))
{
// already in upgradeable read lock
Q_ASSERT_X(d->recursive, "QUpgradeableReadWriteLock::readLockRequest","Recursion is not supported");
return upgradeableReadLockRequest(timeout);
}

if (d->recursive)
{
QHash<Qt::HANDLE, int>::iterator it = d->currentReaders.find(self);
if (it != d->currentReaders.end())
{
++it.value();
++d->accessCount;
Q_ASSERT_X(d->accessCount > 0, "QUpgradeableReadWriteLock::readLockRequest",
"Overflow in lock counter");
return true;
}
}

if (!timeout)
{
if (d->accessCount < 0)
return false;
}
else
{
while (d->accessCount < 0 || d->waitingWriters)
{
++d->waitingReaders;
bool success = d->readerWait.wait(&d->mutex, timeout < 0 ? ULONG_MAX : timeout);
--d->waitingReaders;
if (!success)
return false;
}
}

if (d->recursive)
d->currentReaders.insert(self, 1);

++d->accessCount;
Q_ASSERT_X(d->accessCount > 0, "QUpgradeableReadWriteLock::readLockRequest", "Overflow in lock counter");

return true;
}

inline bool upgradeableReadLockRequest(int timeout)
{
Qt::HANDLE self = QThread::currentThreadId();

QHash<Qt::HANDLE, int>::iterator it = d->currentReaders.find(self);
Q_ASSERT_X(it == d->currentReaders.end() || !it.value(),
"QUpgradeableReadWriteLock::upgradeableReadLockRequest",
"This thread already in read lock");

if (self == d->currentUpgradeableReader)
{
Q_ASSERT_X(d->recursive, "QUpgradeableReadWriteLock::writeLockRequest","Recursion is not supported");

++d->upgradeableReadAccessCount;
Q_ASSERT_X(d->upgradeableReadAccessCount > 0, "QUpgradeableReadWriteLock::upgradeableReadLockRequest",
"Overflow in lock counter");
return true;
}

if (!timeout)
{
if (d->upgradeableReadAccessCount != 0)
return false;
}
else
{
while (d->accessCount < 0 || d->waitingWriters || d->upgradeableReadAccessCount)
{
++d->waitingUpgradeableReaders;
bool success = d->upgradeableReaderWait.wait(&d->mutex, timeout < 0 ? ULONG_MAX : timeout);
--d->waitingUpgradeableReaders;
if (!success)
return false;
}
}

d->currentUpgradeableReader = self;

++d->upgradeableReadAccessCount;
Q_ASSERT_X(d->upgradeableReadAccessCount > 0, "QUpgradeableReadWriteLock::upgradeableReadLockRequest", "Overflow in lock counter");

return true;
}

void wakeUp()
{
if (d->waitingWriters)
d->writerWait.wakeAll();
else
if (d->waitingUpgradeableReaders)
d->upgradeableReaderWait.wakeOne();
else
if (d->waitingReaders)
d->readerWait.wakeAll();
}

private:
QUpgradeableReadWriteLockPrivate *d;
};


Устроим стресс-тест (винда)

#include <windows.h>

QUpgradeableReadWriteLock locker(QReadWriteLock::Recursive);

class TestThread : public QThread
{
public:
int num;

protected:
virtual void run()
{
while(1)
{
if (rand() % 10 > 4)
{
locker.lockForRead();
cout << "Concrete Reading "<< num << " ~~~~~~~" << endl;
Sleep(rand()%5);
cout << "End concrete Reading " << num << " ~~~~~~~~~~" << endl;
locker.unlock();
}

if (rand() % 10 > 3)
{
locker.lockForWrite();
cout << "Concrete Writing " << num << "~~~~~~~~" << endl;
Sleep(rand()%10);
cout << "End concrete Writing " << num << "~~~~~~" << endl;
locker.unlock();
}

locker.lockForUpgradeableRead();
locker.lockForUpgradeableRead();
Sleep(rand()%3);

cout << "UPG Reading " << num << " --------------" << endl;
Sleep(rand()%3);

if (rand() % 10 >= 5)
{
locker.lockForWrite();

cout << "Writing " << num << " <<<<<<<<<<<<" << endl;
Sleep(rand()%10);
cout << "Writing " << num << " Free >>>>>>>>>>" << endl;

locker.unlock();
}

cout << "UPG Reading " << num << " Free ++++++++++++" << endl;

locker.unlock();
Sleep(rand()%2);

locker.unlock();
Sleep(rand()%2);
}
}
};


int main()
{
const int threads = 100;
srand(GetTickCount());

TestThread t[threads];

for (int i=0; i<threads; i++)
{
t[i].num = i+1;
t[i].start();
}

while(true)
{
Sleep(1000);
}
}

Примеры использования блокировок:

#include <QReadWriteLock>
#include "QUpgradeableReadWriteLock.h"

// Объявление класса без рекурсии
QUpgradeableReadWriteLock locker();

// чтение
locker.lockForRead(); // вход в режим блокировки для чтения
target.read();
locker.unlock(); // выход из блокировки

// запись
locker.lockForWrite(); // вход в режим блокировки
target.read();
locker.unlock(); // выход из блокировки

// чтение с возможностью повышения до записи
locker.lockForUpgradeableRead(); // блокировка для чтения с возможностью повышения до записи
if (target.read() == true)
{
locker.lockForWrite(); // блокировка для записи
target.write();
locker.unlock(); // освобождение записи
}
locker.unlock(); // освобождение чтения


Особенность: в режиме upgradeable read можно входить в режим read. Но не наоборот. Да, и в отличие от обычного QReadWriteLock, при попытке рекурсивной блокировки в нерекурсивном режиме объект кидает исключение, а не замирает в дедлоке.

P.S. Надеюсь, что подобный механизм появится нативно в следующих релизах Qt

Комментариев нет:

Отправить комментарий