Skip to content

Connect Your CDP to the Recommender API

Overview

Audience: Customer engineers and integration partners wiring a customer data platform (CDP), analytics stack, or homegrown event source into the Recommender API recommendation backend.

The Recommender API learns from the behavioral events your audience generates — page views, clicks, scroll depth — and turns them into personalized recommendations. To do that, the Recommender API needs a stream of those events in one canonical shape, the event envelope. This guide explains that contract and walks through connecting each kind of event source.

What the Recommender API needs from your CDP

The Recommender API’s recommendation models are trained on user–item interactions. Whatever your source emits, the translation you write needs to answer four questions for every event:

NeedsMeaningExamples
WhoA stable identifier for the user (user_id)logged-in ID, cookie ID, CDP profile ID
WhatThe content item interacted with (item_id)your CMS’s internal record ID (e.g. WordPress post_id, Arc XP _id)
HowThe kind of interaction (event_type)page view, click, save
WhenA timezone-aware timestamp (timestamp)2026-03-27T14:30:00+00:00

Event taxonomy

The Recommender API accepts a fixed set of event_type values — the Supported Event Types table in the API Developer Guide is the authoritative list. On the CDP side your job is to map your source’s native event names onto those values and drop events that don’t map (ad-stack telemetry, consent pings, etc.) rather than forcing them in.

Identity model

  • user_id must be stable per visitor over time — the same person should resolve to the same user_id across sessions. A per-page-load random value destroys the model’s ability to learn.
  • Anonymous IDs are fine (a cookie ID, a CDP-assigned profile ID) as long as they’re stable.
  • If a visitor logs in mid-session and your source switches from a cookie ID to an account ID, those look like two different users to the Recommender API. Decide an identity-resolution strategy on your side; see anonymous vs identified users.

item_id conventions

item_id is echoed back verbatim in recommendation responses, so it must be something your front end can resolve to a renderable item. Use your CMS’s internal record ID — the immutable primary key it assigns each content piece (e.g. WordPress post_id, Arc XP _id). Don’t use the canonical URL or its slug: URLs are content, not keys, and editorial rewrites (SEO changes, recategorization) orphan every interaction recorded against the old value. Whatever you choose, apply it consistently — the same article must always produce the same item_id. See the Item ID Guidance for the full rationale and stability rules.

Batching guidance

Batch for throughput, but don’t sit on events: behavioral recency matters for recommendations. A few seconds to a few minutes of buffering is reasonable for a forwarder; multi-hour batch windows materially degrade recommendation freshness (see the analytics-only section for the worst case). Each batch request is capped at 100 events — see the Batch API for the request format and limits.

Timestamp conventions

timestamp must be timezone-aware ISO 8601 (2026-03-27T14:30:00Z); a naive timestamp with no offset is rejected. The forwarder-specific rule worth repeating: send the time the interaction actually occurred, not the time your forwarder processed it — stale timestamps skew the recency signal that drives recommendations.

The event envelope

Every event you send is one event-envelope JSON object. The full field reference — types, required vs. optional, and the 422 validation rules — lives in the API Developer Guide (single events) and the Batch API (batched events). This section covers only what you need to produce the shape from a CDP or analytics source.

A minimal valid envelope is the four required fields — user_id, item_id, event_type, and a timezone-aware timestamp:

{
"user_id": "abc-123",
"item_id": "ZSGXFR2KNFCMPN3VHPWQR3BGCE",
"event_type": "page_view",
"timestamp": "2026-03-27T14:30:00+00:00"
}

event_value (a 0.0–1.0 ratio, e.g. scroll depth) and session_id are the optional fields you’re most likely to add:

{
"user_id": "abc-123",
"item_id": "ZSGXFR2KNFCMPN3VHPWQR3BGCE",
"event_type": "deepest_scroll",
"timestamp": "2026-03-27T14:32:00+00:00",
"event_value": 0.75,
"session_id": "sess-a1b2c3"
}

Don’t invent fields outside the documented schema, and leave schema_version unset (any value other than 1 is rejected).

event_id and de-duplication

If you omit event_id, the Collector fills it deterministically from a hash of the event’s natural identity (user_id, item_id, event_type, timestamp, session_id, and any attribution ID). Under at-least-once delivery — a forwarder retry, a redelivered S3 object — two copies of the same logical event hash to the same event_id, so downstream de-dup treats them as one. We recommend omitting event_id and letting the Recommender API derive it, unless your source emits its own stable per-event ID you specifically want preserved end to end (then set event_id to that value).

Where to send it

EndpointUse
POST /collector/v1/eventsA single event-envelope object.
POST /collector/v1/events/batchA JSON array of up to 100 envelopes.

Authenticate every request with the X-Api-Key header (treat the key as a secret; never commit it). Your tenant is resolved automatically from your unique base URL (https://<your-arc-organization>-config-prod.api.arc-cdn.net) — your Arc contact provides the exact host and key during onboarding. See the API Developer Guide for the full authentication reference.

Connecting BlueConic

BlueConic emits events via a customer-templated HTTP webhook destination (Mustache). You configure the webhook to render each outbound event directly as an event envelope, so no separate forwarder is needed.

1. Use the Recommender API BlueConic Mustache template

Download the BlueConic template bundle — a ready-to-paste blueconic.mustache with field-mapping notes in its README.md.

2. Configure the BlueConic webhook destination

In BlueConic (Connections → add a Webhook/HTTP connection):

  1. Destination URL: https://<your-arc-organization>-config-prod.api.arc-cdn.net/collector/v1/events.
  2. Method: POST, Content-Type: application/json.
  3. Headers: add X-Api-Key: <your-api-key>.
  4. Payload: paste the Mustache template; map your BlueConic profile/event properties onto the envelope fields (the user identifier → user_id, the content URL/ID → item_id, the BlueConic event type → an event_type).
  5. Timestamp: ensure the rendered timestamp is timezone-aware ISO 8601.
  6. Set the connection’s trigger/goal so it fires on the behavioral events you want the Recommender API to learn from.

BlueConic posts one event per webhook fire to the single-event endpoint; you don’t manage batching yourself. Then verify events are arriving.

Connecting Amplitude / Adobe RT-CDP / Tealium / other HTTP CDP

If your CDP can call an outbound HTTP destination but can’t render the event-envelope shape itself (unlike BlueConic’s Mustache), run a small forwarder: it receives or polls your CDP’s events, translates each to the event envelope, and POSTs batches to the Recommender API. The snippet below is a complete, self-contained forwarder core — fill in translate() with your vendor’s field mapping.

This mirrors the batching/retry conventions of the reference S3 forwarder template so the two stay consistent.

"""Minimal Recommender API HTTP forwarder. Translate your CDP records, then POST in batches of <=100."""
import json
import time
import urllib.error
import urllib.request
COLLECTOR_URL = "https://<your-arc-organization>-config-prod.api.arc-cdn.net"
BATCH_PATH = "/collector/v1/events/batch"
HEADERS = {
"X-Api-Key": "<your-api-key>", # load from a secret store; never hard-code
"Content-Type": "application/json",
}
MAX_BATCH = 100
MAX_ATTEMPTS = 3
def translate(record: dict) -> dict | None:
"""Map ONE of your CDP's records to the event envelope. Return None to drop it.
Replace the right-hand sides with your vendor's field paths. Drop events
that don't map to the Recommender API taxonomy (ad telemetry, consent pings, etc.).
"""
event_type = {"pageview": "page_view", "click": "click"}.get(record.get("type"))
if event_type is None:
return None
return {
"user_id": record["user_id"], # stable per-visitor identifier
"item_id": record["content_id"], # your CMS's internal record ID (stable primary key)
"event_type": event_type,
"timestamp": record["time"], # tz-aware ISO 8601, e.g. 2026-03-27T14:30:00Z
# Optional: "session_id", "event_value" (0.0-1.0). Omit "event_id" to let the Recommender API derive it.
}
def _post_batch(batch: list[dict]) -> None:
body = json.dumps(batch).encode("utf-8")
req = urllib.request.Request(COLLECTOR_URL + BATCH_PATH, data=body, headers=HEADERS, method="POST")
for attempt in range(MAX_ATTEMPTS):
try:
with urllib.request.urlopen(req, timeout=10) as resp: # noqa: S310
resp.read()
return
except urllib.error.HTTPError as exc:
if not (exc.code == 429 or 500 <= exc.code < 600):
raise # 4xx (e.g. 422 bad envelope) is permanent — fix the mapping
except OSError:
pass # transient network/timeout error — retry
time.sleep(2**attempt) # backoff: 1s, 2s, 4s
raise RuntimeError("batch failed after retries")
def forward(records: list[dict]) -> None:
envelopes = [e for r in records if (e := translate(r)) is not None]
for i in range(0, len(envelopes), MAX_BATCH):
_post_batch(envelopes[i : i + MAX_BATCH])

Key points: batch to at most 100, retry only on 5xx / 429 / network errors (a 422 means a bad envelope — fix translate(), don’t retry), and set the X-Api-Key auth header on every request (load it from a secret store).

Per-vendor notes

  • Amplitude — Use the Event Streaming / forwarding destination to your forwarder, or pull via the Export API. Amplitude’s user_id / device_id split maps to user_id: prefer user_id when present, fall back to device_id. Map event_type names onto the Recommender API taxonomy in translate().
  • Adobe Real-Time CDP — Use a streaming destination or the Edge Network to reach your forwarder. Resolve an XDM identity into a stable user_id; map XDM web.webPageDetails/media.* events onto the taxonomy.
  • Tealium — Use an EventStream connector (or a webhook destination from Tealium iQ) to your forwarder. The data-layer variable you use for identity becomes user_id; the page/content URL becomes item_id.
  • Any other HTTP CDP — Same shape: get the events to your forwarder, write translate(), POST batches.

Connecting an S3-transport CDP (Permutive, ActionIQ S3)

Some CDPs don’t offer an HTTP webhook destination — they stream activations to an S3 bucket as gzipped NDJSON (Permutive today; ActionIQ S3 activations potentially). The Recommender API does not ingest from object storage. Instead, run a small forwarder Lambda in your own AWS account that reads your bucket and POSTs batches to the Recommender API.

The Recommender API ships a runnable, forkable reference template for exactly this — the S3 forwarder Lambda (s3-forwarder). Download the S3 forwarder template bundle to fork into your own account.

It carries all the S3-side plumbing — s3:ObjectCreated trigger, gunzip + NDJSON parse, batching to 100, Secrets-Manager-backed auth, exponential-backoff retry, and an SQS dead-letter queue. You fork it into your account and write a translate() adapter for your CDP’s record shape (the template includes Permutive-specific field mappings as a worked example). Its bundled README covers deploy steps.

Connecting an analytics-only stack (GA4, Sophi, other)

Some sites collect behavioral events in an analytics tool, not a CDP — most commonly Google Analytics 4 (GA4) or Sophi. These tools don’t sit between your site and a destination the way a CDP does, so there’s no webhook to point at the Recommender API. The integration path is a warehouse export: read the events your analytics tool lands in its data warehouse and forward them to the Recommender API from your own cloud.

GA4 from BigQuery (warehouse export)

Read GA4’s BigQuery export and forward to the Recommender API from your own GCP project. The pattern — a scheduled query / Cloud Function over the daily events_YYYYMMDD table — inlines below (it’s small enough not to warrant a separate template directory).

"""GA4 BigQuery -> Recommender API forwarder. Run as a scheduled Cloud Function in YOUR GCP project.
Reads the prior day's events_YYYYMMDD export table, maps GA4 rows to the event envelope,
and POSTs to /events/batch (<=100 per request). The daily table is next-day batch; for
fresher data enable streaming export and read events_intraday_YYYYMMDD instead.
"""
import datetime
import json
import time
import urllib.error
import urllib.request
from google.cloud import bigquery # type: ignore[import-untyped]
COLLECTOR_URL = "https://<your-arc-organization>-config-prod.api.arc-cdn.net"
HEADERS = {
"X-Api-Key": "<your-api-key>", # load from Secret Manager; never hard-code
"Content-Type": "application/json",
}
GA4_DATASET = "analytics_000000000" # your GA4 BigQuery export dataset
MAX_BATCH = 100
# Map GA4 event_name -> Recommender API event_type. Drop everything not listed.
# Map your org-defined GA4 events (saves, scroll depth, engaged reads) onto
# article_save / deepest_scroll / engaged_read as appropriate.
_EVENT_TYPES = {
"page_view": "page_view",
"select_content": "click",
"search": "search",
"share": "share",
}
def _string_param(row: dict, key: str) -> str | None:
"""Pull a string event-param value out of GA4's repeated event_params array."""
for p in row.get("event_params", []):
if p["key"] == key:
return p["value"].get("string_value")
return None
def _to_envelope(row: dict) -> dict | None:
event_type = _EVENT_TYPES.get(row["event_name"])
item_id = _string_param(row, "content_id") # stable CMS record-ID param; do not fall back to the URL
if event_type is None or not item_id or not row.get("user_pseudo_id"):
return None
# GA4 event_timestamp is epoch microseconds, UTC.
ts = datetime.datetime.fromtimestamp(row["event_timestamp"] / 1_000_000, tz=datetime.UTC)
return {
"user_id": row.get("user_id") or row["user_pseudo_id"], # logged-in id, else pseudo id
"item_id": item_id,
"event_type": event_type,
"timestamp": ts.isoformat(), # tz-aware ISO 8601
}
def _post_batch(batch: list[dict]) -> None:
body = json.dumps(batch).encode("utf-8")
req = urllib.request.Request(
COLLECTOR_URL + "/collector/v1/events/batch", data=body, headers=HEADERS, method="POST"
)
for attempt in range(3):
try:
with urllib.request.urlopen(req, timeout=10) as resp: # noqa: S310
resp.read()
return
except urllib.error.HTTPError as exc:
if not (exc.code == 429 or 500 <= exc.code < 600):
raise # permanent (e.g. 422 bad envelope)
except OSError:
pass # transient network/timeout error — retry
time.sleep(2**attempt)
raise RuntimeError("batch failed after retries")
def run(yyyymmdd: str) -> None:
client = bigquery.Client()
table = f"{GA4_DATASET}.events_{yyyymmdd}"
rows = client.query(f"SELECT * FROM `{table}`").result() # noqa: S608 - dataset is your own config
batch: list[dict] = []
for row in rows:
envelope = _to_envelope(dict(row))
if envelope is None:
continue
batch.append(envelope)
if len(batch) == MAX_BATCH:
_post_batch(batch)
batch = []
if batch:
_post_batch(batch)

Deploy as a Cloud Function triggered by Cloud Scheduler (e.g. daily after the export lands). You operate this in your own GCP project — the Recommender API doesn’t run it for you.

Sophi integration note

Sophi collects through its own SDK / hosted pipeline. Unverified (no live vendor access at time of writing): whether Sophi exposes an outbound real-time webhook or a documented streaming/partner export API is not confirmed. If your Sophi contract includes a webhook or export, you can forward from it using the same HTTP forwarder snippet (or the S3 forwarder if the export is to S3). Confirm Sophi’s actual export capabilities with your Sophi account team before building a server-side forwarder.

Connecting a homegrown event source

If you emit events from your own application code or a homegrown pipeline, you’re already in control of the shape — just produce the event envelope and POST it. Use the inline HTTP forwarder snippet as your starting point: it shows the batching, retry, and header conventions the Recommender API expects. The only part specific to you is translate() — and if your code can emit the envelope directly, you can skip translation and POST envelopes straight to /events/batch.

Verifying events are arriving

After wiring up any path above:

  1. Send a known test event. POST a single, hand-built envelope to /collector/v1/events with a recognizable user_id (e.g. qa-smoke-001) and a current timestamp.

    Terminal window
    curl -i -X POST \
    "https://<your-arc-organization>-config-prod.api.arc-cdn.net/collector/v1/events" \
    -H "X-Api-Key: <your-api-key>" \
    -H "Content-Type: application/json" \
    -d '{"user_id":"qa-smoke-001","item_id":"test-item-1","event_type":"page_view","timestamp":"2026-03-27T14:30:00+00:00"}'
  2. Confirm the response. A 2xx (typically 202/200) means the envelope was accepted. A 422 is a validation failure — the response body names the offending field; fix it (most often a naive timestamp or an unknown event_type).

  3. Confirm auth. A 401/403 means a wrong X-Api-Key or the wrong host. Re-check your key and base URL.

  4. Confirm volume. Once live, your forwarder/webhook logs (or the S3 forwarder’s CloudWatch processed_object lines, or the GA4 function logs) should show steady accepted batches with few or no 422s.

  5. What success looks like: a steady stream of 2xx responses, validation-failure rate near zero, and recommendation quality improving as interaction history accumulates. (New tenants go through a cold-start period before history is rich enough — ask your Arc contact about expected ramp.)

Debugging quick reference:

SymptomLikely causeFix
422 on every eventBad envelope (naive timestamp, unknown event type)Read the response body; fix translate().
401 / 403Wrong X-Api-Key, or wrong hostRe-verify your key and the base URL with your Arc contact.
2xx but no recommendation liftuser_id not stable, or events all droppedConfirm translate() isn’t returning None for everything.
Recommendations feel staleBatch window too long / warehouse exportShorten buffering; enable streaming export instead of next-day batch.

Common pitfalls

  • Anonymous vs identified users. When a visitor logs in, many sources switch from a cookie/device ID to an account ID — the Recommender API then sees two distinct users and the pre-login history doesn’t carry over. Decide an identity-stitching strategy on your side (e.g. keep emitting the cookie ID as user_id, or maintain your own mapping) and apply it consistently. A per-page-load random ID is the worst case — it makes every event look like a brand-new user.
  • Event de-dup behavior. The Recommender API de-dups on event_id. Omit it and the Recommender API derives a deterministic hash from the event’s natural identity, so retries/redeliveries of the same event collapse to one. But if you mutate any of (user_id, item_id, event_type, timestamp, session_id) between retries, the Recommender API sees a different event. Send the original interaction time, not a fresh “now,” on retries.
  • Timestamp timezones. Always send timezone-aware ISO 8601. Naive timestamps are rejected. Prefer UTC; a wrong offset shifts the event in time and skews recency-based recommendations.
  • Batch size limits. /events/batch accepts at most 100 events per request. Slice larger sets. A batch of 101 is rejected — not silently truncated.
  • event_value is a ratio. It’s 0.0–1.0 (e.g. scroll depth), not a raw count or duration.
  • Don’t force-fit events. Drop events that don’t map to the Recommender API taxonomy rather than inventing an event_type — and never invent envelope fields outside the contract.
  • schema_version. Leave it unset. Sending a value other than 1 is rejected by design (so old shapes can’t silently degrade analytics).

See also