Skip to main content

Custom Connectors

AudienceGPT's connection architecture is designed to be extensible. Beyond the built-in LiveRamp, Trade Desk, and Index Exchange integrations, you can build custom connectors for any platform that exposes an API or accepts file-based uploads. This guide covers the connector architecture, the PlatformClient interface, configuration schemas, authentication types, and testing.


Architecture Overview

All external integrations in AudienceGPT are managed through the unified platform connections system. Each connection is a record in the platform_connections table with:

  • A platform type identifier
  • A direction (inbound, outbound, or bidirectional)
  • An auth type and encrypted credentials
  • A JSONB config column for platform-specific settings

Inbound vs. Outbound

DirectionPurposeData FlowExample
InboundPull data from external sourcesExternal API --> AudienceGPTSync topics from a DMP
OutboundPush segments to DSP platformsAudienceGPT --> External PlatformPush to LiveRamp, TTD
BidirectionalBoth directionsBothFull platform sync

Platform Types

The system defines four platform types. Custom connectors use the custom_api type:

type PlatformType = "custom_api" | "indexexchange" | "liveramp" | "tradedesk";

When building a new connector, register it as custom_api or add a new platform type to the PlatformType union.


The PlatformClient Interface

Every outbound connector must implement the PlatformClient interface. This is the contract that the activation pipeline uses to push segments to external platforms.

interface PlatformClient {
/** Verify that credentials and config are valid. */
testConnection(): Promise<{ success: boolean; error?: string }>;

/** Create a new segment on the platform. Returns the platform's segment ID. */
createSegment(params: CreateSegmentParams): Promise<{ platformSegmentId: string }>;

/** Update an existing segment (name, description, pricing). */
updateSegment(platformSegmentId: string, params: UpdateSegmentParams): Promise<void>;

/** Deactivate/disable a segment on the platform. */
deactivateSegment(platformSegmentId: string): Promise<void>;

/** Retrieve a segment by its platform ID. Returns null if not found. */
getSegment(platformSegmentId: string): Promise<PlatformSegment | null>;
}

Parameter Types

interface CreateSegmentParams {
name: string; // Platform-formatted segment name
description: string; // Platform-formatted description
cpm: number; // CPM price
currency: string; // Currency code (e.g. "USD")
dataSourceLocation: string; // Platform-specific data source reference
}

interface UpdateSegmentParams {
name: string; // Updated segment name
description: string; // Updated description
cpm?: number; // Optional updated CPM
}

interface PlatformSegment {
id: string; // Platform segment ID
name: string; // Segment name on platform
status: string; // Platform status (e.g. "active", "inactive")
}

Implementation Example

Here is a skeleton for a custom platform client:

import type {
PlatformClient,
CreateSegmentParams,
UpdateSegmentParams,
PlatformSegment,
} from "@/lib/connections/types";

interface MyPlatformCredentials {
api_key: string;
account_id: string;
}

interface MyPlatformConfig {
api_base_url: string;
default_cpm: number;
currency: string;
}

export class MyPlatformClient implements PlatformClient {
constructor(
private credentials: MyPlatformCredentials,
private config: MyPlatformConfig,
) {}

async testConnection(): Promise<{ success: boolean; error?: string }> {
try {
const res = await fetch(`${this.config.api_base_url}/health`, {
headers: { Authorization: `Bearer ${this.credentials.api_key}` },
});
if (!res.ok) {
return { success: false, error: `API returned ${res.status}` };
}
return { success: true };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : "Unknown error",
};
}
}

async createSegment(
params: CreateSegmentParams,
): Promise<{ platformSegmentId: string }> {
const res = await fetch(`${this.config.api_base_url}/segments`, {
method: "POST",
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
name: params.name,
description: params.description,
price: params.cpm,
}),
});

if (!res.ok) {
throw new Error(`Create segment failed: ${res.status}`);
}

const json = await res.json() as { id: string };
return { platformSegmentId: json.id };
}

async updateSegment(
platformSegmentId: string,
params: UpdateSegmentParams,
): Promise<void> {
const res = await fetch(
`${this.config.api_base_url}/segments/${platformSegmentId}`,
{
method: "PUT",
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
name: params.name,
description: params.description,
}),
},
);

if (!res.ok) {
throw new Error(`Update segment failed: ${res.status}`);
}
}

async deactivateSegment(platformSegmentId: string): Promise<void> {
const res = await fetch(
`${this.config.api_base_url}/segments/${platformSegmentId}`,
{
method: "DELETE",
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
},
},
);

if (!res.ok) {
throw new Error(`Deactivate segment failed: ${res.status}`);
}
}

async getSegment(
platformSegmentId: string,
): Promise<PlatformSegment | null> {
const res = await fetch(
`${this.config.api_base_url}/segments/${platformSegmentId}`,
{
headers: {
Authorization: `Bearer ${this.credentials.api_key}`,
},
},
);

if (res.status === 404) return null;
if (!res.ok) return null;

const json = await res.json() as { id: string; name: string; status: string };
return { id: json.id, name: json.name, status: json.status };
}
}

Registering the Client

To make your custom client available to the activation pipeline, add it to the platform client factory in src/lib/connections/platforms/index.ts:

import { MyPlatformClient } from "./myplatform-client";

export function createPlatformClient(
platform: string,
credentials: Record<string, string>,
config: Record<string, unknown>,
): PlatformClient {
switch (platform) {
case "liveramp":
return new LiveRampClient(/* ... */);
case "tradedesk":
return new TradeDeskClient(/* ... */);
case "indexexchange":
return new IndexExchangeClient(/* ... */);
case "myplatform":
return new MyPlatformClient(
credentials as unknown as MyPlatformCredentials,
config as unknown as MyPlatformConfig,
);
default:
throw new Error(`Unsupported platform: ${platform}`);
}
}

Authentication Types

The connection system supports four authentication types:

Auth TypeHow It WorksUse Case
oauth2Client credentials or password grant flowLiveRamp, complex OAuth APIs
bearerStatic API key sent as Bearer tokenTrade Desk, simple API key auth
api_keyAPI key sent as a custom headerPlatforms using X-API-Key headers
headerCustom header-based authSFTP credentials, custom headers
noneNo authenticationPublic APIs, internal services

Credential Storage

All credentials are encrypted at rest using PostgreSQL's pgp_sym_encrypt function:

  • Credentials are stored as encrypted bytea in auth_credentials_enc
  • The CREDENTIALS_ENCRYPTION_KEY environment variable provides the symmetric key
  • Credentials are decrypted server-side only when needed (push, sync, test)
  • The plaintext auth_credentials JSONB column is always set to '{}' -- it exists only for backward compatibility
warning

Never expose decrypted credentials to the client. The getConnectionCredentials() function is server-only and requires an authenticated org context.


Configuration Schema

Connection Input

When creating or updating a connection via the API, use this shape:

interface ConnectionInput {
name: string; // Display name
platform: PlatformType; // "custom_api", "liveramp", etc.
direction: ConnectionDirection; // "inbound", "outbound", "bidirectional"
authType: AuthType; // "oauth2", "bearer", "api_key", "header", "none"
authCredentials?: Record<string, string>; // Encrypted at rest
config: Record<string, unknown>; // Platform-specific JSONB config
enabled?: boolean; // Default: true
}

Inbound Config Schema

For custom inbound (sync) connections:

interface InboundConfig {
base_url: string; // External API base URL
response_path: string; // JSON path to items array
count_endpoint: string; // Endpoint for total count
count_response_path: string; // JSON path to count value
pagination_type: "offset" | "cursor"; // Pagination strategy
pagination_params: SyncPaginationParams;
field_mappings: SyncFieldMapping[]; // Map external fields to AudienceGPT fields
filters: Record<string, string>; // Query filters
extra_headers: Record<string, string>; // Additional request headers
sync_mode: "full" | "incremental"; // Sync strategy
incremental_field: string; // Field for incremental sync
source_id_field: string; // Field for external ID tracking
}

Outbound Config Schema

Outbound configs vary by platform. Define your own interface for custom platforms. At minimum, include:

interface CustomOutboundConfig {
api_base_url: string; // Platform API endpoint
default_cpm: number; // Default pricing
currency: string; // Currency code
// Add platform-specific fields as needed
}

Building an Inbound Connector

Inbound connectors pull data from external APIs into AudienceGPT using the sync pipeline.

Sync Workflow

Configure inbound connection --> "Sync Now"
--> POST /api/connections/{id}/sync/run (create run, fetch count)
--> Loop: POST .../run/{runId}/page (fetch page, classify, insert)
--> GET .../run/{runId}/status (poll/cancel)

Field Mappings

Map external API fields to AudienceGPT's topic fields:

{
"field_mappings": [
{ "source": "segment_name", "target": "topic_name" },
{ "source": "category", "target": "parent_category" },
{ "source": "external_id", "target": "external_id" }
]
}

Pagination

The sync engine supports two pagination strategies:

StrategyHow It WorksConfig
Offset?offset=0&limit=100{ "offset_param": "offset", "limit_param": "limit", "page_size": 100 }
Cursor?cursor=abc123{ "cursor_param": "cursor", "page_size": 100 }

Building an Outbound Connector

Outbound connectors push classified segments to external DSP platforms.

Step-by-Step

  1. Define credential and config interfaces for your platform
  2. Implement PlatformClient with all five methods
  3. Register the client in the platform factory (platforms/index.ts)
  4. Add the platform type to PlatformType if not using custom_api
  5. Create an output template in Admin > Output Templates for naming
  6. Test the connection via the UI or API

Handling Platform-Specific Behavior

Different platforms handle segments differently. Consider these patterns:

PatternExampleImplementation
REST APITrade Desk, LiveRampStandard HTTP client with retry
File uploadIndex ExchangeGenerate files, upload via SFTP/S3
Batch APISome DSPs accept bulk payloadsBatch multiple segments per request
WebhookPush notificationsRegister webhook, handle callbacks

Retry Logic

All built-in clients implement exponential backoff retry. Follow this pattern:

const MAX_RETRIES = 3;

for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const res = await fetch(url, options);

// Don't retry client errors (except rate limits)
if (res.ok || (res.status !== 429 && res.status < 500)) {
return res;
}

// Will retry on 429 and 5xx
} catch (err) {
// Will retry on network errors
}

if (attempt < MAX_RETRIES) {
await new Promise(r => setTimeout(r, 1000 * 2 ** (attempt - 1)));
}
}

Request Timeouts

Use AbortController to enforce request timeouts:

const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 30_000);

try {
const res = await fetch(url, { signal: controller.signal });
// handle response
} finally {
clearTimeout(timeout);
}

Output Templates

Each outbound connection can reference an output template that controls segment naming and description formatting. Templates are configured in Admin > Output Templates and use {{field}} placeholders.

Template Key Assignment

Each connection can have an output_template_key in its config that determines which template generates the segment name:

  • If output_template_key matches a built-in platform (tradedesk, liveramp), the dedicated DB column is used
  • Otherwise, names are stored in the platform_outputs JSONB column keyed by the template key

Creating a Custom Template

  1. Navigate to Admin > Output Templates
  2. Click Create Template
  3. Configure path fields, separators, and description template
  4. Set the template as active
  5. Reference the template key in your connection's config

Testing and Validation

Connection Test

Every PlatformClient must implement testConnection(). This method should:

  • Verify credentials are valid (authenticate with the platform)
  • Confirm the account has appropriate permissions
  • Return { success: true } or { success: false, error: "description" }
  • Be lightweight -- avoid modifying any data

Test via the API:

POST /api/connections/{id}/test

Unit Testing

Follow the pattern used by the built-in clients. Key testing considerations:

  • Mock external API calls -- never hit real APIs in tests
  • Test retry behavior -- simulate 429 and 5xx responses
  • Test timeout handling -- simulate slow responses
  • Test credential validation -- ensure meaningful errors for missing fields
  • Isolate token caching -- reset cache between tests to prevent state leakage

Example test structure:

import { describe, test, expect, mock, beforeEach } from "bun:test";
import { MyPlatformClient } from "./myplatform-client";

describe("MyPlatformClient", () => {
const credentials = { api_key: "test-key", account_id: "test-acct" };
const config = { api_base_url: "https://api.example.com", default_cpm: 2.0, currency: "USD" };

beforeEach(() => {
// Reset any module-level state
});

test("testConnection returns success on 200", async () => {
global.fetch = mock(() =>
Promise.resolve(new Response("OK", { status: 200 }))
);

const client = new MyPlatformClient(credentials, config);
const result = await client.testConnection();
expect(result.success).toBe(true);
});

test("createSegment returns platform ID", async () => {
global.fetch = mock(() =>
Promise.resolve(new Response(JSON.stringify({ id: "seg_123" }), { status: 201 }))
);

const client = new MyPlatformClient(credentials, config);
const result = await client.createSegment({
name: "Test Segment",
description: "Test description",
cpm: 2.0,
currency: "USD",
dataSourceLocation: "test",
});
expect(result.platformSegmentId).toBe("seg_123");
});
});

Limits and Constraints

ConstraintValueDescription
Max connections per org20Across all platforms and directions
Activation batch size25Segments pushed per API call
Max activation run errors500Error records stored per run
Push retry attempts3Per-segment retry limit
API request timeout30 secondsPer-request timeout

API Reference

EndpointMethodDescription
/api/connectionsGETList all connections
/api/connectionsPOSTCreate a connection
/api/connections/{id}GETGet connection details
/api/connections/{id}PUTUpdate connection
/api/connections/{id}DELETEDelete connection
/api/connections/{id}/testPOSTTest connection
/api/connections/{id}/push/runPOSTStart push run
/api/connections/{id}/push/run/{runId}/pushPOSTPush next batch
/api/connections/{id}/push/run/{runId}/statusGETPoll status
/api/connections/{id}/sync/runPOSTStart sync run
/api/connections/{id}/sync/run/{runId}/pagePOSTSync next page
/api/connections/{id}/sync/run/{runId}/statusGETPoll sync status

For complete schemas, see the API Reference.