Evaluating IDV Vendors

Running a proof of concept for FrankieOne? The below instructions help you set up an instance to run FrankieOne against your data.

The POC API flow is:

  1. Generate an onboarding url: Provide an onboarding url for hosted OneSDK and capture the entity id (you need an entity before you can provide an image for OCR / it needs to be attached to one)
  2. Send OCR data to the API by following these instructions.
  3. If the response indicates that input is still required, see if it needs a back of a document
  4. If the output is COMPLETE_OCR, then your IDV

This POC mimics a production API call sequence.

The below is an example python script used to process image results for review. This file reads image files from a folder structure where each customer document is captured in its own folder.

import os
import json
import base64
import requests
import csv

# Main script variables
# main_directory = './temp-rerun/images'
main_directory = './temp-rerun/images'
# output_directory = './temp-rerun/results'
# main_directory = './system-test'
# output_directory = './system-test/results'
keys_file_path = 'keys/tempuat.keys.json'
results_file = 'temp-rerun/results/result_test.csv'
results_summary_file = 'temp-rerun/results/results_summary.csv'
file_type = 'jpeg' # alt is png
customer_ref_addition = 'rerun'

# Function to read file and convert to base64
def read_file_as_base64(file_path):
    try:
        with open(file_path, "rb") as file:
            return base64.b64encode(file.read()).decode("utf-8")
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        raise

def read_keys(file_path):
    try:
        with open(file_path, "r") as file:
            return json.load(file)
    except Exception as e:
        print(f"Error reading keys from {file_path}: {e}")
        raise

def get_onboarding_entity_id(private_keys, customer_reference):
    try:
        payload = {
            "consent": True,
            "flowId": "idv",
            "sendSMS": False,
            "customerRef": f"{customer_reference}_{customer_ref_addition}"
            "senderName": "Onboarding",
            "successRedirectURL": "https://www.frankieone.com/",
            "errorRedirectURL": "https://www.frankieone.com/",
            "entity": {
                "addresses": [
                    {
                        "country": "AUS"
                    }
                ],
                "extraData": [
                    {
                        "kvpKey": "application_reference",
                        "kvpType": "id.external",
                        "kvpValue": f'{customer_reference}_{customer_ref_addition}'
                    },
                    {
                        "kvpKey": "customer_reference",
                        "kvpType": "id.external",
                        "kvpValue": f'{customer_reference}_{customer_ref_addition}'
                    }
                ],
                "name": {
                    "displayName": f"OCR Tester - {customer_reference}",
                    "familyName": f"Tester {customer_reference}",
                    "givenName": "OCR",
                    "honourific": "",
                    "middleName": ""
                }
            }
        }

        response = requests.post(
            'https://api.kycaml.uat.frankiefinancial.io/idv/v2/idvalidate/onboarding-url',
            json=payload,
            headers={
                'X-Frankie-CustomerID': private_keys['X-Frankie-CustomerID'],
                # 'X-Frankie-CustomerChildId': private_keys['X-Frankie-CustomerChildID'],
                'api_key': private_keys['api_key'],
                'Content-Type': 'application/json'
            }
        )

        response.raise_for_status()
        # print('Onboarding Response:', response.json())
        return response.json()['entity']['entityId']
    except Exception as e:
        print('Error during onboarding:', e)
        raise

def send_ocr_api_request(file_path, entity_id, side, credentials, document_id, results_file, results_summary_file, customer_reference):
    try:
        base64_data = read_file_as_base64(file_path)
        payload = {
            "entity": {
                "entityId": entity_id
            },
            "fileData": {
                "ScanDelete": False,
                "scanFilename": os.path.basename(file_path),
                "scanMIME": "image/jpeg",
                "scanSide": side,
                "scanType": "PHOTO",
                "scanData": base64_data
            }
        }

        if document_id:
            payload["document_id"] = document_id

        # print(f"Sending request with payload {payload}")

        response = requests.post(
            'https://api.kycaml.uat.frankiefinancial.io/idv/v2/idvalidate/ocr',
            json=payload,
            headers={
                'accept': 'application/json',
                'content-type': 'application/json',
                'X-Frankie-CustomerID': credentials['X-Frankie-CustomerID'],
                # 'X-Frankie-CustomerChildId': credentials['X-Frankie-CustomerChildID'],
                'api_key': credentials['api_key']
            }
        )

        response.raise_for_status()

        print(f'file path: {file_path}')
        # if side == "B":
        #     side_number = 3
        # else:
        #     side_number = 2
        output_path = os.path.splitext(file_path)[0] + '.json'

        print(f'output path: {output_path}')

        json_response = response.json()

        with open(output_path, 'w') as f:
            json.dump(json_response, f, indent=4)

        if json_response.get('status') == "COMPLETE_OCR":
            identity_doc = json_response['entity']['identityDocs'][0]
            extra_data = {item['kvpKey']: item.get('kvpValue','') for item in identity_doc['extraData'] if 'kvpValue' in item}

            # Extract the important values
            csv_data = {
                "entityId": entity_id,
                "customerReference": customer_reference,
                "idExpiry": identity_doc.get('idExpiry',''),
                "idNumber": identity_doc.get('idNumber',''),
                "idType": identity_doc.get('idType',''),
                "region": identity_doc.get('region',''),
                "country": identity_doc.get('country',''),
                "ocr_scanned_full_name": extra_data.get('ocr_scanned_full_name',''),
                "ocr_scanned_address_long": extra_data.get('ocr_scanned_address_long',''),
                "ocr_scanned_first_name": extra_data.get('ocr_scanned_first_name',''),
                "ocr_scanned_middle_name": extra_data.get('ocr_scanned_middle_name',''),
                "ocr_scanned_paternal_last_name": extra_data.get('ocr_scanned_paternal_last_name',''),
                "ocr_scanned_given_name": extra_data.get('ocr_scanned_given_name',''),
                "ocr_scanned_last_name": extra_data.get('ocr_scanned_last_name',''),
                "ocr_scanned_reference_number": extra_data.get('ocr_scanned_reference_number',''),
                "ocr_scanned_id_number": extra_data.get('ocr_scanned_id_number',''),
                "ocr_scanned_document_number": extra_data.get('ocr_scanned_document_number',''),
            }

            # Write the data to a CSV file
            with open(results_file, 'a', newline='') as csv_file:
                writer = csv.DictWriter(csv_file, fieldnames=csv_data.keys())
                if csv_file.tell() == 0:
                    writer.writeheader()  # Write header only once
                writer.writerow(csv_data)

        # print(f"Response for {file_path}: {response.json()}")
        return response.json().get('status'), response.json().get('entity').get('identityDocs')[0].get('documentId')
    except requests.exceptions.RequestException as e:
        print(f'Error during OCR API request for {file_path}:', e)
        if e.response:
            print('Response status:', e.response.status_code)
            print('Response data:', e.response.json())
        raise

def process_files(id_front, id_back, entity_id, results_file, results_summary_file, customer_reference):
    try:
        credentials = read_keys(keys_file_path)

        status_front, document_id = send_ocr_api_request(id_front, entity_id, "F", credentials, None, results_file, results_summary_file, customer_reference)

        if status_front == 'AWAITING_DOCUMENT_UPLOAD_BACK':
            print("awaiting back")
            status_back, document_id  = send_ocr_api_request(id_back, entity_id, "B", credentials, document_id, results_file, results_summary_file, customer_reference)
            print(f"final status: {status_back}")
            with open(results_summary_file, 'a', newline='') as csv_file:
                csv_data = {
                    "customerReference": customer_reference,
                    "result": status_back
                }
                writer = csv.DictWriter(csv_file, fieldnames=csv_data.keys())
                if csv_file.tell() == 0:
                    writer.writeheader()  # Write header only once
                writer.writerow(csv_data)
        elif status_front == "COMPLETE_OCR":
            print("completed OCR")
            with open(results_summary_file, 'a', newline='') as csv_file:
                csv_data = {
                    "customerReference": customer_reference,
                    "result": "COMPLETE_OCR"
                }
                writer = csv.DictWriter(csv_file, fieldnames=csv_data.keys())
                if csv_file.tell() == 0:
                    writer.writeheader()  # Write header only once
                writer.writerow(csv_data)
        else:
            print('First file processing did not return AWAITING_DOCUMENT_UPLOAD_BACK or COMPLETE_OCR status.')
            print(f"status: {status_front}")
            with open(results_summary_file, 'a', newline='') as csv_file:
                csv_data = {
                    "customerReference": customer_reference,
                    "result": status_front
                }
                writer = csv.DictWriter(csv_file, fieldnames=csv_data.keys())
                if csv_file.tell() == 0:
                    writer.writeheader()  # Write header only once
                writer.writerow(csv_data)

    except Exception as e:
        print('Error during file processing:', e)

for folder in os.listdir(main_directory):
    folder_path = os.path.join(main_directory, folder)

    if os.path.isdir(folder_path):
        files = os.listdir(folder_path)

        front_id = next((file for file in files if file.endswith(f'_2.{file_type}')), None)
        back_id = next((file for file in files if file.endswith(f'_3.{file_type}')), None)

        customer_reference = front_id.split('_')[0]

        if front_id and back_id:
            front_id_path = os.path.join(folder_path, front_id)
            back_id_path = os.path.join(folder_path, back_id)
            print(f"front path: {front_id_path}")
            print(f"back path: {back_id_path}")

            try:
                credentials = read_keys(keys_file_path)
                entity_id = get_onboarding_entity_id(credentials, customer_reference)
                print(f"entity id: {entity_id}")
                process_files(front_id_path, back_id_path, entity_id, results_file , results_summary_file, customer_reference)
            except Exception as e:
                print(f"Error processing files in folder {folder}: {e}")
        elif front_id:
            front_id_path = os.path.join(folder_path, front_id)
            print(f"front path: {front_id_path}")

            try:
                credentials = read_keys(keys_file_path)
                entity_id = get_onboarding_entity_id(credentials, customer_reference)
                print(f"entity id: {entity_id}")
                process_files(front_id_path, None, entity_id, results_file , results_summary_file, customer_reference)
            except Exception as e:
                print(f"Error processing files in folder {folder}: {e}")
        else:
            print(f"Files not found in folder {folder}")