Real time Train Operator Performance Analysis with Azure Event Hubs and Databricks Structured Streaming

Network Rail is the owner and infrastructure manager for most of the rail network in UK. They have been operating a real time data feed for quite a long time, but the feed was pretty much limited to the train operating companies (TOC's). In recent years however they have also made available a public feed of this data inline with the UK government's Open Data initiatives.

The public feed is available over several protocols which includes a modern HTTP based REST service as well as the much older STOMP base messaging service which is still the one widely used by much of the TOC's themselves.

I have been playing around with data streaming for a while now, but its all been the usual suspects like your social media feeds such as Twitter, Facebook, Instagram, ext, ext.. I was looking for something more interesting and remembered the network rail real time feeds. After looking more closely at the options available I decided to use the STOMP based feed as I was a bit more familiar with this datasets having previously worked with them. Anyone can gain access to this data by first registering on the Network Rail data feed website. There is however a limit to the number of users they register which is revised frequently based on demand and capacity. I have listed all the relevant links to their sites within this article, so you can also check it out if you are interested.

My main goal here is to document and demonstrate the work involved in gaining access to the live feed and establishing a data streaming service utilising Azure Event Hubs and Spark Structured Streaming on Databricks. In order to crystallise this I have depicted the basic tech stack architecture in the diagram below:

As seen in the above diagram, Network Rail real-time data feed is sourced from an internal enterprise system named TRUST which is used for monitoring the progress of trains and tracking delays on the rail network. There are a number of datasets available from TRUST which you have to select when you register for the data feeds. For this piece of work I have chosen the Train Movements dataset. This dataset consists of a number of topics exposed via an ActiveMQ messaging service that registered users can subscribe to using a client app over the Stomp protocol. Once subscribed, you receive JSON messages that can be sent to an Azure Event Hub in small batches where they can be consumed by Spark Structured Streaming data frames running on Azure Databricks. The train movement data can be put to good use in a large number of monitoring, alerting and analytical applications. In this instance I have decided to use the data to produce a simple streaming dataset that provides for a specific train company information about the number of trains that were on time, late or even early within a 10 minute rolling time window. Of course you could use the data for far more complex and more useful applications, but the objective here is to understand and highlight the basic technicalities involved in doing such a project.

Getting the Source Data

There is a good deal of information about the Network Rail data online if you are interested in the subject of rail operations in the UK. Getting access to the real time data feed is done by first registering on their site which is Once registered you can select the datasets you are interested in. 

Information about the registration process can be found here:, 

Information about the datasets and topics can be found in their Wiki pages here: 

The Client App

The client app itself can be a simple background job running on an on-premise or cloud server and can be written in any language that supports the Stomp protocol either natively or via third party libraries. For this purpose I have written a small console app in Python using the open source Python Stomp library available from the Python library index. In addition this app also uses the Python Azure Event Hub library in order to send the data received from the real time feed to the Azure Event Hub.

Below is the Python code for this client app in its most simplistic form:

import stomp
from azure.eventhub import EventHubProducerClient, EventData

NETWORK_RAIL_AUTH = ("<network rail login name>", "<password>")

class Listener(object):

    def __init__(self, mq, pr, eb):
        self._mq = mq
        self._pr = pr
        self._eb = eb

    def on_message(self, headers, message):



        except ValueError:
            self._eb = self._pr.create_batch()

        self._mq.ack(id=headers['message-id'], subscription=headers['subscription'])

def main():
	producer = EventHubProducerClient.from_connection_string(conn_str="<event hub namespace connection string>", eventhub_name="<event hub name")
	event_data_batch = producer.create_batch()

	mq = stomp.Connection(host_and_ports=[('', 61618)],
						heartbeats=(100000, 50000))
	mq.set_listener('', Listener(mq, producer, event_data_batch))

	mq.subscribe('/topic/TRAIN_MVT_ALL_TOC', 'tmat-1', ack='client-individual')
	input('press any key to stop')

	print('connection closed')

if __name__ == "__main__":

The credentials in line 4 should be replaced with the login name and password that was used to register for the live feeds.

The <event hub namespace connection string> in line 27 should be replaced with the correct connection string for the Azure Event Hub Namespace. Also the <event hub name> should be replaced with the name of the Event Hub Instance. These details are available on the Azure portal once the event hub has been provisioned on the relevant Azure subscription. I have further explanations regarding the event hubs in the following sections of this article.

In the above code, lines 1 and 2 brings in the Stomp library and Azure Event Hub library which are the only external Python libraries required for this basic app.

In line 6 we setup a Listener class which has the job of listening to the messages arriving from the subscribed live feed. This class takes in as parameters a Stomp connection object, an Azure Event Hub producer object, and an Azure Event Hub data batch object. At the heart of this class is the 'on_message' method in line 13 which gets called by Stomp listener with each message that it receives. It then attempts to add those messages to the event hub data batch object in line 18. This continues to happen until the total size of all the received messages exceed the size limit of the event hub data batch at which point it enters the exception defined in line 20. The exception handler then sends the event hub data batch object containing the data so far collected to the Azure Event Hub in line 21. Line 22 then replenishes the data batch object with a new instance to resume the process of collecting new messages again.

In the main function, lines 27 and 28 sets up the connection to the Azure Event Hub from the information obtained from the provisioned resources. Line 30 makes the connection to the message queue over Stomp. Note the parameters being passed here pertain to the host ActiveMQ server at Network Rail along with the relevant port enabled for the connection. This information can be obtained from the Wiki pages given earlier. Additionally the heartbeat parameter is optional but is encouraged to make the connection more efficient and fault tolerant. More information about this along with the documentation on the Python Stomp library is available on this link:

An instance of the Listener class defined earlier gets assigned to the Stomp connection in line 34 before the actual connection is made in line 35 using the credentials provided at the start in line 4.

The listener starts in line 39, when a subscribe is made on the Stomp connection to the message queue. Notice that the first string parameter is the path to topic which in this case is TRAIN_MVT_ALL_TOC. This is the codified topic name for 'Train Movements -All TOC's' dataset available on the Network Rail data feeds. If another dataset is required, then further subscribes could be made by finding out their codified topic names from the Network Rail Open Data Wiki page links given earlier. Since it is possible to have multiple subscribes on the same Stomp connection, the second string parameter takes a unique identifier that can be used to reference them in subsequent actions such as unsubscribes as can be seen in line 42.

This simple client app runs as a console application and once started will continue to run printing each message as they are received until the user terminated the app by hitting any key. Note that this app is written this way for simplicity and ease of explanations and is not a production ready app. Typically this would be run as a background service job with additional logging, exception handling, fault tolerance and performance characteristics built in.

For demonstration purposes I am simply printing the received messages to the console in line 15 before sending them to the event hub. The clip below shows how this app is receiving data from the Network Rail real time feed in a VS Code terminal.

Azure Event Hub

Azure Event Hubs are based on the same underlying platform as Azure Service Bus. However event hubs are geared more towards ingesting and handling big data streams. Hence this is the perfect service to ingest the real time feeds from Network Rail. All that is needed is to provision one on a Azure subscription. This is a fairly straight forward process which could be done via a fully automated DevOps process, or for demonstration and learning purposes simply creating one on the Azure portal.

For details on how to provision an Azure Event Hub, see the link below:

Once the event hub and instance have been created, the Primary Key Connection Strings of both the event hub namespace and the event hub instance should be noted along with the name of the instance. These pieces of information can be easily obtained from the Azure portal under the provisioned event hub. They are needed when writing and/or configuring the client app that sends data to the event hub and the Databricks notebook that consumes the data streams ingested from the event hub.

Azure Databricks

Azure Databricks is essentially Spark provisioned on the Azure platform. Spark offers Structured Streaming based on dataframes which is ideal for the performance analytics that we need to build here. Similar to Azure Event Hubs, provisioning an Azure Databricks cluster can be done in a automated manner using DevOps or by simply using the Azure Portal.

For details on Azure Databricks and how to provision clusters can be found on the link below:

Databricks notebook for real time TOC performance analysis

For the purpose of conducting this simple performance analysis, I set up a Databricks notebook on the portal under an existing workspace. 

See the link below for more details on how to setup workspaces and notebooks:

Databricks notebooks supports Python, Scala, SQL and R languages. I have used Python for this notebook along with the build in PySpark library. Here we can consume the data being ingested into the Azure Event Hub using a Structured Streaming dataframe.

Before being able to consume data from the event hub, we need to install the Apache Spark Event Hub Connector library package on to the cluster provisioned for the job. To do this on the portal launch into the Databricks workspace and go to the Compute tab where you can see the clusters. Make sure the cluster is up and running, then click on 'Install New' as seen below:

Chose Marven as the Library Source and provide the Coordinates to the package as seen below:

Note it is important to match the package version to the version of the Databricks cluster. To help with this there is a handy table put together by the good folks who created this package which can be found in the link below:

I have highlighted below the version I have selected from the table below, based on the version of the cluster that I provisioned:

You can get the exact coordinates for the package by clicking on the Package Version above which takes you to the Marven Central Repository. Once the package is installed it will appear listed under the Libraries tab on the cluster page.

With the required library package installed, its time to take a look at the notebook in more detail. Before diving in though its good to get to know the train movement data a bit better. These are essentially the real time event records that reports on the progress of trains along their journey. They include every arrival and departure at a station stop, along with the corresponding timestamps and a variation status that indicate if the event occurred on time, late or early. In addition to this, there are also many other pieces of data that gets embedded in those records providing information on the train operator, location, service, scheduled times, ext., ext.. However for the purpose of this work, only the train operator id, actual timestamp and variation status is needed. From the train operator id, it is possible to obtain the train operator name by referencing a static list in the form of a simple delimited text file which can be uploaded to the Azure storage beforehand.

The table below lists the data that is used for this simple performance analysis:

Train Movement Feed
division_codeOperating company ID as per TOC Codes
actual_timestampThe date and time that this event happened at the location
variation_statusOne of "ON TIME", "EARLY", "LATE" or "OFF ROUTE"

Static Reference for TOC names
SectorcodeOperating company ID as per TOC Codes
DescriptionName of operating company

For the purpose of this article and for simplicity, the analysis has been limited to a single TOC. Hence only the single record pertaining to that TOC has been inserted into the static reference file. If other TOC's needs to be included in the analysis then its simply a matter of inserting the records for those in the reference file.

In the first cell of the notebook we configure the Spark Context to allow access to the Azure Storage Account so that it can retrieve the TOC codes and names from the static reference file. We also setup a variable to hold the connection string to the Azure Event Hub Instance. Note that this connection string is not the same as the one used earlier in the Client App to send data to the event hub. That was the connection string to the event hub namespace whereas this is the connection string to the event hub instance. The code for this cell is shown below:

from pyspark.sql.types import *
import  pyspark.sql.functions as F

# Set access key credentials to the Azure Data Lake Gen 2 at the Spark Context
  "<azure storage account name>",
  "<azure storage account access key>")

# Connection string to the Event Hub Instance
conn_str = "<event hub instance connection string>"
# Initialize event hub config dictionary with connectionString
ehConf = {}
ehConf['eventhubs.connectionString'] = conn_str
# Add consumer group to the ehConf dictionary
ehConf['eventhubs.consumerGroup'] = "$Default" 
# Encrypt ehConf connectionString property
ehConf['eventhubs.connectionString'] =

First the dependant libraries are imported into the notebook in line 1 and 2. 

The string values in line 6 and 7 should be replaced with the Account Name and Access Key to the Azure Storage Account where the static reference file is stored. The Access Key can be obtained from the Azure portal under the provisioned storage account.

The string value in line 10 should be replaced with the connection string to the Azure Event Hub Instance. As mentioned earlier this information can be obtained from the Azure portal under the provisioned event hub instance

In the next cell we can load both the streaming data and static reference data into dataframes:

dfstream = spark.readStream.format("eventhubs").options(**ehConf).load()
dfref ="delimiter","\t").csv("abfss://refdata@<storage account name>", header="true")

Line 1 loads the real time train movements data feed from the event hub to a streaming dataframe using the configuration information setup in the dictionary variable in cell 1. Line 2 loads the static reference file containing the TOC codes and names into a static dataframe. Note that this is a tab delimited text csv file named 'sectorcodes.csv' stored in a container named 'refdata' in a Azure Storage Account. The string <storage account name> should be replaced with the respective name of the provisioned storage account.

Next we define the schema that maps to the JSON structure of the messages received from the Network Rail data feed. This is a crucial piece to get right without which the data would not be usable. Fortunately the Network Rail data feed messages are not hugely complex and can be handled with the schema shown in the following cell:

sch = ArrayType(

As can be seen in the code above, schema is a array of structures each element comprising of 2 nested field structures, one called 'header' and another called 'body'. As discussed earlier although we schematise and map the entire message structure, only a small number of fields from the body is used for our analysis. With this schema in place we can actually convert the JSON messages into usable dataframes and run the required analysis in the next cell as shown below:

dfsj ="body").cast("string"), sch).alias("payload"))
dfsjrows =
dfevtoc = dfsjrows.join(dfref, dfsjrows.col.body.toc_id == dfref.Sectorcode)\
  .withColumn("event_time", F.to_timestamp(F.from_unixtime((dfsjrows.col.body.actual_timestamp.cast("double"))/1000)))
dftocperf = dfevtoc.withWatermark("event_time", "30 minutes")\
  .groupBy(F.window(F.col("event_time"), "10 minutes", "5 minutes"), "Description",dfevtoc.col.body.variation_status).count()\

Note that the messages arriving from the event hub are in binary form and needs to the converted to string to reveal the JSON contained within. Only then can the schema defined earlier could be applied to the JSON to convert them to a dataframe. These steps can all be easily done in a single statement as seen in line 1.

Since each JSON message is an array nested elements, they need to be exploded into individual train movement records. This steps happens in line 2.

In line 3, we join the explored train movement records with the TOC code reference data to being in the TOC names. The statement in line 3 is chained to derive a new column that is a conversion of the actual timestamp from unix time to regular datetime. Note that this is required due to the fact that all time fields from the Network Rail data feed arrives in unix time.

Finally in line 5 we create the analysis dataframe by calculating the count of train movement events grouped by a time window, TOC name and the variation status. Notice that the count is based on a 10 minute rolling time window with a the count being updated every 5 minutes. An optional watermark time of 30 minutes is also applied to prevent a memory overrun as this demonstration notebook is only meant to be run on limited resources. The resulting dataframe also gets ordered by TOC name and the time window. Note that the grouping by time window is not of any value for the analysis, but its only present here to demonstrate the frequency of the real time data as they arrive into the streaming dataframe.

To learn more about aggregations on streaming dataframes along with time windows and watermarks, refer to the documentation at Databricks in following link:

As long as the client app is sending data from real time data feed to the event hub and the event hub is streaming to Databricks ,the final dataframe will continue to aggregate as per specified fields and time windows. The dataframe can be used in many different ways depending on the requirement. i.e. It could be written to a database for further processing, It could be written back to another event hub for further onward streaming and directly consumed by a dashboard on Power BI or any other thin client or rich client app.

However on this notebook, for demonstration purposes, we can simply display the final dataframe using the Databricks display statement as shown below:

As mentioned earlier, the window column has been included in the aggregation here to demonstrate the frequency of the rolling time window within which the train movement events are being counted. The description column referees to the TOC name which has been referenced from the static reference file containing TOC codes and names. The variation status column is displayed by its fully qualified dataframe path.

The final analysis could of course be further refined, although my objective here was to document and demonstrate the work involved in building a basic data streaming service over this tech stack.

Docker Containers for SQL Server shared database development

Whilst most application development environments utilise local databases running on individual developer machines, some projects such as ETL and data warehousing may require to work with shared databases where the databases run on a shared server and used on by many developers working on multiple projects.

In these circumstances the developers have to work very carefully with close co-ordination to ensure that they don't inadvertently undo each others work. If one developer has to work in isolation often they may make a temporary copy of the database on the shared server and work on that one separately. This can lead to a lot of unwanted branch databases lying around the server which are highly out of sync with the trunk version.

Now that Microsoft has made the docker images available for SQL Server 2016 and 2017 developer edition, there is a much better solution for managing shared database development.

I am documenting here the process I went though to set up such an environment with a set of sample databases. For simplicity sake, I have only created very basic database structure and not used any complex models.

The Docker engine for Windows can be run on Windows Server 2016 native containers and with Hyper-V enabled on Windows 10 Pro or Enterprise editions.

For the purpose of this documentation work, I am running Docker on my Windows 10 Pro machine with Hyper V enabled and my development environment is based on SQL Server 2016 Developer Edition.

So first I downloaded the Docker Image for SQL Server 2016 Developer from Docker Hub as follows:

The database development environment consist of a database that represent the Trunk branch (the production version) of the sample database application which I call APP1DB_Trunk.

In order to create a Docker image for this database I took copies of the .MDF and .LDF files of the existing APP1DB database. These files I stored locally in a folder named D:\PalProjects\Docker\sqldb1. Then in the same folder I created a text file named Dockerfile where the docker image instructions are placed as follows:

The FROM command instructs docker to use the SQL Server 2016 image which includes its dependant Windows Server Core. Then it runs a PowerShell command to create a new folder named SQLServer on the container's C: drive to which it copies the .MDF and .LDF database files from the local folder. It then sets two environment variables for the SA password and end user licence acceptance indicator. At present I have only intend to use SQL Authentication with SA user to connect to the SQL Server container.

Next, I change directory (CD) to the folder where I have placed the Dockerfile and the database files and ran the following to create the docker image of the database Trunk branch.

-t parameter indicates the tag name given to the image. I have used a naming convention as follows:

<client id>/<database name>_<branch>:<database version>-<sql server version>

As such my client ID is 1, database name is APP1DB, branch is Trunk, database version is 1.1 and SQL Server version is 2016. This is the naming convention I have used in this sample, but you may follow your own convention. Also note that docker image names must be all in lower case.

When this is completes, I check the docker image list with the following command:

This lists all the images present in my repository as depicted below:

Now when this image is launched into a container, the instructions in the Dockerfile gets processed and the database files gets copied over from the current host directory (C:\PalProjects\Docker\sqldb1) into the C:\SQLServer directory which gets created inside the container via the PowerShell mkdir command. When the container starts, an instance of the SQL Server 2016 should be running inside the container. However, the Dockerfile does not have any instructions to actually attach the .mdf and .ldf files to a database. This will be given as a parameter in the following command which launches the container from the image:

This command launches the container from the clt1/app1db_trunk:1.1-2016 image given in the last parameter above. The --name parameter assigns the name APP1DB_dev to the container. -d launches the container in detached mode. -p maps a port number from the image host (left side of the colon) to the image container (right side of the colon). As mentioned earlier, the Dockerfile did not have any instructions for attaching the .mdf and .ldf database files to create a database within the container. This is achieved by an -e parameter which sets the environment variable attach_dbs. The parameter value is a JSON string containing key/value pairs for the database name (dbName), and the database file and log file name (dbFiles) in a two element array.

Any number of containers can now be launched from the single image. In addition to the container named APP1DB_dev which I launched for development, I had also launched a second container for testing with the same command only with the --name parameter changed to APP1DB_uat.

To check what containers are currently runinng, issue the following command:

The currently running containers are listed as depicted below:

The containers configuration can be examined by using the docker inspect commandL

This will show the entire configuration as a JSON object. However for the purpose of being able to connect to the SQL Server instance running inside the container, I only want to know the IP Address that docker has assigned to the container. The following parameter can be used to filter the specific JSON item within the configuration:

The above parameter will show the IP Address docker has assigned to the container.

Note:- An important point to note is that the IP Address displayed above is only visible within the host environment. This means it cannot be accessed from outside the host from the LAN. However it is possible to configure the containers network to allow docker to use the LAN's IP address range when assigning an address to the container.

For this documentation, I have not gone into this network configuration and had limited the activity to the host environment only. Visit the Docker reference documentation website at for detailed information on this topic. I shall update this document at a later date with how to configure container network to use the LAN's IP address range.

So with that said, to connect to the SQL Server instance within the above containers, launch SQL Server Management Studio and connect as follows :

After successfully connecting, it will be possible to see the database that was attached within the Object Explorer :

Having connected to the database, I then added some data to TABLE2 like so:

Now I kept the best part of docker containers to the last. As I said at the beginning, when developing and testing in a shared database environment, there is often a need to have the database in a specific state to conduct a specific test. Perhaps to run in lengthy ETL flow that could take several hours or days. This can be very tricky to co-ordinate with other developers and testers. You can always launch a new container from the trunk image, but that defeats the point of being able to have the database in a state (mostly due to loosing the test data) appropriate for the test.

So before being able to commit a running container, it must be stopped. Now it appears that this is only a limitation with Windows and does not apply to Unix platforms. Hopefully this is something that would be addressed in the future. To stop the running container issue the following command:

With the container stopped, its current state can be committed to a new image with the following command:

This creates a new image by the name clt1/app1db_dev_test2.1.2-2016. As per my naming convention this referrers to a development branch of the APP1DB database used specifically for the test2. Since I could have also added any new objects such as tables, views stored procedures, I have increased the version to 1.2. The -2016 still referrers to the SQL Server 2016 version.

The new image can be seen in the repository with the 'docker image ls' command as used previously. This image now contains all the changes that was made so far in the development branch under the APP1DB_dev container. Now a new container can be launched from the newly committed image with the following command:

This launched the new container with the name APP1DB_dev_test2. I then examine its IP address in order to connect to the SQL Server instance as follows:

I can now connect to this IP Address with SQL Server Management Studio and run the required tests in isolation. The following shows the new rows that I added to the development branch container before committing its image.

Those rows can now be seen in the new container and I can now make any changes to this container APP1DB_dev_test2, without effecting the other development branch container APP1DB_dev.

SSIS unit testing with SSISUnit framework in Visual Studio

In this article, I will present a simple ETL application scenario that will be used to demonstrate the SSISUnit unit testing framework in action.

SSISUnit is an open source unit testing framework for SQL Server Integration Services, developed and maintained by Pragmatic Works. You can find more details about it at CodePlex.

There are several ways to use SSISUnit:
a) via the GUI Windows Application
b) via the Command Line Application
c) via the .Net API

Options a) and b) uses XML files to store the test definitions, whilst option c) offers the most flexibility of using the API directly or in combination with XML definition files. In addition option c) is the method that allows us to seamlessly integrate SSISUnit with the Visual Studio unit testing framework.

The Visual Studio unit testing framework offers many features for developer focused unit testing all within one single Integrated Development Environment. For more information about the Visual Studio unit testing framework see this MSDN site.

I am going show you how I have integrated SSISUnit with the Visual Studio unit testing framework via option c) along with XML test definitions.

Development environment

First of all let me give you the details of the software I use and my development environment.

a) SQL Server 2012 Developer with Service Pack 3 running on a Windows 10 desktop (on a LAN)
b) Visual Studio 2012 Ultimate with update 5 running on a Windows 7 laptop (on a LAN)
c) SQL Server Data Tools 2012 (version 11.1) for Visual Studio
d) SQL Server Data Tools with Business Intelligence (version 11..0) for Visual Studio 2012

I have only developed this test application on the above specification, so I cant guarantee it working with other specifications. However SSISUnit is also available from CodePlex for the following environments:

* Visual Studio 2010 targeting SQL Server 2008
* Visual Studio 2013 targeting SQL Server 2014

Setup SSISUnit

Downloaded the source code from CodePlex. The download is a zip file containing several solutions for the following versions of Visual Studio.

SSISUnit_SQL2008.sln :- Visual Studio 2010 targeting SQL Server 2008
SSISUnit_SQL2012.sln :- Visual Studio 2012 targeting SQL Server 2012
SSISUnit_SQL2014.snl :- Visual Studio 2013 targeting SQL Server 2014

The solution I have used for this work is SSISUnit_SQL2012.snl. The solution contains several projects including one for the GUI application, console application and API libraries. The project file containing the API is SSISUnit_SQL2012.csproj. Note that the target framework for this project is .Net Framework 4. So ensure that this framework has been installed on the development environment first. Building this solution successfully will produce the .dll files required to use the SSISUnit API. Please note the following .dll files and their locations which will be referenced by the test application later.

<extract folder>\MAIN\SSISUnit\bin\Release\SSISUnit2012.dll
<extract folder>\MAIN\SSISUnit.Base\bin\Release\SSISUnitBase.dll

<extract folder> is the folder where you extracted the downloaded zip file.

Mixed mode assemblies

The SSISUnit framework uses several COM components which were developed with older versions (.Net 2.0 and earlier) of the .Net Framework. Prior to .Net Framework 4, these components would not work as mixed mode assemblies and needed to be recompiled with same version of .Net as the hosting application. However, starting with .Net Framework 4, it is possible to leave them as they are to work in mixed mode provided a setting is added in the Visual Studio testing framework configuration file as follows.

Edit the file named:

This file is usually located in the following folder:
<local drive>\Program Files (x86)\Microsoft Visual Studio 11.0\Common7\IDE\CommonExtensions\Microsoft\TestWindow\

Add the following setting:
<startup useLegacyV2RuntimeActivationPolicy="true">  </startup>

SSISUnit XML schema definition

With SSISUnit, we can create test definitions purely in XML. Although not strictly required, having a schema definition file (.xsd) does help a great deal in improving productivity and omitting errors through its intelli-sense and validation features. To enable this in Visual Studios, you can copy the SSISUnit.xsd file located in the <extract folder>\MAIN\SSISUnit\ folder to the Visual Studios XML schema definitions folder located in <local drive>\Program Files (x86)\Microsoft Visual Studio 11.0\xml\Schemas\. Once this file is copied to this location, Visual Studios should automatically detect the schema.

Thats all the setup that is required. Now will walk-through building a simple SSIS project and then add a unit test project using the Visual Studio testing framework.

Application scenario

Before proceeding further, a brief explanation of the sample application scenario maybe worth while. So this is a simple ETL process which maybe called by a master package, however for the sake of simplicity we will work with a single package. The scenario here is the loading of contacts data from a regular text file which is tab delimited and has the following column format.

Number :- A unique number for each contact of numeric type
Name :- The full name of the contact
Telephone :- A comma separated list of telephone numbers
Address :- The address of the contact
AddressDate :- The date when the contact was registered at the above address in yyyy-mm-dd format

The file will be named contacts.txt and delivered (perhaps by a master package) to a specific folder prior to executing the load package. Once successfully loaded this file could be removed, moved or renamed. For all intents and purposes, we are only interested in testing the SSIS package for loading the contacts from the source file to a destination database and not the file delivery and moving mechanism.

The file may contain duplicate records for the same contact in which case the load process should de-duplicate them based on the latest date value in the AddressDate column. Where their are duplicates with identical values in all columns, it should take a single record with distinct values. The processed records should then be checked against the database using the contact number business key that is held in the Number column. The contact numbers that do not exist in the database are inserted as new records. Where they already exist, those records need to be updated as type 1 changes (the existing values are overwritten). The Telephone column may contain blank values and needs to be accommodated in the database.

We will limit our ETL process to these few simple business rules, even through we know in reality they can become much more complex. These simple rules will allow us to more room to focus on understanding the SSISUnit Framework and how to integrate it with the Visual Studio unit testing framework.

SQL database

Now its time to create the SQL Server database and the tables within it. For this purpose, you can either create a new database or use an existing database.

Run the following set of scripts using SQL Server Management Studio or Visual Studio SQL Server Object Explorer.

Once the database and the set of tables are created, we can proceed to develop the SSIS package.

Visual Studio solution

In Visual Studio create a new blank solution. I always prefer to start with a blank solution and add the required projects. This is only my preference but is not a strict requirement. I have named my solution SampleSSISUnit.

To the solution add a new SSIS project and name the project SSISPackages or any other appropriate name. Rename the default package to LoadContacts.dtsx.

To the same solution add a new project, expand Visual C# folder and select Test. Then from the listed templates select Unit Test Project. Name the project SSISUnitTests. This will add a new Visual Studio unit test project.

To the SSISUnitTests project add a new C# class file named UnitTest_LoadContacts.cs. This is the file where we will write the code to drive the unit tests. We will also need to add a new .xml file to this project where we define the unit tests. Name this file as UnitTest_LoadContacts.ssisUnit.

At this point, in order to use the SSISUnit API, we need to reference the .dll files we built earlier. Inside the SSISUnitTest project right click on the References and select Add References option. Then browse to the location where the .dll files were built and add the 2 following files:

<extract folder>\MAIN\SSISUnit\bin\Release\SSISUnit2012.dll
<extract folder>\MAIN\SSISUnit.Base\bin\Release\SSISUnitBase.dll

In addition to the above .dll files, we also need to add a reference to the Microsoft.SQLServer.ManagedDTS library version (for SQL Server 2012). This library can be found in the Assemblies, Extensions. This is the library that allows the SSISunit Framework to execute SSIS packages from within the SSISUnitTest project. 

The following is is a snap shot of the solution explorer with the required projects and files added.

SSIS package

So now we can build this minimal SSIS package, to first stage and then load the contacts.txt file into the SQL database.

Open the LoadContacts.dtsx package in Visual Studio and add the 2 following package level variables:

pkDBConnectionString :- This string type variable will hold the OLEDB connection string to the detination SQL database.
pkSourceFilePath :- This string type variable will hold the file system path to the location where the contacts.txt file will be delivered.

Although not strictly required, holding this information in user variable, allow a grater degree of flexibility, enabling us when required to inject different sets of source data from different locations and to change the destination database to different servers via the test definition .xml file without having to change any code as you will see later on.

Now we need to add the following 2 connection managers:

cnFlatFile_Contacts :- This is a Flat File connection to the source data file. It uses an expression that concatenates the pkSourceFilePath variable with the string "contacts.txt" to form the fully qualified file name as follows. @[User::pkSourceFilePath] + "contacts.txt"

cnOLEDB_ContactsDB :- This is a OLEDB connection to the destination SQL database. It uses the pkDBConnectionString variable through an expression to construct the connection string.

We first clear the staging.Contacts table prior to staging the contacts source data. Add a Execute SQL task into the control flow. Set the connection of this task to the cnOLEDB_ContactsDB connection manager and code the following SQL directly into it.

Then add a Data Flow task consisting of a Flat File source component and an OLEDB destination component. Set the Flat File source component to cnFlatFile_Contacts connection manager and the OLEDB destination component to cnOLEDB_ContactsDB connection manager and select staging.Contact table from the drop down. Make sure the source to destination field level mappings are correct.

Now add another Data Flow task to load the data from staging to the final table. Construct this data flow task as follows:

We start with a OLEDB Source component that is set to the cnOLEDB_ContactsDB connection manager. Code the following SQL command into this component:

The above code fetches the staged data from the staging.Contacts table with an additional field named rno that indicates a row number based on the most recent AddressDate column for each contact number represented by the Number column. This field is used later on down the pipeline for the purposes of de-duplication.

Next we add a Conditional Split component which uses a expression rno == 1 to check for de-duplicated records. All records with rno > 1 are considered duplicates and are ignored from the pipeline.

Add a Lookup component which will be used to check for the existence of contacts in the destination database. Set the component to use the cnOLEDB_ContactsDB connection manager and code the following SQL command to fetch the existing contact records from the database into the lookup.

Map the stg_Number from the source data to the id_Number field in the database table and select all other fields to be returned to the pipeline when a record is found.

Add another Conditional Split component and connect the 'Match' branch from the Lookup component to this split component. Here we will do a further check using an expression to see if any of the fields have changed between the staged source and destination records. Insert the following expression into the split condition:

Bellow the second split component add a OLEDB Command component and connect the split component to it. Set the cnOLEDB_ContactsDB connection manager to it and add the following SQL command.

The above command will update the matched record where a change has taken place. As can be seen the above command takes in the values for each of the fields as parameters. These parameters gets default names as param_0, param_1, param_2, param_3, param_4 in the order they appear in the command. Map each of these parameters to the staged source data fields as follows:

Please note that the above technique is not the most efficient way of updating records in a production environment, especially if the data set is large. However for the sake of simplicity we will use this method for this sample application.

For the 'No Match' branch of the lookup component, we will add a OLEDB Destination component and connect that branch to it. All new contact records with no matching numbers will travel via this branch to get inserted into the destination table. Set the cnOLEDB_ContactsDB connection manager to the destination component and select the destination table dim_Contacts from the drop down. Map all the fields from the sourced staging data set to the destination table as follows. Ignoring the key_Contact field since this is an incremental auto-identity field.

Unit test project

Now that the SSIS package has been built, we will see how to develop some basic unit tests that will cover the most important business rules of this ETL process. 

As mentioned earlier we will be using the SSISUnit .Net API with C# via the Visual Studio's own built-in unit testing framework. The main benefit you get with this method over the others, is that, you can now perform the unit tests as part of the solution build process. This can also be easily incorporated into a continuous integration process that you may be carrying out on regular basis. All the code relating to the application and tests are all located together within the same solution and saves you the trouble of maintaining any additional testing scripts in other locations.

SSISUnit offers the ability perform unit tests at individual task level or entire package level. At which level you conduct your tests depend entirely on the complexity of the application. For our sample project, we will be conducting one test at the task level and others at the package level, so you get experience both approaches.

All tests will follow the traditional xUnit structure of Setup, Test and Tear-down.

Test 1 :- Staging contacts data

In this test we will test the individual data flow task within the package that loads the data from the source file to the staging.Contacts table. For this we will use the following data set in a tab delimited text file. We will save this file as contacts_batch1.txt inside a sub folder named SourceFiles within the unit test project folder. As we shall see later, the test setup will take a copy of this file into the contacts.txt file which is used by the package.

1James787,387,423,568,938,000,000,000,000,000,00024 Harper Ave, Middlesex, MH2 4BU22/04/2003
1James787,387,423,568,938,000,000,000,000,000,000154 Arvin Drive, Middlesex, MH5 7GD14/12/2005
2Charls7,656,928,671,776,520,000,0001A Park Street, Bedford, BD7 9OP16/01/1987
3Carol30 Church Road, Brighton, Essex, BR8 7UT02/09/2000
4Sarah9,897,876,521,276,760,000,00038 Kings Lane, Wimbledon, London, SW7 5TR07/12/2006
4Sarah9,897,876,521,276,760,000,00038 Kings Lane, Wimbledon, London, SW7 5TR07/12/2006

Now we need to create a test definition for this in xml. To do this open the UnitTest_LoadContacts.ssisUnit file that we added to the unit test project earlier and insert the following xml code.

If you take a closer look at the above xml, you will see some basic elements that are organized as follows.

<!-- A Test Suite is the main container for the entire test definition. A Test Suite contain one or more
       individual unit tests -->

    <!-- A Connection List contain one or more connection string to specific databases. These
           connections are then referenced from elsewhere in the definition to issue SQL commands to
           those databases --></ConnectionList>

    <!-- A Package List contain one or more fully qualified package paths pointing to individual SSIS
           packages. These are then referenced from elsewhere in the definition --> </PackageList>

    <!-- Any setup that is required and common to all tests within the test suite are defined within this
           element. Setup can contain multiple commands, which can perform different types of
           operations. The supported command types are SQLCommand, ProcessCommand and
           VariableCommand. Note that this setup only runs once for the entire test suite-->

    <!-- Similar to the previous element, this element also may contain multiple commands. However
            unlike the previous setup this setup runs once for each individual test defined within the test
            suite. Hence these commands are intended to perform any common operations that are
            needed prior to running individual tests. --></Setup>

    <!-- This element is acts as the container that holds one or more individual tests to be defines for
           the test suite. -->

        <!-- This is the container element for an individual test. Each test will follow the Setup, Test,
               Tear-down structure. -->

            <!-- Within this element, any setup needed for this specific setup will be defined. Similar to
                    the other setup's, the command types used here can be SQLCommand,
                    ProcessCommand or VariableCommand. --></TestSetup>

            <!-- This is the container element for the assertion definition of this individual test. Here the
                    expected results and the type of this assertion is established. Inside this element there can
                    be one more more commands defined that checks for the actual results of the test. These
                    command are of type SQLCommand, ProcessCommand and VariableCommand. -->

            <!-- Within this element there can be multiple commands that perform any operations needed
                    to clean up or reinstate data after the test has completed --></TestTearDown>


    <!-- This element may contain one or more commands that perform any common operations
            needed to clean up or reinstate data after each individual test within the test suite has
            completed. --></TearDown>

    <!-- This element may contain one or more command that perform any operations needed to clean
            up or reinstate data after the entire test suite has completed. Unlike the previous tear down
            element, the command contained within this element will only run once for the entire test
            suite. --></TestSuiteTearDown>


Given the above information, lets take some time to examine the xml code for the test definition. We start by looking at the <Test> element which declares the unit test. We give its name attribute to be "TestBatch1Staging". The package attribute carries a reference to the SSIS package listed in the <PackageList> element at the top of the definition file. The task attribute specifies which specific task within the to execute to conduct the test. In this case we are testing the staging Data Flow named as cfDFT_StageContacts (this is a naming convention that I follow). In case we were testing the entire package, then the task attribute will carry the name of the package itself. The taskResult attribute indicates the expected result of the task or package and not the expected result of the unit test itself.

Next we look at the <TestSetup> element. This is where the commands that perform the test setup is specified. For this test, we have used three types of commands.

First we have a VariableCommand, which can perform Set and Get operations on SSIS package variables. This is a very useful feature which allows you to inject different values into the package for different test conditions or environments. Here we use it to set the file path to the contacts.txt source data file in the pkSourceFilePath variable. We also set the connection string for the destination database in the pkDBConnectionString variable.

Then we use a FileCommand, which can perform basic system file operations. We use this here to copy our Batch1 data set stored in the file contacts_batch1.txt to the contacts.txt file. This method gives us the flexibility to inject different data sets for different tests

Another SQLCommand is used to clear the staging table before running the test. This is performed by issuing the TRUNCATE TABLE staging.Contacts command on the database referenced by the connectionRef attribute.

Please note that all the folder paths, database names and connection strings specified in the code are referring to my own development environment. So you will need to replace them with the appropriate values for your environment.

Now look at the <Assert> element. Here we define its name as "AssertStagingContacts". The expectedResult attribute is "6" which is the number of records in the contacts_batch1.txt file and what we also expect to find in the staging table after the staging data flow has run. testBefore attribute is to define whether the assertion should run before the task has run. In this case we want it to run after the task has run, so we set it to false. The expression attribute is a boolean to indicate whether the assertion is an expression which in this case is not, so we set it to false.

Now we set the assertion command. You can use a number of different types of commands. For this test we use a SQLCommand named as "CmdCountStagingContacts" its connectionRef is set to "ContactsDB" which refers to the connection listed under the <ConnectionList> element at the very top of the test suite definition. The returnsValue attribute is set to "true" to indicates that the SQLCommand issued here is a simple SELECT COUNT(*) FROM staging.Contacts which returns a scalar value that will be compared to the value set in the expectedResult attribute to determine the passing or failure of the test. So if the SQL command returns a record count of 6, the test will pass, otherwise it will fail.

That concludes the definition of the test suite and the individual test within it.

Next, we need write the unit test code in C# using the SSISUnit .Net API to execute the test. This is done in the UnitTests_LoadContacts.cs file we added earlier to the project. So lets examine the code to execute the above test.

As can be seen above, the SSISUnit .Net API has been referenced by using the SsisUnit and SsisUnitBase namespaces. In addition a reference to System.xml namespace is also added to allow SSISUnit to serialize and de-serialize xml.

When you create a Unit Test Project in Visual Studios, it adds a class with the attribute [TestClass] with a public Initialize method with the attribute [TestInitialize]. The Initialize method is intended to be used for the purposes of setting configurations and allocating resources needed by all the tests in the class. In SSISUnit this initialization work will be carried out by the API based on the definitions specified within the Setup elements and so we would not need to directly code anything in the Initialize method.

Within the test class, we will create the following private member variables which will be used by all the tests.

private SsisTestSuite testSuite; /* This will hold a reference to the test suite object used by all tests  */
private TestResult testResult; /* This will hold a reference to the test result object used by all tests */
private Test test; /* This will hold a reference to an individual test object */
private Context context; /* This holds a reference to a context object that is used by the test */
private bool isTestPassed;  /* This is a boolean that indicates if the test has passed */

SSISUnit delegates several events that gets fired at various stages in the test execution cycle. The one that we are interested in for this project is the AssertCompleted event. This gets fired when SSISUnit has completed an Assert operation of a test. We will subscribe the following public method as the event handler to this event to handle the assertions for all the tests.

public void TestSuiteAssertCompleted(object sender, AssertCompletedEventArgs e)
            if (e.AssertName != null)
                testResult = e.TestExecResult;
                isTestPassed = e.TestExecResult.TestPassed;

The e parameter is an object that contains all the information pertaining to the assertion. In the most simplistic use of this method we simply check the e.TestExecResult.TestPassed property indicates whether the test passed or not and assign its boolean value to the isTestPassed member variable.

We now need to add a method to conduct the test. This should be a public method with the attribute [TestMethod] added at the top. Its best to give this method the same name that was used for its definition in the xml file which in this case is TestBatch1Staging as shown bellow.

public void TestBatch1Staging()
    testSuite = new SsisTestSuite(@"<path to the xml file>\UnitTest_LoadContacts.ssisUnit");
    test = testSuite.Tests["TestBatch1Staging"];
    testSuite.AssertCompleted += TestSuiteAssertCompleted;
    isTestPassed = false;
    context = testSuite.CreateContext();
    bool rs = test.Execute(context);
    Assert.AreEqual<bool>(true, isTestPassed);
    testSuite.AssertCompleted -= TestSuiteAssertCompleted;

The first line creates a new SsisTestSuite object by passing in the test definition xml file as a parameter. The resulting object will contain all the information about the test suite.

Next we access the specific test from the Tests collection property by specifying the test name as an index.

Then we subscribe our TestSuiteAssertCompleted method to the AssertCompleted event. The isTestPassed member variable is set to false before running the test. This is so that the default result of the test will always be false until the test result is assigned to it.

A new SSISUnit test context object is created which contains any references to system file paths and database connections.

Then we execute the test by calling test.Execute method passing to it the context as a parameter. This is the method that actually runs the SSIS package or task that was specified in the test definition. The method returns a boolean that indicates whether the package or task ran successfully. Note that this is not an indication about the pass or failure of the unit test itself. That happens next with the assertion.

Now we do the assertion with the Assert.AreEqual method which is a method of the Visual Studio unit test framework. This method takes in 2 boolean values as parameters, the first being the expected value and the second being the actual value which in this case is the value held in the isTestPassed member variable. If the two values match then the test is considered passed otherwise its a fail.

Finally we unsubscribe our assertion event handler so that another test can subscribe to it.

Test 2 :- Updating contacts with de-duplication

In this unit test we want to test the entire package execution to see if it correctly updates existing data by first de-duplicating any duplicate contact records that may exist in the source data file. In order to achieve this we will use a second batch of data in the file contacts_batch2.txt as given bellow:

5Paul676,256,354,316,735,000,000,000,000,000,000201A Jasper Street, Croydon, London, CR4 9KT13/06/2003
2Charls765,692,867,177,652,000,000,000,000,000,00057 Tinpan Alley, Bedford, BD4 3GH10/10/2000
2Charls765,692,867,177,652,000,000,000,000,000,00078 Station Road, Bedford, BD3 6TU15/04/2001

As you can see in the above data set contact number 2 has duplicate records. As stated earlier in the Application Scenario, when inserting new records and updating old records, the ETL process should only pick the record with the most recent date which in this case should be the third record with the AddressDate of 15/04/2001. We will define this unit test to assert both the updating of an existing record and the correct de-duplication by issuing a SQL statement that checks for this specific record with these specific values.

Now lets define this test by coding the following xml into the UnitTest_LoacContacts.ssisUnit file.

Note that the above code should be inserted between the <Tests> and </Tests> elements. The test has been given a name as "TestBatch2OldDedupContact". This is the naming convention I follow, however you can choose any meaning name and also include spaces to make it more readable. You will notice that unlike the previous test Test 1, here the package and task attributes carries the same value which is the name of the package. This is because for this test the unit we are testing is the whole package. The <TestSetup> element is similar to Test 1 except that now the sourcePath attribute of the FileCommand elements refers to the contacts_batch2.txt file which will be copied overwriting the existing contacts.txt file if it already exist. Also notice that the final SQLCommand which truncates the staging table has been removed. The reason behind this would become more apparent when we examine the .Net C# code that will drives this test.

Look at the the <Assert> element. We have set its name attribute to be "AssertOldDedupContact". Again you may chose a more meaningful name if you wish, as this naming convention is just the one I have been using. The expectedResult is set to "1". The reason for this expected value will become apparent when we see the assertion SQL command. The other attributes for this element remain the same as the previous test.

For the assertion command we again use a SQLCommand with a the name attribute set as "CmdGetContactValues", and the rest of the attributes set exactly the same as the previous test. The SQL command that is issued to assert the test is as follows:

            CASE WHEN
                  Name = 'Charls' AND
             Telephone = '76569286717,76520989012,87293752635' AND
             Address = '78 Station Road, Bedford, BD3 6TU' AND
             AddressDate = '2001-04-15'
           THEN 1 ELSE 0 END
          FROM dim_Contacts
          WHERE id_Number=2

This command simply filters contact number 2 and checks for specific field values that we know are the correct values for the test to pass. If they all match then the CASE statement will return a 1, otherwise a 0. When the return value is compared to the value set in the expectedResult attribute and if they are both 1, then the test will pass, otherwise it will fail.

Now lets look at the .Net code in C# in the unit test project and see how this test is conducted via the SSISUnit .Net API. To do this we add a new method to the UnitTest_LoadContacts.cs class file in Visual Studio as follows:

As in the previous test we give the method same as given to the unit test definition. As the code for this test is very similar to the previous test, you can refer to the explanation given earlier for an understanding of the API. Where the code for this test differs from the previous one is that here we are executing two tests. The first one "TestBatch1NewContacts" is executed first, but no assertion is done. This is a separate test definition created to test for new records from the contacts_batch1.txt file being correctly inserted. Here we are hijacking that test in order to initialize our destination dim_Contacts table with data that we can then update via the second test and perform the required test assertion.

So as you can see in the above code only one Assert.AreEqual method is called to assert the second test.

Other tests

Apart from the two tests mentioned above, I have created a few more tests that tests several application scenarios all of which are too much to document in this article. However you can download the entire sample with all the tests from my GitHub repository.

Running the test in Visual Studio Test Explorer

Before being able to run the tests, the unit test project needs to be built. When built all the methods with the attribute [TestMethod] attached to them will appear in the Test Explorer. You will then have a number of options on how to run them. You can also set all the tests to be run automatically when ever the solution is built which maybe useful for Continuous Integration builds. For more information about how to use the Visual Studio Test Explorer please refer to this MSDN link.

To view the Test Explorer, in Visual Studio simple select the Test menu, then Windows, the Test Explorer. You should see a window similar to the one bellow:

To run an individual test simply right click on a test and select 'Run Selected Test'. The test will execute and if passed will be indicated with a greed tick. If failed it will be indicated with a red exclamation mark.