Hardening our Azure Data Factory (ADF) pipelines

Data is pulled is from various sources (some data sources are from on-prem and some are from other clouds like ServiceNow) into our cloud data warehouse using Azure Data Factory (ADF) pipelines.

ADF pipelines run on daily basis for all data sources. There is an ADF pipeline which runs every 45 mins (every day). There are future plans to run some of other pipelines near real-time (with frequency less than 60 mins).

Power BI reports are built and used by many teams in our organization with the data sourcing from this cloud data warehouse.

So, hardening of ADF pipelines is very crucial and the basic requirement for the proper functioning of the reports.

There can be various technical limitations/challenges that lead to pipeline failure. Below listed are most common limitations and pipeline hardening process should provide mitigation for these problems.

  1. Schema drift situation — this is one of the most annoying problem every Data Architect/Engineer experience with any data source. Pipelines have to be manually updated when any schema changes happen at source.
  2. Hard-coded parameters in the pipelines — hard-coding the parameters into the pipeline will save the building and testing time but would create problems in long-run.
  3. Staging tables maintained manually — In our old data pull process, data was pulled from source into parquet files and then loaded into staging tables. Then, MERGE statement was used to update/insert delta changes from staging tables into main tables. Staging tables were physically created and have to be updated manually whenever schema changes happen at the source.
  4. Stored procedures maintained manually — In our old data pull process, SQL stored procedures were physically created for running MERGE statement between staging and main tables. Stored procedures have to be updated manually whenever schema changes happen at source.
  5. No easy way to report the data pull status for tables in data warehouse.
  6. No monitoring and alerts set up for failed and long-running pipelines.

To mitigate most of the technical limitations/challenges mentioned above, I designed a new ADF pipeline. The pipeline is designed in such a way that it’s suitable to any data source. Also, this new design makes the CI/CD process easy as no manual changes wouldn’t be needed to the pipeline for the deployment.

Two custom tables are created in data warehouse, which are used/updated by various activities in the new pipeline.

a. gen2_watermarktable — this table stores the watermark values used for daily data pulls. These values are needed for doing the delta pulls.

This table also tracks the list of tables that are getting data from the source.

b. gen2_tabcols — this table stores the details of the columns present in the data warehouse tables. Any column (in a table) added at data source is not immediately added to the data warehouse. Column details are stored in gen2_tabcols table for future reference. This helps in handling the schema drift situation.

→ LookupOldWatermark — this activity runs this SQL statement to get the list of tables for which data has to be pulled from the source.

SELECT [TableName], [WatermarkCol], [MergeMapCol], [PartbyCol], [OrderbyCol], [WatermarkValue] FROM dbo.gen2_watermarktable where DataSource=<data source> and IsTablePulledIntoDW=’Y’ and IsTablePulledByDailyDeltaPipeline=’Y’;

Note: this activity helps in mitigating the technical limitation #1 & #2.

→ ForEach1 — this loop executes a set of activities for each table identified in the above mentioned activity.

→ UpdatePipelineStartRun — this activity runs a SQL stored procedure to update the pipeline start time in gen2_watermarktable for each table.

Note: this activity helps in mitigating the technical limitation #5.

→ CopyToParquet — this activity copies the data from source to parquet files in the Azure Data Lake Storage containers. There are containers created for each data source.

→ ServiceCentralTicket_IfCopyFails — this activity creates an incident ticket in ServiceNow if the CopyToParquet activity fails.

→ CopyToStagingTable — this activity dynamically creates a staging table using “Auto create table” option in ADF “Copy data” activity and loads the data from parquet file into the staging table.

Note: this activity helps in mitigating the technical limitation #3.

→ MoveParquetFile — this activity copies the parquet file to another container with a timestamp added to the name of the file. These files can be used when data backfilling is needed and also helpful in parquet files lifecycle management.

→ RemoveDupsFromStagingTable — this activity runs a SQL stored procedure to remove the duplicates in the data loaded into staging table.

→ UpdateMainTableWithDynamicMerge — this activity runs a SQL stored procedure that generates a MERGE statement dynamically.

Note: this activity helps in mitigating the technical limitation #4.

→ DropStagingTable_IfMergeFails — this activity runs a SQL stored procedure to drop the staging table if the UpdateMainTableWithDynamicMerge activity fails.

→ ServiceCentralTicket_IfMergeFails — this activity creates an incident ticket in ServiceNow if the UpdateMainTableWithDynamicMerge activity fails.

→ LookupNewWatermarkValue — this activity runs a SQL statement to find the latest watermark value from the data warehouse table it’s processing.

→ UpdateWatermarkValue — this activity runs a SQL stored procedure to update the watermarkvalue in gen2_watermarktable.

Note: this activity helps in mitigating the technical limitation #5.

→ GetNewColsMissInTarget — this activity runs a SQL stored procedure to find out the new columns created at the source but not yet in data warehouse table, and the column details are entered into gen2_tabcols.

Note: this activity helps in mitigating the technical limitation #1.

→ DropStagingTable — this activity runs a SQL stored procedure to drop the staging table created by CopyToStagingTable activity.

→ UpdatePipelineSuccessRun — this activity runs a SQL stored procedure to update the pipeline end time in gen2_watermarktable for each table.

Note: this activity helps in mitigating the technical limitation #5.

The new ADF pipeline design opened up an opportunity to build a Power BI dashboard to track the progress of ADF pipelines. The data for this dashboard comes from gen2_watermarktable.

Monitoring and alerts for ADF pipelines:

These 2 alerts are created for monitoring the ADF pipelines.

a. ADF failed pipeline — this alert rule is triggered when an ADF pipeline fails. This alert rule is based on this logic.

b. LongRunningPipelineAlert — this alert rule is triggered when there is any pipeline that’s running for more than 5 hours. This alert rule is triggered by a query executed on the ADF’s log.

Disclaimer: The posts here represent my personal views and not those of my employer or any specific vendor. Any technical advice or instructions are based on my own personal knowledge and experience.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store