Data Quality Monitoring Tool for Big Data implemented using Spark
- Validate data using provided business rules
- Log result
- Send alerts
Include dependency:
"com.github.piotr-kalanski" % "data-quality-monitoring_2.11" % "0.3.2"
or
<dependency>
<groupId>com.github.piotr-kalanski</groupId>
<artifactId>data-quality-monitoring_2.11</artifactId>
<version>0.3.2</version>
</dependency>
Data quality monitoring process consists from below steps:
- Load configuration with business rules
- Run data validation
- Log validation results
- Send alerts
Configuration can be loaded from:
- file
- directory
- RDBMS
Additionally there are plans to support:
- Dynamo DB
tablesConfiguration = [
{
location = {type = Hive, table = clients}, // location of first table that should be validated
rules = { // validation rules
rowRules = [ // validation rules working on single row level
{
field = client_id, // name of field that should be validated
rules = [
{type = NotNull}, // this field shouldn't be null
{type = min, value = 0} // minimum value for this field is 0
]
},
{
field = client_name,
rules = [
{type = NotNull} // this field shouldn't be null
]
}
]
}
},
{
location = {type = Hive, table = companies}, // location of first table that should be validated
rules = {
rowRules = [
{
field = company_id, // name of field that should be validated
rules = [
{type = NotNull}, // this field shouldn't be null
{type = max, value = 100} // maximum value for this field is 100
]
},
{
field = company_name, // name of field that should be validated
rules = [
{type = NotNull} // this field shouldn't be null
]
}
]
}
}
]
Use class: FileSingleTableConfigurationLoader
or FileMultipleTablesConfigurationLoader
.
Example:
import com.datawizards.dqm.configuration.loader.FileMultipleTablesConfigurationLoader
val configurationLoader = new FileMultipleTablesConfigurationLoader("configuration.conf")
configurationLoader.loadConfiguration()
Use class: DirectoryConfigurationLoader
.
One file should contain configuration for one table (TableConfiguration).
Use class: DatabaseConfigurationLoader
.
One table row should contain configuration for one table (TableConfiguration).
Currently supported categories of data validation rules:
- field rules - validating value of single field e.g.: not null, min value, max value
- group rules - validating result of group by expression e.g.: expected groups (countries, types)
- table trend rules - validating table trend rules e.g.: comparing current day row count vs previous day row count
Field rules should be defined in section rules.rowRules
:
tablesConfiguration = [
{
location = [...],
rules = {
rowRules = [
{
field = Field name,
rules = [...]
}
]
}
}
]
Supported field validation rules:
-
not null
{type = NotNull}
-
dictionary
{type = dict, values=[1,2,3]}
-
regex
{type = regex, value = """\s.*"""}
-
min value
{type = min, value = 0}
-
max value
{type = max, value = 100}
Group rules should be defined in section groups.rules
:
tablesConfiguration = [
{
location = [...],
rules = [...],
groups = [
{
name = Group name,
field = Group by field name,
rules = [
{
type = NotEmptyGroups,
expectedGroups = [c1,c2,c3,c4]
}
]
}
]
}
]
Supported group validation rules:
-
not empty groups
{type = NotEmptyGroups, expectedGroups = [c1,c2,c3,c4]}
Table trend rules should be defined in section rules.tableTrendRules
:
tablesConfiguration = [
{
location = [...],
rules = {
rowRules = [...],
tableTrendRules = [
{type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}
]
}
}
]
Supported table trends validation rules:
-
current vs previous day row count
{type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}
Validation results can be logged into:
-
Elasticsearch using class
ElasticsearchValidationResultLogger
val logger = new ElasticsearchValidationResultLogger( esUrl = "http://localhost:9200", // Elasticsearch URL invalidRecordsIndexName = "invalid_records", // Index name where to store invalid records tableStatisticsIndexName = "table_statistics", // Index name where to store table statistics columnStatisticsIndexName = "column_statistics", // Index name where to store column statistics groupsStatisticsIndexName = "group_statistics", // Index name where to store group statistics invalidGroupsIndexName = "invalid_groups" // Index name where to store group statistics )
-
RDBMS using class
DatabaseValidationResultLogger
val logger = new DatabaseValidationResultLogger( driverClassName = "org.h2.Driver", // JDBC driver class name dbUrl = connectionString, // DB connection string connectionProperties = new Properties(), // JDBC connection properties, especially user and password invalidRecordsTableName = "INVALID_RECORDS", // name of table where to insert invalid records tableStatisticsTableName = "TABLE_STATISTICS", // name of table where to insert table statistics records columnStatisticsTableName = "COLUMN_STATISTICS", // name of table where to insert column statistics records groupsStatisticsTableName = "GROUP_STATISTICS", // name of table where to insert group by statistics records invalidGroupsTableName = "INVALID_GROUPS" // name of table where to insert invalid groups )
Alerts can be send to:
- Slack using class
SlackAlertSender
Additionally there are plans to support:
import com.datawizards.dqm.configuration.loader.FileConfigurationLoader
import com.datawizards.dqm.logger.ElasticsearchValidationResultLogger
import com.datawizards.dqm.alert.SlackAlertSender
import com.datawizards.dqm.DataQualityMonitor
val configurationLoader = new FileConfigurationLoader("configuration.conf")
val esUrl = "http://localhost:9200"
val invalidRecordsIndexName = "invalid_records"
val tableStatisticsIndexName = "table_statistics"
val columnStatisticsIndexName = "column_statistics"
val groupsStatisticsIndexName = "group_statistics"
val invalidGroupsIndexName = "invalid_groups"
private val logger = new ElasticsearchValidationResultLogger(esUrl, invalidRecordsIndexName, tableStatisticsIndexName, columnStatisticsIndexName, groupsStatisticsIndexName, invalidGroupsIndexName)
val alertSender = new SlackAlertSender("webhook url", "Slack channel", "Slack user name")
val processingDate = new java.util.Date()
DataQualityMonitor.run(processingDate, configurationLoader, logger, alertSender)
configuration.conf:
tablesConfiguration = [
{
location = {type = Hive, table = clients},
rules = {
rowRules = [
{
field = client_id,
rules = [
{type = NotNull}
]
}
]
}
}
]