Database Reference
In-Depth Information
'mapreduce.output_writers._GoogleCloudStorageOutputWriter',
params={
'files': ['/gs/%s/%s/input-*' % (GCS_BUCKET,
TMP_PATH)],
'format': 'lines',
'output_writer': {
'bucket_name': GCS_BUCKET,
'naming_format': TMP_PATH + '/output-$num',
}
})
mapper.start()
wait_for_pipeline(mapper.pipeline_id)
# Load from GCS into BigQuery.
run_bigquery_job(JOB_ID_PREFIX, 'load', {
'destinationTable':
table_reference('add_zip_output'),
'sourceUris': ['gs://%s/%s/output-*' %
(GCS_BUCKET, TMP_PATH)],
'sourceFormat': 'NEWLINE_DELIMITED_JSON',
'schema': OUTPUT_SCHEMA,
'writeDisposition': 'WRITE_TRUNCATE',
})
def run_attempt():
global g_state
try:
with g_state_lock:
if g_state['status'] == 'RUNNING':
return
g_state = ZERO_STATE.copy()
g_state['status'] = 'RUNNING'
run_transform()
except Exception, err:
with g_state_lock:
g_state['error'] = pre(err)
finally:
Search WWH ::




Custom Search