Skip to content

Commit 9c170a9

Browse files
authored
Improve handling of file upload (#324)
1 parent 5a30039 commit 9c170a9

1 file changed

Lines changed: 119 additions & 76 deletions

File tree

src/index.ts

Lines changed: 119 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -154,81 +154,121 @@ async function createUploadRequest(
154154
>,
155155
options: RequestOptions
156156
): Promise<IncomingMessage> {
157-
// Generate a unique boundary for multipart encoding.
158-
const boundary = `NodeMultipartBoundary${Date.now()}`
159-
const boundarySep = `--${boundary}\r\n`
160-
const finalBoundary = `--${boundary}--\r\n`
161-
const requestBody = [
162-
...requestBodyNoBoundaries.flatMap(part => [
163-
boundarySep,
164-
...(Array.isArray(part) ? part : [part])
165-
]),
166-
finalBoundary
167-
]
168-
const url = new URL(urlPath, baseUrl)
169-
const req: ClientRequest = getHttpModule(baseUrl).request(url, {
170-
method: 'POST',
171-
...options,
172-
headers: {
173-
...options?.headers,
174-
'Content-Type': `multipart/form-data; boundary=${boundary}`
175-
}
176-
})
177-
let aborted = false
178-
req.on('error', _err => {
179-
aborted = true
180-
})
181-
req.on('close', () => {
182-
aborted = true
183-
})
184-
try {
185-
// Send the request body (headers + files).
186-
for (const part of requestBody) {
187-
if (aborted) {
188-
break
157+
158+
// Note: this will create a regular http request and stream in the file content
159+
// implicitly. The outgoing buffer is (implicitly) flushed periodically
160+
// by node. When this happens first it will send the headers to the server
161+
// which may decide to reject the request, immediately send a response and
162+
// then cut the connection (EPIPE or ECONNRESET errors may follow while
163+
// writing the files).
164+
// We have to make sure to guard for sudden reject responses because if we
165+
// don't then the file streaming will fail with random errors and it gets
166+
// hard to debug what's going on why.
167+
// Example : `socket scan create --org badorg` should fail gracefully.
168+
169+
return new Promise(async (pass, fail) => {
170+
171+
// Generate a unique boundary for multipart encoding.
172+
const boundary = `NodeMultipartBoundary${Date.now()}`
173+
const boundarySep = `--${boundary}\r\n`
174+
const finalBoundary = `--${boundary}--\r\n`
175+
const requestBody = [
176+
...requestBodyNoBoundaries.flatMap(part => [
177+
boundarySep,
178+
...(Array.isArray(part) ? part : [part])
179+
]),
180+
finalBoundary
181+
]
182+
const url = new URL(urlPath, baseUrl)
183+
const req: ClientRequest = getHttpModule(baseUrl).request(url, {
184+
method: 'POST',
185+
...options,
186+
headers: {
187+
...options?.headers,
188+
'Content-Type': `multipart/form-data; boundary=${boundary}`
189189
}
190-
if (typeof part === 'string') {
191-
req.write(part)
192-
} else if (typeof part?.pipe === 'function') {
193-
part.pipe(req, { end: false })
194-
// Wait for file streaming to complete.
195-
// eslint-disable-next-line no-await-in-loop
196-
await new Promise<void>((resolve, reject) => {
197-
const cleanup = () => {
198-
part.off('end', onEnd)
199-
part.off('error', onError)
200-
}
201-
const onEnd = () => {
202-
cleanup()
203-
resolve()
204-
}
205-
const onError = (e: Error) => {
206-
cleanup()
207-
reject(e)
190+
})
191+
192+
// Send the headers now. If the server would reject this request, it should
193+
// do so asap. This prevents us from sending more data to it then necessary.
194+
// If it will reject we could just await the `req.on(response` now but if it
195+
// accepts the request then the response will not come until after the final
196+
// file. So we can't await the response at this time. Just proceed, carefully.
197+
req.flushHeaders()
198+
199+
200+
// Wait for the response. It may arrive at any point during the request or
201+
// afterwards. Node will flush the output buffer at some point, initiating
202+
// the request, and the server can decide to reject the request immediately
203+
// or at any point later (ike a timeout). We should handle those cases.
204+
getResponse(req).then(res => {
205+
// Note: this returns the response to the caller to createUploadRequest
206+
pass(res)
207+
}, async (err) => {
208+
// Note: this will throw an error for the caller to createUploadRequest
209+
if (err.response && !isResponseOk(err.response)) {
210+
fail(new ResponseError(err.response, `${err.method} request failed`))
211+
}
212+
fail(err)
213+
})
214+
215+
let aborted = false
216+
req.on('error', _err => {
217+
aborted = true
218+
})
219+
req.on('close', () => {
220+
aborted = true
221+
})
222+
try {
223+
// Send the request body (headers + files).
224+
for (const part of requestBody) {
225+
if (aborted) {
226+
break
227+
}
228+
if (typeof part === 'string') {
229+
req.write(part)
230+
} else if (typeof part?.pipe === 'function') {
231+
part.pipe(req, { end: false })
232+
// Wait for file streaming to complete.
233+
// eslint-disable-next-line no-await-in-loop
234+
await new Promise<void>((resolve, reject) => {
235+
const cleanup = () => {
236+
part.off('end', onEnd)
237+
part.off('error', onError)
238+
}
239+
const onEnd = () => {
240+
cleanup()
241+
resolve()
242+
}
243+
const onError = (e: Error) => {
244+
cleanup()
245+
reject(e)
246+
}
247+
part.on('end', onEnd)
248+
part.on('error', onError)
249+
})
250+
if (!aborted) {
251+
// Ensure a new line after file content.
252+
req.write('\r\n')
208253
}
209-
part.on('end', onEnd)
210-
part.on('error', onError)
211-
})
212-
if (!aborted) {
213-
// Ensure a new line after file content.
214-
req.write('\r\n')
254+
} else {
255+
throw new TypeError(
256+
'Socket API - Invalid multipart part, expected string or stream'
257+
)
215258
}
216-
} else {
217-
throw new TypeError(
218-
'Socket API - Invalid multipart part, expected string or stream'
219-
)
259+
}
260+
} catch (e) {
261+
req.destroy(e as Error)
262+
fail(e)
263+
} finally {
264+
if (!aborted) {
265+
// Close request after writing all data.
266+
req.end()
220267
}
221268
}
222-
} catch (e) {
223-
req.destroy(e as Error)
224-
throw e
225-
} finally {
226-
if (!aborted) {
227-
// Close request after writing all data.
228-
req.end()
229-
}
230-
}
231-
return await getResponse(req)
269+
270+
pass(getResponse(req))
271+
})
232272
}
233273

234274
async function getErrorResponseBody(
@@ -265,8 +305,9 @@ function getHttpModule(baseUrl: string): typeof http | typeof https {
265305
}
266306

267307
async function getResponse(req: ClientRequest): Promise<IncomingMessage> {
308+
let res;
268309
try {
269-
const res = await new Promise<IncomingMessage>((resolve, reject) => {
310+
res = await new Promise<IncomingMessage>((resolve, reject) => {
270311
const cleanup = () => {
271312
req.off('response', onResponse)
272313
req.off('error', onError)
@@ -289,14 +330,16 @@ async function getResponse(req: ClientRequest): Promise<IncomingMessage> {
289330
req.on('error', onError)
290331
abortSignal?.addEventListener('abort', onAbort)
291332
})
292-
if (!isResponseOk(res)) {
293-
throw new ResponseError(res, `${req.method} request failed`)
294-
}
295-
return res
296333
} catch (e) {
297-
req.destroy()
334+
// Note: Calling req.destroy here can lead to silent nodejs crashes.
335+
// req.destroy();
298336
throw e
299337
}
338+
339+
if (!isResponseOk(res)) {
340+
throw new ResponseError(res, `${req.method} request failed`)
341+
}
342+
return res
300343
}
301344

302345
async function getResponseJson(

0 commit comments

Comments
 (0)