Skip to content
bobby_dreamer

A Day with Apache Beam and Dataflow

notes, python, time-wasted, GCP2 min read

I wanted to build a pipeline move data from Cloud Storage to BigQuery. There are couple of options to do it.

  1. Elite way is doing it via Dataflow and Apache Beam
  2. Professional way is doing it via Cloud Functions

At first, i thought of doing it via Dataflow, i understand there is going to be a bit of learning in Apache Beam but if its the recommended approach in long term view then why not go with it. Here are some of the learnings and attempts i didn't for couple of days,

  1. Apache Beam can be coded via Java and Python
  2. Same code with little tweakings can be used locally and in cloud.
  3. When used in Cloud, it will start a VM Worker instance automatically and stop it when work is done, so there will be a bit of cost attached to this.

NOTE : There were lots of errors while trying to install using Python 3.5.2, but it worked flawlessly when tried with Python 3.9.7

1python3 -m venv env
2.\env\Scripts\activate
3
4pip3 install google-cloud-storage
5pip3 install --upgrade google-cloud-storage
6
7pip3 install wheel
8pip3 install apache-beam[gcp]
1(env) D:\BigData\12. Python\1. Tryouts\Beam>pip3 list
2Package Version
3------------------------------- ---------
4apache-beam 2.32.0
5avro-python3 1.9.2.1
6cachetools 4.2.2
7certifi 2021.5.30
8charset-normalizer 2.0.6
9crcmod 1.7
10dill 0.3.1.1
11docopt 0.6.2
12fastavro 1.4.5
13fasteners 0.16.3
14future 0.18.2
15google-api-core 1.31.3
16google-apitools 0.5.31
17google-auth 1.35.0
18google-cloud-bigquery 2.26.0
19google-cloud-bigtable 1.7.0
20google-cloud-core 1.7.2
21google-cloud-datastore 1.15.3
22google-cloud-dlp 1.0.0
23google-cloud-language 1.3.0
24google-cloud-pubsub 1.7.0
25google-cloud-recommendations-ai 0.2.0
26google-cloud-spanner 1.19.1
27google-cloud-storage 1.42.2
28google-cloud-videointelligence 1.16.1
29google-cloud-vision 1.0.0
30google-crc32c 1.2.0
31google-resumable-media 2.0.3
32googleapis-common-protos 1.53.0
33grpc-google-iam-v1 0.12.3
34grpcio 1.40.0
35grpcio-gcp 0.2.2
36hdfs 2.6.0
37httplib2 0.19.1
38idna 3.2
39numpy 1.20.3
40oauth2client 4.1.3
41orjson 3.6.3
42packaging 21.0
43pip 21.2.3
44proto-plus 1.19.0
45protobuf 3.17.3
46pyarrow 4.0.1
47pyasn1 0.4.8
48pyasn1-modules 0.2.8
49pydot 1.4.2
50pymongo 3.12.0
51pyparsing 2.4.7
52python-dateutil 2.8.2
53pytz 2021.1
54requests 2.26.0
55rsa 4.7.2
56setuptools 57.4.0
57six 1.16.0
58typing-extensions 3.7.4.3
59urllib3 1.26.7
60wheel 0.37.0
61WARNING: You are using pip version 21.2.3; however, version 21.2.4 is available.
62You should consider upgrading via the 'D:\BigData\12. Python\1. Tryouts\Beam\env\Scripts\python.exe -m pip install --upgrade pip' command.

Enabling APIs and Creating a GCP Service Account

Enabled APIs : There was a link to enable all this at one-go and went on with it. Dataflow API, Compute Engine API, Cloud Logging API, Cloud Storage, Google Cloud Storage JSON API, BigQuery API, Cloud Pub/Sub API, Cloud Datastore API, Cloud Resource Manager API in Google Cloud Platform

Creating Service Account

  1. Created a new IAM Service Account dataflow-btd-in3@<<project-id>>.iam.gserviceaccount.com with Role (Quick Access 🡪 Basic 🡪 Owner)
  2. After creating the account, go in to the Keys section and click ADD KEY 🡪 Create new key and download the key as JSON.

This you will use while running the Apache Beam

Running the sample wordcount Apache Beam program locally

Below is the wordcount program which reads a text file from Cloud Storage and counts number of words

wordcount-local.py
1"""A word-counting workflow."""
2import argparse
3import logging
4import re
5
6import apache_beam as beam
7from apache_beam.io import ReadFromText
8from apache_beam.io import WriteToText
9from apache_beam.options.pipeline_options import PipelineOptions
10from apache_beam.options.pipeline_options import SetupOptions
11
12
13class WordExtractingDoFn(beam.DoFn):
14 """Parse each line of input text into words."""
15 def process(self, element):
16 """Returns an iterator over the words of this element.
17
18 The element is a line of text. If the line is blank, note that, too.
19
20 Args:
21 element: the element being processed
22
23 Returns:
24 The processed element.
25 """
26 return re.findall(r'[\w\']+', element, re.UNICODE)
27
28
29def run(argv=None, save_main_session=True):
30 """Main entry point; defines and runs the wordcount pipeline."""
31 parser = argparse.ArgumentParser()
32 parser.add_argument(
33 '--input',
34 dest='input',
35 default='gs://dataflow-samples/shakespeare/kinglear.txt',
36 help='Input file to process.')
37 parser.add_argument(
38 '--output',
39 dest='output',
40 required=True,
41 help='Output file to write results to.')
42 known_args, pipeline_args = parser.parse_known_args(argv)
43
44 # We use the save_main_session option because one or more DoFn's in this
45 # workflow rely on global context (e.g., a module imported at module level).
46 pipeline_options = PipelineOptions(pipeline_args)
47 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
48
49 # The pipeline will be run on exiting the with block.
50 with beam.Pipeline(options=pipeline_options) as p:
51
52 # Read the text file[pattern] into a PCollection.
53 lines = p | 'Read' >> ReadFromText(known_args.input)
54
55 counts = (
56 lines
57 | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
58 | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
59 | 'GroupAndSum' >> beam.CombinePerKey(sum))
60
61 # Format the counts into a PCollection of strings.
62 def format_result(word, count):
63 return '%s: %d' % (word, count)
64
65 output = counts | 'Format' >> beam.MapTuple(format_result)
66
67 # Write the output using a "Write" transform that has side effects.
68 # pylint: disable=expression-not-assigned
69 output | 'Write' >> WriteToText(known_args.output)
70
71
72if __name__ == '__main__':
73 logging.getLogger().setLevel(logging.INFO)
74 run()

Before running the program set the PATH of the Service Account JSON file

1set GOOGLE_APPLICATION_CREDENTIALS=D:\BigData\12. Python\1. Tryouts\private\beam-test-abb6f7f2eb46-owner-role.json

Now run the program

1(env) D:\BigData\12. Python\1. Tryouts\Beam>python -m wordcount-local --output outputs
2D:\BigData\12. Python\1. Tryouts\Beam\env\lib\site-packages\apache_beam\__init__.py:79: UserWarning: This version of Apache Beam has not been sufficiently tested on Python 3.9. You may encounter bugs or missing features.
3 warnings.warn(
4INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
5INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
6INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
7INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
8INFO:oauth2client.client:Refreshing access_token
9WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
10INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.32.0
11INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x00000206033A4F70> ====================
12INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x00000206033A90D0> ====================
13INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x00000206033A9550> ====================
14INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x00000206033A95E0> ====================
15INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x00000206033A9790> ====================
16INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x00000206033A9820> ====================
17INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x00000206033A9940> ====================
18INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x00000206033A99D0> ====================
19INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x00000206033A9A60> ====================
20INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x00000206033A9AF0> ====================
21INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x00000206033A9D30> ====================
22INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x00000206033A9CA0> ====================
23INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x00000206033A9DC0> ====================
24INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
25INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x000002060367C6A0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
26INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((ref_AppliedPTransform_Read-Read-Impulse_4)+(ref_AppliedPTransform_Read-Read-Map-lambda-at-iobase-py-894-_5))+(Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_2_split/Write)
27INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_PCollection_PCollection_2_split/Read)+(Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/Process))+(ref_AppliedPTransform_Split_8))+(ref_AppliedPTransform_PairWIthOne_9))+(GroupAndSum/Precombine))+(GroupAndSum/Group/Write)
28INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_Write-Write-WriteImpl-DoOnce-Impulse_19)+(ref_AppliedPTransform_Write-Write-WriteImpl-DoOnce-FlatMap-lambda-at-core-py-2968-_20))+(ref_AppliedPTransform_Write-Write-WriteImpl-DoOnce-Map-decode-_22))+(ref_AppliedPTransform_Write-Write-WriteImpl-InitializeWrite_23))+(ref_PCollection_PCollection_11/Write))+(ref_PCollection_PCollection_12/Write)
29INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((((GroupAndSum/Group/Read)+(GroupAndSum/Merge))+(GroupAndSum/ExtractOutputs))+(ref_AppliedPTransform_Format_14))+(ref_AppliedPTransform_Write-Write-WriteImpl-WindowInto-WindowIntoFn-_24))+(ref_AppliedPTransform_Write-Write-WriteImpl-WriteBundles_25))+(ref_AppliedPTransform_Write-Write-WriteImpl-Pair_26))+(Write/Write/WriteImpl/GroupByKey/Write)
30INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((Write/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_Write-Write-WriteImpl-Extract_28))+(ref_PCollection_PCollection_17/Write)
31INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((ref_PCollection_PCollection_11/Read)+(ref_AppliedPTransform_Write-Write-WriteImpl-PreFinalize_29))+(ref_PCollection_PCollection_18/Write)
32INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (ref_PCollection_PCollection_11/Read)+(ref_AppliedPTransform_Write-Write-WriteImpl-FinalizeWrite_30)
33INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
34INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.08 seconds.

Outputs

1(env) D:\BigData\12. Python\1. Tryouts\Beam>more outputs-00000-of-00001
2KING: 243
3LEAR: 236
4DRAMATIS: 1
5PERSONAE: 1
6king: 65
7of: 447
8Britain: 2
9OF: 15

Running the sample wordcount Apache Beam program on GCP

Error 1 : Permissions verification for controller service account failed. IAM role roles/dataflow.worker should be granted to controller service account

Below is the command

1python -m wordcount-local ^
2 --region us-central1 ^
3 --input gs://dataflow-samples/shakespeare/kinglear.txt ^
4 --output btd-in3-bse-nse-dailys/outputs ^
5 --runner DataflowRunner ^
6 --project <<project-name>> ^
7 --temp_location btd-in3-bse-nse-dailys/temp/ ^

It failed with below message

1INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2021-09-26_00_16_24-6802767166116217174 is in state JOB_STATE_PENDING
2INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:28.455Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2021-09-26_00_16_24-6802767166116217174. The number of workers will be between 1 and 1000.
3INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:28.594Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2021-09-26_00_16_24-6802767166116217174.
4INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:29.920Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: Permissions verification for controller service account failed. IAM role roles/dataflow.worker should be granted to controller service account 25491243517-compute@developer.gserviceaccount.com.
5INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:29.964Z: JOB_MESSAGE_DETAILED: Cleaning up.
6INFO:apache_beam.runners.dataflow.dataflow_runner:2021-09-26T07:16:30.041Z: JOB_MESSAGE_BASIC: Worker pool stopped.

Solution was to add `` to the command

1python -m wordcount-local ^
2 --region us-central1 ^
3 --input gs://dataflow-samples/shakespeare/kinglear.txt ^
4 --output btd-in3-bse-nse-dailys/outputs ^
5 --runner DataflowRunner ^
6 --project <<project-name>> ^
7 --temp_location btd-in3-bse-nse-dailys/temp/ ^
8 --service_account_email dataflow-btd-in3@<<project-id>>.iam.gserviceaccount.com
Error 2 : Dataflow runner currently supports Python versions ['3.6', '3.7', '3.8'], got 3.9.7
1Exception: Dataflow runner currently supports Python versions ['3.6', '3.7', '3.8'], got 3.9.7 (tags/v3.9.7:1016ef3, Aug 30 2021, 20:19:38) [MSC v.1929 64 bit (AMD64)].
2To ignore this requirement and start a job using an unsupported version of Python interpreter, pass --experiment use_unsupported_python_version pipeline option.

Tried running with

1python -m wordcount-local ^
2 --region us-central1 ^
3 --input gs://dataflow-samples/shakespeare/kinglear.txt ^
4 --output btd-in3-bse-nse-dailys/outputs ^
5 --runner DataflowRunner ^
6 --project btd-in2-20180718 ^
7 --temp_location btd-in3-bse-nse-dailys/temp/ ^
8 --experiment use_unsupported_python_version ^
9 --service_account_email dataflow-btd-in3@<<project-id>>.iam.gserviceaccount.com

This kickstarted the Dataflow and i could see Pipeline in Dataflow console and Worker-VM created but it just kept running. In my local system it took less than 10sec to finish and it was taking around 15mins in GCP and no-progress. Later when into StackDriver to check whats happening found there were 73retries

Error 3 : Showstopper

Dataflow doesn't work with Python 3.9 yet and this is where is stop.


Cleanups

  1. Deleting the IAM and Service Account
  2. Disabling APIs
  3. Delete staging buckets in Cloud Storage

Total days wasted = 12 Days

  • 10 Days thinking, reading and watching youtube videos basically preparing and basically deciding to go with Dataflow or Cloud Functions
  • 2 days of testing

Thank you for reading

References (if i ever come back)