Usage Metering Event Pipelines

A usage metering pipeline is the path a single billable action takes from the moment it happens in your product to the moment it becomes a number on an invoice — and it operates within the broader Tenant Billing & Usage Metering framework that turns raw activity into revenue. Get this pipeline wrong and you bill the wrong tenant, double-count under retries, or lose events during a deploy; get it right and metering becomes a boring, auditable stream that survives partial failures without losing a cent of recognized usage.

The premise is that usage is an event, not a database row you increment. Every API call, every gigabyte stored, every seat activated is emitted as an immutable fact carrying a tenant identifier, a metric name, a quantity, a timestamp, and an idempotency key. Those facts flow through a durable log, get aggregated into per-tenant windows, and land in a store partitioned by tenant so that rating and invoicing read only the rows that belong to one customer. Incrementing a counter in place cannot survive a crash, cannot be replayed, and cannot be audited — an event log can do all three.

Prerequisites

Before wiring usage events into a stream, confirm the surrounding infrastructure and contracts are in place. A metering pipeline is only as trustworthy as the weakest stage in it.

Step-by-Step Implementation

The build proceeds in five stages: define the event, emit it from the product, ingest it through a stream, aggregate it into windows, and write it to a tenant-partitioned store. Each step below is independently runnable.

Step 1 — Define an immutable usage event

The event is the contract between every producer and every consumer, so pin it down first. A usage event is a fact about the past: it has already happened and never changes. Carry the tenant identity inside the event itself — downstream stages must never have to re-derive which tenant an event belongs to.

export interface UsageEvent {
  idempotencyKey: string;   // UUIDv4 or hash(tenantId + metric + sourceId)
  tenantId: string;         // stamped from server-side context, never the client body
  metric: string;           // 'api.request' | 'storage.gb_hour' | 'seat.active'
  quantity: number;         // additive units; always >= 0
  eventTime: string;        // ISO-8601 of when the action occurred, not when emitted
  source: string;           // service that produced the event, for lineage
  metadata?: Record<string, string | number>;
}

export function newUsageEvent(p: Omit<UsageEvent, 'idempotencyKey'> & { idempotencyKey?: string }): UsageEvent {
  if (p.quantity < 0) throw new Error('Usage quantity must be non-negative');
  return { ...p, idempotencyKey: p.idempotencyKey ?? crypto.randomUUID() };
}

The eventTime field is the moment the action occurred, not the moment it was published. Aggregating on publish time corrupts late-arriving events into the wrong billing window; the disciplines around late data and event-time windows belong to tenant-partitioned time-series for metering.

Step 2 — Emit events with the tenant key as the partition key

Publish each event to the stream with the tenantId as the partition key. This keeps all of one tenant's events on the same partition, which preserves per-tenant ordering and lets a single consumer aggregate a tenant without cross-partition coordination.

import { Kafka } from 'kafkajs';
import { UsageEvent } from './usage-event';

const kafka = new Kafka({ clientId: 'metering-producer', brokers: [process.env.KAFKA_BROKERS!] });
const producer = kafka.producer({ idempotent: true, maxInFlightRequests: 1 });

export async function emitUsage(event: UsageEvent): Promise<void> {
  await producer.send({
    topic: 'usage.events',
    messages: [{
      key: event.tenantId,                 // partition by tenant -> ordered, co-located
      value: JSON.stringify(event),
      headers: { idempotencyKey: event.idempotencyKey, metric: event.metric },
    }],
    acks: -1,                              // wait for all in-sync replicas
  });
}

The producer is configured idempotent: true with acks: -1, which makes Kafka deduplicate retried sends within a producer session and guarantees the write reached every in-sync replica. This handles producer-side retries; consumer-side duplicates from rebalances are a separate problem solved by idempotent usage event ingestion.

Step 3 — Ingest from the stream into an aggregator

A consumer reads from the topic, validates the event, and folds its quantity into an in-flight window keyed by tenant and metric. Commit offsets only after the aggregate is durably accounted for — committing before persistence loses events on a crash.

import { Kafka } from 'kafkajs';
import { UsageEvent } from './usage-event';
import { foldIntoWindow } from './aggregator';

const consumer = new Kafka({ clientId: 'metering-consumer', brokers: [process.env.KAFKA_BROKERS!] })
  .consumer({ groupId: 'usage-aggregator', maxBytesPerPartition: 1_048_576 });

export async function runConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'usage.events', fromBeginning: false });
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ message, partition, topic }) => {
      const event = JSON.parse(message.value!.toString()) as UsageEvent;
      const dedupKey = message.headers?.idempotencyKey?.toString() ?? event.idempotencyKey;
      await foldIntoWindow(event, dedupKey);          // dedups + accumulates
      await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
    },
  });
}

For an SQS-based pipeline the shape is the same: receive a batch, fold each message, then delete the message only after the fold succeeds. With Kinesis you checkpoint the shard iterator after the batch persists. In all three, the rule is identical — acknowledge after durability, never before.

Step 4 — Aggregate into per-tenant time windows

Raw events are too granular and too numerous to invoice directly. Roll them into fixed windows — hourly or daily per tenant per metric — and dedup by idempotency key so a replayed event never inflates a total. The aggregate is the unit that rating reads.

import redis
from dataclasses import dataclass

r = redis.Redis(decode_responses=True)

@dataclass
class Fold:
    tenant_id: str
    metric: str
    window: str          # '2026-06-21T14' for an hourly bucket
    quantity: float
    dedup_key: str

def fold_into_window(f: Fold) -> bool:
    # SETNX returns False if the key already counted -> at-least-once becomes exactly-once
    if not r.set(f"seen:{f.dedup_key}", 1, nx=True, ex=86_400 * 35):
        return False
    bucket = f"agg:{f.tenant_id}:{f.metric}:{f.window}"
    r.hincrbyfloat(bucket, "quantity", f.quantity)
    r.hincrby(bucket, "events", 1)
    r.expire(bucket, 86_400 * 40)
    return True

The SET ... NX is the exactly-once hinge: the first time a dedup_key is seen it claims the slot and the quantity is added; every replay after that returns early and changes nothing. The TTL on seen: must exceed your maximum replay horizon — a 35-day window covers a full monthly billing cycle plus reprocessing slack.

Step 5 — Flush aggregates to the tenant-partitioned store

Periodically drain closed windows from the fast in-memory layer into the durable, queryable store. Partition the sink by tenant so that every rating and invoicing query is bounded to a single customer's rows.

-- TimescaleDB hypertable, partitioned by time and space-partitioned by tenant
CREATE TABLE usage_rollup (
  tenant_id   text        NOT NULL,
  metric      text        NOT NULL,
  window_start timestamptz NOT NULL,
  quantity    double precision NOT NULL,
  event_count integer     NOT NULL,
  PRIMARY KEY (tenant_id, metric, window_start)
);
SELECT create_hypertable('usage_rollup', 'window_start',
  partitioning_column => 'tenant_id', number_partitions => 16);

-- Idempotent flush: a re-flushed window overwrites, never duplicates
INSERT INTO usage_rollup (tenant_id, metric, window_start, quantity, event_count)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tenant_id, metric, window_start)
DO UPDATE SET quantity = EXCLUDED.quantity, event_count = EXCLUDED.event_count;

The composite primary key (tenant_id, metric, window_start) makes the flush itself idempotent: re-running a window's flush overwrites the row rather than appending a duplicate. Once usage is in this store, the rated totals feed the billing sync with Stripe layer that turns them into invoice line items, while real-time consumption is compared against plan ceilings by the subscription and plan enforcement layer.

Choosing a Transport

The stream you pick shapes ordering guarantees, replay cost, and operational burden. The table below maps the decision to the metering properties that matter.

Transport Ordering Replay model Dedup support Best fit
Apache Kafka Per-partition (per-tenant key) Offset rewind, full retention Idempotent producer + key High-volume, self-managed, long retention
Amazon Kinesis Per-shard Shard iterator by timestamp Application-level AWS-native, managed scaling
Amazon SQS (FIFO) Per message-group None (consume-once) Built-in 5-min dedup Ordered, lower throughput, simplest ops
Amazon SQS (standard) None None Application-level only High throughput, order-insensitive metrics

SQS standard delivers at-least-once with no ordering and no native replay, so it suits additive metrics where order is irrelevant but demands an application-level dedup store. Kafka and Kinesis retain the log, which is what makes a full billing-period reprocess possible after a rating-logic fix — a capability SQS cannot offer.

How a usage event flows to the store

The diagram traces one event from emission to a tenant-partitioned rollup. The decisive stages are the partition key, which co-locates a tenant's events, and the dedup gate, which converts at-least-once delivery into exactly-once accounting before any quantity is added.

Dynamic Query Scoping & Connection Handling

A metering pipeline reads and writes under two very different access patterns, and both must stay tenant-scoped. Ingestion is a high-volume write path: the aggregator folds millions of events into a small number of per-tenant buckets, so the contention point is the dedup store, not the sink. Rating and invoicing are a read path that must touch exactly one tenant's rollups, which is why the sink is partitioned by tenant_id — a query for tenant A must be physically prevented from scanning tenant B's chunks.

Connection handling diverges by stage. Consumers hold long-lived connections to the broker and short, pooled connections to the dedup store; size the consumer pool to the partition count, because one consumer per partition is the natural concurrency unit and over-provisioning beyond it only adds idle connections. The flush job batches writes to the rollup store and should use a transaction-pooled connection so that a burst of window closures does not exhaust the sink's connection limit. When the same store backs other tenant data, the broader query-scoping discipline lives in tenant-aware data routing and query scoping, which governs how every read is bound to a tenant before it reaches the engine.

Stage Access pattern Connection model Scoping mechanism
Producer Fire-and-forget write Pooled broker client Tenant as partition key
Consumer Sequential read per partition One client per consumer group Partition assignment per tenant set
Dedup store High-frequency key check Pipelined Redis pool Key namespaced by dedup key
Rollup sink Batched upsert Transaction-pooled writer Composite key on tenant_id

A subtle interaction: if the dedup store and the rollup sink can diverge — the SET NX succeeds but the flush later fails — you must replay from the stream, not from the dedup store, because the stream is the only durable source of truth. Treating Redis as the system of record instead of a cache is a classic way to lose usage on an eviction.

Security Enforcement & Access Control

Metering data is billing data, and billing data is a tampering target. The first and most important control is that tenant_id is stamped server-side from authenticated context, never read from a client-supplied event body — a tenant that can set its own tenant_id can charge usage to a competitor or zero out its own bill. Validate the originating identity against the auth and cross-tenant access control layer before an event is ever published.

The second control is integrity in transit and at rest. The stream should enforce TLS and per-topic ACLs so a compromised service cannot subscribe to another team's usage topic. The rollup store should enforce tenant isolation at the engine, not just the application, so a query bug cannot read across tenants.

Layer Mechanism Enforces Failure mode if absent
Emission Server-stamped tenant_id Correct attribution Usage billed to wrong tenant
Transport TLS + topic ACLs Confidentiality, access Cross-team usage exposure
Ingestion Idempotency dedup Exactly-once accounting Double-billing on replay
Rollup store Row-level / partition isolation Tenant read scope Cross-tenant usage leak

Order matters: dedup must run before the quantity is added, and tenant attribution must be fixed before the event enters the stream. An event that enters the log with the wrong tenant is permanently mis-attributed, because the log is immutable — there is no clean correction short of an explicit reversing event.

Operational Overhead & Scaling Metrics

A metering pipeline fails quietly — a stalled consumer or a growing lag does not throw an error, it just silently stops billing. Track these signals and act on the thresholds before a billing cycle closes on stale data.

Metric Healthy threshold Mitigation when exceeded
Consumer lag < 10k messages / partition Add consumers up to partition count; profile fold latency
Dedup-store hit rate (replays) < 1% of events Investigate producer retries or rebalance storms
End-to-end event latency < 60 s p99 emit-to-rollup Shorten window flush interval; scale aggregators
Dead-letter queue depth ~ 0 / hour Inspect schema drift; a spike means poison events
Flush failure rate < 0.1% Backpressure flush; ensure replay from stream, not cache
Rollup vs. raw reconciliation drift 0 (must match) Replay the window from stream retention

The most expensive failure is silent under-counting: a consumer that crashes mid-batch and commits its offset before persisting loses every event in that batch with no error logged. Always commit offsets after durability, and run a periodic reconciliation that re-derives a window's total from the raw stream and compares it to the rollup — a non-zero drift is the only reliable signal that the pipeline is dropping usage.

Pitfalls & Anti-Patterns

Trusting client-supplied tenant or quantity. An event whose tenant_id or quantity comes from the request body is a direct path to billing fraud. Stamp tenant from authenticated server context and validate quantity bounds before publishing; treat the client as adversarial.

Committing offsets before persistence. Acknowledging an event — committing a Kafka offset, deleting an SQS message, checkpointing a Kinesis shard — before its quantity is durably accounted for loses usage silently on any crash. Always acknowledge after durability, never before.

Aggregating on publish time instead of event time. Bucketing a late-arriving event by when it was published, rather than when the action occurred, drops it into the wrong billing window and corrupts both windows. Carry an explicit event_time and window on that.

No idempotency key. At-least-once transports redeliver on every rebalance and retry. Without a dedup key and a store to check it against, every redelivery becomes an extra charge. The key must be deterministic and stable across retries.

Treating the cache as the system of record. Holding in-flight aggregates in Redis is correct, but the durable source of truth is the stream's retained log. If a flush fails, replay from the stream — never reconstruct billing from an evictable cache that may have silently lost keys.

Frequently Asked Questions

Why emit usage as events instead of incrementing a counter? An incremented counter cannot be replayed, cannot be audited, and cannot survive a crash mid-update without ambiguity. An immutable event log can be reprocessed after a rating-logic fix, reconciled against rollups to detect drift, and audited line by line, which is exactly what billing disputes and compliance reviews demand.

Which transport should I choose for multi-tenant metering? Pick Kafka or Kinesis when you need to replay a full billing period after a fix, since both retain the log; pick SQS FIFO for ordered, lower-throughput metrics with the simplest operations. Use the tenant ID as the partition or message-group key in all cases so one tenant's events stay ordered and co-located.

How do I prevent double-billing under at-least-once delivery? Attach a deterministic idempotency key to every event and check it against a dedup store before adding the quantity. A SET NX in Redis or a unique constraint in the sink turns at-least-once delivery into exactly-once accounting; the dedup TTL must exceed your maximum replay horizon.

Where does the usage data go after aggregation? Closed per-tenant windows are flushed to a tenant-partitioned store such as TimescaleDB or ClickHouse, where rating reads only one customer's rows. From there, totals feed Stripe for invoicing and feed plan-limit checks for real-time enforcement.