Skip to content

Commit 872c633

Browse files
authored
Fix renew/delete race in DT.AzureStorage (#1197)
1 parent 36e92c8 commit 872c633

3 files changed

Lines changed: 51 additions & 18 deletions

File tree

src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,14 @@ await this.storageQueue.UpdateMessageAsync(
332332
}
333333
catch (Exception e)
334334
{
335+
string details = $"Caller: {nameof(RenewMessageAsync)}";
336+
if (e is DurableTaskStorageException storageException && storageException.ErrorCode != null)
337+
{
338+
details += $", ErrorCode: {storageException.ErrorCode}";
339+
}
340+
335341
// Message may have been processed and deleted already.
336-
this.HandleMessagingExceptions(e, message, $"Caller: {nameof(RenewMessageAsync)}");
342+
this.HandleMessagingExceptions(e, message, details);
337343
}
338344
}
339345

@@ -342,46 +348,64 @@ public virtual async Task DeleteMessageAsync(MessageData message, SessionBase? s
342348
QueueMessage queueMessage = message.OriginalQueueMessage;
343349
TaskMessage taskMessage = message.TaskMessage;
344350

345-
this.settings.Logger.DeletingMessage(
346-
this.storageAccountName,
347-
this.settings.TaskHubName,
348-
taskMessage.Event.EventType.ToString(),
349-
Utils.GetTaskEventId(taskMessage.Event),
350-
queueMessage.MessageId,
351-
taskMessage.OrchestrationInstance.InstanceId,
352-
taskMessage.OrchestrationInstance.ExecutionId,
353-
this.storageQueue.Name,
354-
message.SequenceNumber,
355-
queueMessage.PopReceipt);
356-
357351
bool haveRetried = false;
358352
while (true)
359353
{
354+
this.settings.Logger.DeletingMessage(
355+
this.storageAccountName,
356+
this.settings.TaskHubName,
357+
taskMessage.Event.EventType.ToString(),
358+
Utils.GetTaskEventId(taskMessage.Event),
359+
queueMessage.MessageId,
360+
taskMessage.OrchestrationInstance.InstanceId,
361+
taskMessage.OrchestrationInstance.ExecutionId,
362+
this.storageQueue.Name,
363+
message.SequenceNumber,
364+
queueMessage.PopReceipt);
365+
360366
try
361367
{
362368
await this.storageQueue.DeleteMessageAsync(queueMessage, session?.TraceActivityId);
363369
}
364370
catch (Exception e)
365371
{
366-
if (!haveRetried && this.IsMessageGoneException(e))
372+
// Delete operations can transiently fail if a delete operation races with a
373+
// message update operation. In this case, we retry the delete operation.
374+
if (!haveRetried && (IsMessageGoneException(e) || IsPopReceiptMismatch(e)))
367375
{
368376
haveRetried = true;
369377
continue;
370378
}
371379

372-
this.HandleMessagingExceptions(e, message, $"Caller: {nameof(DeleteMessageAsync)}");
380+
string details = $"Caller: {nameof(DeleteMessageAsync)}";
381+
if (e is DurableTaskStorageException storageException && storageException.ErrorCode != null)
382+
{
383+
details += $", ErrorCode: {storageException.ErrorCode}";
384+
}
385+
386+
this.HandleMessagingExceptions(e, message, details);
373387
}
374388

375389
break;
376390
}
377391
}
378392

379-
private bool IsMessageGoneException(Exception e)
393+
static bool IsMessageGoneException(Exception e)
380394
{
381395
DurableTaskStorageException? storageException = e as DurableTaskStorageException;
382396
return storageException?.HttpStatusCode == 404;
383397
}
384398

399+
static bool IsPopReceiptMismatch(Exception e)
400+
{
401+
if (e is DurableTaskStorageException storageException)
402+
{
403+
return storageException.IsPopReceiptMismatch;
404+
}
405+
406+
return false;
407+
}
408+
385409
void HandleMessagingExceptions(Exception e, MessageData message, string details)
386410
{
387411
string messageId = message.OriginalQueueMessage.MessageId;
@@ -403,7 +427,7 @@ void HandleMessagingExceptions(
403427
string details,
404428
string popReceipt)
405429
{
406-
if (this.IsMessageGoneException(e))
430+
if (IsMessageGoneException(e))
407431
{
408432
// Message may have been processed and deleted already.
409433
this.settings.Logger.MessageGone(

src/DurableTask.AzureStorage/Storage/DurableTaskStorageException.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ namespace DurableTask.AzureStorage.Storage
1616
using System;
1717
using Azure;
1818
using Azure.Storage.Blobs.Models;
19+
using Azure.Storage.Queues.Models;
1920

2021
[Serializable]
2122
class DurableTaskStorageException : Exception
@@ -40,12 +41,18 @@ public DurableTaskStorageException(RequestFailedException? requestFailedExceptio
4041
if (requestFailedException != null)
4142
{
4243
this.HttpStatusCode = requestFailedException.Status;
43-
this.LeaseLost = requestFailedException.ErrorCode != null && requestFailedException.ErrorCode == BlobErrorCode.LeaseLost;
44+
this.ErrorCode = requestFailedException.ErrorCode;
45+
this.LeaseLost = requestFailedException?.ErrorCode == BlobErrorCode.LeaseLost;
46+
this.IsPopReceiptMismatch = requestFailedException?.ErrorCode == QueueErrorCode.PopReceiptMismatch;
4447
}
4548
}
4649

4750
public int HttpStatusCode { get; }
4851

52+
public string? ErrorCode { get; }
53+
4954
public bool LeaseLost { get; }
55+
56+
public bool IsPopReceiptMismatch { get; }
5057
}
5158
}

src/DurableTask.AzureStorage/Storage/Queue.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ await this.queueClient
8383
queueMessage.PopReceipt,
8484
cancellationToken)
8585
.DecorateFailure();
86+
87+
this.stats.MessagesUpdated.Increment();
8688
}
8789

8890
public async Task<QueueMessage?> GetMessageAsync(TimeSpan visibilityTimeout, CancellationToken cancellationToken = default)

0 commit comments

Comments
 (0)