← Back to all posts
7 min readCentrali Team

Atomic Upsert: Create-or-Update Records Without Race Conditions

The new upsertRecord operation finds a record by business key and updates it, or creates one if it doesn't exist. Uses PostgreSQL advisory locking for safe concurrent writes.

FeatureComputeTutorial

If you've ever built a system that processes events, syncs external data, or runs background jobs, you've probably hit this problem: two requests arrive at the same time for the same record. One creates it, the other creates a duplicate. Or worse, one silently overwrites the other.

The classic fix is a check-then-act pattern — look up the record, decide whether to create or update. But without locking, two concurrent callers can both see "not found" and both insert. That's a race condition, and it's surprisingly common in real systems.

Today we're shipping upsertRecord — an atomic create-or-update operation that handles this correctly, across the SDK, HTTP API, and compute functions.

How It Works

You provide two things:

  • match — key-value pairs that identify the record (your business key)
  • data — the fields to set

Centrali looks for an active record where the JSONB data fields match your criteria. If found, it updates. If not, it creates. The entire operation runs inside a PostgreSQL transaction with an advisory lock, so concurrent calls with the same match criteria are serialized — no duplicates, no races.

typescript
const result = await client.upsertRecord('HourlyRollup', { match: { metricKey: 'pageviews', bucketHour: '2026-02-12T14:00' }, data: { metricKey: 'pageviews', bucketHour: '2026-02-12T14:00', count: 42 }, }); // First call: result.operation === 'created', status 201 // Second call: result.operation === 'updated', status 200 // result.data is the full record in both cases

The response tells you which path was taken, so your code can react accordingly.

Use Case: Hourly Metric Rollups

Let's say you're building an analytics dashboard. Events stream in throughout the day, and you want to maintain hourly rollup records — one per metric per hour.

Without upsert, you'd need to:

  1. Query for the existing rollup record
  2. If found, update it
  3. If not found, create it
  4. Handle the race condition where two events arrive simultaneously for the same hour

With upsert, the entire flow collapses to a single call.

In a Compute Function

Compute functions run with at-least-once delivery — the same event might be processed more than once. Upsert makes this safe by design.

typescript
async function run() { const event = executionParams; // Roll up into hourly buckets const bucketHour = event.timestamp.slice(0, 13) + ':00'; // Fetch the current rollup (if any) to increment const existing = await api.queryRecords('HourlyRollup', { where: { metricKey: { $eq: event.metricKey }, bucketHour: { $eq: bucketHour }, }, limit: 1, }); const previousCount = existing.data?.[0]?.data?.count ?? 0; const result = await api.upsertRecord('HourlyRollup', { match: { metricKey: event.metricKey, bucketHour }, data: { metricKey: event.metricKey, bucketHour, count: previousCount + 1, lastEventId: event.id, updatedAt: new Date().toISOString(), }, }); api.log(`Rollup ${result.operation}: ${event.metricKey} @ ${bucketHour} = ${previousCount + 1}`); return { success: true, data: { operation: result.operation } }; }

Because the match fields (metricKey + bucketHour) uniquely identify the bucket, concurrent executions for the same hour are serialized by the advisory lock. One creates the record, subsequent ones update it.

Via the SDK

For server-side code that syncs data from external systems, the SDK method works the same way:

typescript
import { CentraliSDK } from '@centrali/sdk'; const client = new CentraliSDK({ apiKey: process.env.CENTRALI_API_KEY, workspaceId: 'my-workspace', }); // Sync product catalog from external API async function syncProduct(externalProduct) { const result = await client.upsertRecord('Products', { match: { externalId: externalProduct.id }, data: { externalId: externalProduct.id, name: externalProduct.title, price: externalProduct.price, category: externalProduct.category, lastSyncedAt: new Date().toISOString(), }, }); console.log( result.operation === 'created' ? `New product: ${externalProduct.title}` : `Updated product: ${externalProduct.title}` ); }

Run this on a schedule or in response to a webhook — the match on externalId ensures you never get duplicate products regardless of how many times the sync runs.

Match Field Behavior

A few details worth knowing:

Match values must be primitives — strings, numbers, booleans, or null. Objects and arrays are rejected with a 400 error.

Match fields are merged into data on create — if your data doesn't include the match fields, they're added automatically. And match values always win over data values, so the created record is guaranteed to satisfy the match predicate.

Match fields are immutable on update — when updating, match fields are stripped from the data payload. This prevents accidental changes to the business key that would break future upserts for the same criteria.

Type coercion in matching — JSONB stores values as strings, so { count: 42 } and { count: "42" } match the same record. The advisory lock key is canonicalized the same way.

Null matching — matching on null finds records where the key exists with a JSON null value. It won't match records where the key is missing entirely.

Authorization

Upsert requires both create and update permissions on the structure, since the caller doesn't know in advance which path will be taken. If either permission is missing, the request is rejected before any database work happens.

API Reference

HTTP:

POST /records/slug/{structureSlug}/upsert

Request body:

json
{ "match": { "externalId": "ext-123" }, "data": { "externalId": "ext-123", "name": "Widget", "price": 9.99 } }

Response (201 Created):

json
{ "data": { "id": "...", "data": { "externalId": "ext-123", "name": "Widget", "price": 9.99 }, "version": 1 }, "operation": "created" }

Response (200 OK):

json
{ "data": { "id": "...", "data": { "externalId": "ext-123", "name": "Widget", "price": 12.99 }, "version": 2 }, "operation": "updated" }

SDK: client.upsertRecord(structureSlug, { match, data })

Compute: api.upsertRecord(structureSlug, { match, data })

Supports idempotency keys via the Idempotency-Key header for safe HTTP retries.

When to Use Upsert

Upsert is the right tool when:

  • Syncing external data — webhook handlers, scheduled imports, API polling
  • Aggregating events — counters, rollups, running totals
  • Idempotent writes — compute functions with at-least-once delivery
  • Deduplication — ensuring exactly one record per business key

For simple creates where you know the record doesn't exist, createRecord is still the right choice. For targeted updates where you have the record ID, updateRecord is more direct.

Check the SDK reference and compute functions guide for more details.

Building something with Centrali and want to share feedback about this feature?

Email feedback@centrali.io