Skip to content

Commit d7a7112

Browse files
author
Vadim Belov
committed
Refactor Pipe buffer thresholds for better memory control
Introduce _pipePauseWriterThresholdBytes and _pipeResumeWriterThresholdBytes to manage Pipe writer buffering independently of chunk size and windowing. Thresholds are now set based on memoryLimitBytes, improving predictability and memory usage, especially when a memory cap is specified. This decouples Pipe buffering from internal windowing logic for more robust resource management.
1 parent 0774d94 commit d7a7112

1 file changed

Lines changed: 18 additions & 7 deletions

File tree

Sources/EasyExtensions.Crypto/AesGcmStreamCipher.cs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class AesGcmStreamCipher : IStreamCipher, IDisposable
6464
private readonly bool _strictLengthCheck;
6565
private readonly RandomNumberGenerator _rng;
6666
private readonly long? _memoryLimitBytes;
67+
private readonly long _pipePauseWriterThresholdBytes;
68+
private readonly long _pipeResumeWriterThresholdBytes;
6769
private static readonly ArrayPool<byte> BufferPool = ArrayPool<byte>.Shared;
6870
private readonly int ConcurrencyLevel = Math.Min(4, Environment.ProcessorCount);
6971

@@ -120,6 +122,18 @@ public AesGcmStreamCipher(ReadOnlyMemory<byte> masterKey, int keyId = 1, int? th
120122
_strictLengthCheck = strictLengthCheck;
121123
_rng = rng ?? RandomNumberGenerator.Create();
122124
_memoryLimitBytes = memoryLimitBytes;
125+
126+
const long defaultPipePause = 1L * 1024L * 1024L;
127+
long pipePause = defaultPipePause;
128+
if (memoryLimitBytes.HasValue)
129+
{
130+
// Keep outer Pipe buffering small and independent from internal windowing.
131+
// Optionally respect the provided memory cap, without letting it grow unbounded.
132+
pipePause = Math.Min(defaultPipePause, Math.Max(256L * 1024L, memoryLimitBytes.Value / 64));
133+
}
134+
_pipePauseWriterThresholdBytes = pipePause;
135+
_pipeResumeWriterThresholdBytes = Math.Max(1, pipePause / 2);
136+
123137
if (threads.HasValue)
124138
{
125139
if (threads.Value < 1 || threads.Value > _maxThreads)
@@ -289,9 +303,8 @@ public Task<Stream> EncryptAsync(Stream input, int chunkSize = DefaultChunkSize,
289303
throw new ArgumentOutOfRangeException(nameof(chunkSize), $"Chunk size must be between {MinChunkSize} and {MaxChunkSize} bytes.");
290304
}
291305

292-
int effectiveWindowCap = ComputeEffectiveWindowCap(chunkSize);
293-
long pauseThreshold = (long)Math.Clamp(chunkSize, MinChunkSize, MaxChunkSize) * Math.Clamp(effectiveWindowCap, 4, int.MaxValue);
294-
long resumeThreshold = pauseThreshold / 2;
306+
long pauseThreshold = _pipePauseWriterThresholdBytes;
307+
long resumeThreshold = _pipeResumeWriterThresholdBytes;
295308
var pipe = new Pipe(new PipeOptions(
296309
pool: MemoryPool<byte>.Shared,
297310
readerScheduler: null,
@@ -340,10 +353,8 @@ public Task<Stream> DecryptAsync(Stream input, bool leaveOpen = false, Cancellat
340353
throw new ArgumentException("Input stream must be readable.", nameof(input));
341354
}
342355

343-
long perChunkGuess = DefaultChunkSize;
344-
int effectiveWindowCap = ComputeEffectiveWindowCap((int)perChunkGuess);
345-
long pauseThreshold = perChunkGuess * Math.Clamp(effectiveWindowCap, 4, int.MaxValue);
346-
long resumeThreshold = pauseThreshold / 2;
356+
long pauseThreshold = _pipePauseWriterThresholdBytes;
357+
long resumeThreshold = _pipeResumeWriterThresholdBytes;
347358
var pipe = new Pipe(new PipeOptions(
348359
pool: MemoryPool<byte>.Shared,
349360
readerScheduler: null,

0 commit comments

Comments
 (0)