Google Cloud storage에 위치한 샘플 csv파일 읽어들인 후 Insertvaules라는 정의된 전처리 코드를 ParDo를 사용한 병렬처리를 통해 파이프라인 수행하는 코드


당연히 적절한 권한을 보유한 GOOGLE_APPLICATION_CREDENTIALS 환경 변수가 설정되어 있어야 한다!


#-*- coding: utf-8 -*-

from __future__ import absolute_import


import argparse

import logging

import re

import apache_beam as beam

from apache_beam.io import ReadFromText

from apache_beam.io import WriteToText

from apache_beam.metrics import Metrics

from apache_beam.metrics.metric import MetricsFilter

from apache_beam.options.pipeline_options import PipelineOptions

from apache_beam.options.pipeline_options import SetupOptions

from apache_beam.options.pipeline_options import StandardOptions

from apache_beam.options.pipeline_options import GoogleCloudOptions


class InsertValues(beam.DoFn):

  def process(self, element):

    splited = element.split('^')


    writestring = {'seq_no': splited[0], 'page_value': splited[1], 'gd_nm': splited[2], 'group_code': splited[3]}

    return [writestring]



def run(argv=None):

  parser = argparse.ArgumentParser()

  parser.add_argument('--input',

                      dest='input',

                      default='gs://source_file_url*',

                      help='Input file to process.')

  parser.add_argument('--output',

                      dest='output',

                      default='gs://output_file_url',

                      help='Output file to write results to.')

  known_args, pipeline_args = parser.parse_known_args(argv)


  # 1. Create pipeline & configure options

  pipeline_options = PipelineOptions(pipeline_args)

  google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)

  google_cloud_options.project = 'GCP_project_ID'

  google_cloud_options.staging_location = 'gs://staging_folder_url'

  google_cloud_options.temp_location = 'gs://temp_folder_url'

  google_cloud_options.job_name = 'dataflow_job_name'

  pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'


  p = beam.Pipeline(options=pipeline_options)


  # 2. Read the text file[pattern] into a PCollection.

  insertlines = (p

                | ReadFromText(known_args.input)

                | beam.ParDo(InsertValues())

                | beam.io.WriteToBigQuery(

                             'Big_Query_Table_id',

                             schema='seq_no:STRING, page_value:STRING, gd_nm:STRING, group_code:STRING,

                             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

                             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)

                )


  result = p.run()

  result.wait_until_finish()


if __name__ == '__main__':

  logging.getLogger().setLevel(logging.INFO)

  run()