Database Reference
In-Depth Information
self.job_runner = job_runner
self.partition_id = partition_id
self.gcs_reader = gcs_reader
self.gcs_object_glob = None
def resolve_shard_path(self, path, index):
'''Turns a glob path and an index into the
expected filename.'''
path_fmt = path.replace('*', '%012d')
return path_fmt % (index,)
def read_shard(self, shard):
'''Reads the file if the file is present or
returns None.'''
resolved_object =
self.resolve_shard_path(self.gcs_object_glob,
shard)
return self.gcs_reader.read(resolved_object)
def start(self, gcs_object_glob):
''' Starts the thread, reading a GCS object
pattern.'''
self.gcs_object_glob = gcs_object_glob;
threading.Thread.start(self)
def wait_for_complete(self):
''' Waits for the thread to complete.'''
self.join()
def run(self):
'''Waits for files to be written and reads them
when they arrive.'''
if not self.gcs_object_glob:
raise Exception(
'Must set the gcs_object_glob before running
thread')
Search WWH ::




Custom Search