Google Cloud storage에 위치한 샘플 csv파일 읽어들인 후 Insertvaules라는 정의된 전처리 코드를 ParDo를 사용한 병렬처리를 통해 파이프라인 수행하는 코드
당연히 적절한 권한을 보유한 GOOGLE_APPLICATION_CREDENTIALS 환경 변수가 설정되어 있어야 한다!
#-*- coding: utf-8 -*-
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
class InsertValues(beam.DoFn):
def process(self, element):
splited = element.split('^')
writestring = {'seq_no': splited[0], 'page_value': splited[1], 'gd_nm': splited[2], 'group_code': splited[3]}
return [writestring]
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://source_file_url*',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default='gs://output_file_url',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# 1. Create pipeline & configure options
pipeline_options = PipelineOptions(pipeline_args)
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'GCP_project_ID'
google_cloud_options.staging_location = 'gs://staging_folder_url'
google_cloud_options.temp_location = 'gs://temp_folder_url'
google_cloud_options.job_name = 'dataflow_job_name'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=pipeline_options)
# 2. Read the text file[pattern] into a PCollection.
insertlines = (p
| ReadFromText(known_args.input)
| beam.ParDo(InsertValues())
| beam.io.WriteToBigQuery(
'Big_Query_Table_id',
schema='seq_no:STRING, page_value:STRING, gd_nm:STRING, group_code:STRING,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
'Cloud & Virtualization' 카테고리의 다른 글
Docker Container의 Logging 구조 요약 (1) | 2018.07.12 |
---|---|
구글 클라우드에서 Jenkins로 Docker Image Build/Push하기 (0) | 2018.07.10 |
Docker를 통한 WordPress 초간단 배포 (0) | 2018.05.28 |
VMWare ESXi 가상머신에 USB Controller Passthrough 설정 (0) | 2018.05.28 |
Java를 이용한 GCP compute instance metadata 가져오기 샘플 코드 (0) | 2018.05.28 |