Adding Incremental Data

Building a continuosly updated identity graph with new, updated and deleted records

Rerunning matching on entire datasets is wasteful, and we lose the lineage of matched records against a persistent identifier. Using the incremental flow feature in Zingg Enterprise, incremental loads can be run to match existing pre-resolved entities. The new and updated records are matched to existing clusters, and new persistent ZINGG_IDs are generated for records that do not find a match. If a record gets updated and Zingg Enterprise discovers that it is a more suitable match with another cluster, it will be reassigned. Cluster assignment, merge, and unmerge happens automatically in the flow. Zingg Enterprise also takes care of human feedback on previously matched data to ensure that it does not override the approved records.

The incremental phase is run as follows:

./scripts/zingg.sh --phase runIncremental --conf <location to incrementalConf.json>

Example incrementalConf.json:

{      
    "config" : "config.json",  
    "incrementalData": [{  
            "name":"customers_incr",   
            "format":"csv",   
            "props": {  
                "location": "test-incr.csv",  
                "delimiter": ",",  
                "header":false  
            },  
            "schema": "recId string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn  string"   
        }  
    ]   
}  

runIncremental can also be triggered using Python by invoking:

./scripts/zingg.sh --run examples/FebrlExample.py

Python Code Example:

#import the packages  
  
from zingg.client import *  
from zingg.pipes import *  
from zinggEC.enterprise.common.ApproverArguments import *  
from zinggEC.enterprise.common.IncrementalArguments import *  
from zinggEC.enterprise.common.epipes import *  
from zinggEC.enterprise.common.EArguments import *  
from zinggEC.enterprise.common.EFieldDefinition import EFieldDefinition  
from zinggES.enterprise.spark.ESparkClient import EZingg  
import os  
  
#build the arguments for zingg  
args = EArguments()  
#set field definitions  
recId = EFieldDefinition("recId", "string", MatchType.DONT_USE)  
recId.setPrimaryKey(True)  
fname = EFieldDefinition("fname", "string", MatchType.FUZZY)  
lname = EFieldDefinition("lname", "string", MatchType.FUZZY)  
stNo = EFieldDefinition("stNo", "string", MatchType.FUZZY)  
add1 = EFieldDefinition("add1","string", MatchType.FUZZY)  
add2 = EFieldDefinition("add2", "string", MatchType.FUZZY)  
city = EFieldDefinition("city", "string", MatchType.FUZZY)  
areacode = EFieldDefinition("areacode", "string", MatchType.FUZZY)  
state = EFieldDefinition("state", "string", MatchType.FUZZY)  
dob = EFieldDefinition("dob", "string", MatchType.FUZZY)  
ssn = EFieldDefinition("ssn", "string", MatchType.FUZZY)  
  
fieldDefs = [recId, fname, lname, stNo, add1, add2, city, areacode, state, dob, ssn]  
args.setFieldDefinition(fieldDefs)  
#set the modelid and the zingg dir  
args.setModelId("100")  
args.setZinggDir("/tmp/models")  
args.setNumPartitions(4)  
args.setLabelDataSampleSize(0.5)  
  
#reading dataset into inputPipe and settint it up in 'args'  
schema = "recId string, fname string, lname string, stNo string, add1 string, add2 string, city string, areacode string, state string, dob string, ssn  string"  
inputPipe = ECsvPipe("testFebrl", "examples/febrl/test.csv", schema)  
args.setData(inputPipe)  
  
outputPipe = ECsvPipe("resultFebrl", "/tmp/febrlOutput")  
outputPipe.setHeader("true")  
  
args.setOutput(outputPipe)  
  
#Run findAndLabel  
options = ClientOptions([ClientOptions.PHASE,"findAndLabel"])  
zingg = EZingg(args, options)  
zingg.initAndExecute()  
  
#Run trainMatch after above completes  
options = ClientOptions([ClientOptions.PHASE,"trainMatch"])  
zingg = EZingg(args, options)  
zingg.initAndExecute()  
  
#Now run incremental on output generated above  
incrArgs = IncrementalArguments()  
incrArgs.setParentArgs(args)  
incrPipe = ECsvPipe("testFebrlIncr", "examples/febrl/test-incr.csv", schema)  
incrArgs.setIncrementalData(incrPipe)  
  
options = ClientOptions([ClientOptions.PHASE,"runIncremental"])  
zingg = EZingg(incrArgs, options)  
zingg.initAndExecute()  

Last updated