Data Validation on Hadoop using Pig


I have been involved in the development and implementation of a fairly large data warehousing application project for several years now. One of the key aspects of this application has been data validation since the source data originates at a national level from a large number of organisations who do not conform to a single set of data standards. Until now the source data was first brought in to a staging area within a relational database to be profiled and validated. This has turned out to be a very messy and cumbersome process of re-formatting the source data, the majority of which arrive on a regular basis as semi-structured text files, to fit into a relational database. The results of the validation interfaces to a feedback loop where any issues with the data that cannot be locally cleansed is notified to the data provider. The turn-around time from receiving the data to notifying the provider of any issues has been one of the major bottlenecks in delivering timely information to the end-users.

As a pilot project, I have been re-developing some of the data validation processors on a small Hadoop cluster using scaled down versions of the test data sets. So I decided to document how I converted one of those procedures from SQL to Apache Pig on Hadoop.

The platform I have used for this pilot is Apache Hadoop 2.6.0, Flume 1.5.2, Pig 0.14, running on a 4 node CentOS cluster.

The dataset

This dataset consists of patient diagnosis records from doctors clinics within a region. The frequency of data supply varies from provider to provider ranging from the more common once a month to a couple a week. Some supply all the data extracted for the period in one file and others prefer to split them into several files each file containing data for a smaller period.

The structure of the source data file is as follows: (the data is text qualified by quotes)

"HEADER","20110118","20101218","20110118","T00002"
"0000000000000000075412","20110107","2699","D","20110107","2315.","","","","","","C"
"0000000000000000075412","20110107","2699","D","20110107","246..","162","74","","","","B"
<.... more records ....>
The first row is a header with the following fields:
Field 1:- Header tag
Field 2:- End date of data range
Field 3:- Start date of data range
Field 4:- Extracted date
Field 5:- Data file provider code (which will usually be the clinic)

The rest of the rows contain the diagnosis data:
Field 1:- Person identifier
Field 2:- Activity date (also called the diagnosis date)
Fields 3 to 12:- Other details of the diagnosis (there could be several diagnosis on the same day)

Validation process 

Between two data files extracted for consecutive periods, there is very often an overlap from a few days to a couple of weeks. This overlapping period is typically due to the need to re-extract any alterations that may have taken place to the most recent data entries. An a rule, no data should ever be removed but they should be flagged as deleted. This validation process is to ensure that within the overlapping, the number of records remain the same. Otherwise the latest data file is deemed to be truncated and should fail the validation. Truncated data files will not be passed to the subsequent data loading process.

Data files received are loaded to the Hadoop HDFS cluster using Flume. Once in HDFS, a shell script iterates through them and calls the validation Pig script for each file. The script can also be called from the command line like so:

pig -param datafile=data/T00002_diags_2.csv -param lastheader=validations/T00002_lastheader -param lastdatacounts=validations/T00002_lastdatacounts -param validationresultfile=validations/T00002_datacounts_validation
The following parameters are passed:

$datafile:- The fully qualified name of the data file
$lastheader:- The fully qualified name of the header file extracted from the previous valid data file from the same provider. This file is created as part of the validation and loading process
$lastdatacounts:- The fully qualified name of the daily counts file aggregated from the previous valid data file from the same provider. This file is created as part of the validation and loading process
$validationresultfile:- The fully qualified name of the resulting validation file

The Pig script that performs this validation is listed bellow in sections with a general description about what the code is doing bellow each section:

-- Load the source data
source = LOAD '$datafile' USING PigStorage(',');

-- Split the header record and data records
SPLIT source INTO header IF $0 == '"HEADER"', data OTHERWISE;
First the data file is loaded. Then the header row and data rows are separated into two relation variables using the SPLIT command.

-- Format by removing text qualifiers and set required fields
currentheader = FOREACH header GENERATE 
 REPLACE($4,'\\"','') AS (Provider:chararray), 
 ToDate(REPLACE($2,'\\"','')) AS (StartDate:datetime), 
 ToDate(REPLACE($1,'\\"','')) AS (EndDate:datetime);

currentdata = FOREACH data GENERATE 
 REPLACE(header.$4,'\\"','') AS (Provider:chararray), 
 ToDate(REPLACE($1,'\\"','')) AS (ActivityDate:datetime);
Since the source data arrive as double quotes qualified text files, all the double quotes needs to be removed. This is done using the REPLACE function.

-- Group activity by date and count number of activities for each day
datagrouped = GROUP currentdata BY (ActivityDate);
currentdatacounts = FOREACH datagrouped {
    Provider = DISTINCT currentdata.Surgery; -- set joining field to the header
    GENERATE 
  FLATTEN(Provider) AS Surgery, 
  group AS ActivityDate, 
  COUNT(currentdata) AS (ActivityCount:long);
 };
Using the GROUP command, the data is grouped by activity date. The grouping is then used in a FOREACH command to perform an COUNT aggregation that results in the count of activity records by day which is stored into a new relation variable called currentdatacounts.

Now that the daily counts of the current data file is established, its time to retrieve the counts of the previous data file from the same provider. One important point to note here is that if this is the very first data file and no previous file exists, then there would be no need to perform this validation. The controlling shell script does a check for this prior to calling this Pig script.

-- Load the last event header and validation counts
lastheader = LOAD '$lastheader' AS (Provider:chararray,StartDate:datetime,EndDate:datetime);
lastdatacounts = LOAD '$lastdatacounts' AS (Provider:chararray,ActivityDate:datetime,ActivityCount:long);

-- Join the current header with the last header
combinedheaders = JOIN currentheader BY Provider, lastheader BY Provider;

-- Join the combined headers to the current data counts
combinedheaders_currentdatacounts_joined = JOIN combinedheaders BY currentheader::Provider, currentdatacounts BY Provider;

-- Join the combined headers to the last data counts
combinedheaders_lastdatacounts_joined = JOIN combinedheaders BY lastheader::Provider, lastdatacounts BY Provider;

-- Filter out the current data in the overlapping period between current and last
overlaped_currentdatacounts = FILTER combinedheaders_currentdatacounts_joined BY currentdatacounts::ActivityDate >= currentheader::StartDate 
 AND currentdatacounts::ActivityDate <= lastheader::EndDate;

-- Filter out the last data in the overlapping period between current and last
overlaped_lastdatacounts = FILTER combinedheaders_lastdatacounts_joined BY lastdatacounts::ActivityDate >= currentheader::StartDate 
 AND lastdatacounts::ActivityDate <= lastheader::EndDate;
The control shell script also works out the $lastheader and $lastdatacounts parameters to be passed to the Pig script based on the provider code which forms the first six characters of the data file name. Those two files gets created as part of the loading process which occurs subsequent to this validation and is stored in a validations folder with the HDFS cluster. So at this stage the file names provided by these parameters are loaded into two relation variables named lastheader and lastdatacounts respectively.

Then the header row of the current data file held in variable currentheader is joined to the header row of the previous data file now held in variable lastheader. The JOIN command is performed based on the Provider field. The joined up headers is stored in a new relation variable called combinedheaders. The combined header relation gives access to the start date of the current data file and the end date of the previous data file which in effect forms the date range of the overlapping period.

Now that the overlapping period is established with the combined headers, this information is used to filter in the daily counts from both the currentdatacounts and lastdatacounts relations using the FILTER command to be stored in two new relation variables named overlaped_currentdatacounts and overlaped_lastdatacounts.

-- Group the current overlapping data by header and sum up the data counts
overlaped_currentdatacounts_grouped = GROUP overlaped_currentdatacounts BY (currentheader::Provider,currentheader::StartDate,currentheader::EndDate);
totaloverlaped_currentdatacounts = FOREACH overlaped_currentdatacounts_grouped GENERATE 
 group, 
 SUM(overlaped_currentdatacounts.currentdatacounts::ActivityCount) AS (TotalOverlapActivityCounts:long);

-- Group the last overlapping data by header and sum up the data counts
overlaped_lastdatacounts_grouped = GROUP overlaped_lastdatacounts BY (lastheader::Provider,lastheader::StartDate,lastheader::EndDate);
totaloverlaped_lastdatacounts = FOREACH overlaped_lastdatacounts_grouped GENERATE 
 group, 
 SUM(overlaped_lastdatacounts.lastdatacounts::ActivityCount) AS (TotalLastOverlapActivityCounts:long);

-- Join the results together and create validation record
totaloverlapped_joined = JOIN totaloverlaped_currentdatacounts BY group.combinedheaders::currentheader::Provider, 
 totaloverlaped_lastdatacounts BY group.combinedheaders::lastheader::Provider;

validationresult = FOREACH totaloverlapped_joined GENERATE
 'OverlappedActivityCount' AS (Validation:chararray),
    totaloverlaped_currentdatacounts::group.combinedheaders::currentheader::Provider,
    totaloverlaped_currentdatacounts::group.combinedheaders::currentheader::StartDate,
    totaloverlaped_currentdatacounts::group.combinedheaders::currentheader::EndDate,
    (totaloverlaped_currentdatacounts::TotalOverlapActivityCounts == totaloverlaped_lastdatacounts::TotalLastOverlapActivityCounts ? true : false)
  AS(ValidationResult:boolean);
Next, the filtered daily counts for the overlapping period from both the current data file and previous data file gathered above is grouped by the respective header information. The grouping is then used to aggregate a summation of activity record counts for the overlapping periods using the SUM aggregation function. This information is stored in two new relations named totaloverlaped_currentdatacounts and totaloverlaped_lastdatacounts for the current data file and previous data file respectively.

These two relations are in turn joined together by provider field which facilitates the generation of the final validation result relation. The final validation result is a single record for each data file with the following field structure stored in new relation variable named validationresult:
Validation:- A string constant identifying the specific validation. In this case its "OverlappedActivityCount"
Provider:- Data file provider code
StartDate:- Start date of data range (as specified in the header of the current data file)
EndDate:- End date of data range (as specified in the header of the current data file)
ValidationResult:- A boolean field indicating the result of the validation as true or false, i.e. If the count of activity records between the overlapping periods in both the current and the previous data files match, it will be a "TRUE", otherwise it will be a "FALSE".

-- Store the final result of the validation
STORE validationresult INTO '$validationresultfile';
Finally the validationresult relation is stored in the HDFS cluster under the file name specified by the parameter $validationresultfile.

After the data files are validated, the resulting validation files will be used by the subsequent process to move the valid data files to a loading area and the invalid files will be moved to an invalid area to be put in to the feed-back loop and notified the data provider.

Choosing Pig for data validation on a Hadoop platform

The following is an extract from the book Pig Design Patterns (Pradeep Pasupuleti (2014) Pig Design Patterns, : Packt Publishing.)

Implementing the validation and cleansing code in Pig within the Hadoop environment, reduces the time-quality trade-off and the requirement to move data to external systems to perform cleansing. The high-level overview of implementation is depicted in the following diagram:



























The following are the advantages of performing data cleansing within the Hadoop environment using Pig:
  • Improved overall performance since validation and cleansing are done in the same environment. There is no need to transfer data to external systems for cleansing.
  • Pig is highly suitable to write code for validating and cleansing scripts since the built-in functions are geared towards processing messy data and for exploratory analysis.
  • Pig enables automating of the cleansing process by chaining complex workflows, which is very handy for datasets that are periodically updated.

0 comments:

Post a Comment