BigQuery Data Pipeline Without Any Orchestrator Just CloudFunction And PubSub

A successful BigQuery Job completion will trigger another BigQuery job and scheduled queries via Cloud Function and PubSub sink from the StackDriver logging.

I was discussing with my team regarding a data pipeline for BQ, it’s a very simple pipeline, we are uploading some CSV files from the Relational Databases to GCS. Once the file arrived at the GCS, we can load it into a staging table and then merge it with the main table. Very simple pipeline. But I was thinking how can we orchestrate this without any orchestrator like Airflow, Matillion and etc. I have done a small PoC and wiring this blog post about that.

Orchestrate the pipeline:

#1 Create the tables:

For this PoC, Im going to use 2 tables. One for staging data and the other one is the main table.

Table Structure:


I have added a single row on the target table.

insert into `` values ('aaa',2);

#2 GCS Bucket and directory stucture:

I have a bucket with the name poc-bucket and inside I have the directory structure will looks like below.


All the codes that I used here are based on strict naming conversion.

#3 Create Service accounts:

We need 2 service accounts for Cloudction and BigQuery scheduled queries.

#4 CloudFunction to load data into staging table:

Region - Where your GCS and BQ tables are there. Trigger - Cloud Storage Event Type - Finalize/Create Bucket - poc-bucket Advanced - Select the service account Rutime - Python 3.8

Add these lines into the REQUIREMENTS.TXT file.


Function code:

dataset = client.dataset('my_db') - replace with your Dataset name.

from import storage
from import bigquery

def hello_gcs_generic(data, context):
    sourcebucket = format(data['bucket'])
    source_file = format(data['name'])
    # this split is based on my directory structure on GCS
    table_name = source_file.split('/')[1]
    input_file = source_file.split('/')[2]
    uri = 'gs://poc-bucket/' + source_file
    # BQ details
    client = bigquery.Client()
    dataset = client.dataset('my_db')
    table = dataset.table(table_name)
    # Job config
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1
    job_config.autodetect = True
    job_config.allow_jagged_rows = True
    job_config.allow_quoted_newlines = True
    job_config.fieldDelimiter = ','
    job_config.write_disposition = 'WRITE_TRUNCATE'

    load_job = client.load_table_from_uri(
    uri, dataset.table(table_name), job_config=job_config)
      # API request
    print("Starting job {}".format(load_job.job_id))

    load_job.result()  # Waits for table load to complete.
    print("Job finished.")

    destination_table = client.get_table(dataset.table(table_name))
    print("Loaded {} rows.".format(destination_table.num_rows))

#5 StackDriver PubSub Sink:

Go to logging and create a new filter using the following lines. But replace these things.

protoPayload.authenticationInfo.principalEmail="[email protected]"

Now click on the create sink and the sink service as PubSub.

#6 Schedule Query in BQ:

For mering the Data from the staging table to the main table, we can directly use a SQL query from the CloudFunction. But I want to do it in a different way. So we need to create a scheduled query that will run the merge SQL command.

  `my_db.tbla` b
  (name, id) ;

Once its created, go to Scheduled queries –> Query Name –> Config. You can see the resource name. This resource name will be used to trigger the scheduled query from the Cloud Function.

#7 CloudFunction to trigger scheduled query:

Configuration - Same as the previous Cloud Function.

The logic behind this function is, once the job is done and its success, then it’ll create a log entry with the table name. In my scheduled query, I have the naming conversion as merge_tablename. Then it’ll list all the scheduled queries then and pick the resource name which matches the query name as merge_tablename.

REQUIREMENTS.TXT - google-cloud-bigquery-datatransfer

Main function Code:

Replace parent=f"projects/poc-project" with your project name.

import time
import base64
import json

from import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp

def hello_pubsub(event, context):
    pubsub_message = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    # get the table name
    client = bigquery_datatransfer_v1.DataTransferServiceClient()
    # list all Scheduled queries 
    list_squery = []
    for data_source in client.list_transfer_configs(parent=f"projects/poc-project",data_source_ids=["scheduled_query"]):
    # Get the correct query id for the required table.
    value = json.loads(list_squery[0].replace('\'','"'))['merge_'+tbl]
    # BQ Job configs
    projectid = value.split("/")[1]
    transferid = value.split("/")[5]
    parent = client.project_transfer_config_path(projectid, transferid)
    # Trigger after 10 secnds
    start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
    response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)

#8 Its demo time:

Go to the GCS storage and upload the sample CSV file.


And then see the data on both staging and target tables.

select * from ``;

name: aaa
id: 1


We can do this pipeline in many ways, but my idea is without any orchestator I want to run, Also I need to trigger the scheduled queries when a particular job is success.

· gcp, BigQuery, cloud function, python, pubsub, stackdriver


