Skip to content

Commit d274e92

Browse files
authored
Merge pull request #13 from Aircoookie/response_queue
Add request queue with limits
2 parents 5d1f89a + 490cb87 commit d274e92

8 files changed

Lines changed: 581 additions & 127 deletions

File tree

library.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
"platforms": "espressif8266"
3232
},
3333
{
34-
"owner": "me-no-dev",
34+
"owner": "willmmiles",
3535
"name": "AsyncTCP",
36-
"version": "^1.1.1",
36+
"version": "^1.3.0",
3737
"platforms": "espressif32"
3838
},
3939
{

src/DynamicBuffer.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ DynamicBuffer::DynamicBuffer(SharedBuffer&& b) : _data(nullptr), _len(0) {
4343
if (b) *this = std::move(*b._buf);
4444
}
4545

46+
size_t DynamicBuffer::resize(size_t s) {
47+
if (_len != s) {
48+
auto next = realloc(_data, s);
49+
if (next) {
50+
_data = reinterpret_cast<char*>(next);
51+
_len = s;
52+
}
53+
}
54+
55+
return _len;
56+
}
57+
4658
String toString(DynamicBuffer buf) {
4759
auto dbstr = DynamicBufferString(std::move(buf));
4860
return std::move(*static_cast<String*>(&dbstr)); // Move-construct the result string from dbstr

src/DynamicBuffer.h

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,33 @@ class DynamicBuffer {
2424
DynamicBuffer() : _data(nullptr), _len(0) {};
2525
explicit DynamicBuffer(size_t len) : _data(len ? reinterpret_cast<char*>(malloc(len)): nullptr), _len(_data ? len : 0) {};
2626
DynamicBuffer(const char* buf, size_t len) : DynamicBuffer(len) { if (_data) memcpy(_data, buf, len); };
27-
explicit DynamicBuffer(const String& s) : DynamicBuffer(s.begin(), s.length()) {};
28-
explicit DynamicBuffer(String&&); // Move string contents in to buffer if possible
29-
DynamicBuffer(const SharedBuffer&);
30-
DynamicBuffer(SharedBuffer&&);
27+
3128
~DynamicBuffer() { clear(); };
3229

3330
// Move
3431
DynamicBuffer(DynamicBuffer&& d) : _data(d._data), _len(d._len) { d._data = nullptr; d._len = 0; };
3532
DynamicBuffer& operator=(DynamicBuffer&& d) { std::swap(_data, d._data); std::swap(_len, d._len); return *this; };
33+
DynamicBuffer(SharedBuffer&&); // Move data, leaving shared buffer empty
34+
explicit DynamicBuffer(String&&); // Move string contents in to buffer if possible
3635

3736
// Copy
3837
DynamicBuffer(const DynamicBuffer& d) : DynamicBuffer(d._data, d._len) {}; // copy
3938
DynamicBuffer& operator=(const DynamicBuffer& d) { *this = DynamicBuffer(d); return *this; }; // use move to copy
40-
39+
DynamicBuffer(const SharedBuffer&); // Copy data
40+
explicit DynamicBuffer(const String& s) : DynamicBuffer(s.begin(), s.length()) {};
41+
4142
// Accessors
4243
char* data() const { return _data; };
4344
size_t size() const { return _len; };
45+
char& operator[](ptrdiff_t p) const { return *(_data + p); };
4446

4547
explicit operator bool() const { return (_data != nullptr) && (_len > 0); }
4648

4749
// Release the buffer without freeing it
4850
char* release() { char* temp = _data; _data = nullptr; _len = 0; return temp; }
4951

50-
// TODO, if it ever matters - resizing
52+
// Resize the buffer. Returns new size on success, current size on failure.
53+
size_t resize(size_t);
5154
};
5255

5356
// Same interface as DynamicBuffer, but with shared_ptr semantics: buffer is held until last copy releases it.
@@ -67,6 +70,7 @@ class SharedBuffer {
6770

6871
char* data() const { return _buf ? _buf->data() : nullptr; };
6972
size_t size() const { return _buf ? _buf->size() : 0U; };
73+
char& operator[](ptrdiff_t p) const { return *(data() + p); };
7074
void clear() { _buf.reset(); };
7175

7276
explicit operator bool() const { return _buf && *_buf; };
@@ -150,3 +154,86 @@ class BufferListPrint : public Print {
150154
typedef BufferListPrint<DynamicBufferList> DynamicBufferListPrint;
151155
typedef BufferListPrint<SharedBufferList> SharedBufferListPrint;
152156

157+
158+
// Walkable buffer
159+
// A buffer class that permits "consuming" data from either end, adjusting data() and size() to match
160+
template<typename buffer_type>
161+
class Walkable
162+
{
163+
buffer_type _buf;
164+
size_t _left, _right;
165+
166+
public:
167+
Walkable() : _left(0), _right(0) {};
168+
explicit Walkable(size_t len) : _buf(len), _left(0), _right(0) {};
169+
Walkable(const char* buf, size_t len) : _buf(buf, len), _left(0), _right(0) {};
170+
Walkable(buffer_type&& buf) : _buf(std::move(buf)), _left(0), _right(0) {};
171+
explicit Walkable(const String& s) : _buf(s), _left(0), _right(0) {};
172+
explicit Walkable(String&& s) : _buf(std::move(s)), _left(0), _right(0) {};
173+
174+
// Accessors
175+
// Buffer interface
176+
char* data() const { return _buf.data() + _left; };
177+
size_t size() const { return _buf.size() - (_left + _right); };
178+
size_t capacity() const { return _buf.size(); }; // for similarity with STL types
179+
explicit operator bool() const { return (buffer_type::data() != nullptr) && (size() > 0); }
180+
char& operator[](ptrdiff_t p) const { return *(data() + p); };
181+
void clear() { _buf.clear(); reset(); };
182+
183+
// Raw interface
184+
const buffer_type& buffer() const { return _buf; };
185+
size_t offset() const { return _left; }
186+
size_t roffset() const { return _right; }
187+
188+
// Modifiers
189+
void reset() { _left = _right = 0; }; // Reset the walking counters
190+
void advance(ptrdiff_t count) { // Consume some data from the left hand side
191+
if (count > 0) {
192+
_left = std::min(_left+count, _buf.size() - _right);
193+
} else {
194+
if (abs(count) <= _left) {
195+
_left += count;
196+
} else {
197+
_left = 0; // do not wrap
198+
}
199+
}
200+
}
201+
void radvance(ptrdiff_t count) { // Consume some data from the right hand side
202+
if (count > 0) {
203+
_right = std::min(_right+count, _buf.size() - _left);
204+
} else {
205+
if (abs(count) <= _right) {
206+
_right += count;
207+
} else {
208+
_right = 0; // do not wrap
209+
}
210+
}
211+
}
212+
213+
// Contract buffer to specified size
214+
size_t resize(size_t s) {
215+
auto bs = _buf.size() - _left;
216+
_right = s <= bs ? (bs - s) : 0U;
217+
return size();
218+
}
219+
220+
// Resize the underlying buffer storage, preserving contents
221+
// Returns new size on success, current size on failure.
222+
size_t reallocate(size_t s) {
223+
if (s <= size()) {
224+
auto new_buf = buffer_type(data(), s);
225+
if (new_buf) {
226+
_buf = std::move(new_buf);
227+
reset();
228+
}
229+
} else {
230+
auto new_buf = buffer_type(s);
231+
if (new_buf) {
232+
memcpy(new_buf.data(), data(), size());
233+
_buf = std::move(new_buf);
234+
reset();
235+
}
236+
}
237+
return _buf.size();
238+
}
239+
};

src/ESPAsyncWebServer.h

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
#include "StringArray.h"
3030

3131
#ifdef ESP32
32+
#include <mutex>
3233
#include <WiFi.h>
3334
#include <AsyncTCP.h>
35+
#define ASYNCWEBSERVER_NEEDS_MUTEX
3436
#elif defined(ESP8266)
3537
#include <ESP8266WiFi.h>
3638
#include <ESPAsyncTCP.h>
@@ -195,6 +197,9 @@ class AsyncWebServerRequest {
195197
void _parsePlainPostChar(uint8_t data);
196198
void _parseMultipartPostByte(uint8_t data, bool last);
197199
void _addGetParams(const String& params);
200+
201+
void _requestReady();
202+
void _handleRequest(); // called when the queue permits this request to run
198203

199204
void _handleUploadStart();
200205
void _handleUploadByte(uint8_t data, bool last);
@@ -253,6 +258,8 @@ class AsyncWebServerRequest {
253258
AsyncWebServerResponse *beginResponse_P(int code, const String& contentType, const uint8_t * content, size_t len, AwsTemplateProcessor callback=nullptr);
254259
AsyncWebServerResponse *beginResponse_P(int code, const String& contentType, PGM_P content, AwsTemplateProcessor callback=nullptr);
255260

261+
void deferResponse(); // Move to the back of the queue
262+
256263
size_t headers() const; // get header count
257264
bool hasHeader(const String& name) const; // check if header exists
258265
bool hasHeader(const __FlashStringHelper * data) const; // check if header exists
@@ -369,6 +376,7 @@ class AsyncWebServerResponse {
369376
size_t _writtenLength; // size of data written to client
370377
WebResponseState _state;
371378
static const __FlashStringHelper* _responseCodeToString(int code);
379+
friend class AsyncWebServer;
372380

373381
public:
374382
AsyncWebServerResponse();
@@ -386,6 +394,20 @@ class AsyncWebServerResponse {
386394
virtual size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time);
387395
};
388396

397+
/*
398+
* Queue limit structure for Server
399+
*
400+
* Any value set to 0 indicates no limit.
401+
* */
402+
struct AsyncWebServerQueueLimits {
403+
// Count limits
404+
size_t nParallel; // Permit up to this number of active parallel requests.
405+
size_t nMax; // Permit up to this number of active + queued requests - send 503 otherwise.
406+
// Heap limits
407+
size_t queueHeapRequired; // Require at least this much free heap before queuing a new request, otherwise send a 503.
408+
size_t requestHeapRequired; // Require at least this much free heap before handling a new request, except if no requests are active.
409+
};
410+
389411
/*
390412
* SERVER :: One instance
391413
* */
@@ -396,14 +418,22 @@ typedef std::function<void(AsyncWebServerRequest *request, uint8_t *data, size_t
396418

397419
class AsyncWebServer {
398420
protected:
421+
AsyncWebServerQueueLimits _queueLimits;
399422
AsyncServer _server;
400423
LinkedList<AsyncWebRewrite*> _rewrites;
401-
LinkedList<AsyncWebHandler*> _handlers;
424+
LinkedList<AsyncWebHandler*> _handlers;
402425
AsyncCallbackWebHandler* _catchAllHandler;
403-
426+
#ifdef ASYNCWEBSERVER_NEEDS_MUTEX
427+
std::mutex _mutex;
428+
#endif
429+
LinkedList<AsyncWebServerRequest*> _requestQueue;
430+
bool _queueActive;
431+
404432
public:
405433
AsyncWebServer(IPAddress addr, uint16_t port);
406434
AsyncWebServer(uint16_t port);
435+
AsyncWebServer(IPAddress addr, uint16_t port, const AsyncWebServerQueueLimits& limits);
436+
AsyncWebServer(uint16_t port, const AsyncWebServerQueueLimits& limits);
407437
~AsyncWebServer();
408438

409439
void begin();
@@ -433,10 +463,21 @@ class AsyncWebServer {
433463
void onRequestBody(ArBodyHandlerFunction fn); //handle posts with plain body content (JSON often transmitted this way as a request)
434464

435465
void reset(); //remove all writers and handlers, with onNotFound/onFileUpload/onRequestBody
466+
467+
// Queue interface
468+
size_t numClients(); // Number of active clients, active and pending
469+
size_t queueLength(); // Number of queued clients
470+
const AsyncWebServerQueueLimits& getQueueLimits() { return _queueLimits; };
471+
void setQueueLimits(const AsyncWebServerQueueLimits& limits);
472+
void printStatus(Print&); // Write queue status in human-readable format
473+
void processQueue(); // Consider the current queue state against the limits; may retry deferred handlers.
436474

437475
void _handleDisconnect(AsyncWebServerRequest *request);
438476
void _attachHandler(AsyncWebServerRequest *request);
439477
void _rewriteRequest(AsyncWebServerRequest *request);
478+
479+
void _dequeue(AsyncWebServerRequest *request);
480+
void _defer(AsyncWebServerRequest *request);
440481
};
441482

442483
class DefaultHeaders {

0 commit comments

Comments
 (0)