Zingg
  • Welcome To Zingg
  • Step-By-Step Guide
    • Installation
      • Docker
        • Sharing Custom Data And Config Files
        • Shared Locations
        • File Read/Write Permissions
        • Copying Files To And From The Container
      • Installing From Release
        • Single Machine Setup
        • Spark Cluster Checklist
        • Installing Zingg
        • Verifying The Installation
      • Enterprise Installation for Snowflake
        • Setting up Zingg
        • Snowflake Properties
        • Match Configuration
        • Running Asynchronously
        • Verifying The Installation
      • Compiling From Source
    • Hardware Sizing
    • Zingg Runtime Properties
    • Zingg Command Line
    • Configuration
      • Configuring Through Environment Variables
      • Data Input And Output
        • Input Data
        • Output
      • Field Definitions
      • User Defined Mapping Match Types
      • Deterministic Matching
      • Pass Thru Data
      • Model Location
      • Tuning Label, Match And Link Jobs
      • Telemetry
    • Working With Training Data
      • Finding Records For Training Set Creation
      • Labeling Records
      • Find And Label
      • Using Pre-existing Training Data
      • Updating Labeled Pairs
      • Exporting Labeled Data
    • Verification of Blocking Model
    • Building And Saving The Model
    • Finding The Matches
    • Adding Incremental Data
    • Linking Across Datasets
    • Explanation of Models
    • Approval of Clusters
    • Combining Different Match Models
    • Model Difference
    • Persistent ZINGG ID
  • Data Sources and Sinks
    • Zingg Pipes
    • Databricks
    • Snowflake
    • JDBC
      • Postgres
      • MySQL
    • AWS S3
    • Cassandra
    • MongoDB
    • Neo4j
    • Parquet
    • BigQuery
    • Exasol
  • Working With Python
    • Python API
  • Running Zingg On Cloud
    • Running On AWS
    • Running On Azure
    • Running On Databricks
    • Running on Fabric
  • Zingg Models
    • Pre-Trained Models
  • Improving Accuracy
    • Ignoring Commonly Occuring Words While Matching
    • Defining Domain Specific Blocking And Similarity Functions
  • Documenting The Model
  • Interpreting Output Scores
  • Reporting Bugs And Contributing
    • Setting Up Zingg Development Environment
  • Community
  • Frequently Asked Questions
  • Reading Material
  • Security And Privacy
Powered by GitBook

@2021 Zingg Labs, Inc.

On this page
  • The incremental phase is run as follows:
  • Example incrementalConf.json:
  • runIncremental can also be triggered using Python by invoking:

Was this helpful?

Edit on GitHub
  1. Step-By-Step Guide

Adding Incremental Data

Building a continuously updated identity graph with new, updated, and deleted records

PreviousFinding The MatchesNextLinking Across Datasets

Last updated 2 months ago

Was this helpful?

Rerunning matching on entire datasets is wasteful, and we lose the lineage of matched records against a persistent identifier. Using the feature in , incremental loads can be run to match existing pre-resolved entities. The new and updated records are matched to existing clusters, and new persistent are generated for records that do not find a match. If a record gets updated and Zingg Enterprise discovers that it is a more suitable match with another cluster, it will be reassigned. Cluster assignment, merge, and unmerge happens automatically in the flow. Zingg Enterprise also takes care of human feedback on previously matched data to ensure that it does not override the approved records.

The incremental phase is run as follows:

./scripts/zingg.sh --phase runIncremental --conf <location to incrementalConf.json>

Example incrementalConf.json:

{      
    "config" : "config.json",  
    "incrementalData": [{  
            "name":"customers_incr",   
            "format":"csv",   
            "props": {  
                "location": "test-incr.csv",  
                "delimiter": ",",  
                "header":false  
            },  
            "schema": "recId string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn  string"   
        }  
    ]   
}  

runIncremental can also be triggered using Python by invoking:

./scripts/zingg.sh --run examples/FebrlExample.py

Python Code Example:

#import the packages  
  
from zingg.client import *  
from zingg.pipes import *  
from zinggEC.enterprise.common.ApproverArguments import *  
from zinggEC.enterprise.common.IncrementalArguments import *  
from zinggEC.enterprise.common.epipes import *  
from zinggEC.enterprise.common.EArguments import *  
from zinggEC.enterprise.common.EFieldDefinition import EFieldDefinition  
from zinggES.enterprise.spark.ESparkClient import EZingg  
import os  
  
#build the arguments for zingg  
args = EArguments()  
#set field definitions  
recId = EFieldDefinition("recId", "string", MatchType.DONT_USE)  
recId.setPrimaryKey(True)  
fname = EFieldDefinition("fname", "string", MatchType.FUZZY)  
lname = EFieldDefinition("lname", "string", MatchType.FUZZY)  
stNo = EFieldDefinition("stNo", "string", MatchType.FUZZY)  
add1 = EFieldDefinition("add1","string", MatchType.FUZZY)  
add2 = EFieldDefinition("add2", "string", MatchType.FUZZY)  
city = EFieldDefinition("city", "string", MatchType.FUZZY)  
areacode = EFieldDefinition("areacode", "string", MatchType.FUZZY)  
state = EFieldDefinition("state", "string", MatchType.FUZZY)  
dob = EFieldDefinition("dob", "string", MatchType.FUZZY)  
ssn = EFieldDefinition("ssn", "string", MatchType.FUZZY)  
  
fieldDefs = [recId, fname, lname, stNo, add1, add2, city, areacode, state, dob, ssn]  
args.setFieldDefinition(fieldDefs)  
#set the modelid and the zingg dir  
args.setModelId("100")  
args.setZinggDir("/tmp/models")  
args.setNumPartitions(4)  
args.setLabelDataSampleSize(0.5)  
  
#reading dataset into inputPipe and settint it up in 'args'  
schema = "recId string, fname string, lname string, stNo string, add1 string, add2 string, city string, areacode string, state string, dob string, ssn  string"  
inputPipe = ECsvPipe("testFebrl", "examples/febrl/test.csv", schema)  
args.setData(inputPipe)  
  
outputPipe = ECsvPipe("resultFebrl", "/tmp/febrlOutput")  
outputPipe.setHeader("true")  
  
args.setOutput(outputPipe)  
  
#Run findAndLabel  
options = ClientOptions([ClientOptions.PHASE,"findAndLabel"])  
zingg = EZingg(args, options)  
zingg.initAndExecute()  
  
#Run trainMatch after above completes  
options = ClientOptions([ClientOptions.PHASE,"trainMatch"])  
zingg = EZingg(args, options)  
zingg.initAndExecute()  
  
#Now run incremental on output generated above  
incrArgs = IncrementalArguments()  
incrArgs.setParentArgs(args)  
incrPipe = ECsvPipe("testFebrlIncr", "examples/febrl/test-incr.csv", schema)  
incrArgs.setIncrementalData(incrPipe)  
  
options = ClientOptions([ClientOptions.PHASE,"runIncremental"])  
zingg = EZingg(incrArgs, options)  
zingg.initAndExecute()  
incremental flow
Zingg Enterprise
ZINGG_IDs