3939from io import StringIO
4040import logging
4141import os
42+ try :
43+ from queue import Queue , Empty
44+ except ImportError :
45+ from Queue import Queue , Empty
4246import re
4347import socket
4448from subprocess import Popen , PIPE
@@ -137,7 +141,7 @@ def no_quote(s):
137141 return s
138142
139143
140- def _copy_data (instream , outstream , buffer_size ):
144+ def _copy_data (instream , outstream , buffer_size , error_queue ):
141145 # Copy one stream to another
142146 assert buffer_size > 0
143147 sent = 0
@@ -150,8 +154,10 @@ def _copy_data(instream, outstream, buffer_size):
150154 # for what is actually a binary file
151155 try :
152156 data = instream .read (buffer_size )
153- except Exception : # pragma: no cover
157+ except Exception as e : # pragma: no cover
154158 logger .warning ('Exception occurred while reading' , exc_info = 1 )
159+ error_queue .put_nowait (e )
160+ logger .debug ('queued exception: %s' , e )
155161 break
156162 if not data :
157163 break
@@ -161,10 +167,12 @@ def _copy_data(instream, outstream, buffer_size):
161167 outstream .write (data )
162168 except UnicodeError : # pragma: no cover
163169 outstream .write (data .encode (enc ))
164- except Exception : # pragma: no cover
170+ except Exception as e : # pragma: no cover
165171 # Can sometimes get 'broken pipe' errors even when the data has all
166172 # been sent
167173 logger .exception ('Error sending data' )
174+ error_queue .put_nowait (e )
175+ logger .debug ('queued exception: %s' , e )
168176 break
169177 try :
170178 outstream .close ()
@@ -173,9 +181,9 @@ def _copy_data(instream, outstream, buffer_size):
173181 logger .debug ('closed output, %d bytes sent' , sent )
174182
175183
176- def _threaded_copy_data (instream , outstream , buffer_size ):
184+ def _threaded_copy_data (instream , outstream , buffer_size , error_queue ):
177185 assert buffer_size > 0
178- wr = threading .Thread (target = _copy_data , args = (instream , outstream , buffer_size ))
186+ wr = threading .Thread (target = _copy_data , args = (instream , outstream , buffer_size , error_queue ))
179187 wr .daemon = True
180188 logger .debug ('data copier: %r, %r, %r' , wr , instream , outstream )
181189 wr .start ()
@@ -1358,8 +1366,15 @@ def _handle_io(self, args, fileobj_or_path, result, passphrase=None, binary=Fals
13581366 stdin = p .stdin
13591367 if passphrase :
13601368 _write_passphrase (stdin , passphrase , self .encoding )
1361- writer = _threaded_copy_data (fileobj , stdin , self .buffer_size )
1369+ error_queue = Queue ()
1370+ writer = _threaded_copy_data (fileobj , stdin , self .buffer_size , error_queue )
13621371 self ._collect_output (p , result , writer , stdin )
1372+ try :
1373+ exc = error_queue .get_nowait ()
1374+ # if we get here, that means an error occurred in the copying thread
1375+ raise exc
1376+ except Empty :
1377+ pass
13631378 return result
13641379 finally :
13651380 if writer :
@@ -1481,14 +1496,21 @@ def sign_file(self,
14811496 # passphrase is bad, gpg bails and you can't write the message.
14821497 fileobj = self ._get_fileobj (fileobj_or_path )
14831498 p = self ._open_subprocess (args , passphrase is not None )
1499+ writer = None
14841500 try :
14851501 stdin = p .stdin
14861502 if passphrase :
14871503 _write_passphrase (stdin , passphrase , self .encoding )
1488- writer = _threaded_copy_data (fileobj , stdin , self .buffer_size )
1504+ error_queue = Queue ()
1505+ writer = _threaded_copy_data (fileobj , stdin , self .buffer_size , error_queue )
1506+ try :
1507+ exc = error_queue .get_nowait ()
1508+ # if we get here, that means an error occurred in the copying thread
1509+ raise exc
1510+ except Empty :
1511+ pass
14891512 except IOError : # pragma: no cover
14901513 logging .exception ('error writing message' )
1491- writer = None
14921514 finally :
14931515 if writer :
14941516 writer .join (0.01 )
0 commit comments