In the previous tutorial, Google Storage Export Task, we learned another way we can store data by exporting it to Google Cloud Storage. But how about managing data within the workflow? One way to manage data can be found in the Loop Task.
We have been working with Bike Sharing data. The main data points have been the starting station name and number of bike trips for each station. Another data point we can add is subscriber type. bigquery-public-data.san_francisco.bikeshare_trips contains subscriber_type identifying a subscriber as either a customer or subscriber. What if we wanted to break out the data by this subscriber type and send different reports via email for each type? One way to do this is with Loop Task.
By the end of the tutorial you will know
- What is Iteration Data and how to prepare it
- How to use iteration parameter
- How to use Loop Task
- How to add tasks to loop
- How to add tasks in positions of workflow
- How to move tasks within workflow
- What magic parameters are
Use the workflow created in the previous tutorial which contained a BigQuery Task that saves query results to a destination table, exports the data to a GCS bucket using GS Export Task, and then emails the file using E-Mail Task.
Modify the BigQuery Task so that subscriber_type is added to it. We want to use subscriber_type to break out the data.
SELECT start_station_name, subscriber_type, COUNT(1) AS trips FROM `<var_source_table>` GROUP BY start_station_name, subscriber_type
Add a second BigQuery Task and name it subscriber_types. The task is added at the bottom of the workflow. On the right side of the task header there are arrows pointing up and down. Clicking them moves the task up or down within the workflow. Click on the up arrow to move BigQuery Task subscriber_types directly under the first BigQuery task. The position of each task identifies the sequence in which they will be executed.
A BigQuery Task added at the bottom of workflow
The same BigQuery Task moved to the second position
A task can also be added to a position. In the Add Task section identify Add to position and click on the drop down. Select the first option in the list which should have 1 followed by the first BigQuery task name. Click a task and notice that it is added in the selected position and not at the bottom of the workflow. Remove any tasks that were created testing positioning.
Go back to BigQuery Task subscriber_types. Our goal is to iterate through subscriber types and send a separate email for each type. Therefore, we want to obtain the unique values for subscriber type and iterate through each of these types. Add the following for the query
SELECT subscriber_type AS subscriber_type FROM `<var_bq_table>` GROUP BY subscriber_type
Where parameter var_bq_table is set to the value where the prior BigQuery Task results were saved.
We are ready to start the loop. Add a Loop Task in the position right after BigQuery Task subscriber_types. This Loop Task should be position 3 of the workflow. Name the Loop Task loop. Notice that Loop Task has a green border. Other tasks can be added to Loop Task. Each task within the loop will be surrounded by a green border indicating they will be executed as part of the loop.
The Loop Task iterates through data (in our case subscriber types) and sets the value of the iteration to a parameter. In the Loop Task leave Loop Type as For Each (currently the only available option). In Iteration Data leave option as BigQuery Table and set input field to <var_subscriber_types_output>. <var_subscriber_types_output> is a so called magic parameter. Magic parameters are pre-defined custom parameters. In the case of <var_subscriber_types_output> the parameter value is the query result of BigQuery Task subscriber_types. Magic parameter output can be used by setting var_[taskid]_output where taskid is the name of the task.
Next set Iteration Parameter by first creating a customer parameter called var_subscriber_types and set data type to record. Iteration parameters must be of data type record. In the Loop Task click on the Iteration Parameter drop down list and select <var_subscriber_types>. For each loop of data iterated through the value will be set in var_subscriber_types which can then be used in other tasks within the Loop Task.
At the bottom of the Loop Task is an Add button followed by a drop down list of tasks that can be added to the loop. Set the task to BigQuery and click Add. Notice that the task has a green border indicating that it is part of the loop.
Name the task subscriber_type_data. We want to add a query that will filter station names and trips by subscriber type. Subscriber type is the data being iterated so add the Iteration Parameter to the WHERE clause of the query to filter the type. When using a record parameter specify the index of the record. For example  indicated the first column of the record. For our example we only have one column which is subscriber type.
SELECT start_station_name, trips FROM `<var_bq_table>` WHERE subscriber_type = '<var_subscriber_types>' ORDER BY start_station_name
In the previous workflow, tutorial_cloudstorageexporttask, we exported the file to cloud storage and then emailed the attachment. Let us do the same here but update the tasks to export and email a file for each subscriber type.
Currently our tasks gsexporttask and emailtask are not part of the Loop Task. Tasks not within the loop do not contain a green border. Tasks outside the loop can be added to the loop by clicking on an arrow in the task header that will move their position into the loop. Click on the up arrow for gsexporttask. This will move the task into the Loop task. Repeat this step for emaitask.
GS Export Task gsexporttask is not inside the loop. It is not surrounded by a green border
Moving GS Export Task gsexporttask up by clicking the up arrow moves the task inside the loop which is identified by the green border
Do same for E-Mail task. Both GS Export Task and E-Mail task are now within the loop. We need to update their settings to use Iteration Parameter. This will allow a break out of the data by subscriber types. For GS Export Task change Source BQ Table to the magic parameter output of BigQuery Task subscriber_type_data query results <var_subscriber_type_data_output>. We need to update the filename to indicate each subscriber type. Update the name by adding the Iteration parameter to the file name of the Destination GS URI path gs://<var_gs_bucket>/<var_subscriber_types>_<var_gs_file_name>
For E-Mail Task we should add the Iteration Parameter to the subject and body to indicate which subscriber type we are receiving. Set Subject as <var_email_subject> - <var_subscriber_types> and for Body set Attached is your data for <var_subscriber_types> types. Click on the Attachment link and set the GCS path to the updated file name using the Iteration Paramter gs://<var_gs_bucket>/<var_subscriber_types>_<var_gs_file_name>.
It is good practice to reset any dynamically set parameters. Outside the Loop Task add a Script Task and name it clear_subscriber_types. In the script body type return null and set the Return Parameter to var_subscriber_types.
Save the workflow and run.
The workflow will iterate through the loop and export a file for each subscriber type. In our example there are two types Customer and Subscriber. Check the destination cloud path for these files. Finally, you should receive emails with data for each type of subscriber. This workflow is an example of how to use a Loop Task. You can use it as a starting point to build your own loops when you need to iterate through your data.
View completed workflow
A completed version of the workflow in this tutorial can be found at https://magnus.potens.io/?l=dHV0b3JpYWxfbG9vcHRhc2s=
Next, we will add logic to our Loop Task by using Go-To Task.