@@ -124,6 +124,7 @@ def __init__(self):
124124 self ._default_logging_level = self ._logger .level
125125 self ._record_writer = None
126126 self ._records = None
127+ self ._allow_empty_input = True
127128
128129 def __str__ (self ):
129130 text = ' ' .join (chain ((type (self ).name , str (self .options )), [] if self .fieldnames is None else self .fieldnames ))
@@ -413,7 +414,7 @@ def prepare(self):
413414 """
414415 pass
415416
416- def process (self , argv = sys .argv , ifile = sys .stdin , ofile = sys .stdout ):
417+ def process (self , argv = sys .argv , ifile = sys .stdin , ofile = sys .stdout , allow_empty_input = True ):
417418 """ Process data.
418419
419420 :param argv: Command line arguments.
@@ -425,10 +426,16 @@ def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
425426 :param ofile: Output data file.
426427 :type ofile: file
427428
429+ :param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read
430+ :type allow_empty_input: bool
431+
428432 :return: :const:`None`
429433 :rtype: NoneType
430434
431435 """
436+
437+ self ._allow_empty_input = allow_empty_input
438+
432439 if len (argv ) > 1 :
433440 self ._process_protocol_v1 (argv , ifile , ofile )
434441 else :
@@ -965,13 +972,14 @@ def _execute_v2(self, ifile, process):
965972 def _execute_chunk_v2 (self , process , chunk ):
966973 metadata , body = chunk
967974
968- if len (body ) <= 0 :
969- return
975+ if len (body ) <= 0 and not self ._allow_empty_input :
976+ raise ValueError (
977+ "No records found to process. Set allow_empty_input=True in dispatch function to move forward "
978+ "with empty records." )
970979
971980 records = self ._read_csv_records (StringIO (body ))
972981 self ._record_writer .write_records (process (records ))
973982
974-
975983 def _report_unexpected_error (self ):
976984
977985 error_type , error , tb = sys .exc_info ()
@@ -1063,8 +1071,7 @@ def iteritems(self):
10631071SearchMetric = namedtuple ('SearchMetric' , ('elapsed_seconds' , 'invocation_count' , 'input_count' , 'output_count' ))
10641072
10651073
1066-
1067- def dispatch (command_class , argv = sys .argv , input_file = sys .stdin , output_file = sys .stdout , module_name = None ):
1074+ def dispatch (command_class , argv = sys .argv , input_file = sys .stdin , output_file = sys .stdout , module_name = None , allow_empty_input = True ):
10681075 """ Instantiates and executes a search command class
10691076
10701077 This function implements a `conditional script stanza <https://docs.python.org/2/library/__main__.html>`_ based on the value of
@@ -1087,6 +1094,8 @@ def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys
10871094 :type output_file: :code:`file`
10881095 :param module_name: Name of the module calling :code:`dispatch` or :const:`None`.
10891096 :type module_name: :code:`basestring`
1097+ :param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read
1098+ :type allow_empty_input: bool
10901099 :returns: :const:`None`
10911100
10921101 **Example**
@@ -1124,4 +1133,4 @@ def stream(records):
11241133 assert issubclass (command_class , SearchCommand )
11251134
11261135 if module_name is None or module_name == '__main__' :
1127- command_class ().process (argv , input_file , output_file )
1136+ command_class ().process (argv , input_file , output_file , allow_empty_input )
0 commit comments