Database Reference
In-Depth Information
if pipeline.outputs.result_status.value != 'success':
raise RuntimeError('Mapper job failed, see status
link.')
def table_reference(table_id):
'''Helper to construct a table reference.'''
return {
'projectId': PROJECT_ID,
'datasetId': 'ch12',
'tableId': table_id,
}
OUTPUT_SCHEMA = {
'fields': [
{'name':'id', 'type':'STRING'},
{'name':'lat', 'type':'FLOAT'},
{'name':'lng', 'type':'FLOAT'},
{'name':'zip', 'type':'STRING'},
]
}
def run_transform():
JOB_ID_PREFIX = 'ch12_%d' % int(time.time())
TMP_PATH = 'tmp/mapreduce/%s' % JOB_ID_PREFIX
# Extract from BigQuery to GCS.
run_bigquery_job(JOB_ID_PREFIX, 'extract', {
'sourceTable': table_reference('add_zip_input'),
'destinationUri': 'gs://%s/%s/input-*' %
(GCS_BUCKET, TMP_PATH),
'destinationFormat': 'NEWLINE_DELIMITED_JSON',
})
# Run the mapper job to annotate the records.
mapper = MapperPipeline(
'Add Zip',
'add_zip.apply',
'mapreduce.input_readers.FileInputReader',
Search WWH ::




Custom Search