Skip to content

Commit a586782

Browse files
authored
Merge pull request #51 from python36/asyncAddWork
Fix broken logic when appending in async
2 parents 93bc8e5 + 5e1d7a9 commit a586782

1 file changed

Lines changed: 11 additions & 11 deletions

File tree

nmqtt.nim

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type
3939
workQueue: OrderedTable[MsgId, Work]
4040
pubCallbacks: Table[string, PubCallback]
4141
inWork: bool
42+
hasNewWorks: bool
4243
keepAlive: uint16
4344
willFlag: bool
4445
willQoS: uint8
@@ -587,31 +588,32 @@ proc sendWork(ctx: MqttCtx, work: Work): Future[bool] =
587588
ctx.wrn("Error sending unknown package: " & $work.typ)
588589

589590
proc work(ctx: MqttCtx) {.async.} =
591+
ctx.hasNewWorks = true
590592
if ctx.inWork:
591593
return
592594
ctx.inWork = true
593-
if ctx.state == Connected:
594-
var delWork: seq[MsgId]
595-
let workQueue = ctx.workQueue # TODO: We need to copy the workQueue, otherwise
596-
# we can hit: `the length of the table changed
597-
# while iterating over it`
595+
596+
while ctx.state == Connected and ctx.hasNewWorks:
597+
ctx.hasNewWorks = false
598+
let workQueue = ctx.workQueue
599+
598600
for msgId, work in workQueue:
599601

600602
#when defined(broker):
601603
if work.typ in [ConnAck, SubAck, UnsubAck, PingResp]:
602604
if await ctx.sendWork(work):
603-
delWork.add msgId
605+
ctx.workQueue.del msgId
604606
continue
605607

606608
if work.wk == PubWork and work.state == WorkNew:
607609
if work.typ == Publish and work.qos == 0:
608-
if await ctx.sendWork(work): delWork.add msgId
610+
if await ctx.sendWork(work): ctx.workQueue.del msgId
609611

610612
elif work.typ == PubAck and work.qos == 1:
611-
if await ctx.sendWork(work): delWork.add msgId
613+
if await ctx.sendWork(work): ctx.workQueue.del msgId
612614

613615
elif work.typ == PubComp and work.qos == 2:
614-
if await ctx.sendWork(work): delWork.add msgId
616+
if await ctx.sendWork(work): ctx.workQueue.del msgId
615617

616618
else:
617619
if await ctx.sendWork(work): work.state = WorkSent
@@ -626,8 +628,6 @@ proc work(ctx: MqttCtx) {.async.} =
626628
work.state = WorkSent
627629
ctx.pubCallbacks.del work.topic
628630

629-
for msgId in delWork:
630-
ctx.workQueue.del msgId
631631
ctx.inWork = false
632632

633633
when defined(broker):

0 commit comments

Comments
 (0)