Blogs

πŸš€ End-to-End Guide: Building a Fivetran Custom Connector (Keap β†’ BigQuery)

, April 28, 202653 Views

1. Introduction

In today’s data-driven world, businesses rely heavily on automated data pipelines. While Fivetran provides many pre-built connectors, there are scenarios where your data source is simply not supported out-of-the-box. That’s where Fivetran Custom Connectors (Connector SDK) come into play.

 

This guide is based on a real-world implementation: Keap API β†’ BigQuery. In this blog, we’ll walk you through:

  • What is a Fivetran Custom Connector
  • How it works internally
  • Step-by-step implementation
  • Local testing
  • Deployment to Fivetran
  • Running and validating data in BigQuery
  • Common issues and fixes

2. What is a Fivetran Custom Connector?

A Fivetran Custom Connector allows you to:

  • Connect to any API or data source
  • Transform and send data to Fivetran
  • Load it into your destination (BigQuery, Snowflake, etc.)

It uses Python and a simple SDK to define:

Component Description
update() How to fetch data from the source API
schema() How to structure and model the data
State / Checkpoint How to maintain state for incremental loads

3. How It Works (Architecture)

Your Python code acts as the bridge between the source API and Fivetran’s cloud infrastructure. The SDK handles authentication with Fivetran, state management, and delivery to the destination.

API (Keap)
↓
connector.py (Your Logic)
↓
Fivetran SDK
↓
Fivetran Cloud
↓
Destination (BigQuery)

4. Step 1: Prerequisites

Install Python

You need Python 3.9+ installed on your machine.

Install Fivetran SDK

pip install fivetran-connector-sdk

Verify Installation

fivetran –version

5. Step 2: Create a New Connector Project

Run the following command to initialize a new project:

fivetran_connector_sdk init keap_emails_connector

Your terminal will prompt:

create new connector project at keap_connector? (y/N):

Type y and press Enter. This will create your Fivetran Custom Connector project.

Next, the terminal asks which AI Agent to optimize for:

Which AI Agent shall we optimize the project to work with?
1. Claude
2. Cursor
3. VSCode with Copilot
4. Do nothing, I already have my AI context configured
Tip
This setting does NOT affect your connector logic. It only adds helper files for AI coding tools. To keep the project clean, type 4 and press Enter.

6. Step 3: Navigate Into the Project Folder

dir
cd keap_connector

Project Structure

keap_connector
|
|– connector.py           ← Main Python code to call the Keap API
|– requirements.txt       ← Python libraries needed
|– configuration.json    ← Stores API token and settings
|– README.md             ← Documentation
Important
Keep all files inside this same folder. Do NOT move them to other locations.

7. Step 4: Add Keap API Logic to connector.py

Open connector.py using any editor (Notepad, VS Code, Notepad++) and replace its contents with the connector logic below.

Key Components

1. schema() β€” Defines Table Structure

The schema function tells Fivetran how to model your data in the destination. For the Keap emails use case, it registers a single table called emails with id as the primary key for upsert operations.

def schema(configuration):
return [
{
“table”: “emails”,
“primary_key”: [“id”]
}
]

2. update() β€” Fetches and Sends Data (Incremental Logic)

The update function is the heart of every Fivetran connector. It is responsible for calling the API, fetching data, and streaming records to Fivetran.

Incremental Strategy

State stores last_synced_date (format: YYYY-MM-DD, UTC) β€” the last fully completed day.

  • First run: Starts from historical_start_date in configuration (defaults to 2024-01-01 if not set).
  • Every subsequent run: Iterates day-by-day from (last_synced_date + 1 day) up to yesterday UTC, so it never fetches a partial today window.
  • Within each day: Paginates using offset/limit until all records for that day are fetched, then checkpoints and advances to the next date.

Final Working Code (connector.py)

from fivetran_connector_sdk import Connector, Operations as op
import requests
import time
from datetime import datetime, timedelta, timezoneBASE_URL = “https://api.infusionsoft.com/crm/rest/v1/emails”
LIMIT = 1000def format_date(dt):
return dt.strftime(“%Y-%m-%dT%H:%M:%S.000Z”)def today_utc():
return datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0)def schema(configuration):
return [{“table”: “emails”, “primary_key”: [“id”]}]def update(configuration, state):
api_key = configuration[“api_key”]
history_start = configuration.get(
“historical_start_date”, “2024-01-01”)headers = {
“X-Keap-API-Key”: api_key,
“Content-Type”: “application/json”,
“Accept”: “application/json”
}if “last_synced_date” in state:
start_date = datetime.strptime(
state[“last_synced_date”], “%Y-%m-%d”
).replace(tzinfo=timezone.utc) + timedelta(days=1)
else:
start_date = datetime.strptime(
history_start, “%Y-%m-%d”
).replace(tzinfo=timezone.utc) – timedelta(days=1)end_date = today_utc() – timedelta(days=1)if start_date > end_date:
print(“Already up to date. Nothing to sync.”)
returncurrent_day = start_date
while current_day <= end_date:
since = format_date(current_day)
until = format_date(current_day.replace(
hour=23, minute=59, second=59))offset = 0
total_for_day = 0while True:
url = (f”{BASE_URL}?since={since}”
f”&until={until}”
f”&limit={LIMIT}&offset={offset}”)try:
res = requests.get(url, headers=headers, timeout=60)
res.raise_for_status()
except requests.exceptions.HTTPError as e:
print(f”HTTP error: {e}. Retrying in 30s…”)
time.sleep(30)
continue
except Exception as e:
print(f”Request error: {e}. Retrying in 10s…”)
time.sleep(10)
continuedata = res.json()
emails = data.get(“emails”, [])if not emails:
breakfor email in emails:
if “id” not in email:
continue
yield op.upsert(table=”emails”, data=email)total_for_day += len(emails)
offset += LIMITif len(emails) < LIMIT:
breaktime.sleep(0.5)yield op.checkpoint(state={
“last_synced_date”: current_day.strftime(“%Y-%m-%d”)
})current_day += timedelta(days=1)
time.sleep(1)connector = Connector(update=update, schema=schema)if __name__ == “__main__”:
connector.run()

8. Step 5: Add the Configuration File

Open configuration.json and add your Keap API token:

{
“api_key”: “YOUR_KEAP_API_TOKEN”
}

Replace YOUR_KEAP_API_TOKEN with your actual Keap API key. Example format (not real):

{
“api_key”: “64636hfgh4e726bc99a02d0a0fdc51tdfr4”
}

9. Step 6: Add Python Dependencies

Open requirements.txt and add the following line:

requests

This is the only external library needed since the Fivetran SDK handles the rest.

10. Step 7: Test Locally

Navigate to Your Connector Folder

cd keap_connector
dir

You should see: configuration.json, connector.py, requirements.txt

Run the Debug Command

fivetran debug –configuration configuration.json

Understanding the Debug Output

Your terminal will display: sdk calling update()

Status Meaning
βœ“ SDK started Fivetran Connector SDK initialized successfully
βœ“ Python loaded Your connector.py code was loaded without errors
βœ“ Config read configuration.json was read successfully
βœ“ update() running Fivetran is now executing update(configuration, state)

What Happens After sdk calling update()

Inside update(), your code performs an HTTP GET request to the Keap API. You should see paginated log output such as:

Calling API offset 0
Fetched 1000 records
Calling API offset 1000
Fetched 1000 records

If nothing happens after sdk calling update(), common causes are:

  • The API request is slow or timing out
  • Authentication failed (incorrect API key)
  • The code is stuck waiting for an API response

Internal Flow Diagram

fivetran debug
↓
Connector SDK starts
↓
Reads configuration.json
↓
Loads connector.py
↓
sdk calling update()
↓
update() function runs
↓
API request β†’ Keap API
↓
Records returned β†’ Fivetran

The Two Core Functions in Any Fivetran Connector

Function Purpose
schema() Defines table structure in the destination
update() Pulls data from the API and streams to Fivetran
Note
If you only define update(), the terminal will print ‘Connector initialized with Update method only’ β€” this is completely normal.

11. Step 8: Get Your Fivetran API Key

Before deploying your connector, you must generate a Fivetran API Key from the dashboard.

Where to Find It

  • Click your profile (bottom-left corner of Fivetran dashboard)
  • Click API Key
Credential Example
API Key abc123xyz
API Secret 98dfh234dsf
Encoded Key (Base64) YWJjMTIzOmRIZjQ1Ng==

This Encoded credential is required for the deployment command in the next step.

12. Step 9: Deploy to Fivetran

From your terminal, go one level up from the connector folder, then run the deploy command:

cd ..
fivetran deploy keap_connector –api-key YOUR_KEY

Deployment Prompts Explained

1. Destination Name

Provide the destination name (as displayed in your dashboard):

Type: BigQuery

Important
This must EXACTLY match what you see in Fivetran UI β†’ Destinations.

2. Connection Name

Provide the connection name:

Type: keap_emails

3. Configuration Requirement

sdk configuration.json found, but –configuration flag was not provided
does this run require configuration? (y/N):

Type: Y

4. Configuration File Path

Provide the configuration file path [Default: configuration.json]:

Just press Enter (or type configuration.json).

5. Existing Connection Update

connection already exists
continue with update? (y/N):

Type: y

13. Step 10: Run Sync from the UI

Navigate to your Fivetran Dashboard:

  • Go to Connections β†’ keap_emails
  • Click Sync

14. Step 11: Monitor Execution

Go to the Monitoring Tab in Fivetran. You should see:

Extract: Success
Load: Β Β Β Success
Loaded rows: > 0

15. Step 12: Verify in BigQuery

Navigate to your BigQuery project. You will see a dataset and table created:

keap_emails
└── emails

Run a verification query to confirm data was loaded:

SELECT * FROM `keap_emails.emails` LIMIT 10;

If rows are returned β€” your pipeline is live and working!

16. How Incremental Sync Works (Deep Dive)

The connector uses a day-by-day incremental strategy to keep data fresh while avoiding partial loads:

  • State Persistence: After each completed day, the connector calls op.checkpoint(). This saves progress so the next run picks up exactly where it left off.
  • Safe Window: The sync always stops at yesterday UTC. This prevents fetching incomplete data for an in-progress day.
  • Pagination: Within each day, the connector pages through results using offset and limit=1000 until all records are fetched. If a page returns fewer than LIMIT records, it is treated as the last page.
  • Retry Logic: On HTTP errors, the connector sleeps 30 seconds and retries. On other exceptions, it sleeps 10 seconds. This makes the connector resilient to transient API failures.
  • Deduplication: Each email is upserted using op.upsert(table=”emails”, data=email) with id as the primary key, ensuring no duplicates in the destination.

17. Error Handling Summary

Error Type Behavior
HTTP error (4xx/5xx) Sleep 30 seconds β†’ Retry automatically
Network / connection error Sleep 10 seconds β†’ Retry automatically
Record missing id field Skip record with a warning log entry
API rate limiting Mitigated by 0.5s delay between pages

18. Summary

In this guide, you built a production-ready Fivetran custom connector that:

  • Connects to the Keap API using an API key
  • Fetches email records day-by-day with full pagination
  • Maintains incremental state across syncs using checkpointing
  • Handles API errors gracefully with automatic retries
  • Loads data into BigQuery via Fivetran’s cloud infrastructure
  • Validates data in BigQuery with a simple SQL query

Fivetran’s Connector SDK makes it surprisingly straightforward to bring any data source into your warehouse. Once you understand the schema() + update() pattern and the incremental checkpoint model, you can build connectors for virtually any API.

Happy syncing!