— notes, python, time-wasted, GCP — 2 min read
I wanted to build a pipeline move data from Cloud Storage to BigQuery. There are couple of options to do it.
At first, i thought of doing it via Dataflow, i understand there is going to be a bit of learning in Apache Beam but if its the recommended approach in long term view then why not go with it. Here are some of the learnings and attempts i didn't for couple of days,
NOTE : There were lots of errors while trying to install using
Python 3.5.2
, but it worked flawlessly when tried withPython 3.9.7
1python3 -m venv env2.\env\Scripts\activate3
4pip3 install google-cloud-storage5pip3 install --upgrade google-cloud-storage6
7pip3 install wheel8pip3 install apache-beam[gcp]
1(env) D:\BigData\12. Python\1. Tryouts\Beam>pip3 list2Package Version3------------------------------- ---------4apache-beam 2.32.05avro-python3 1.9.2.16cachetools 4.2.27certifi 2021.5.308charset-normalizer 2.0.69crcmod 1.710dill 0.3.1.111docopt 0.6.212fastavro 1.4.513fasteners 0.16.314future 0.18.215google-api-core 1.31.316google-apitools 0.5.3117google-auth 1.35.018google-cloud-bigquery 2.26.019google-cloud-bigtable 1.7.020google-cloud-core 1.7.221google-cloud-datastore 1.15.322google-cloud-dlp 1.0.023google-cloud-language 1.3.024google-cloud-pubsub 1.7.025google-cloud-recommendations-ai 0.2.026google-cloud-spanner 1.19.127google-cloud-storage 1.42.228google-cloud-videointelligence 1.16.129google-cloud-vision 1.0.030google-crc32c 1.2.031google-resumable-media 2.0.332googleapis-common-protos 1.53.033grpc-google-iam-v1 0.12.334grpcio 1.40.035grpcio-gcp 0.2.236hdfs 2.6.037httplib2 0.19.138idna 3.239numpy 1.20.340oauth2client 4.1.341orjson 3.6.342packaging 21.043pip 21.2.344proto-plus 1.19.045protobuf 3.17.346pyarrow 4.0.147pyasn1 0.4.848pyasn1-modules 0.2.849pydot 1.4.250pymongo 3.12.051pyparsing 2.4.752python-dateutil 2.8.253pytz 2021.154requests 2.26.055rsa 4.7.256setuptools 57.4.057six 1.16.058typing-extensions 3.7.4.359urllib3 1.26.760wheel 0.37.061WARNING: You are using pip version 21.2.3; however, version 21.2.4 is available.62You should consider upgrading via the 'D:\BigData\12. Python\1. Tryouts\Beam\env\Scripts\python.exe -m pip install --upgrade pip' command.
Enabled APIs : There was a link to enable all this at one-go and went on with it. Dataflow API, Compute Engine API, Cloud Logging API, Cloud Storage, Google Cloud Storage JSON API, BigQuery API, Cloud Pub/Sub API, Cloud Datastore API, Cloud Resource Manager API in Google Cloud Platform
Creating Service Account
dataflow-btd-in3@<<project-id>>.iam.gserviceaccount.com
with Role (Quick Access 🡪 Basic 🡪 Owner)This you will use while running the Apache Beam
Below is the wordcount program which reads a text file from Cloud Storage and counts number of words
1"""A word-counting workflow."""2import argparse3import logging4import re5
6import apache_beam as beam7from apache_beam.io import ReadFromText8from apache_beam.io import WriteToText9from apache_beam.options.pipeline_options import PipelineOptions10from apache_beam.options.pipeline_options import SetupOptions11
12
13class WordExtractingDoFn(beam.DoFn):14 """Parse each line of input text into words."""15 def process(self, element):16 """Returns an iterator over the words of this element.17
18 The element is a line of text. If the line is blank, note that, too.19
20 Args:21 element: the element being processed22
23 Returns:24 The processed element.25 """26 return re.findall(r'[\w\']+', element, re.UNICODE)27
28
29def run(argv=None, save_main_session=True):30 """Main entry point; defines and runs the wordcount pipeline."""31 parser = argparse.ArgumentParser()32 parser.add_argument(33 '--input',34 dest='input',35 default='gs://dataflow-samples/shakespeare/kinglear.txt',36 help='Input file to process.')37 parser.add_argument(38 '--output',39 dest='output',40 required=True,41 help='Output file to write results to.')42 known_args, pipeline_args = parser.parse_known_args(argv)43
44 # We use the save_main_session option because one or more DoFn's in this45 # workflow rely on global context (e.g., a module imported at module level).46 pipeline_options = PipelineOptions(pipeline_args)47 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session48
49 # The pipeline will be run on exiting the with block.50 with beam.Pipeline(options=pipeline_options) as p:51
52 # Read the text file[pattern] into a PCollection.53 lines = p | 'Read' >> ReadFromText(known_args.input)54
55 counts = (56 lines57 | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))58 | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))59 | 'GroupAndSum' >> beam.CombinePerKey(sum))60
61 # Format the counts into a PCollection of strings.62 def format_result(word, count):63 return '%s: %d' % (word, count)64
65 output = counts | 'Format' >> beam.MapTuple(format_result)66
67 # Write the output using a "Write" transform that has side effects.68 # pylint: disable=expression-not-assigned69 output | 'Write' >> WriteToText(known_args.output)70
71
72if __name__ == '__main__':73 logging.getLogger().setLevel(logging.INFO)74 run()
Before running the program set the PATH of the Service Account JSON file
1set GOOGLE_APPLICATION_CREDENTIALS=D:\BigData\12. Python\1. Tryouts\private\beam-test-abb6f7f2eb46-owner-role.json
Now run the program
1(env) D:\BigData\12. Python\1. Tryouts\Beam>python -m wordcount-local --output outputs2D:\BigData\12. Python\1. Tryouts\Beam\env\lib\site-packages\apache_beam\__init__.py:79: UserWarning: This version of Apache Beam has not been sufficiently tested on Python 3.9. You may encounter bugs or missing features.3 warnings.warn(4INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.5INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.6INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.7INFO:oauth2client.transport:Attempting refresh to obtain initial access_token8INFO:oauth2client.client:Refreshing access_token9WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.10INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.32.011INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x00000206033A4F70> ====================12INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x00000206033A90D0> ====================13INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x00000206033A9550> ====================14INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x00000206033A95E0> ====================15INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x00000206033A9790> ====================16INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x00000206033A9820> ====================17INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x00000206033A9940> ====================18INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x00000206033A99D0> ====================19INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x00000206033A9A60> ====================20INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x00000206033A9AF0> ====================21INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x00000206033A9D30> ====================22INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x00000206033A9CA0> ====================23INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x00000206033A9DC0> ====================24INFO:apache_beam.runners.worker.statecache:Creating state cache with size 10025INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x000002060367C6A0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')26INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((ref_AppliedPTransform_Read-Read-Impulse_4)+(ref_AppliedPTransform_Read-Read-Map-lambda-at-iobase-py-894-_5))+(Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_2_split/Write)27INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_PCollection_PCollection_2_split/Read)+(Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/Process))+(ref_AppliedPTransform_Split_8))+(ref_AppliedPTransform_PairWIthOne_9))+(GroupAndSum/Precombine))+(GroupAndSum/Group/Write)28INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_Write-Write-WriteImpl-DoOnce-Impulse_19)+(ref_AppliedPTransform_Write-Write-WriteImpl-DoOnce-FlatMap-lambda-at-core-py-2968-_20))+(ref_AppliedPTransform_Write-Write-WriteImpl-DoOnce-Map-decode-_22))+(ref_AppliedPTransform_Write-Write-WriteImpl-InitializeWrite_23))+(ref_PCollection_PCollection_11/Write))+(ref_PCollection_PCollection_12/Write)29INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((((GroupAndSum/Group/Read)+(GroupAndSum/Merge))+(GroupAndSum/ExtractOutputs))+(ref_AppliedPTransform_Format_14))+(ref_AppliedPTransform_Write-Write-WriteImpl-WindowInto-WindowIntoFn-_24))+(ref_AppliedPTransform_Write-Write-WriteImpl-WriteBundles_25))+(ref_AppliedPTransform_Write-Write-WriteImpl-Pair_26))+(Write/Write/WriteImpl/GroupByKey/Write)30INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((Write/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_Write-Write-WriteImpl-Extract_28))+(ref_PCollection_PCollection_17/Write)31INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((ref_PCollection_PCollection_11/Read)+(ref_AppliedPTransform_Write-Write-WriteImpl-PreFinalize_29))+(ref_PCollection_PCollection_18/Write)32INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (ref_PCollection_PCollection_11/Read)+(ref_AppliedPTransform_Write-Write-WriteImpl-FinalizeWrite_30)33INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 134INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.08 seconds.
Outputs
1(env) D:\BigData\12. Python\1. Tryouts\Beam>more outputs-00000-of-000012KING: 2433LEAR: 2364DRAMATIS: 15PERSONAE: 16king: 657of: 4478Britain: 29OF: 15
Below is the command
1python -m wordcount-local ^2 --region us-central1 ^3 --input gs://dataflow-samples/shakespeare/kinglear.txt ^4 --output btd-in3-bse-nse-dailys/outputs ^5 --runner DataflowRunner ^6 --project <<project-name>> ^7 --temp_location btd-in3-bse-nse-dailys/temp/ ^
It failed with below message
1INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2021-09-26_00_16_24-6802767166116217174 is in state JOB_STATE_PENDING2INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:28.455Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2021-09-26_00_16_24-6802767166116217174. The number of workers will be between 1 and 1000.3INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:28.594Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2021-09-26_00_16_24-6802767166116217174.4INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:29.920Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: Permissions verification for controller service account failed. IAM role roles/dataflow.worker should be granted to controller service account 25491243517-compute@developer.gserviceaccount.com.5INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:29.964Z: JOB_MESSAGE_DETAILED: Cleaning up.6INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:30.041Z: JOB_MESSAGE_BASIC: Worker pool stopped.
Solution was to add `` to the command
1python -m wordcount-local ^2 --region us-central1 ^3 --input gs://dataflow-samples/shakespeare/kinglear.txt ^4 --output btd-in3-bse-nse-dailys/outputs ^5 --runner DataflowRunner ^6 --project <<project-name>> ^7 --temp_location btd-in3-bse-nse-dailys/temp/ ^8 --service_account_email dataflow-btd-in3@<<project-id>>.iam.gserviceaccount.com
1Exception: Dataflow runner currently supports Python versions ['3.6', '3.7', '3.8'], got 3.9.7 (tags/v3.9.7:1016ef3, Aug 30 2021, 20:19:38) [MSC v.1929 64 bit (AMD64)].2To ignore this requirement and start a job using an unsupported version of Python interpreter, pass --experiment use_unsupported_python_version pipeline option.
Tried running with
1python -m wordcount-local ^2 --region us-central1 ^3 --input gs://dataflow-samples/shakespeare/kinglear.txt ^4 --output btd-in3-bse-nse-dailys/outputs ^5 --runner DataflowRunner ^6 --project btd-in2-20180718 ^7 --temp_location btd-in3-bse-nse-dailys/temp/ ^8 --experiment use_unsupported_python_version ^9 --service_account_email dataflow-btd-in3@<<project-id>>.iam.gserviceaccount.com
This kickstarted the Dataflow and i could see Pipeline in Dataflow console and Worker-VM created but it just kept running. In my local system it took less than 10sec to finish and it was taking around 15mins in GCP and no-progress. Later when into StackDriver to check whats happening found there were 73retries
Dataflow doesn't work with Python 3.9 yet and this is where is stop.
Total days wasted = 12 Days
Thank you for reading