Custom Connectors
AudienceGPT's connection architecture is designed to be extensible. Beyond the built-in LiveRamp, Trade Desk, and Index Exchange integrations, you can build custom connectors for any platform that exposes an API or accepts file-based uploads. This guide covers the connector architecture, the PlatformClient interface, configuration schemas, authentication types, and testing.
Architecture Overview
All external integrations in AudienceGPT are managed through the unified platform connections system. Each connection is a record in the platform_connections table with:
- A platform type identifier
- A direction (inbound, outbound, or bidirectional)
- An auth type and encrypted credentials
- A JSONB config column for platform-specific settings
Inbound vs. Outbound
| Direction | Purpose | Data Flow | Example |
|---|---|---|---|
| Inbound | Pull data from external sources | External API --> AudienceGPT | Sync topics from a DMP |
| Outbound | Push segments to DSP platforms | AudienceGPT --> External Platform | Push to LiveRamp, TTD |
| Bidirectional | Both directions | Both | Full platform sync |
Platform Types
The system defines four platform types. Custom connectors use the custom_api type:
type PlatformType = "custom_api" | "indexexchange" | "liveramp" | "tradedesk";
When building a new connector, register it as custom_api or add a new platform type to the PlatformType union.
The PlatformClient Interface
Every outbound connector must implement the PlatformClient interface. This is the contract that the activation pipeline uses to push segments to external platforms.
interface PlatformClient {
/** Verify that credentials and config are valid. */
testConnection(): Promise<{ success: boolean; error?: string }>;
/** Create a new segment on the platform. Returns the platform's segment ID. */
createSegment(params: CreateSegmentParams): Promise<{ platformSegmentId: string }>;
/** Update an existing segment (name, description, pricing). */
updateSegment(platformSegmentId: string, params: UpdateSegmentParams): Promise<void>;
/** Deactivate/disable a segment on the platform. */
deactivateSegment(platformSegmentId: string): Promise<void>;
/** Retrieve a segment by its platform ID. Returns null if not found. */
getSegment(platformSegmentId: string): Promise<PlatformSegment | null>;
}
Parameter Types
interface CreateSegmentParams {
name: string; // Platform-formatted segment name
description: string; // Platform-formatted description
cpm: number; // CPM price
currency: string; // Currency code (e.g. "USD")
dataSourceLocation: string; // Platform-specific data source reference
}
interface UpdateSegmentParams {
name: string; // Updated segment name
description: string; // Updated description
cpm?: number; // Optional updated CPM
}
interface PlatformSegment {
id: string; // Platform segment ID
name: string; // Segment name on platform
status: string; // Platform status (e.g. "active", "inactive")
}
Implementation Example
Here is a skeleton for a custom platform client:
import type {
PlatformClient,
CreateSegmentParams,
UpdateSegmentParams,
PlatformSegment,
} from "@/lib/connections/types";
interface MyPlatformCredentials {
api_key: string;
account_id: string;
}
interface MyPlatformConfig {
api_base_url: string;
default_cpm: number;
currency: string;
}
export class MyPlatformClient implements PlatformClient {
constructor(
private credentials: MyPlatformCredentials,
private config: MyPlatformConfig,
) {}
async testConnection(): Promise<{ success: boolean; error?: string }> {
try {
const res = await fetch(`${this.config.api_base_url}/health`, {
headers: { Authorization: `Bearer ${this.credentials.api_key}` },
});
if (!res.ok) {
return { success: false, error: `API returned ${res.status}` };
}
return { success: true };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : "Unknown error",
};
}
}
async createSegment(
params: CreateSegmentParams,
): Promise<{ platformSegmentId: string }> {
const res = await fetch(`${this.config.api_base_url}/segments`, {
method: "POST",
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
name: params.name,
description: params.description,
price: params.cpm,
}),
});
if (!res.ok) {
throw new Error(`Create segment failed: ${res.status}`);
}
const json = await res.json() as { id: string };
return { platformSegmentId: json.id };
}
async updateSegment(
platformSegmentId: string,
params: UpdateSegmentParams,
): Promise<void> {
const res = await fetch(
`${this.config.api_base_url}/segments/${platformSegmentId}`,
{
method: "PUT",
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
name: params.name,
description: params.description,
}),
},
);
if (!res.ok) {
throw new Error(`Update segment failed: ${res.status}`);
}
}
async deactivateSegment(platformSegmentId: string): Promise<void> {
const res = await fetch(
`${this.config.api_base_url}/segments/${platformSegmentId}`,
{
method: "DELETE",
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
},
},
);
if (!res.ok) {
throw new Error(`Deactivate segment failed: ${res.status}`);
}
}
async getSegment(
platformSegmentId: string,
): Promise<PlatformSegment | null> {
const res = await fetch(
`${this.config.api_base_url}/segments/${platformSegmentId}`,
{
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
},
},
);
if (res.status === 404) return null;
if (!res.ok) return null;
const json = await res.json() as { id: string; name: string; status: string };
return { id: json.id, name: json.name, status: json.status };
}
}
Registering the Client
To make your custom client available to the activation pipeline, add it to the platform client factory in src/lib/connections/platforms/index.ts:
import { MyPlatformClient } from "./myplatform-client";
export function createPlatformClient(
platform: string,
credentials: Record<string, string>,
config: Record<string, unknown>,
): PlatformClient {
switch (platform) {
case "liveramp":
return new LiveRampClient(/* ... */);
case "tradedesk":
return new TradeDeskClient(/* ... */);
case "indexexchange":
return new IndexExchangeClient(/* ... */);
case "myplatform":
return new MyPlatformClient(
credentials as unknown as MyPlatformCredentials,
config as unknown as MyPlatformConfig,
);
default:
throw new Error(`Unsupported platform: ${platform}`);
}
}
Authentication Types
The connection system supports four authentication types:
| Auth Type | How It Works | Use Case |
|---|---|---|
oauth2 | Client credentials or password grant flow | LiveRamp, complex OAuth APIs |
bearer | Static API key sent as Bearer token | Trade Desk, simple API key auth |
api_key | API key sent as a custom header | Platforms using X-API-Key headers |
header | Custom header-based auth | SFTP credentials, custom headers |
none | No authentication | Public APIs, internal services |
Credential Storage
All credentials are encrypted at rest using PostgreSQL's pgp_sym_encrypt function:
- Credentials are stored as encrypted bytea in
auth_credentials_enc - The
CREDENTIALS_ENCRYPTION_KEYenvironment variable provides the symmetric key - Credentials are decrypted server-side only when needed (push, sync, test)
- The plaintext
auth_credentialsJSONB column is always set to'{}'-- it exists only for backward compatibility
Never expose decrypted credentials to the client. The getConnectionCredentials() function is server-only and requires an authenticated org context.
Configuration Schema
Connection Input
When creating or updating a connection via the API, use this shape:
interface ConnectionInput {
name: string; // Display name
platform: PlatformType; // "custom_api", "liveramp", etc.
direction: ConnectionDirection; // "inbound", "outbound", "bidirectional"
authType: AuthType; // "oauth2", "bearer", "api_key", "header", "none"
authCredentials?: Record<string, string>; // Encrypted at rest
config: Record<string, unknown>; // Platform-specific JSONB config
enabled?: boolean; // Default: true
}
Inbound Config Schema
For custom inbound (sync) connections:
interface InboundConfig {
base_url: string; // External API base URL
response_path: string; // JSON path to items array
count_endpoint: string; // Endpoint for total count
count_response_path: string; // JSON path to count value
pagination_type: "offset" | "cursor"; // Pagination strategy
pagination_params: SyncPaginationParams;
field_mappings: SyncFieldMapping[]; // Map external fields to AudienceGPT fields
filters: Record<string, string>; // Query filters
extra_headers: Record<string, string>; // Additional request headers
sync_mode: "full" | "incremental"; // Sync strategy
incremental_field: string; // Field for incremental sync
source_id_field: string; // Field for external ID tracking
}
Outbound Config Schema
Outbound configs vary by platform. Define your own interface for custom platforms. At minimum, include:
interface CustomOutboundConfig {
api_base_url: string; // Platform API endpoint
default_cpm: number; // Default pricing
currency: string; // Currency code
// Add platform-specific fields as needed
}
Building an Inbound Connector
Inbound connectors pull data from external APIs into AudienceGPT using the sync pipeline.
Sync Workflow
Configure inbound connection --> "Sync Now"
--> POST /api/connections/{id}/sync/run (create run, fetch count)
--> Loop: POST .../run/{runId}/page (fetch page, classify, insert)
--> GET .../run/{runId}/status (poll/cancel)
Field Mappings
Map external API fields to AudienceGPT's topic fields:
{
"field_mappings": [
{ "source": "segment_name", "target": "topic_name" },
{ "source": "category", "target": "parent_category" },
{ "source": "external_id", "target": "external_id" }
]
}
Pagination
The sync engine supports two pagination strategies:
| Strategy | How It Works | Config |
|---|---|---|
| Offset | ?offset=0&limit=100 | { "offset_param": "offset", "limit_param": "limit", "page_size": 100 } |
| Cursor | ?cursor=abc123 | { "cursor_param": "cursor", "page_size": 100 } |
Building an Outbound Connector
Outbound connectors push classified segments to external DSP platforms.
Step-by-Step
- Define credential and config interfaces for your platform
- Implement
PlatformClientwith all five methods - Register the client in the platform factory (
platforms/index.ts) - Add the platform type to
PlatformTypeif not usingcustom_api - Create an output template in Admin > Output Templates for naming
- Test the connection via the UI or API
Handling Platform-Specific Behavior
Different platforms handle segments differently. Consider these patterns:
| Pattern | Example | Implementation |
|---|---|---|
| REST API | Trade Desk, LiveRamp | Standard HTTP client with retry |
| File upload | Index Exchange | Generate files, upload via SFTP/S3 |
| Batch API | Some DSPs accept bulk payloads | Batch multiple segments per request |
| Webhook | Push notifications | Register webhook, handle callbacks |
Retry Logic
All built-in clients implement exponential backoff retry. Follow this pattern:
const MAX_RETRIES = 3;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const res = await fetch(url, options);
// Don't retry client errors (except rate limits)
if (res.ok || (res.status !== 429 && res.status < 500)) {
return res;
}
// Will retry on 429 and 5xx
} catch (err) {
// Will retry on network errors
}
if (attempt < MAX_RETRIES) {
await new Promise(r => setTimeout(r, 1000 * 2 ** (attempt - 1)));
}
}
Request Timeouts
Use AbortController to enforce request timeouts:
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 30_000);
try {
const res = await fetch(url, { signal: controller.signal });
// handle response
} finally {
clearTimeout(timeout);
}
Output Templates
Each outbound connection can reference an output template that controls segment naming and description formatting. Templates are configured in Admin > Output Templates and use {{field}} placeholders.
Template Key Assignment
Each connection can have an output_template_key in its config that determines which template generates the segment name:
- If
output_template_keymatches a built-in platform (tradedesk,liveramp), the dedicated DB column is used - Otherwise, names are stored in the
platform_outputsJSONB column keyed by the template key
Creating a Custom Template
- Navigate to Admin > Output Templates
- Click Create Template
- Configure path fields, separators, and description template
- Set the template as active
- Reference the template key in your connection's config
Testing and Validation
Connection Test
Every PlatformClient must implement testConnection(). This method should:
- Verify credentials are valid (authenticate with the platform)
- Confirm the account has appropriate permissions
- Return
{ success: true }or{ success: false, error: "description" } - Be lightweight -- avoid modifying any data
Test via the API:
POST /api/connections/{id}/test
Unit Testing
Follow the pattern used by the built-in clients. Key testing considerations:
- Mock external API calls -- never hit real APIs in tests
- Test retry behavior -- simulate 429 and 5xx responses
- Test timeout handling -- simulate slow responses
- Test credential validation -- ensure meaningful errors for missing fields
- Isolate token caching -- reset cache between tests to prevent state leakage
Example test structure:
import { describe, test, expect, mock, beforeEach } from "bun:test";
import { MyPlatformClient } from "./myplatform-client";
describe("MyPlatformClient", () => {
const credentials = { api_key: "test-key", account_id: "test-acct" };
const config = { api_base_url: "https://api.example.com", default_cpm: 2.0, currency: "USD" };
beforeEach(() => {
// Reset any module-level state
});
test("testConnection returns success on 200", async () => {
global.fetch = mock(() =>
Promise.resolve(new Response("OK", { status: 200 }))
);
const client = new MyPlatformClient(credentials, config);
const result = await client.testConnection();
expect(result.success).toBe(true);
});
test("createSegment returns platform ID", async () => {
global.fetch = mock(() =>
Promise.resolve(new Response(JSON.stringify({ id: "seg_123" }), { status: 201 }))
);
const client = new MyPlatformClient(credentials, config);
const result = await client.createSegment({
name: "Test Segment",
description: "Test description",
cpm: 2.0,
currency: "USD",
dataSourceLocation: "test",
});
expect(result.platformSegmentId).toBe("seg_123");
});
});
Limits and Constraints
| Constraint | Value | Description |
|---|---|---|
| Max connections per org | 20 | Across all platforms and directions |
| Activation batch size | 25 | Segments pushed per API call |
| Max activation run errors | 500 | Error records stored per run |
| Push retry attempts | 3 | Per-segment retry limit |
| API request timeout | 30 seconds | Per-request timeout |
API Reference
| Endpoint | Method | Description |
|---|---|---|
/api/connections | GET | List all connections |
/api/connections | POST | Create a connection |
/api/connections/{id} | GET | Get connection details |
/api/connections/{id} | PUT | Update connection |
/api/connections/{id} | DELETE | Delete connection |
/api/connections/{id}/test | POST | Test connection |
/api/connections/{id}/push/run | POST | Start push run |
/api/connections/{id}/push/run/{runId}/push | POST | Push next batch |
/api/connections/{id}/push/run/{runId}/status | GET | Poll status |
/api/connections/{id}/sync/run | POST | Start sync run |
/api/connections/{id}/sync/run/{runId}/page | POST | Sync next page |
/api/connections/{id}/sync/run/{runId}/status | GET | Poll sync status |
For complete schemas, see the API Reference.