Skip to content

Commit 7428b98

Browse files
authored
Merge pull request #960 from menzels/improve-multi-thread
Improve multi threading
2 parents a208a11 + e4d4f63 commit 7428b98

5 files changed

Lines changed: 280 additions & 127 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Makefile
1010
.settings
1111
.idea
1212
.vscode
13+
.cache
1314
*.user
1415
*.user.*
1516
*.o

Jamulus.pro

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ HEADERS += src/buffer.h \
403403
src/global.h \
404404
src/protocol.h \
405405
src/recorder/jamcontroller.h \
406+
src/threadpool.h \
406407
src/server.h \
407408
src/serverlist.h \
408409
src/serverlogging.h \

src/server.cpp

Lines changed: 81 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -421,10 +421,24 @@ CServer::CServer ( const int iNewMaxNumChan,
421421
vecChannels[i].SetEnable ( true );
422422
}
423423

424-
// introduced by kraney (#653): "increased the size of the thread pool"
425-
if ( bUseMultithreading )
426-
{
427-
QThreadPool::globalInstance()->setMaxThreadCount ( QThread::idealThreadCount() * 4 );
424+
int iAvailableCores = QThread::idealThreadCount();
425+
426+
// setup QThreadPool if multithreading is active and possible
427+
if ( bUseMultithreading ) {
428+
if ( iAvailableCores == 1 )
429+
{
430+
qDebug() << "found only one core, disabling multithreading";
431+
bUseMultithreading = false;
432+
}
433+
else
434+
{
435+
// set maximum thread count to available cores; other threads will share at random
436+
iMaxNumThreads = iAvailableCores;
437+
qDebug() << "multithreading enabled, setting thread count to" << iMaxNumThreads;
438+
439+
pThreadPool = std::unique_ptr<CThreadPool>( new CThreadPool{static_cast<size_t>(iMaxNumThreads)} );
440+
Futures.reserve( iMaxNumThreads );
441+
}
428442
}
429443

430444

@@ -640,8 +654,8 @@ void CServer::OnNewConnection ( int iChID,
640654
vecChannels[iChID].CreateReqChanInfoMes();
641655

642656
// send welcome message (if enabled)
643-
MutexWelcomeMessage.lock();
644657
{
658+
QMutexLocker locker ( &MutexWelcomeMessage );
645659
if ( !strWelcomeMessage.isEmpty() )
646660
{
647661
// create formatted server welcome message and send it just to
@@ -652,7 +666,6 @@ void CServer::OnNewConnection ( int iChID,
652666
vecChannels[iChID].CreateChatTextMes ( strWelcomeMessageFormated );
653667
}
654668
}
655-
MutexWelcomeMessage.unlock();
656669

657670
// send licence request message (if enabled)
658671
if ( eLicenceType != LT_NO_LICENCE )
@@ -707,17 +720,14 @@ void CServer::OnAboutToQuit()
707720
// if enabled, disconnect all clients on quit
708721
if ( bDisconnectAllClientsOnQuit )
709722
{
710-
Mutex.lock();
723+
QMutexLocker locker ( &Mutex );
724+
for ( int i = 0; i < iMaxNumChannels; i++ )
711725
{
712-
for ( int i = 0; i < iMaxNumChannels; i++ )
726+
if ( vecChannels[i].IsConnected() )
713727
{
714-
if ( vecChannels[i].IsConnected() )
715-
{
716-
ConnLessProtocol.CreateCLDisconnection ( vecChannels[i].GetAddress() );
717-
}
728+
ConnLessProtocol.CreateCLDisconnection ( vecChannels[i].GetAddress() );
718729
}
719730
}
720-
Mutex.unlock(); // release mutex
721731
}
722732

723733
Stop();
@@ -805,13 +815,16 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
805815
*/
806816
// Get data from all connected clients -------------------------------------
807817
// some inits
808-
int iNumClients = 0; // init connected client counter
818+
int iNumClients = 0; // init connected client counter
819+
bool bUseMT = false;
820+
int iNumBlocks = 0; // init number of blocks for multithreading
821+
int iMTBlockSize = 0; // init block size for multithreading
809822
bChannelIsNowDisconnected = false; // note that the flag must be a member function since QtConcurrent::run can only take 5 params
810823

811-
// Make put and get calls thread safe. Do not forget to unlock mutex
812-
// afterwards!
813-
Mutex.lock();
814824
{
825+
// Make put and get calls thread safe.
826+
QMutexLocker locker ( &Mutex );
827+
815828
// first, get number and IDs of connected channels
816829
for ( int i = 0; i < iMaxNumChannels; i++ )
817830
{
@@ -826,21 +839,23 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
826839
}
827840
}
828841

842+
// use multithreading for any non-zero number of clients
843+
// (overhead is low and it is worth doing for all numbers)
844+
bUseMT = bUseMultithreading && iNumClients > 0;
845+
829846
// prepare and decode connected channels
830-
if ( !bUseMultithreading )
847+
if ( !bUseMT )
831848
{
832-
for ( int iChanCnt = 0; iChanCnt < iNumClients; iChanCnt++ )
833-
{
834-
DecodeReceiveData ( iChanCnt, iNumClients );
835-
}
849+
// run the OPUS decoder for all data blocks
850+
DecodeReceiveDataBlocks ( this, 0, iNumClients - 1, iNumClients );
836851
}
837852
else
838853
{
839-
// processing with multithreading
840-
// TODO optimization of the MTBlockSize value
841-
const int iMTBlockSize = 10; // every 10 users a new thread is created
842-
const int iNumBlocks = ( iNumClients - 1 ) / iMTBlockSize + 1;
854+
// spread work equally among available threads
855+
iNumBlocks = std::min ( iNumClients, iMaxNumThreads );
856+
iMTBlockSize = ( iNumClients - 1 ) / iNumBlocks + 1;
843857

858+
// processing with multithreading
844859
for ( int iBlockCnt = 0; iBlockCnt < iNumBlocks; iBlockCnt++ )
845860
{
846861
// The work for OPUS decoding is distributed over all available processor cores.
@@ -849,16 +864,18 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
849864
const int iStartChanCnt = iBlockCnt * iMTBlockSize;
850865
const int iStopChanCnt = std::min ( ( iBlockCnt + 1 ) * iMTBlockSize - 1, iNumClients - 1 );
851866

852-
FutureSynchronizer.addFuture ( QtConcurrent::run ( this,
853-
&CServer::DecodeReceiveDataBlocks,
854-
iStartChanCnt,
855-
iStopChanCnt,
856-
iNumClients ) );
867+
Futures.push_back ( pThreadPool->enqueue ( CServer::DecodeReceiveDataBlocks,
868+
this,
869+
iStartChanCnt,
870+
iStopChanCnt,
871+
iNumClients ) );
857872
}
858873

859874
// make sure all concurrent run threads have finished when we leave this function
860-
FutureSynchronizer.waitForFinished();
861-
FutureSynchronizer.clearFutures();
875+
for ( auto& future : Futures ) {
876+
future.wait();
877+
}
878+
Futures.clear();
862879
}
863880

864881
// a channel is now disconnected, take action on it
@@ -868,7 +885,6 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
868885
CreateAndSendChanListForAllConChannels();
869886
}
870887
}
871-
Mutex.unlock(); // release mutex
872888

873889

874890
// Process data ------------------------------------------------------------
@@ -909,7 +925,7 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
909925
}
910926

911927
// processing without multithreading
912-
if ( !bUseMultithreading )
928+
if ( !bUseMT )
913929
{
914930
// generate a separate mix for each channel, OPUS encode the
915931
// audio data and transmit the network packet
@@ -918,14 +934,8 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
918934
}
919935

920936
// processing with multithreading
921-
if ( bUseMultithreading )
937+
if ( bUseMT )
922938
{
923-
// introduced by kraney (#653): each thread must complete within the 1 or 2ms time budget for the timer
924-
// TODO determine at startup by running a small benchmark
925-
const int iMaximumMixOpsInTimeBudget = 500; // approximate limit as observed on GCP e2-standard instance
926-
const int iMTBlockSize = iMaximumMixOpsInTimeBudget / iNumClients; // number of ops = block size * total number of clients
927-
const int iNumBlocks = ( iNumClients - 1 ) / iMTBlockSize + 1;
928-
929939
for ( int iBlockCnt = 0; iBlockCnt < iNumBlocks; iBlockCnt++ )
930940
{
931941
// Generate a separate mix for each channel, OPUS encode the
@@ -936,16 +946,18 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
936946
const int iStartChanCnt = iBlockCnt * iMTBlockSize;
937947
const int iStopChanCnt = std::min ( ( iBlockCnt + 1 ) * iMTBlockSize - 1, iNumClients - 1 );
938948

939-
FutureSynchronizer.addFuture ( QtConcurrent::run ( this,
940-
&CServer::MixEncodeTransmitDataBlocks,
941-
iStartChanCnt,
942-
iStopChanCnt,
943-
iNumClients ) );
949+
Futures.push_back ( pThreadPool->enqueue ( CServer::MixEncodeTransmitDataBlocks,
950+
this,
951+
iStartChanCnt,
952+
iStopChanCnt,
953+
iNumClients ) );
944954
}
945955

946956
// make sure all concurrent run threads have finished when we leave this function
947-
FutureSynchronizer.waitForFinished();
948-
FutureSynchronizer.clearFutures();
957+
for ( auto& fFuture : Futures ) {
958+
fFuture.wait();
959+
}
960+
Futures.clear();
949961
}
950962
if ( bDelayPan )
951963
{
@@ -966,25 +978,31 @@ static CTimingMeas JitterMeas ( 1000, "test2.dat" ); JitterMeas.Measure(); // TE
966978
}
967979
}
968980

969-
void CServer::DecodeReceiveDataBlocks ( const int iStartChanCnt,
981+
// This is a static method used as a callback, and does not inherit a "this" pointer,
982+
// so it is necessary for the server instance to be passed as a parameter.
983+
void CServer::DecodeReceiveDataBlocks ( CServer* pServer,
984+
const int iStartChanCnt,
970985
const int iStopChanCnt,
971986
const int iNumClients )
972987
{
973988
// loop over all channels in the current block, needed for multithreading support
974989
for ( int iChanCnt = iStartChanCnt; iChanCnt <= iStopChanCnt; iChanCnt++ )
975990
{
976-
DecodeReceiveData ( iChanCnt, iNumClients );
991+
pServer->DecodeReceiveData ( iChanCnt, iNumClients );
977992
}
978993
}
979994

980-
void CServer::MixEncodeTransmitDataBlocks ( const int iStartChanCnt,
995+
// This is a static method used as a callback, and does not inherit a "this" pointer,
996+
// so it is necessary for the server instance to be passed as a parameter.
997+
void CServer::MixEncodeTransmitDataBlocks ( CServer* pServer,
998+
const int iStartChanCnt,
981999
const int iStopChanCnt,
9821000
const int iNumClients )
9831001
{
9841002
// loop over all channels in the current block, needed for multithreading support
9851003
for ( int iChanCnt = iStartChanCnt; iChanCnt <= iStopChanCnt; iChanCnt++ )
9861004
{
987-
MixEncodeTransmitData ( iChanCnt, iNumClients );
1005+
pServer->MixEncodeTransmitData ( iChanCnt, iNumClients );
9881006
}
9891007
}
9901008

@@ -1088,7 +1106,7 @@ void CServer::DecodeReceiveData ( const int iChanCnt,
10881106
// get current number of OPUS coded bytes
10891107
const int iCeltNumCodedBytes = vecChannels[iCurChanID].GetCeltNumCodedBytes();
10901108

1091-
for ( int iB = 0; iB < vecNumFrameSizeConvBlocks[iChanCnt]; iB++ )
1109+
for ( size_t iB = 0; iB < (size_t)vecNumFrameSizeConvBlocks[iChanCnt]; iB++ )
10921110
{
10931111
// get data
10941112
const EGetDataStat eGetStat = vecChannels[iCurChanID].GetData ( vecvecbyCodedData[iChanCnt], iCeltNumCodedBytes );
@@ -1390,26 +1408,26 @@ void CServer::MixEncodeTransmitData ( const int iChanCnt,
13901408
DoubleFrameSizeConvBufOut[iCurChanID].GetAll ( vecsSendData, DOUBLE_SYSTEM_FRAME_SIZE_SAMPLES * vecNumAudioChannels[iChanCnt] );
13911409
}
13921410

1393-
for ( int iB = 0; iB < vecNumFrameSizeConvBlocks[iChanCnt]; iB++ )
1411+
// OPUS encoding
1412+
if ( pCurOpusEncoder != nullptr )
13941413
{
1395-
// OPUS encoding
1396-
if ( pCurOpusEncoder != nullptr )
1397-
{
13981414
// TODO find a better place than this: the setting does not change all the time so for speed
13991415
// optimization it would be better to set it only if the network frame size is changed
14001416
opus_custom_encoder_ctl ( pCurOpusEncoder, OPUS_SET_BITRATE ( CalcBitRateBitsPerSecFromCodedBytes ( iCeltNumCodedBytes, iClientFrameSizeSamples ) ) );
14011417

1418+
for ( size_t iB = 0; iB < (size_t)vecNumFrameSizeConvBlocks[iChanCnt]; iB++ )
1419+
{
14021420
iUnused = opus_custom_encode ( pCurOpusEncoder,
14031421
&vecsSendData[iB * SYSTEM_FRAME_SIZE_SAMPLES * vecNumAudioChannels[iChanCnt]],
14041422
iClientFrameSizeSamples,
14051423
&vecvecbyCodedData[iChanCnt][0],
14061424
iCeltNumCodedBytes );
1407-
}
14081425

1409-
// send separate mix to current clients
1410-
vecChannels[iCurChanID].PrepAndSendPacket ( &Socket,
1411-
vecvecbyCodedData[iChanCnt],
1412-
iCeltNumCodedBytes );
1426+
// send separate mix to current clients
1427+
vecChannels[iCurChanID].PrepAndSendPacket ( &Socket,
1428+
vecvecbyCodedData[iChanCnt],
1429+
iCeltNumCodedBytes );
1430+
}
14131431
}
14141432
}
14151433

0 commit comments

Comments
 (0)