Skip to content

Commit 2c9c0ed

Browse files
committed
tar/asm: simplify done handling
Refactor `newInputTarStreamCommon` to accept an optional `done chan<- error` instead of a `withDone` bool. Clarify documentation to state that callers must fully read the returned reader. Signed-off-by: Jan Kaluza <jkaluza@redhat.com>
1 parent 915b1a6 commit 2c9c0ed

2 files changed

Lines changed: 20 additions & 21 deletions

File tree

tar/asm/disassemble.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ func newInputTarStreamCommon(
161161
r io.Reader,
162162
p storage.Packer,
163163
fp storage.FilePutter,
164-
withDone bool,
165-
) (pr *io.PipeReader, done <-chan error) {
164+
done chan<- error,
165+
) (pr *io.PipeReader) {
166166
// What to do here... folks will want their own access to the Reader that is
167167
// their tar archive stream, but we'll need that same stream to use our
168168
// forked 'archive/tar'.
@@ -181,22 +181,14 @@ func newInputTarStreamCommon(
181181
// `archive/tar` doesn't care.
182182
pr, pw := io.Pipe()
183183

184-
// We need a putter that will generate the crc64 sums of file payloads.
185184
if fp == nil {
186185
fp = storage.NewDiscardFilePutter()
187186
}
188187

189188
outputRdr := io.TeeReader(r, pw)
189+
go runInputTarStreamGoroutine(outputRdr, pw, p, fp, done)
190190

191-
if withDone {
192-
ch := make(chan error, 1)
193-
done = ch
194-
go runInputTarStreamGoroutine(outputRdr, pw, p, fp, ch)
195-
return pr, done
196-
}
197-
198-
go runInputTarStreamGoroutine(outputRdr, pw, p, fp, nil)
199-
return pr, nil
191+
return pr
200192
}
201193

202194
// NewInputTarStream wraps the Reader stream of a tar archive and provides a
@@ -213,9 +205,9 @@ func newInputTarStreamCommon(
213205
// If callers need to be able to abort early and/or wait for goroutine termination,
214206
// prefer NewInputTarStreamWithDone.
215207
//
216-
// Deprecated: Use NewInputTarStreamWithDone instead.
208+
// Deprecated: This leaves a goroutine around if the consumer aborts without consuming.
217209
func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) {
218-
pr, _ := newInputTarStreamCommon(r, p, fp, false)
210+
pr := newInputTarStreamCommon(r, p, fp, nil)
219211
return pr, nil
220212
}
221213

@@ -232,7 +224,12 @@ func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io
232224
// The returned reader is an io.ReadCloser so callers can stop early; closing it
233225
// aborts the pipe so the internal goroutine can terminate promptly (rather than
234226
// hanging on a blocked pipe write).
227+
//
228+
// The caller is expected to consume the returned reader fully until EOF
229+
// (not just the tar EOF marker); closing the returned reader earlier will
230+
// cause the done channel to return a failure.
235231
func NewInputTarStreamWithDone(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.ReadCloser, <-chan error, error) {
236-
pr, done := newInputTarStreamCommon(r, p, fp, true)
232+
done := make(chan error, 1)
233+
pr := newInputTarStreamCommon(r, p, fp, done)
237234
return pr, done, nil
238235
}

tar/asm/disassemble_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ func (p *recordingPacker) AddEntry(e storage.Entry) (int, error) {
9797
// Copy payload because callers may reuse buffers.
9898
cp := e
9999
if e.Payload != nil {
100-
cp.Payload = append([]byte(nil), e.Payload...)
100+
cp.Payload = bytes.Clone(e.Payload)
101101
}
102102
p.entries = append(p.entries, cp)
103-
return len(cp.Payload), nil
103+
return len(p.entries), nil
104104
}
105105

106106
func (p *recordingPacker) snapshot() []storage.Entry {
@@ -118,17 +118,18 @@ type recordingFilePutter struct {
118118
}
119119

120120
func (fp *recordingFilePutter) Put(name string, r io.Reader) (int64, []byte, error) {
121-
b, err := io.ReadAll(r)
121+
dataLen, err := io.Copy(io.Discard, r)
122122
if err != nil {
123123
return 0, nil, err
124124
}
125+
125126
fp.mu.Lock()
126127
fp.puts = append(fp.puts, name)
127128
fp.mu.Unlock()
128129

129130
// Return a deterministic "checksum" based on content length.
130-
csum := []byte(fmt.Sprintf("len=%d", len(b)))
131-
return int64(len(b)), csum, nil
131+
csum := []byte(fmt.Sprintf("len=%d", dataLen))
132+
return dataLen, csum, nil
132133
}
133134

134135
// Helper function to generate the tar with optional extra padding.
@@ -339,7 +340,8 @@ func TestNewInputTarStreamWithDonUnderlyingClosed(t *testing.T) {
339340
t.Fatalf("timeout waiting for underlying reader to enter blocked state")
340341
}
341342

342-
// Now "close" the underlying reader while the tar-split goroutine is still running.
343+
// Now trigger an error from the underlying closableBlockingReader while the tar-split
344+
// goroutine is still running.
343345
under.Close()
344346

345347
// The tar-split goroutine should treat this as a non-EOF error, call fail(err),

0 commit comments

Comments
 (0)