Using External Functions
Unifying customer records for customer data platforms, personalization, AML, GDPR, analytics, and reporting.
Last updated
Unifying customer records for customer data platforms, personalization, AML, GDPR, analytics, and reporting.
Last updated
Enterprises spend massive amounts on building their warehouses, pumping data from different sources to have a clear view of what is happening. However, data-driven insights are only as good as the data itself. To make important business decisions, we need to ensure that our data is clean, consistent, and unified. If the data is riddled with duplicates or not harmonized correctly, the decisions based on them will only be erroneous.
Customer and prospect records in the warehouse come from different devices, offline and online channels. They also come from multiple SAAS and in-house applications like CRMs, ticketing, and procurement. Identifying the unique real-world customer across multiple channels and applications is critical to understand the customer, and delight her with personalized messages and offers. Anti-money laundering, CCPA and GDPR can not work without a solid customer identity. Customer identity is also the backbone of AI models on fraud and risk. There is a growing set of users who are using the warehouse as the Customer Data Platform, and identity resolution directly on the warehouse becomes a building block there.
Let us take a look at this table in Snowflake containing information regarding customer name, address, and other attributes.
We can see that the customer nicole carbone has multiple records and they have inconsistent values across last name, street, address, and locality attributes. We can try to remove such duplicates using attributes that would be unique to an entity, like for this case SSN number but we cannot be sure they would always be consistent.
What if we want to identify and unify such records, without spending time trying to figure out how attributes need to be compared? As the data has variations, defining match rules gets tricky. Lack of a unique identifier also means comparing every record with every other record, leading to scalability challenges.
In this article, we will use Zingg, an open-source and scalable ML-based identity resolution tool. We will run Zingg on an AWS EC2 instance, using Snowflake’s external function feature, and resolve customer identities in the table. To build machine learning models for identity resolution, Zingg needs samples of matching and non-matching records.
Zingg has four main phases to build and run the ML models for fuzzy matching.
findTrainingData phase to sample records to show to end user for building training data.
An interactive label phase where a human annotator marks the pairs pairs from findTrainingData as matches or non matches.
The train phase where Zingg models utilise the user supplied labels from label phase and learns how to identify duplicates.
The match phase where the models are applied and output with match scores is generated.
Let us now see how we can do identity resolution on Snowflake.
Step 1: Set up Zingg on AWS EC2 instance
You can use an existing instance or you can create a new instance for this task. Once the new instance is up and running, we will connect to it using the Visual Studio Code IDE itself, since it facilitates code development on remote machines. The steps to connect VS code to EC2 are as follows
Install VS Code Remote SSH extension
Define the config file for Remote SSH extension. Given below is an example file with placeholder values
Click on the Open a remote window option in VS Code, select the host which is the EC2 IP you mentioned in the config file and connect.
The quickest way to run Zingg is to use the Docker image inside this instance.
To connect Zingg with Snowflake, we need the Snowflake Spark connector and Snowflake JDBC driver as mentioned here. Follow the Zingg docs to copy over the jars and set the configuration to add them to the classpath.
Step 2: Connect Zingg and Snowflake
We need to tell Zingg the location of the input records and where to write the matched records. We use the sample at the Zingg github repository and provide our connection properties to make our config.json.
Step 3: Create an AWS Lambda function to trigger Zingg on EC2
AWS Lambda is a serverless compute service where we just need to write our code and the entire server side infrastructure is fully managed by AWS. This provides fault tolerance and robustness. The custom code in the Lambda function is executed whenever a request is received for that function via a REST API endpoint. Our Lambda function will receive the Zingg phase name as an input parameter, SSH into the EC2 instance we set up in step 1 and execute that Zingg phase inside the docker container.
AWS expects the following schema for the return value from Lambda functions, an HTTP status code, and data as a nested array. Each row of the data array is a return value, preceded by a row number.
As we only need to pass the phase name to the function, the input to Lambda from Snowflake will look like this
To parse the above input inside the Lambda function, we use the following python code.
We will run findTrainingData, train and match phases through this procedure. We also add another input option called checklog that will let us see the Zingg logs directly inside Snowflake.
If the input parameter is a valid phase, the function will SSH into EC2, using the username, IP address of the EC2 instance, and the file path of the private key that was generated when we created the instance.
The following code will execute Zingg phases or read the log file it generates
Let us try and understand the above code. Let us look at the command.
This will
a. Create a bash shell inside the docker container identified with CONTAINERID fab997383957.
b. bash -c will run the command string specified after the -c option inside the bash terminal.
c. The rest is the command to run. We redirect the output to logfile.txt.
In EC2, inside the Zingg instance, the container ID is the part between @ and semicolon(:). The following image has the container ID highlighted.
We have 3 Zingg phases to run with Lambda — findTrainingData, train and match. Since the label phase is interactive it will be run directly on the EC2 instance.
Okay, so now let us put it all together!
Inside the EC2 instance, let us copy the complete code whose snippets we described above. Let us call this script lambda_function.py. Please edit the script and specify the placeholder values.
We now go to AWS Lambda, select Create Function, give a function name, and select the Python programming language version matching the version on the EC2 instance.
We select Create a new role with basic Lambda permissions and hit Create Function.
In EC2, we create a new virtual environment and activate it. Once activated, we install all packages needed by AWS Lambda to run.
Zip the site-packages folder
Add the script and key file at the root of the zipped folder
This is what the final zipped folder directory structure should look like
Go to the AWS Lambda function you created, select Upload From option and then select .zip file
Let’s test the function and see the output. Create a new Test event, put the value in Event JSON as shown and click on Test.
We get a HTTP 200 status code which means our function has successfully executed . We can see the function output, which says that the Zingg phase findTrainingData has been started.
If we want to check more, we can run tail -f logfile.txt in our EC2 instance and see the processing stages. The output from both AWS Lambda test and log file is shown below.
Step 4 : Connect AWS Lambda with AWS API Gateway
Create a new IAM role from your AWS account. Entity type should be another AWS account. Specify the AWS ID of your account for the field. We will need this when we connect AWS with Snowflake.
Next, we create an AWS API gateway. Select Gateway Type as REST API with a regional endpoint. Once the API is created, create a resource with a POST method. For this method, the integration type should be lambda function. Use Lambda proxy integration must be selected so that the response data schema is as expected. Mention the Lambda function name we created in step 3, save changes, deploy API by selecting the API name and choose Action, Deploy API and test it. Sample test response input parameter checklog is provided as follows
To secure the API gateway, let us add AWS IAM authorization to Method Request. We use the following configuration for resource policy and update the placeholders.
Step 5: Connect Snowflake with AWS
In Snowflake, we run use role accountadmin and create the API integration object with the following code
We use the ARN of the IAM role from Step 4. api_allowed_prefixes is set to the resource invocation URL from the AWS API Gateway. The URL can also be found by going to Stages and clicking on stage name. Inside the stage name dropdown options, we see the defined resource. Clicking on the POST method beneath that resource shows the resource invocation URL.
Next, we need to find two values that are required to allow communication between Snowflake and AWS.
From the output, we note API_AWS_IAM_USER_ARN and API_AWS_EXTERNAL_ID.
Let us now edit the Trust Relationship of the IAM role of the API Gateway, place the API_AWS_IAM_USER_ARN as value for the key Statement.Principal.AWS
Step 6: Create Snowflake external function
External functions are user-defined functions that run outside Snowflake. The AWS Lambda function we wrote is stored and executed on AWS servers and it is an external function with respect to Snowflake. External functions allow easy access to custom code, REST services involved with data manipulation and API services for machine learning models.
The code for our external function is as follows
Run this to grant the proper usage roles
Invoke the external function and get the output as shown below
We will run findTrainingData as above. Then we will run label directly on the EC2 instance as it needs interaction. The train and match phases are executed through external functions. Once the Zingg phases are done, we will get a new table in Snowflake with the output.
The output table has three additional columns
Z_CLUSTER — unique ID assigned by Zingg, all records in the same cluster are matching or duplicated.
Z_MINSCORE — it indicates the least the record matched with any other record in that cluster
Z_MAXSCORE — it indicates the maximum the record matched with any other record in that cluster
Let’s see these values for “nicole carbone”
All the records with “nicole carbone” are assigned to the same cluster.
Congratulations, identity resolution has been done!
We can now decide a suitable threshold using the scores and use that to automate the post-processing of the identities. Records that cannot be resolved using scores could be passed to a downstream human annotation team for manual review.