π End-to-End Guide: Building a Fivetran Custom Connector (Keap β BigQuery)
Inkey Solutions, April 28, 202653 Views
1. Introduction
In today’s data-driven world, businesses rely heavily on automated data pipelines. While Fivetran provides many pre-built connectors, there are scenarios where your data source is simply not supported out-of-the-box. That’s where Fivetran Custom Connectors (Connector SDK) come into play.
This guide is based on a real-world implementation: Keap API β BigQuery. In this blog, we’ll walk you through:
- What is a Fivetran Custom Connector
- How it works internally
- Step-by-step implementation
- Local testing
- Deployment to Fivetran
- Running and validating data in BigQuery
- Common issues and fixes
2. What is a Fivetran Custom Connector?
A Fivetran Custom Connector allows you to:
- Connect to any API or data source
- Transform and send data to Fivetran
- Load it into your destination (BigQuery, Snowflake, etc.)
It uses Python and a simple SDK to define:
| Component | Description |
|---|---|
| update() | How to fetch data from the source API |
| schema() | How to structure and model the data |
| State / Checkpoint | How to maintain state for incremental loads |
3. How It Works (Architecture)
Your Python code acts as the bridge between the source API and Fivetran’s cloud infrastructure. The SDK handles authentication with Fivetran, state management, and delivery to the destination.
4. Step 1: Prerequisites
Install Python
You need Python 3.9+ installed on your machine.
Install Fivetran SDK
Verify Installation
5. Step 2: Create a New Connector Project
Run the following command to initialize a new project:
Your terminal will prompt:
Type y and press Enter. This will create your Fivetran Custom Connector project.
Next, the terminal asks which AI Agent to optimize for:
2. Cursor
3. VSCode with Copilot
4. Do nothing, I already have my AI context configured
6. Step 3: Navigate Into the Project Folder
cd keap_connector
Project Structure
7. Step 4: Add Keap API Logic to connector.py
Open connector.py using any editor (Notepad, VS Code, Notepad++) and replace its contents with the connector logic below.
Key Components
1. schema() β Defines Table Structure
The schema function tells Fivetran how to model your data in the destination. For the Keap emails use case, it registers a single table called emails with id as the primary key for upsert operations.
return [
{
“table”: “emails”,
“primary_key”: [“id”]
}
]
2. update() β Fetches and Sends Data (Incremental Logic)
The update function is the heart of every Fivetran connector. It is responsible for calling the API, fetching data, and streaming records to Fivetran.
Incremental Strategy
State stores last_synced_date (format: YYYY-MM-DD, UTC) β the last fully completed day.
- First run: Starts from historical_start_date in configuration (defaults to 2024-01-01 if not set).
- Every subsequent run: Iterates day-by-day from (last_synced_date + 1 day) up to yesterday UTC, so it never fetches a partial today window.
- Within each day: Paginates using offset/limit until all records for that day are fetched, then checkpoints and advances to the next date.
Final Working Code (connector.py)
import requests
import time
from datetime import datetime, timedelta, timezoneBASE_URL = “https://api.infusionsoft.com/crm/rest/v1/emails”
LIMIT = 1000def format_date(dt):
return dt.strftime(“%Y-%m-%dT%H:%M:%S.000Z”)def today_utc():
return datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0)def schema(configuration):
return [{“table”: “emails”, “primary_key”: [“id”]}]def update(configuration, state):
api_key = configuration[“api_key”]
history_start = configuration.get(
“historical_start_date”, “2024-01-01”)headers = {
“X-Keap-API-Key”: api_key,
“Content-Type”: “application/json”,
“Accept”: “application/json”
}if “last_synced_date” in state:
start_date = datetime.strptime(
state[“last_synced_date”], “%Y-%m-%d”
).replace(tzinfo=timezone.utc) + timedelta(days=1)
else:
start_date = datetime.strptime(
history_start, “%Y-%m-%d”
).replace(tzinfo=timezone.utc) – timedelta(days=1)end_date = today_utc() – timedelta(days=1)if start_date > end_date:
print(“Already up to date. Nothing to sync.”)
returncurrent_day = start_date
while current_day <= end_date:
since = format_date(current_day)
until = format_date(current_day.replace(
hour=23, minute=59, second=59))offset = 0
total_for_day = 0while True:
url = (f”{BASE_URL}?since={since}”
f”&until={until}”
f”&limit={LIMIT}&offset={offset}”)try:
res = requests.get(url, headers=headers, timeout=60)
res.raise_for_status()
except requests.exceptions.HTTPError as e:
print(f”HTTP error: {e}. Retrying in 30s…”)
time.sleep(30)
continue
except Exception as e:
print(f”Request error: {e}. Retrying in 10s…”)
time.sleep(10)
continuedata = res.json()
emails = data.get(“emails”, [])if not emails:
breakfor email in emails:
if “id” not in email:
continue
yield op.upsert(table=”emails”, data=email)total_for_day += len(emails)
offset += LIMITif len(emails) < LIMIT:
breaktime.sleep(0.5)yield op.checkpoint(state={
“last_synced_date”: current_day.strftime(“%Y-%m-%d”)
})current_day += timedelta(days=1)
time.sleep(1)connector = Connector(update=update, schema=schema)if __name__ == “__main__”:
connector.run()
8. Step 5: Add the Configuration File
Open configuration.json and add your Keap API token:
“api_key”: “YOUR_KEAP_API_TOKEN”
}
Replace YOUR_KEAP_API_TOKEN with your actual Keap API key. Example format (not real):
“api_key”: “64636hfgh4e726bc99a02d0a0fdc51tdfr4”
}
9. Step 6: Add Python Dependencies
Open requirements.txt and add the following line:
This is the only external library needed since the Fivetran SDK handles the rest.
10. Step 7: Test Locally
Navigate to Your Connector Folder
dir
You should see: configuration.json, connector.py, requirements.txt
Run the Debug Command
Understanding the Debug Output
Your terminal will display: sdk calling update()
| Status | Meaning |
|---|---|
| β SDK started | Fivetran Connector SDK initialized successfully |
| β Python loaded | Your connector.py code was loaded without errors |
| β Config read | configuration.json was read successfully |
| β update() running | Fivetran is now executing update(configuration, state) |
What Happens After sdk calling update()
Inside update(), your code performs an HTTP GET request to the Keap API. You should see paginated log output such as:
Fetched 1000 records
Calling API offset 1000
Fetched 1000 records
…
If nothing happens after sdk calling update(), common causes are:
- The API request is slow or timing out
- Authentication failed (incorrect API key)
- The code is stuck waiting for an API response
Internal Flow Diagram
β
Connector SDK starts
β
Reads configuration.json
β
Loads connector.py
β
sdk calling update()
β
update() function runs
β
API request β Keap API
β
Records returned β Fivetran
The Two Core Functions in Any Fivetran Connector
| Function | Purpose |
|---|---|
| schema() | Defines table structure in the destination |
| update() | Pulls data from the API and streams to Fivetran |
11. Step 8: Get Your Fivetran API Key
Before deploying your connector, you must generate a Fivetran API Key from the dashboard.
Where to Find It
- Click your profile (bottom-left corner of Fivetran dashboard)
- Click API Key
| Credential | Example |
|---|---|
| API Key | abc123xyz |
| API Secret | 98dfh234dsf |
| Encoded Key (Base64) | YWJjMTIzOmRIZjQ1Ng== |
This Encoded credential is required for the deployment command in the next step.
12. Step 9: Deploy to Fivetran
From your terminal, go one level up from the connector folder, then run the deploy command:
fivetran deploy keap_connector –api-key YOUR_KEY
Deployment Prompts Explained
1. Destination Name
Type: BigQuery
2. Connection Name
Type: keap_emails
3. Configuration Requirement
does this run require configuration? (y/N):
Type: Y
4. Configuration File Path
Just press Enter (or type configuration.json).
5. Existing Connection Update
continue with update? (y/N):
Type: y
13. Step 10: Run Sync from the UI
Navigate to your Fivetran Dashboard:
- Go to Connections β keap_emails
- Click Sync
14. Step 11: Monitor Execution
Go to the Monitoring Tab in Fivetran. You should see:
Load: Β Β Β Success
Loaded rows: > 0
15. Step 12: Verify in BigQuery
Navigate to your BigQuery project. You will see a dataset and table created:
βββ emails
Run a verification query to confirm data was loaded:
If rows are returned β your pipeline is live and working!
16. How Incremental Sync Works (Deep Dive)
The connector uses a day-by-day incremental strategy to keep data fresh while avoiding partial loads:
- State Persistence: After each completed day, the connector calls op.checkpoint(). This saves progress so the next run picks up exactly where it left off.
- Safe Window: The sync always stops at yesterday UTC. This prevents fetching incomplete data for an in-progress day.
- Pagination: Within each day, the connector pages through results using offset and limit=1000 until all records are fetched. If a page returns fewer than LIMIT records, it is treated as the last page.
- Retry Logic: On HTTP errors, the connector sleeps 30 seconds and retries. On other exceptions, it sleeps 10 seconds. This makes the connector resilient to transient API failures.
- Deduplication: Each email is upserted using op.upsert(table=”emails”, data=email) with id as the primary key, ensuring no duplicates in the destination.
17. Error Handling Summary
| Error Type | Behavior |
|---|---|
| HTTP error (4xx/5xx) | Sleep 30 seconds β Retry automatically |
| Network / connection error | Sleep 10 seconds β Retry automatically |
| Record missing id field | Skip record with a warning log entry |
| API rate limiting | Mitigated by 0.5s delay between pages |
18. Summary
In this guide, you built a production-ready Fivetran custom connector that:
- Connects to the Keap API using an API key
- Fetches email records day-by-day with full pagination
- Maintains incremental state across syncs using checkpointing
- Handles API errors gracefully with automatic retries
- Loads data into BigQuery via Fivetran’s cloud infrastructure
- Validates data in BigQuery with a simple SQL query
Fivetran’s Connector SDK makes it surprisingly straightforward to bring any data source into your warehouse. Once you understand the schema() + update() pattern and the incremental checkpoint model, you can build connectors for virtually any API.











