Database Reference
In-Depth Information
# Imports from files in this directory:
from table_reader import TableReader
from table_reader import TableReadThread
def parallel_indexed_read(partition_count,
project_id, dataset_id, table_id, output_dir):
'''Divides up a table and reads the pieces in
parllel by index.'''
table_reader = TableReader(project_id, dataset_id,
table_id)
_, row_count = table_reader.get_table_info()
snapshot_time = int(time.time() * 1000)
stride = row_count / partition_count
threads = []
for index in range(partition_count):
file_name = '%s.%d' % (os.path.join(output_dir,
table_id), index)
start_index = stride * index
thread_reader = TableReader(
project_id=project_id,
dataset_id=dataset_id,
table_id='%s@%d' % (table_id, snapshot_time),
start_index=start_index,
read_count=stride)
read_thread = TableReadThread(
thread_reader,
file_name,
thread_id='[%d-%d)' % (start_index,
start_index + stride))
threads.append(read_thread)
threads[index].start()
for index in range(partition_count):
First, this listing reads the number of rows in the table, so it can partition
the table into row ranges. Next, it iterates through the number of desired
partitions (that is, the number of parallel readers you want) and assigns
each one a section of the table to read by creating a TableReader that is
limited to that section. It then spins up a TableReadThread to run each
Search WWH ::




Custom Search