Skip to content

Commit dabb819

Browse files
MalavanEQAlphamsotheeswaran-sc
authored andcommitted
Merge pull request #386 from EQ-Alpha/fix_rdb_hang
add readwrite lock for forking
1 parent 35dc4df commit dabb819

5 files changed

Lines changed: 232 additions & 124 deletions

File tree

src/module.cpp

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -365,11 +365,7 @@ typedef struct RedisModuleCommandFilter {
365365
static list *moduleCommandFilters;
366366

367367
/* Module GIL Variables */
368-
static int s_cAcquisitionsServer = 0;
369-
static int s_cAcquisitionsModule = 0;
370-
static std::mutex s_mutex;
371-
static std::condition_variable s_cv;
372-
static std::recursive_mutex s_mutexModule;
368+
static readWriteLock s_moduleGIL;
373369
thread_local bool g_fModuleThread = false;
374370

375371
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
@@ -5969,95 +5965,58 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
59695965
// as the server thread acquisition is sufficient. If we did try to lock we would deadlock
59705966
static bool FModuleCallBackLock(bool fServerThread)
59715967
{
5972-
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
5968+
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_moduleGIL.hasReader();
59735969
}
59745970
void moduleAcquireGIL(int fServerThread, int fExclusive) {
5975-
std::unique_lock<std::mutex> lock(s_mutex);
5976-
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
5977-
59785971
if (FModuleCallBackLock(fServerThread)) {
59795972
return;
59805973
}
59815974

5982-
while (*pcheck > 0)
5983-
s_cv.wait(lock);
5984-
59855975
if (fServerThread)
59865976
{
5987-
++s_cAcquisitionsServer;
5977+
s_moduleGIL.acquireRead();
59885978
}
59895979
else
59905980
{
5991-
// only try to acquire the mutexModule in exclusive mode
5992-
if (fExclusive){
5993-
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
5994-
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
5995-
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
5996-
// As a result, a deadlock has occured.
5997-
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
5998-
// in order to prevent this deadlock from occuring.
5999-
while (!s_mutexModule.try_lock())
6000-
s_cv.wait(lock);
6001-
}
6002-
++s_cAcquisitionsModule;
6003-
fModuleGILWlocked++;
5981+
s_moduleGIL.acquireWrite(fExclusive);
60045982
}
60055983
}
60065984

60075985
int moduleTryAcquireGIL(bool fServerThread, int fExclusive) {
6008-
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
6009-
if (!lock.try_lock())
6010-
return 1;
6011-
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
6012-
60135986
if (FModuleCallBackLock(fServerThread)) {
60145987
return 0;
60155988
}
60165989

6017-
if (*pcheck > 0)
6018-
return 1;
6019-
60205990
if (fServerThread)
60215991
{
6022-
++s_cAcquisitionsServer;
5992+
if (!s_moduleGIL.tryAcquireRead())
5993+
return 1;
60235994
}
60245995
else
60255996
{
6026-
// only try to acquire the mutexModule in exclusive mode
6027-
if (fExclusive){
6028-
if (!s_mutexModule.try_lock())
6029-
return 1;
6030-
}
6031-
++s_cAcquisitionsModule;
6032-
fModuleGILWlocked++;
5997+
if (!s_moduleGIL.tryAcquireWrite(fExclusive))
5998+
return 1;
60335999
}
60346000
return 0;
60356001
}
60366002

60376003
void moduleReleaseGIL(int fServerThread, int fExclusive) {
6038-
std::unique_lock<std::mutex> lock(s_mutex);
6039-
60406004
if (FModuleCallBackLock(fServerThread)) {
60416005
return;
60426006
}
6043-
6007+
60446008
if (fServerThread)
60456009
{
6046-
--s_cAcquisitionsServer;
6010+
s_moduleGIL.releaseRead();
60476011
}
60486012
else
60496013
{
6050-
// only try to release the mutexModule in exclusive mode
6051-
if (fExclusive)
6052-
s_mutexModule.unlock();
6053-
--s_cAcquisitionsModule;
6054-
fModuleGILWlocked--;
6014+
s_moduleGIL.releaseWrite(fExclusive);
60556015
}
6056-
s_cv.notify_all();
60576016
}
60586017

60596018
int moduleGILAcquiredByModule(void) {
6060-
return fModuleGILWlocked > 0;
6019+
return s_moduleGIL.hasWriter();
60616020
}
60626021

60636022

src/networking.cpp

Lines changed: 24 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3863,82 +3863,37 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) {
38633863
*
38643864
* The function returns the total number of events processed. */
38653865
void processEventsWhileBlocked(int iel) {
3866-
serverAssert(GlobalLocksAcquired());
3867-
int iterations = 4; /* See the function top-comment. */
3868-
3869-
std::vector<client*> vecclients;
3870-
listIter li;
3871-
listNode *ln;
3872-
listRewind(g_pserver->clients, &li);
38733866

3874-
// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
3875-
// so unlock here, and save them for reacquisition later
3876-
while ((ln = listNext(&li)) != nullptr)
3877-
{
3878-
client *c = (client*)listNodeValue(ln);
3879-
if (c->lock.fOwnLock()) {
3880-
serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop
3881-
c->lock.unlock();
3882-
vecclients.push_back(c);
3867+
int eventsCount = 0;
3868+
executeWithoutGlobalLock([&](){
3869+
int iterations = 4; /* See the function top-comment. */
3870+
try
3871+
{
3872+
ProcessingEventsWhileBlocked = 1;
3873+
while (iterations--) {
3874+
long long startval = g_pserver->events_processed_while_blocked;
3875+
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
3876+
AE_FILE_EVENTS|AE_DONT_WAIT|
3877+
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
3878+
/* Note that g_pserver->events_processed_while_blocked will also get
3879+
* incremeted by callbacks called by the event loop handlers. */
3880+
eventsCount += ae_events;
3881+
long long events = eventsCount - startval;
3882+
if (!events) break;
3883+
}
3884+
ProcessingEventsWhileBlocked = 0;
38833885
}
3884-
}
3885-
3886-
/* Since we're about to release our lock we need to flush the repl backlog queue */
3887-
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
3888-
if (fReplBacklog) {
3889-
flushReplBacklogToClients();
3890-
g_pserver->repl_batch_idxStart = -1;
3891-
g_pserver->repl_batch_offStart = -1;
3892-
}
3893-
3894-
long long eventsCount = 0;
3895-
aeReleaseLock();
3896-
serverAssert(!GlobalLocksAcquired());
3897-
try
3898-
{
3899-
ProcessingEventsWhileBlocked = 1;
3900-
while (iterations--) {
3901-
long long startval = g_pserver->events_processed_while_blocked;
3902-
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
3903-
AE_FILE_EVENTS|AE_DONT_WAIT|
3904-
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
3905-
/* Note that g_pserver->events_processed_while_blocked will also get
3906-
* incremeted by callbacks called by the event loop handlers. */
3907-
eventsCount += ae_events;
3908-
long long events = eventsCount - startval;
3909-
if (!events) break;
3910-
}
3911-
ProcessingEventsWhileBlocked = 0;
3912-
}
3913-
catch (...)
3914-
{
3915-
// Caller expects us to be locked so fix and rethrow
3916-
ProcessingEventsWhileBlocked = 0;
3917-
AeLocker locker;
3918-
locker.arm(nullptr);
3919-
locker.release();
3920-
for (client *c : vecclients)
3921-
c->lock.lock();
3922-
throw;
3923-
}
3924-
3925-
AeLocker locker;
3926-
locker.arm(nullptr);
3927-
locker.release();
3886+
catch (...)
3887+
{
3888+
ProcessingEventsWhileBlocked = 0;
3889+
throw;
3890+
}
3891+
});
39283892

39293893
g_pserver->events_processed_while_blocked += eventsCount;
39303894

39313895
whileBlockedCron();
39323896

3933-
// Restore it so the calling code is not confused
3934-
if (fReplBacklog) {
3935-
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
3936-
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
3937-
}
3938-
3939-
for (client *c : vecclients)
3940-
c->lock.lock();
3941-
39423897
// If a different thread processed the shutdown we need to abort the lua command or we will hang
39433898
if (serverTL->el->stop)
39443899
throw ShutdownException();

src/readwritelock.h

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#pragma once
2+
#include <condition_variable>
3+
4+
class readWriteLock {
5+
std::mutex m_readLock;
6+
std::recursive_mutex m_writeLock;
7+
std::condition_variable m_cv;
8+
int m_readCount = 0;
9+
int m_writeCount = 0;
10+
bool m_writeWaiting = false;
11+
public:
12+
void acquireRead() {
13+
std::unique_lock<std::mutex> rm(m_readLock);
14+
while (m_writeCount > 0 || m_writeWaiting)
15+
m_cv.wait(rm);
16+
m_readCount++;
17+
}
18+
19+
bool tryAcquireRead() {
20+
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
21+
if (!rm.try_lock())
22+
return false;
23+
if (m_writeCount > 0 || m_writeWaiting)
24+
return false;
25+
m_readCount++;
26+
return true;
27+
}
28+
29+
void acquireWrite(bool exclusive = true) {
30+
std::unique_lock<std::mutex> rm(m_readLock);
31+
m_writeWaiting = true;
32+
while (m_readCount > 0)
33+
m_cv.wait(rm);
34+
if (exclusive) {
35+
/* Another thread might have the write lock while we have the read lock
36+
but won't be able to release it until they can acquire the read lock
37+
so release the read lock and try again instead of waiting to avoid deadlock */
38+
while(!m_writeLock.try_lock())
39+
m_cv.wait(rm);
40+
}
41+
m_writeCount++;
42+
m_writeWaiting = false;
43+
}
44+
45+
void upgradeWrite(bool exclusive = true) {
46+
std::unique_lock<std::mutex> rm(m_readLock);
47+
m_writeWaiting = true;
48+
while (m_readCount > 1)
49+
m_cv.wait(rm);
50+
if (exclusive) {
51+
/* Another thread might have the write lock while we have the read lock
52+
but won't be able to release it until they can acquire the read lock
53+
so release the read lock and try again instead of waiting to avoid deadlock */
54+
while(!m_writeLock.try_lock())
55+
m_cv.wait(rm);
56+
}
57+
m_writeCount++;
58+
m_readCount--;
59+
m_writeWaiting = false;
60+
}
61+
62+
bool tryAcquireWrite(bool exclusive = true) {
63+
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
64+
if (!rm.try_lock())
65+
return false;
66+
if (m_readCount > 0)
67+
return false;
68+
if (exclusive)
69+
if (!m_writeLock.try_lock())
70+
return false;
71+
m_writeCount++;
72+
return true;
73+
}
74+
75+
void releaseRead() {
76+
std::unique_lock<std::mutex> rm(m_readLock);
77+
serverAssert(m_readCount > 0);
78+
m_readCount--;
79+
m_cv.notify_all();
80+
}
81+
82+
void releaseWrite(bool exclusive = true) {
83+
std::unique_lock<std::mutex> rm(m_readLock);
84+
serverAssert(m_writeCount > 0);
85+
if (exclusive)
86+
m_writeLock.unlock();
87+
m_writeCount--;
88+
m_cv.notify_all();
89+
}
90+
91+
void downgradeWrite(bool exclusive = true) {
92+
std::unique_lock<std::mutex> rm(m_readLock);
93+
serverAssert(m_writeCount > 0);
94+
if (exclusive)
95+
m_writeLock.unlock();
96+
m_writeCount--;
97+
while (m_writeCount > 0 || m_writeWaiting)
98+
m_cv.wait(rm);
99+
m_readCount++;
100+
}
101+
102+
bool hasReader() {
103+
return m_readCount > 0;
104+
}
105+
106+
bool hasWriter() {
107+
return m_writeCount > 0;
108+
}
109+
110+
bool writeWaiting() {
111+
return m_writeWaiting;
112+
}
113+
};

0 commit comments

Comments
 (0)