Skip to main content

This RFC is first created a while ago on confluence https://jurnal.atlassian.net/wiki/spaces/QON/pages/51115786492/DRAFT+RFC+Customer+Segmentation+based+on+Basic+Attributes+Backend which follows the Qontak RFC Template format.

RFC: Customer Segmentation based on Basic Attributes (Backend)

StatusRFC
OwnerCDP — Contact Service
Submitted Date2026-05-19
ApproverTBD — tech lead + infosec approver
Related DocumentsPRD — Customer Segmentation based on Basic Attributes · Epic TF-2544

1. Overview

This RFC defines the backend technical design for Rule-Based Customer Segmentation inside the Customer Data Platform (CDP).

The feature allows CDP users to define dynamic customer groups (segments) using declarative filter rules on customer properties and basic events. Segment membership is re-evaluated daily so that customers automatically enter or leave a segment when their data changes.

Segment results are exposed as reusable assets that can be consumed by Broadcast, Chatbot automation, and downstream personalization services.

Success Criteria

  1. A user can create, edit, archive, and duplicate a named segment with up to 2 criteria groups, each containing up to 3 nested filter conditions.
  2. Each filter condition supports the full operator matrix defined in the PRD (text, date, number, dropdown, multi-select, boolean, file presence).
  3. A Preview Customers endpoint evaluates the current rule set against the datamart and returns total matched count + a sample list without persisting state.
  4. A daily cron job (08:00 WIB / 01:00 UTC) re-evaluates every active segment and writes the resulting member list and metadata to MongoDB. In addition, a scoped recalculation is triggered immediately (best-effort, asynchronous) when a segment is created or when its rule set is edited, so a new or just-edited segment becomes usable within seconds rather than waiting up to ~24h for the next cron run.
  5. Segment detail and list APIs expose: total matched, percentage reach, last evaluated timestamp, per-channel reachability (WhatsApp, Email), and a readiness indicator (is_ready + processing_status) that tells the caller whether the segment's member list has been computed yet (pending / ready / stale / failed).
  6. Four permission keys control access (customers_segment_view, customers_segment_add, customers_segment_manage, customers_segment_archived) with support for ALL_ACCESS, OWNED_ONLY, and DISABLED levels.
  7. An internal API exposes the segment list per customer for downstream service integration (Broadcast, Chatbot).
  8. All segmentation queries hit the datamart (separate read-optimised SQL database), not the primary CDP MongoDB.

Out of Scope

  • Frontend / UI implementation.
  • Behavioral event segmentation (e.g., "opened email N times") — not in MVP.
  • Member Loyalty segmentation group is NICE TO HAVE and may be deferred.
  • Real-time streaming evaluation (segment recalculation is batch-daily only).
  • Automatic routing to campaign creation (the Broadcast service is responsible for consuming the segment data).
  • Performance tab / analytics charts (Metrics #2–#6 in the PRD) — deferred post-MVP.

Assumptions

  1. A datamart (PostgreSQL or ClickHouse) exists or will be provisioned that mirrors customer data in a denormalised, queryable form per company_sso_id.
  2. The datamart is populated via CDC (Kafka events from contact-service writes) — provisioning the CDC pipeline is owned by the Data/Platform team and is a prerequisite for this feature.
  3. The datamart uses PostgreSQL with exactly six top-level columns: customer_id, created_at_date, created_by_user_id, default_fields, custom_fields, and company_sso_id. All other contact data — including updated_at, updated_by, and bsuid — is stored inside default_fields JSONB. Chat-related data (chatroom presence etc.) is stored inside the nested default_fields->'chat_data' JSONB object; a non-null chat_data value indicates the contact has an active chatroom. User-defined custom fields are stored in custom_fields JSONB. No company_id column is present in the datamart; tenant isolation uses company_sso_id exclusively.
  4. Daily recalculation at 08:00 WIB is acceptable latency for segment freshness (no sub-minute SLA required).
  5. Maximum segment size per company is bounded by the company's total customer count (up to ~1M rows).
  6. Authentication is done via the existing IAGMiddleware for frontend endpoints; role/permission data comes from Launchpad CRS via RequirePermissionMiddleware (existing pattern). Internal service-to-service endpoints use BasicAuth.
  7. Segment rules allow at most 2 groups (criteria) and 3 nested conditions per group — enforced at the service layer.

Dependencies

DependencyOwnerNotes
Datamart provisioning (PostgreSQL/ClickHouse)Data / Platform teamMust contain denormalised customer rows queryable by company_sso_id
CDC pipeline: MongoDB → DatamartData / Platform teamKeeps datamart fresh; SLA to be agreed
Launchpad CRS new permission keysIdentity / CRM teamFour new segment permission keys must be registered
Broadcast ServiceBroadcast teamConsumes segment IDs/member lists for recipient list creation

2. Technical Design

Architecture & Tech Stack

The segmentation feature is implemented entirely within the existing contact-service Go application. No new microservice is introduced for MVP.

graph TB
FE["Frontend / CDP UI"]
GW["API Gateway"]
CS["contact-service (Go)"]
MongoDB[("MongoDB\n(CDP Primary)")]
Datamart[("Datamart\nPostgreSQL / ClickHouse")]
Redis[("Redis Cache")]
Kafka[["Kafka\n(cdp.segment.recalculated)"]]
Broadcast["Broadcast Service"]
Chatbot["Chatbot / Officeless"]

FE --> GW --> CS
CS -->|"Segment CRUD\nMember storage"| MongoDB
CS -->|"Rule evaluation\n(preview & recalc)"| Datamart
CS -->|"Feature flags\nCache"| Redis
CS -->|"Emit on member change"| Kafka
Kafka -->|"Subscribe"| Broadcast
Kafka -->|"Subscribe"| Chatbot
Broadcast -->|"GET /segments/:id/members\n(on-demand pull)"| CS
Chatbot -->|"GET /contacts/:id/segments\n(on-demand pull)"| CS

subgraph "contact-service internals"
SH["Segment Handler\n(HTTP)"]
SS["Segment Service\n(business logic)"]
RE["Rule Engine\n(SQL builder)"]
SR["Segment Repository\n(MongoDB)"]
DMR["Datamart Repository\n(SQL)"]
EP["Event Publisher\n(Kafka)"]
CJ["Cron Job\nSegmentRecalculation\n(daily 08:00 WIB)"]
TH["Trigger Handler\n(S2S BasicAuth)"]

SH --> SS
SS --> RE
SS --> SR
SS --> DMR
CJ --> SS
TH --> SS
SS --> EP
end

Tech stack additions:

ComponentTechnologyRationale
Segment metadata & member listMongoDB (existing)Consistent with CDP data store; document model fits variable rule schema
Segmentation query executionDatamart SQL (new DB driver)Separate read-optimised store; avoids pressure on primary MongoDB
Cron schedulinggocraft/work (existing)Same scheduler used by all existing cron jobs
Permission checkLaunchpad CRS (existing api package)Consistent with contact permission pattern
Rule EnginePure GoTranslates rule DSL → parameterised SQL WHERE clause

Sequence Diagrams

2.1 Create Segment

sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant R as Segment Repository (MongoDB)
participant BG as Background Goroutine

FE->>H: POST /iag/v1/segments\n{name, description, rule_set}
H->>H: Validate request payload\n(name max 60, desc max 250,\nrule limits)
H->>S: CreateSegment(ctx, companySsoID, payload)
S->>S: Validate rule_set structure\n(≤2 groups, ≤3 conditions/group)
S->>R: InsertOne(customer_segments)\n(last_evaluated_at = null → status "pending")
R-->>S: segmentID
S->>BG: go recalculateSegment(WithoutCancel(ctx), companySsoID, segmentID)
Note over BG: Best-effort scoped recalc (see 2.11).\nDoes not block the response. \nfailures are logged only.
S-->>H: segmentID
H-->>FE: 201 Created {segment_id}

The segment is persisted with last_evaluated_at = null, so it reports processing_status: "pending" until the scoped recalc completes. The recalc runs in a detached goroutine (context.WithoutCancel) so the HTTP response returns immediately and is never affected by recalc outcome. If the recalc cannot run (feature flag off, per-segment lock held, required deps unwired, datamart unavailable), the next daily cron is the backstop.

2.2 Preview Customers (ad-hoc, unsaved rule)

sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant RE as Rule Engine
participant DM as Datamart Repository

FE->>H: POST /iag/v1/segments/preview\n{rule_set: {...}}
H->>H: Validate rule_set\n(≤2 groups, ≤3 conditions/group)
H->>S: PreviewSegment(ctx, ruleSet, companySsoID, userSsoID, permission)
S->>RE: BuildQuery(ruleSet)
RE-->>S: baseQuery (WHERE clause + params)
S->>DM: Count(ctx, companySsoID, baseQuery)
DM-->>S: totalMatched=2500, totalBase=11700
alt view_customer = OWNED_ONLY
S->>RE: BuildOwnedQuery(baseQuery, userSsoID)
Note over RE: appends AND created_by_user_id = $N
RE-->>S: ownedQuery
S->>DM: Sample(ctx, companySsoID, ownedQuery, SEGMENT_PREVIEW_MAX_ROWS)
else ALL_ACCESS
S->>DM: Sample(ctx, companySsoID, baseQuery, SEGMENT_PREVIEW_MAX_ROWS)
end
DM-->>S: sample=[...], isTruncated
S-->>H: PreviewResult
H-->>FE: 200 OK {total_matched, total_customer_base, sample, is_truncated}

2.3 Daily Segment Recalculation (Cron)

sequenceDiagram
participant CJ as Cron Job (gocraft/work)
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant RE as Rule Engine
participant DM as Datamart Repository
participant MR as Member Repository (MongoDB)
participant Cache as Redis Cache

CJ->>S: RecalculateAllSegments(ctx)
S->>Cache: FeatureEnabled("cdp_segment_recalc_enabled")
alt feature disabled
S-->>CJ: return nil (recalc disabled)
end
S->>SR: FindActiveSegments(ctx, batchSize=100)
loop For each batch of active segments
S->>Cache: Get("segment_recalculation_stop_requested")
alt stop signal present
S->>Cache: Del("segment_recalculation_stop_requested")
Note over S: Circuit breaker triggered — exit loop cleanly
end
loop For each segment in batch
S->>RE: BuildQuery(segment.RuleSet)
RE-->>S: SQL query
S->>DM: FetchMatchingContactIDs(ctx, companySsoID, query)
DM-->>S: []contactID
S->>MR: ReplaceMembers(ctx, segmentID, contactIDs)
S->>SR: UpdateStats(ctx, segmentID, totalMatched, lastEvaluatedAt)
end
end
S-->>CJ: done (errors logged per segment, non-fatal)

2.4 Get Segment Details (with Reachability Metrics)

sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant MR as Member Repository (MongoDB)
participant CR as Contact Repository (MongoDB)

FE->>H: GET /iag/v1/segments/:id
H->>S: GetSegmentDetails(ctx, segmentID, userPermission)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment (metadata, rule_set)
S->>MR: CountMembers(ctx, segmentID)
MR-->>S: totalMatched
alt segment is active
S->>MR: CountMembersWithPhone(ctx, segmentID)
MR-->>S: whatsappReachable
S->>MR: CountMembersWithEmail(ctx, segmentID)
MR-->>S: emailReachable
S->>CR: CountAll(ctx, companySsoID)
CR-->>S: totalCustomers
end
S-->>H: SegmentDetailResponse
H-->>FE: 200 OK {segment info, stats, reachability}

2.5 Archive Segment

sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)

FE->>H: PATCH /iag/v1/segments/:id/archive
H->>S: ArchiveSegment(ctx, segmentID, userSSOID, permission)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment
S->>S: Check OWNED_ONLY permission\n(segment.CreatedBy == userSSOID?)
S->>SR: UpdateStatus(ctx, segmentID, "archived")
SR-->>S: ok
S-->>H: ok
H-->>FE: 200 OK

2.6 Segment Recalculated Event (Post-Recalculation)

Each event notifies subscribers that a segment's membership has been recalculated — one Kafka message per segment per recalculation run, carrying aggregate counts of how many contacts were added and removed. Downstream services (Broadcast, Chatbot) that need individual contact details pull them on-demand via GET /api/v1/segments/:id/customers.

sequenceDiagram
participant CJ as Cron Job (gocraft/work)
participant S as Segment Service
participant MR as Member Repository (MongoDB)
participant EP as Event Publisher (Kafka)
participant BS as Broadcast Service
participant CB as Chatbot / Officeless

CJ->>S: RecalculateAllSegments(ctx)
loop For each active segment
S->>MR: ReplaceMembers(ctx, segmentID, contactIDs) [in transaction]
Note over S: compute diff: added = new − old, removed = old − new
alt len(added) > 0 OR len(removed) > 0
S->>EP: Publish("cdp.segment.recalculated", SegmentRecalculatedEvent)
EP-->>BS: Kafka delivery (one msg per segment, async)
EP-->>CB: Kafka delivery (one msg per segment, async)
end
end
S-->>CJ: done

Event payload (cdp.segment.recalculated topic):

One message per segment per recalculation run:

{
"event_type": "segment.recalculated",
"segment_id": "683ab...",
"segment_name": "Loyal WhatsApp Customers",
"company_sso_id": "company_abc",
"recalc_run_id": "01J...",
"rule_version": 3,
"total_matched": 2500,
"total_added": 150,
"total_removed": 32,
"occurred_at": "2026-06-10T01:00:00Z"
}

Go struct and publisher interface (internal/app/service/segment/event_publisher.go):

type SegmentRecalculatedEvent struct {
EventType string `json:"event_type"` // always "segment.recalculated"
SegmentID string `json:"segment_id"`
SegmentName string `json:"segment_name"`
CompanySsoID string `json:"company_sso_id"`
RecalcRunID string `json:"recalc_run_id"` // ULID; idempotency key for consumers
RuleVersion int64 `json:"rule_version"` // segment.Version at time of recalculation; lets consumers correlate which rule definition produced the result
TotalMatched int64 `json:"total_matched"`
TotalAdded int64 `json:"total_added"`
TotalRemoved int64 `json:"total_removed"`
OccurredAt time.Time `json:"occurred_at"` // UTC timestamp of the recalc run
}

// ISegmentEventPublisher is injected into SegmentService; implemented by kafkaSegmentPublisher.
// Publish sends one Kafka message per segment per recalculation run.
type ISegmentEventPublisher interface {
Publish(ctx context.Context, event SegmentRecalculatedEvent) error
}

Publishing sequence (within processSegment, per segment):

  1. The MongoDB session transaction commits successfully (ReplaceMembers + InsertEvents both persist).
  2. Guard: only if len(added) > 0 OR len(removed) > 0 — no-diff runs emit nothing.
  3. Build one SegmentRecalculatedEvent with TotalAdded = len(added), TotalRemoved = len(removed), TotalMatched = len(newIDs). OccurredAt = the run-start timestamp from RecalculateAllSegments (same for all segments in one cron invocation); RecalcRunID = the ULID generated at cron run start.
  4. Call eventPublisher.Publish(ctx, event).
    • Kafka message: topic = "cdp.segment.recalculated", key = []byte(segment_id) (all recalc events for one segment route to the same partition), value = JSON(event).
    • The implementation sends the single message and blocks until the broker acknowledges or returns an error.
  5. On publish error: slog.ErrorContext(ctx, "segment event publish failed", slog.String("segment_id", seg.ID), slog.Any("error", err)) + increment Datadog metric segment.event.publish_error{segment_id}continue to the next segment. The recalculation result is not rolled back; MongoDB member data is authoritative.
  6. On MongoDB transaction failure: Publish is never called — the topic never receives false-positive notifications for a failed write.

Consumer contract: each message signals that a segment's membership has changed, carrying aggregate counts (total_added, total_removed, total_matched). Consumers that need individual contact details pull them on-demand via GET /api/v1/segments/:id/customers.

Idempotency: consumers must deduplicate by (segment_id, recalc_run_id). A retried delivery with the same tuple must be treated as a no-op.

Topic configuration: cdp.segment.recalculated, partition key = segment_id (preserves per-segment event ordering), retention = 7 days.


2.7 Broadcast / Service Integration — Get Customer Segments

sequenceDiagram
participant BS as Broadcast Service
participant H as Segment Handler (s2s)
participant S as Segment Service
participant MR as Member Repository (MongoDB)

BS->>H: GET /api/v1/contacts/:contact_id/segments
H->>S: GetSegmentsByContact(ctx, contactID, companySsoID)
S->>MR: FindSegmentsByContactID(ctx, contactID, companySsoID)
MR-->>S: []SegmentMembership{segment_id, segment_name}
S-->>H: []SegmentInfo
H-->>BS: 200 OK {segments: [{segment_id, segment_name}]}

2.8 Get Customers in Saved Segment

sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant MR as Member Repository (MongoDB)

FE->>H: GET /iag/v1/segments/:id/customers?page=1&per_page=20
H->>S: GetSegmentCustomers(ctx, segmentID, companySsoID, userSsoID, permission, page, perPage)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment
S->>MR: FindMembers(ctx, segmentID, companySsoID, userSsoID, permission, page, perPage)
Note over MR: OWNED_ONLY: filter members by created_by_user_id\nvia contact lookup before pagination
MR-->>S: []member, total
S-->>H: CustomersResult
H-->>FE: 200 OK {customers: [...], pagination}

2.9 Get Detailed Customers in Saved Segment (Internal / S2S)

sequenceDiagram
participant SVC as Downstream Service (Broadcast / Chatbot)
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant MR as Member Repository (MongoDB)
participant CR as Contact Repository (MongoDB)

SVC->>H: GET /api/v1/segments/:id/customers?page=1&per_page=100
Note over H: BasicAuth — no Launchpad permission check,\nno OWNED_ONLY filtering
H->>S: GetPrivateSegmentCustomers(ctx, segmentID, companySsoID, page, perPage)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment (existence check only)
S->>MR: FindMembers(ctx, segmentID, companySsoID, page, perPage)
MR-->>S: []contactID, total
S->>CR: FindByIDs(ctx, contactIDs, companySsoID)
CR-->>S: []ContactDetail
S-->>H: PrivateCustomersResult
H-->>SVC: 200 OK {customers: [...], pagination}

2.10 Manual Recalculation Trigger (Internal S2S)

Allows an internal service or operator tooling to trigger a targeted recalculation outside the daily schedule. The endpoint acquires the same Redis lock as the daily cron, so both can never run concurrently. Returns 202 Accepted immediately; progress is tracked via Kafka cdp.segment.recalculated events carrying the returned recalc_run_id. TriggerRecalculation intentionally bypasses cdp_segment_recalc_enabled — see the Recalculation feature flag note in §3 — so an operator can still run a targeted recalc via this endpoint even while the flag has the daily cron disabled (e.g. during a datamart maintenance window where only the unattended bulk job needs to stay off).

sequenceDiagram
participant SVC as Internal Service / Ops
participant H as Trigger Handler (S2S)
participant S as Segment Service
participant Cache as Redis Cache
participant BG as Background Goroutine

SVC->>H: POST /api/v1/segments/recalculate\n{company_sso_ids: [...], segment_ids: [...]}
H->>H: Validate company_sso_ids non-empty
H->>S: TriggerRecalculation(ctx, filter)
S->>Cache: SetNX("segment_recalculation_lock", recalcRunID, TTL)
alt lock already held
S-->>H: ErrRecalcAlreadyRunning
H-->>SVC: 409 Conflict "recalculation already in progress"
end
S->>BG: go doRecalculation(WithoutCancel(ctx), recalcRunID, filter)
Note over BG: Processes only segments matching filter.\nDefers Del("segment_recalculation_lock").\nEmits cdp.segment.recalculated Kafka events per segment.
S-->>H: recalcRunID
H-->>SVC: 202 Accepted {recalc_run_id}

Filter behaviour:

company_sso_idssegment_idsResult
Required, non-emptyOmittedAll active segments for the given companies
Required, non-emptyProvidedOnly the listed segments (verified to belong to the given companies)

2.11 Scoped Recalculation on Create / Rule Edit (Internal, Asynchronous)

When a segment is created (2.1) or its rule_set is edited (PUT /iag/v1/segments/:id), the service fires a scoped, single-segment recalculation in a detached background goroutine so the new/edited segment becomes usable within seconds instead of waiting for the next daily cron. This path is best-effort: it never blocks or fails the originating create/update request, and the daily cron remains the backstop.

Unlike the daily cron and the manual trigger API (2.10) — which both contend on the global lock segment_recalculation_lock — the on-write recalc uses a per-segment lock segment_recalculation_lock:{segmentID}, so it can run concurrently with (and is never starved by) the daily cron. To guarantee a single writer per segment, the daily cron's processSegment also acquires this per-segment lock before writing; the global lock continues to guard whole-cron concurrency across pods.

recalculateSegment checks cdp_segment_recalc_enabled, same as the cron. Unlike the manual S2S trigger (§2.10), which intentionally bypasses the flag, the create/rule-edit path is a regular user-facing action and must respect the operator's decision to pause recalculation (e.g. during a datamart maintenance window) — see the Recalculation feature flag note in §3.

sequenceDiagram
participant S as Segment Service
participant BG as Background Goroutine
participant FF as FeatureFlagService
participant Cache as Redis Cache
participant RE as Rule Engine
participant DM as Datamart Repository
participant MR as Member Repository (MongoDB)
participant SR as Segment Repository (MongoDB)
participant EP as Event Publisher (Kafka)

Note over S: After successful Insert (create)\nor rule-changing Update
S->>BG: go recalculateSegment(WithoutCancel(ctx), companySsoID, segmentID)
BG->>FF: FeatureEnabled("cdp_segment_recalc_enabled", "")
alt feature disabled
Note over BG: no-op — next daily cron will pick it up (once re-enabled)
end
BG->>Cache: SetNX("segment_recalculation_lock:{segmentID}", runID, TTL)
alt per-segment lock already held
Note over BG: a recalc for this segment is already in flight → back off
end
BG->>SR: FindByID(ctx, segmentID)
SR-->>BG: segment
BG->>RE: BuildQuery(segment.RuleSet)
RE-->>BG: SQL query
BG->>DM: FetchContactIDs(ctx, companySsoID, query)
DM-->>BG: []contactID
Note over BG: processSegment: diff vs current members,\nReplaceMembers + InsertEvents in one transaction
BG->>MR: ReplaceMembers + InsertEvents [transaction]
BG->>SR: UpdateStats(total_matched, last_evaluated_at) + ClearRuleDirtyAt
alt len(added) > 0 OR len(removed) > 0
BG->>EP: Publish("cdp.segment.recalculated", event)
end
BG->>Cache: Del("segment_recalculation_lock:{segmentID}") [deferred]

Behaviour summary:

ConditionResult
cdp_segment_recalc_enabled disabledNo-op; segment stays pending/stale until the next daily cron (once the flag is re-enabled)
Per-segment lock already heldBack off (a recalc for this segment is already running); no duplicate work
Datamart unavailable / build error / tx failureLogged + recalc_error_count incremented (same poison-handling as the cron); response already returned
Successlast_evaluated_at set, rule_dirty_at cleared → processing_status becomes ready; Kafka event emitted on non-empty diff

The recalc reuses the exact same processSegment routine as the cron (BuildQuery → FetchContactIDs → atomic ReplaceMembers + InsertEvents → UpdateStats → ClearRuleDirtyAt → conditional publish). Only the trigger and the lock key differ. An edit that does not change the rule set (name/description only) does not trigger a recalc, mirroring how rule_dirty_at is only set when rule_set changes.


Database Model

Collection: customer_segments

Stored in MongoDB (existing CDP database).

customer_segments
├── _id : ObjectID (primary key)
├── company_sso_id : string (tenant isolation; indexed)
├── name : string (max 60 chars; required)
├── description : string (max 250 chars; optional, nullable)
├── status : string (enum: "active" | "archived"; default: "active")
├── rule_dirty_at : time.Time (nullable; set to current timestamp when rule_set changes; cleared to null after recalculation completes)
├── rule_set : RuleSet (embedded document — see schema below)
├── total_matched : int64 (updated on each recalculation)
├── last_evaluated_at: time.Time (updated on each recalculation; nullable until first eval)
├── created_by : string (user SSO ID)
├── updated_by : string (user SSO ID)
├── version : int64 (monotonically increments on each user-initiated edit; starts at 1 on creation)
├── created_at : time.Time
├── updated_at : time.Time
└── is_deleted : bool (soft delete)

RuleSet embedded document:

{
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{
"field": "source",
"field_category": "default",
"field_type": "dropdown",
"operator": "is",
"value": "WhatsApp",
"value2": null
},
{
"field": "address.city",
"field_category": "default",
"field_type": "dropdown",
"operator": "is",
"value": "Jakarta",
"value2": null
}
]
},
{
"operator": "OR",
"conditions": [
{
"field": "created_at_date",
"field_category": "system",
"field_type": "date",
"operator": "before",
"value": "2026-01-01T00:00:00Z",
"value2": null
}
]
}
]
}

Indexes:

{ company_sso_id: 1, status: 1, is_deleted: 1 } // list active segments per company
{ company_sso_id: 1, name: 1 } // search by name
{ company_sso_id: 1, created_at: -1 } // sort by date
{ company_sso_id: 1, created_by: 1 } // owned-only filter

Readiness is derived, not stored. The is_ready / processing_status values exposed by the GET APIs are computed at response-mapping time from three existing persisted fields — last_evaluated_at, rule_dirty_at, and recalc_error_count — so no new column or migration is required. The mapping is:

processing_statusConditionis_ready
pendinglast_evaluated_at == null AND recalc_error_count < 3false
readylast_evaluated_at != null AND rule_dirty_at == nulltrue
stalelast_evaluated_at != null AND rule_dirty_at != nulltrue
failedlast_evaluated_at == null AND recalc_error_count >= 3false

is_ready means the segment has a queryable member snapshot (last_evaluated_at != null); a stale segment is still queryable but its members predate the latest rule edit (a refresh is pending/in flight). On create, last_evaluated_at starts nullpending; the scoped recalc (2.11) flips it to ready on success. A rule edit sets rule_dirty_atstale until the scoped recalc clears it back to ready.


Collection: customer_segment_members

customer_segment_members
├── _id : ObjectID
├── segment_id : string (ref customer_segments._id; indexed)
├── company_sso_id : string (denormalised for scoped queries; indexed)
├── contact_id : string (MongoDB ObjectID as string)
├── added_at : time.Time (timestamp of the recalculation run that added this membership)
├── recalc_run_id : string (ULID of the recalculation run; idempotency key — prevents duplicate inserts on retried runs)

Indexes:

{ segment_id: 1, company_sso_id: 1 } // all members of a segment (used by recalc)
{ contact_id: 1, company_sso_id: 1 } // all segments of a contact (service integration)
{ segment_id: 1, contact_id: 1 } // unique compound (no duplicate memberships)

Recalculation strategy: Each daily run generates a recalc_run_id (ULID) before processing any segment. For each segment: (1) load current member IDs, (2) fetch new IDs from datamart, (3) compute added = new − old and removed = old − new, (4) open a MongoDB session transaction (replica-set multi-document transaction) that atomically executes ReplaceMembers and InsertEvents together — both writes carry the recalc_run_id as an idempotency key so a retried run with the same ULID is a no-op.

Lock safety: A Redis lock (segment_recalculation_lock) with initial TTL = SEGMENT_RECALC_LOCK_TTL_SECONDS (default 7200 s / 2 h) guards against concurrent pod runs of the whole-cron / manual-trigger path. A heartbeat goroutine extends the TTL by 30 minutes every SEGMENT_RECALC_LOCK_RENEW_INTERVAL_SECONDS (default 600 s / 10 min) while the job is alive. If a pod crashes before releasing the lock, the TTL eventually expires and the next pod acquires a fresh lock with a new recalc_run_id; because ReplaceMembers is a full replacement inside a transaction, a partial prior run leaves no inconsistent state.

Per-segment lock (single-writer guarantee): The scoped on-write recalc (create / rule edit — see 2.11) does not take the global lock; it takes a per-segment lock segment_recalculation_lock:{segmentID} (TTL = SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS, default 300 s). To ensure a single writer per segment even when the daily cron and an on-write recalc overlap, the cron's processSegment also acquires this per-segment lock before writing and skips the segment if it is already held. This prevents two runs with different recalc_run_ids from both writing membership-transition events for the same segment (which would otherwise double-count added/removed for consumers). The global lock and the per-segment locks are independent key spaces, so the on-write path is never starved by an in-progress daily cron.

Poison-segment handling: A segment that fails on three consecutive daily runs is marked with a recalc_error_count increment and skipped on subsequent cron runs until manually reactivated. A Datadog metric segment.recalc.error_count (tagged by segment_id) fires on each failure.


Collection: customer_segment_membership_events

customer_segment_membership_events
├── _id : ObjectID
├── segment_id : string (ref customer_segments._id; indexed)
├── company_sso_id : string (denormalised for scoped queries; indexed)
├── contact_id : string (MongoDB ObjectID as string)
├── event : string (enum: "added" | "removed")
├── occurred_at : time.Time (timestamp of the recalculation run)
├── recalc_run_id : string (ULID of the recalculation run; used with segment_id for idempotent upsert)

Indexes:

{ segment_id: 1, company_sso_id: 1, occurred_at: -1 } // membership history per segment
{ contact_id: 1, company_sso_id: 1, occurred_at: -1 } // segment history per contact
{ occurred_at: 1 } // TTL index (90-day retention)

Documents are append-only and never updated. A TTL index expires events older than 90 days to bound collection growth. The added_at field on customer_segment_members records current membership; this collection records the full history of transitions.


Datamart Schema (confirmed — owned by Data team)

Contact-service will query this via a PostgreSQL client. The schema is confirmed as follows:

ColumnTypeNotes
company_sso_idVARCHARTenant isolation key; mandatory filter in all queries
customer_idVARCHARCDP contact ID (maps to MongoDB _id)
created_at_dateTIMESTAMPTZWhen the contact was created; top-level column for efficient range queries
created_by_user_idVARCHARUser SSO ID who created the contact; top-level column
default_fieldsJSONBAll default and Qontak-managed fields plus CDC-populated fields. Keys include: name, email, phone, source, status, owner, sex, dob, address, updated_at, updated_by, bsuid, chat_data (JSONB sub-object — non-null value indicates the contact has an active chatroom), and all other default/Qontak fields
custom_fieldsJSONBAll user-defined custom fields, keyed by field key

These are the only six columns in the datamart. Any data not listed as a top-level column is stored in default_fields or custom_fields. Contact-service must not assume the existence of any other top-level column.

Field access by field_category:

field_categorySQL access patternExample
defaultdefault_fields->>'key'default_fields->>'source', default_fields->>'updated_at'
customcustom_fields->>'key'custom_fields->>'annual_revenue'
systemdirect column referencecreated_at_date, created_by_user_id

updated_at and updated_by are stored inside default_fields — rule conditions targeting them must use field_category: "default" (not "system"). Only created_at_date and created_by_user_id are valid system fields.

Type casting for non-text operators:

field_typeCastExample
number::numeric(custom_fields->>'annual_revenue')::numeric > $2
date::date or ::timestamptz(default_fields->>'date_of_birth')::date < $2
boolean::boolean(default_fields->>'marketing_opt_in')::boolean = $2
single_line_text, multi_line_text, dropdown, urlnone (text)default_fields->>'source' = $2
file, signature, gpsnone (presence-only)default_fields->>'profile_photo' IS [NOT] NULL

Required indexes on the datamart (Data team to provision):

CREATE INDEX idx_dm_company ON customer_datamart (company_sso_id);
CREATE INDEX idx_dm_customer ON customer_datamart (customer_id);
CREATE INDEX idx_dm_created ON customer_datamart (created_at_date);
CREATE INDEX idx_dm_def_gin ON customer_datamart USING GIN (default_fields);
CREATE INDEX idx_dm_cust_gin ON customer_datamart USING GIN (custom_fields);

Without GIN indexes on the JSONB columns, every segment rule that touches default_fields or custom_fields will do a full table scan.


APIs

All segment endpoints follow existing conventions: IAGMiddleware + RequirePermissionMiddleware (for permission checks) + ContextLogger middleware, Chi router, response via myhttp.HTTPHandler. Internal S2S endpoints (/api/v1/...) use BasicAuth instead.

Route → Public URL Mapping

AuthInternal RoutePublic Base URL
IAG/iag/v1/segments/...https://api.mekari.io/internal/qontak/customer/v1/segments/...
S2S (BasicAuth)/api/v1/...https://contact-service.qontak.net/api/v1/...

Base path: /iag/v1/segments


Response Envelope

All responses (success and error) share the same outer envelope, driven by myhttp.SuccessResponse and myhttp.ErrorResponse in internal/pkg/http/.

Success envelope — HTTP 200 for all successful operations:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": {
"version": "v1.0.0",
"api_env": "production",
"trace_id": "0af7651916cd43dd8448eb211c80319c"
},
"data": [ ... ],
"pagination": { "page": 1, "per_page": 20, "total": 42, "total_pages": 3 }
}
  • data is always an array. Single-object responses (create, get-by-id, archive, etc.) are wrapped: "data": [{...}]. Empty responses are "data": null.
  • pagination is omitted for non-list responses.

Error envelope — HTTP status matches resp_code:

{
"resp_code": "404",
"resp_desc": {
"id": "Tidak ditemukan",
"en": "Not found"
},
"meta": {
"version": "v1.0.0",
"api_env": "production",
"trace_id": "0af7651916cd43dd8448eb211c80319c"
}
}

HTTP status → resp_desc mapping for segment endpoints:

HTTPresp_coderesp_desc.enTrigger
400"400""Invalid request"Missing/malformed body or query param
401"401""You are not authorized"Missing auth context (IAGMiddleware)
403"403""You are not authorized"ErrPermissionDenied — OWNED_ONLY boundary or missing permission key
404"404""Not found"ErrSegmentNotFound
409"409""Segment name already exists"ErrSegmentNameConflict
409"409""Operation not valid for current segment status"ErrConflictStatus
422"422""Unprocessable entity"ErrInvalidRule
429"429""Preview limit of 10 requests per minute exceeded"Rate limit exceeded
500"500""Internal error"Unexpected error
503"503""Segment preview is temporarily unavailable"ErrDatamartUnavailable

Per-endpoint error HTTP statuses:

EndpointPossible HTTP errors
POST /segments400, 401, 409 (name), 422
GET /segments400, 401
GET /segments/:id400, 401, 403, 404
PUT /segments/:id400, 401, 403, 404, 409 (name), 422
PATCH /segments/:id/archive400, 401, 403, 404, 409 (status)
POST /segments/:id/duplicate400, 401, 404
POST /segments/preview400, 401, 422, 429, 503
GET /segments/:id/customers400, 401, 403, 404
GET /api/v1/contacts/:id/segments400 (missing company_sso_id), 401 (BasicAuth failure)
GET /api/v1/segments/:id/customers400, 401 (BasicAuth failure), 404
POST /api/v1/segments/recalculate400 (missing company_sso_ids), 401 (BasicAuth failure), 409 (already running), 503 (disabled)

Example error responses:

// 404 — segment not found
{
"resp_code": "404",
"resp_desc": { "id": "Tidak ditemukan", "en": "Not found" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}

// 409 — name conflict
{
"resp_code": "409",
"resp_desc": { "id": "Segment name already exists", "en": "Segment name already exists" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}

// 422 — invalid rulese
{
"resp_code": "422",
"resp_desc": { "id": "Data tidak dapat diproses", "en": "Unprocessable entity" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}

// 429 — rate limit
{
"resp_code": "429",
"resp_desc": { "id": "Terlalu banyak permintaan: Preview limit of 10 requests per minute exceeded", "en": "Preview limit of 10 requests per minute exceeded" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}

// 503 — datamart unavailable
{
"resp_code": "503",
"resp_desc": { "id": "Segment preview is temporarily unavailable", "en": "Segment preview is temporarily unavailable" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}

POST /iag/v1/segments — Create Segment
Permission: customers_segment_add (ALL_ACCESS)

Request body:

{
"name": "Loyal WhatsApp Customers",
"description": "Customers from WhatsApp in Jakarta",
"rule_set": {
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{ "field": "source", "field_category": "default", "field_type": "dropdown", "operator": "is", "value": "WhatsApp" }
]
}
]
}
}

Response 200 OK:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "id": "683ab..." }]
}

GET /iag/v1/segments — List Segments
Permission: customers_segment_view (ALL_ACCESS returns all; OWNED_ONLY returns own)

Query params:

ParamTypeDescription
pageintDefault 1
per_pageintDefault 20, max 100
searchstringPartial case-insensitive name match (min 3 chars)
created_at_fromRFC3339Date range filter start (UTC)
created_at_toRFC3339Date range filter end (UTC)
statusstringactive or archived

Response 200 OK:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"id": "683ab...",
"name": "Loyal WhatsApp Customers",
"total_matched": 1050,
"version": 1,
"created_by": "user_sso_id",
"created_at": "2026-05-15T08:00:00Z",
"status": "active",
"is_ready": true,
"processing_status": "ready"
}
],
"pagination": { "page": 1, "per_page": 20, "total": 42, "total_pages": 3 }
}

is_ready / processing_status are derived (see Database Model → "Readiness is derived"); a freshly created segment whose scoped recalc has not yet finished appears as "is_ready": false, "processing_status": "pending".


GET /iag/v1/segments/:id — Get Segment Detail
Permission: customers_segment_view

Response 200 OK (payload.SegmentDetailResponse):

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"id": "683ab...",
"name": "Loyal WhatsApp Customers",
"description": "Customers from WhatsApp in Jakarta",
"status": "active",
"rule_set": {
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{ "field": "source", "field_category": "default", "field_type": "dropdown", "operator": "is", "value": "WhatsApp", "value2": null }
]
}
]
},
"total_matched": 1050,
"version": 3,
"is_ready": true,
"processing_status": "ready",
"percentage_reach": 8.97,
"last_evaluated_at": "2026-05-19T01:00:00Z",
"reachability": {
"whatsapp": { "count": 820, "percentage": 78.1 },
"email": { "count": 940, "percentage": 89.5 }
},
"created_by": "user_sso_id",
"created_at": "2026-05-15T08:00:00Z",
"updated_at": "2026-05-18T09:00:00Z"
}
]
}

Response field rules:

FieldTypeCondition
versionint64Always present; starts at 1 on creation; increments on every user-initiated edit (PUT) or archive (PATCH /archive)
is_readyboolAlways present; true once the segment has a computed member snapshot (last_evaluated_at != null). Clients should gate member-list reads (GET /:id/customers) on this to distinguish "0 members" from "not yet computed"
processing_statusstringAlways present; one of pending | ready | stale | failed (derived — see Database Model). pending right after create until the scoped recalc finishes; stale after a rule edit until refreshed; failed after 3 consecutive recalc failures with no prior successful eval
percentage_reachfloat64 (omitempty)total_matched / total_customers × 100; omitted when total_customers == 0 or contact counter unavailable
reachabilityobject (omitempty)Present only for status = "active" segments; omitted entirely for archived segments
reachability.whatsappobject | nullnull when CDC backfill is incomplete (no member has chat_data populated); non-null once any member has chat_data
reachability.emailobject | nullnull when datamart unavailable
reachability.*.countint64Absolute count of reachable contacts in the segment
reachability.*.percentagefloat64count / total_matched × 100; 0.0 when total_matched == 0

PUT /iag/v1/segments/:id returns the same shape on success (same SegmentDetailResponse), including reachability fields.


PUT /iag/v1/segments/:id — Update Segment
Permission: customers_segment_manage (ALL_ACCESS or OWNED_ONLY)

Request body: same as create. Updating rule_set sets rule_dirty_at to the current timestamp and fires a best-effort scoped recalculation (see 2.11) so members are refreshed within seconds; the daily cron remains the backstop if the scoped recalc cannot run. status remains "active". While the refresh is pending the segment reports processing_status: "stale" (members exist but predate the edit); callers can also detect this via the non-null rule_dirty_at. An edit that changes only name/description (not rule_set) does not set rule_dirty_at and does not trigger a recalc.

Response 200 OK: same shape as GET /segments/:id (returns the updated SegmentDetailResponse, including is_ready / processing_status).


PATCH /iag/v1/segments/:id/archive — Archive Segment
Permission: customers_segment_archived (ALL_ACCESS or OWNED_ONLY)

Response 200 OK:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "id": "683ab..." }]
}

POST /iag/v1/segments/:id/duplicate — Duplicate Segment
Permission: customers_segment_add

Creates a new segment with prefix "Copy of " in the name. Returns new segment ID. User is not auto-redirected (frontend responsibility).

Response 200 OK:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "id": "684cd..." }]
}

POST /iag/v1/segments/preview — Preview Customers (ad-hoc, unsaved rule)
Permission: customers_segment_view

Evaluates the provided rule_set directly against the datamart without persisting any state. Intended for real-time feedback while building or editing a segment before saving.

total_matched is always the company-wide count of customers matching the rule (ownership-agnostic). For view_customer = OWNED_ONLY, the sample list is filtered to contacts owned by the requesting user (AND created_by_user_id = $N); for ALL_ACCESS users the sample is unfiltered.

Request body:

{
"rule_set": {
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{ "field": "source", "field_category": "default", "field_type": "dropdown", "operator": "is", "value": "WhatsApp" }
]
}
]
}
}

Response 200 OK:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"total_matched": 2500,
"total_customer_base": 11700,
"is_truncated": false,
"sample": [
{
"contact_id": "abc123",
"name": "Budi Santoso",
"source": "WhatsApp",
"last_activity": "2026-05-10T08:00:00Z",
"added_by": "Andi Kurniawan",
"added_at": "2026-01-15T00:00:00Z"
}
]
}
]
}

is_truncated: true when total_matched > SEGMENT_PREVIEW_MAX_ROWS (default 10,000); the sample array is capped at that limit. total_customer_base is the company's total active contact count (denominator for the "2,500 of 11,700 matched" display).

Sample field sources:

FieldSource
contact_idcustomer_id column in datamart
namedefault_fields->>'name' in datamart
sourcedefault_fields->>'source' in datamart
last_activityMost recent activity timestamp from MongoDB activity_logs collection; null when no activity recorded
added_byDisplay name of the user resolved from created_by_user_id (datamart top-level column) via the existing user lookup pattern; empty string when user no longer exists
added_atcreated_at_date top-level column in datamart

GET /iag/v1/segments/:id/customers — Get Customers in Saved Segment
Permission: customers_segment_view

Returns the paginated list of customers from the most recent daily recalculation, sourced from the customer_segment_members MongoDB collection. For OWNED_ONLY users, only members created by the requesting user are returned.

Query params:

ParamTypeDescription
pageintDefault 1
per_pageintDefault 20, max 100

Response 200 OK:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"contact_id": "abc123",
"name": "Budi Santoso",
"source": "WhatsApp",
"added_at": "2026-06-10T01:00:00Z"
}
],
"pagination": { "page": 1, "per_page": 20, "total": 1050, "total_pages": 53 }
}

pagination.total is the count of segment members visible to the requesting user (all for ALL_ACCESS; owned-only for OWNED_ONLY). added_at is when the contact entered the segment in the most recent recalculation run. Contact fields (name, source) are resolved from the MongoDB contact collection.

Readiness caveat: while a segment's first recalculation is still pending (processing_status: "pending"), this endpoint returns an empty data array with total: 0indistinguishable from a segment that genuinely matches zero customers. Clients that need to tell these apart should read the segment detail (GET /iag/v1/segments/:id) and check is_ready / processing_status before treating an empty result as authoritative. The same applies to the S2S GET /api/v1/segments/:id/customers endpoint.


GET /api/v1/contacts/:contact_id/segments — Internal: Get Segments for a Contact
Auth: BasicAuth (machine-to-machine, no user permission check)

Query params:

ParamTypeRequiredDescription
company_sso_idstringYesTenant scope. The handler returns 400 if absent. The member-lookup query is scoped to this value, so a valid contact_id belonging to a different company returns an empty segments array rather than an error.

Response 200 OK:

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"customer_id": "abc123",
"segments": [
{ "segment_id": "seg_001", "segment_name": "Loyal Customer" },
{ "segment_id": "seg_002", "segment_name": "WhatsApp Customer" }
]
}
]
}

POST /api/v1/segments/recalculate — Internal: Trigger Segment Recalculation
Auth: BasicAuth (machine-to-machine, no user permission check)

Triggers an async recalculation for specific companies and, optionally, specific segments. Acquires the same Redis lock as the daily cron — concurrent runs are rejected with 409. Returns immediately with a recalc_run_id; callers track progress via Kafka cdp.segment.recalculated events carrying that ID.

Request body:

{
"company_sso_ids": ["company_abc", "company_xyz"],
"segment_ids": ["683ab..."]
}
FieldRequiredDescription
company_sso_idsYesNon-empty array of companies whose segments to recalculate.
segment_idsNoWhen provided, only the listed segments are processed (verified to belong to one of the given companies). When omitted, all active segments for the given companies are processed.

Response 202 Accepted:

{
"resp_code": "202",
"resp_desc": { "id": "Diterima", "en": "Accepted" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "recalc_run_id": "01J..." }]
}

Per-endpoint error HTTP statuses:

HTTPTriggerNotes
400Missing or empty company_sso_ids
401BasicAuth failure
409ErrRecalcAlreadyRunningAnother recalculation is in progress (daily cron or a prior trigger)

GET /api/v1/segments/:segment_id/customers — Internal: Get Detailed Customers in Saved Segment
Auth: BasicAuth (machine-to-machine, no user permission check)

Query params:

ParamTypeRequiredDescription
company_sso_idstringYesTenant scope. The handler returns 400 if absent. After loading the segment, the service verifies segment.company_sso_id == company_sso_id; a mismatch returns 404 to avoid leaking resource existence across tenants.
pageintNoDefault 1
per_pageintNoDefault 20, max 100

Returns a paginated list of detailed customer data for all members of a saved segment. Intended for downstream service integration (Broadcast, Chatbot). Unlike GET /iag/v1/segments/:id/customers, there is no OWNED_ONLY filtering — all segment members are always returned.

The response shape mirrors ContactSerializer with the following fields omitted: crm_data, chat_data, avatar, accounts, disabled_field, field_permission, permission, is_enable_usernames, and the scalar username field (only usernames array is included). An added_at field (from customer_segment_members) is appended to each item.

Response 200 OK (payload.PrivateSegmentCustomerItem):

{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"id": "abc123",
"company_sso_id": "company_xyz",
"company_billing_id": "billing_001",
"name": "Budi Santoso",
"email": "budi@example.com",
"phone": ["+6281234567890"],
"is_deleted": false,
"custom_fields": [{ "key": "annual_revenue", "value": "500000000" }],
"owner_id": "user_sso_owner",
"owner_name": "Andi Kurniawan",
"assignee_id": "user_sso_assignee",
"assignee_name": "Dewi Rahayu",
"usernames": [],
"source": "WhatsApp",
"source_id": "src_001",
"source_name": "WhatsApp",
"status": "active",
"status_id": "sts_001",
"status_name": "Active",
"job_title": "Manager",
"tags": ["vip", "jakarta"],
"flag": "",
"address": { "street": "Jl. Sudirman", "city": "Jakarta", "country": "ID" },
"date_of_birth": "1990-01-15",
"sex": "M",
"sex_id": "sex_001",
"sex_name": "Male",
"phone_marketing_opt_in": [],
"is_loyalty_member": false,
"added_at": "2026-06-10T01:00:00Z",
"created_at": "2026-01-15T00:00:00Z",
"updated_at": "2026-05-01T00:00:00Z",
"created_by": "user_sso_creator",
"updated_by": "user_sso_updater"
}
],
"pagination": { "page": 1, "per_page": 20, "total": 1050, "total_pages": 53 }
}

pagination.total is the total count of all segment members. added_at is when the contact entered the segment in the most recent recalculation run.


Rule Engine Design

The Rule Engine is a pure Go component (no external dependency) that translates a RuleSet struct into a parameterised SQL WHERE clause.

Operator → SQL mapping

OperatorField typesSQL
istext, dropdownfield = $N
is_nottext, dropdownfield != $N
containstextfield ILIKE $N (value wrapped in %..%)
does_not_containtextfield NOT ILIKE $N
starts_withtextfield ILIKE $N (value appended with %)
ends_withtextfield ILIKE $N (value prepended with %)
is_emptyall(field IS NULL OR field = '')
is_not_emptyall(field IS NOT NULL AND field != '')
equalsnumber, booleanfield = $N
not_equalsnumberfield != $N
greater_thannumberfield > $N
less_thannumberfield < $N
betweennumber, datefield BETWEEN $N AND $M
ondateDATE(field) = $N
not_ondateDATE(field) != $N
beforedatefield < $N
afterdatefield > $N
contains_anymulti-selectfield && ARRAY[$N]::text[] (PostgreSQL)
contains_allmulti-selectfield @> ARRAY[$N]::text[] (PostgreSQL)
is_emptyfile, signature, gps(field IS NULL) (presence check; no empty-string variant)
is_not_emptyfile, signature, gps(field IS NOT NULL)

Field resolver

Before building the SQL expression for a condition, the Rule Engine resolves the field reference using field_category + field_type:

buildFieldExpr(field, field_category, field_type) → string

system → field (e.g. "created_at_date", "created_by_user_id")
default,
text / multi_line_text / dropdown / url → "default_fields->>'{field}'"
number → "(default_fields->>'{field}')::numeric"
date → "(default_fields->>'{field}')::date"
boolean → "(default_fields->>'{field}')::boolean"
file / signature / gps → "default_fields->>'{field}'" (presence-only: is_empty/is_not_empty only; no other operators permitted)
custom,
text / multi_line_text / dropdown / url → "custom_fields->>'{field}'"
number → "(custom_fields->>'{field}')::numeric"
date → "(custom_fields->>'{field}')::date"
file / signature / gps → "custom_fields->>'{field}'" (presence-only)

is_empty / is_not_empty for JSONB fields differs from top-level columns — a key is absent or null, never an empty string:

-- is_empty (system)
created_at_date IS NULL

-- is_empty (default / custom field — text, number, date, dropdown, etc.)
default_fields->>'source' IS NULL

-- is_not_empty (default / custom field)
default_fields->>'source' IS NOT NULL

-- is_empty (file / signature / gps — presence-only; same SQL, no cast)
default_fields->>'profile_photo' IS NULL

-- is_not_empty (file / signature / gps)
default_fields->>'profile_photo' IS NOT NULL

file, signature, and gps field types support only is_empty and is_not_empty. The Rule Engine must reject any other operator for these types and return ErrInvalidRule (surfaced as HTTP 422 INVALID_RULE). multi_line_text follows the same path as text — all text operators apply.

SQL generation example

Rule:

Group 1 (AND): source (default, dropdown) is 'WhatsApp'
AND annual_revenue (custom, number) greater_than 100000000
Group 2: created_at_date (system, date) before '2026-01-01'
Top-level: AND

Generated WHERE clause:

WHERE company_sso_id = $1
AND (
(
default_fields->>'source' = $2
AND (custom_fields->>'annual_revenue')::numeric > $3
)
AND
(created_at_date < $4)
)

Parameters: ['company_abc', 'WhatsApp', 100000000, '2026-01-01T00:00:00Z']

All SQL queries use positional placeholders ($N) to prevent SQL injection. The Rule Engine must never interpolate user-supplied values directly into the query string. Field keys from the rule payload are validated against the company's registered field properties before the expression is built — an unrecognised key returns an error, never a raw SQL fragment.


3. High-Availability & Security

High Availability

  • Contact-service is stateless and horizontally scalable behind the API Gateway load balancer.
  • MongoDB replica set provides read/write availability; segment CRUD operations use the primary.
  • Datamart queries are read-only; the connection pool should be sized separately from the MongoDB driver to avoid resource contention.
  • The cron recalculation job is protected by a Redis distributed lock (segment_recalculation_lock with TTL = 2 hours) to ensure only one pod runs recalculation at a time in a multi-replica deployment.

Recalculation feature flag (cdp_segment_recalc_enabled):
A separate global MongoDB feature flag — distinct from cdp_segmentation_enabled — gates the two regular, user/schedule-driven recalculation paths: the daily cron RecalculateAllSegments and the scoped on-write recalc recalculateSegment (§2.11, fired after create / rule-changing update). Both check the flag at the start of the method/goroutine, before any lock acquisition; when disabled, they log and return/no-op immediately without touching a lock. Use this flag during datamart maintenance windows to pause all automatic recalculation without affecting the CRUD API — a segment created or edited while the flag is off simply stays pending/stale until the flag is re-enabled and the next cron run (or another edit) picks it up.

It deliberately does not gate TriggerRecalculation (§2.10) — the S2S manual trigger bypasses the flag unconditionally, so an operator retains a way to force a targeted recalculation even during a maintenance window where automatic recalculation is paused.

Recalculation circuit breaker:
An operator can gracefully stop a running recalculation without killing the pod by writing any non-empty value to the Redis key segment_recalculation_stop_requested. The recalculation loop checks this key at the start of each batch (before fetching the next page of segments). When detected, the cron: (1) deletes the key (one-shot — tomorrow's scheduled run is unaffected), (2) logs INFO "segment recalc circuit breaker triggered", (3) releases the lock (deferred), and (4) returns nil. The key carries a configurable TTL (SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDS, default 86400 s / 24 h) as a safety net in case the cron exits abnormally before detecting it. To permanently disable future runs, use the cdp_segment_recalc_enabled feature flag instead.

External Call Timeout Strategy

Every external call wraps its context with a dedicated context.WithTimeout so a single hanging call cannot block a goroutine indefinitely. Timeout values are configurable at deploy time via env vars (see Configuration Contract).

Call siteContext applied toEnv varDefault
IDatamartRepository.Count / Sample in PreviewSegmentdatamart SQL queryDATAMART_QUERY_TIMEOUT_SECONDS30 s
IDatamartRepository.FetchContactIDs in processSegment (cron)per-segment SQL queryDATAMART_RECALC_QUERY_TIMEOUT_SECONDS60 s
MongoDB session transaction in processSegment (ReplaceMembers + InsertEvents)entire transactionMONGODB_OPERATION_TIMEOUT_SECONDS30 s
ISegmentEventPublisher.Publish per segmentKafka produce + flushKAFKA_PUBLISH_TIMEOUT_SECONDS10 s
Redis lock acquire (SetNX) and heartbeat extend (Expire)Redis commandREDIS_OPERATION_TIMEOUT_SECONDS5 s

Kafka publish retry: Publish retries up to SEGMENT_KAFKA_PUBLISH_MAX_RETRIES times (default 2) with a fixed SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MS (default 500 ms) between attempts before returning a non-fatal error. Each attempt respects the per-attempt context timeout from KAFKA_PUBLISH_TIMEOUT_SECONDS. If all retries are exhausted, the failure is logged and the Datadog metric segment.event.publish_error{segment_id} is incremented; the recalculation result is not rolled back.

Performance Requirement

  • Preview endpoint (POST /segments/preview): p95 < 3s, p99 < 8s. Heavy queries hit the datamart only; no impact on MongoDB.
  • Segment list / detail APIs: p99 < 500ms (reads from MongoDB with proper indexes).
  • Daily recalculation cron: Must complete within the 2-hour lock window for the largest companies. Segments are processed in batches of 100 per goroutine pool (configurable).

Scalability measures:

  1. Horizontal pod autoscaler can be applied independently since the service is stateless.
  2. Datamart read replica(s) for query isolation.
  3. Segment member replacement uses MongoDB bulk write (BulkUpdate) to minimise round-trips.
  4. Redis cache for total_matched and percentage_reach values (TTL = 24 hours, invalidated on recalculation).

Monitoring & Alerting

SignalToolAlert Threshold
Cron job durationDatadog custom metric segment.recalc.duration_ms> 90 min
Cron job failureDatadog error log monitorAny error in recalculation loop
Preview endpoint error rateDatadog APM> 1% 5xx
Preview endpoint latencyDatadog APM p99> 8s
Datamart query errorsDatadog logAny SQL error

Datadog APM traces will be added to:

  • All HTTP handlers (existing chitrace middleware covers this)
  • Cron job start/finish spans
  • Rule engine BuildQuery() call
  • Datamart Execute() call

Logging

Structured slog logs with the following fields:

Log pointLevelFields
Segment createdINFOsegment_id, company_sso_id, created_by
Segment archivedINFOsegment_id, company_sso_id, user_sso_id
Recalculation startedINFObatch_size, total_segments
Recalculation per segmentINFOsegment_id, company_sso_id, total_matched, duration_ms
Recalculation error (per segment)ERRORsegment_id, error
Preview query executedINFOcompany_sso_id, duration_ms, total_matched
Rule engine build errorERRORrule_set_json, error

No PII (customer names, phone numbers, emails) must appear in logs. contact_id (an opaque ObjectID) is acceptable.

Security Implications

  1. SQL Injection prevention: All datamart queries use parameterised statements. The Rule Engine never interpolates user values into SQL strings. Static code analysis via SonarQube will enforce this.
  2. Tenant isolation: Every datamart query and MongoDB query includes company_sso_id as a mandatory filter. The service reads this from the authenticated request context (not from the request body).
  3. Permission enforcement: All segment endpoints check Launchpad CRS permissions. Endpoints that write data (POST, PUT, PATCH) require explicit ALL_ACCESS or OWNED_ONLY grants.
  4. OWNED_ONLY boundary: When OWNED_ONLY is evaluated, the service compares segment.created_by == authenticated_user_sso_id. This check happens in the service layer, not in the handler.
  5. Internal endpoints (/api/v1/...) are protected by BasicAuth only; they are not accessible from the internet (API Gateway restricts these paths to internal VPC traffic).
  6. Rate limiting: The preview endpoint is computationally heavy; a rate limiter (existing rate_limiter service pattern) should be applied per company_sso_id — max 10 preview requests per minute.
  7. PII compliance (ISO 27001 / 27701): Segment names and rule values may contain PII (e.g., a rule email contains @gmail.com). Rule values are stored encrypted at rest in MongoDB (existing MongoDB encryption-at-rest configuration). Rule values are excluded from logs.

4. Backwards Compatibility and Rollout Plan

Compatibility

  • All new endpoints are additive No existing endpoints are modified.
  • The customer_segments and customer_segment_members MongoDB collections are new; no schema migration is required for existing collections.
  • New permission keys (customers_segment_*) must be registered in Launchpad CRS before deploying the backend. If keys are missing, the middleware will return 403 by default (fail-secure).
  • The datamart database connection is configured via a new env var DATAMART_DSN. If unset, the preview and recalculation features will return a graceful error; segment CRUD operations are unaffected.

Configuration Contract

All new env vars, their types, and defaults. Every default must be overridable at deploy time without code changes.

Env VarTypeDefaultDescription
DATAMART_DSNstring"" (disabled)PostgreSQL DSN for the segmentation datamart; feature disabled when empty
SEGMENT_RECALC_WORKERSint4Goroutine pool size for concurrent segment recalculation
SEGMENT_RECALC_BATCH_SIZEint100Segments fetched per page from MongoDB during recalculation
SEGMENT_RECALC_LOCK_TTL_SECONDSint7200Initial Redis lock TTL for the cron job (seconds)
SEGMENT_RECALC_LOCK_RENEW_INTERVAL_SECONDSint600Heartbeat goroutine renewal interval (seconds); must be < LOCK_TTL / 4
SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDSint300TTL (seconds) for the per-segment lock segment_recalculation_lock:{segmentID} used by the scoped on-write recalc (create / rule edit) and acquired by the cron per segment to guarantee a single writer
SEGMENT_PREVIEW_RATE_LIMITint10Max preview requests per minute per company_sso_id
SEGMENT_PREVIEW_MAX_ROWSint10000Row cap for datamart queries in preview; triggers is_truncated: true when exceeded
DATAMART_QUERY_TIMEOUT_SECONDSint30context.WithTimeout deadline for preview datamart queries (seconds)
DATAMART_RECALC_QUERY_TIMEOUT_SECONDSint60context.WithTimeout deadline for per-segment FetchContactIDs queries during recalculation (seconds)
MONGODB_OPERATION_TIMEOUT_SECONDSint30context.WithTimeout deadline for MongoDB session transaction in processSegment — covers ReplaceMembers + InsertEvents together (seconds)
KAFKA_PUBLISH_TIMEOUT_SECONDSint10context.WithTimeout deadline for Publish per segment (seconds)
REDIS_OPERATION_TIMEOUT_SECONDSint5context.WithTimeout deadline for Redis lock acquire and heartbeat extend operations (seconds)
SEGMENT_KAFKA_PUBLISH_MAX_RETRIESint2Max retry attempts for Publish before declaring failure and continuing to the next segment
SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MSint500Fixed backoff delay between Kafka publish retry attempts (milliseconds)
SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDSint86400TTL (seconds) applied to the segment_recalculation_stop_requested Redis key when the trigger API writes it; auto-expires the signal if the cron pod exits before detecting it

Resource & Storage Estimates

ResourceEstimateBasis
customer_segments growth~1–5 KB/doc × N segments/companySmall; dominated by rule_set BSON; negligible at scale
customer_segment_members growth~100 bytes/doc × members × segments1M contacts × 10 segments × 100 B = ~1 GB/company before recalc churn
customer_segment_membership_events growth~150 bytes/doc; 90-day TTLAt 10% daily churn: 1M × 10% × 10 segs × 150 B × 90 days ≈ 13.5 GB/company; 90-day TTL bounds this
Datamart query load deltaRead-only; bounded by SEGMENT_RECALC_WORKERS × BATCH_SIZE concurrent queriesSeparate connection pool; no impact on primary MongoDB
Per-goroutine peak memory50–100 MB (500k contact ID set × 2 for diff)Tune SEGMENT_RECALC_WORKERS conservatively until profiled in production

Rollout Strategy

PhaseDescriptionGate
Phase 0 — InfrastructureData team provisions datamart + CDC pipeline. Launchpad team registers 4 permission keys.Datamart schema agreed and accessible from contact-service pods
Phase 1 — Segment CRUDDeploy segment CRUD endpoints behind feature flag cdp_segmentation_enabled, evaluated via the existing FeatureFlagService.FeatureEnabled(ctx, "cdp_segmentation_enabled", companySsoID) (MongoDB feature_flag collection). Default state: disabled — the flag document is absent at deploy time; FeatureEnabled returns false for all companies. When the flag is disabled for the requesting company, all IAG segment endpoints return 404 Not Found. No recalculation yet.QA sign-off on CRUD scenarios
Phase 2 — PreviewDeploy preview endpoint. Manual testing with real datamart data. Performance test preview latency.p99 < 8s confirmed in staging
Phase 3 — Recalculation CronEnable cron job in staging (08:00 WIB daily). Monitor duration and correctness.Cron completes in < 90 min for largest staging company
Phase 4 — Service IntegrationEnable /api/v1/contacts/:id/segments endpoint. Coordinate with Broadcast team.Broadcast service integration test passes
Phase 5 — Production RolloutAdd pilot company_sso_id values to the CompanySsoIDs array in the feature_flag document (code: "cdp_segmentation_enabled") → gradual rollout → all companies by setting is_general_flag: true and enabled: true on the same document.No P0/P1 alerts in first 48h for pilot group

Rollback procedure:

  1. Disable feature flag cdp_segmentation_enabled: set enabled: false on the feature_flag document with code: "cdp_segmentation_enabled" in MongoDB, or remove specific company_sso_id values from its CompanySsoIDs array for partial rollback. All IAG segment endpoints return 404 Not Found for companies where the flag evaluates to false.
  2. Cron job can be disabled by removing the cron registration from worker.CronList and redeploying.
  3. MongoDB collections (customer_segments, customer_segment_members) can be left in place — they have no impact on existing functionality.
  4. Datamart connection failure does not affect existing contact CRUD operations (separate connection pool).

5. Concerns, Questions, and Known Limitations

Open Questions

#QuestionOwnerStatus
Q1What is the confirmed datamart technology (PostgreSQL vs ClickHouse)? This affects SQL dialect (e.g., ILIKE, && operator, JSONB).Data teamResolved — PostgreSQL confirmed.
Q2What is the agreed datamart schema for custom fields — JSONB column or EAV rows? JSONB is preferred for query simplicity.Data / CDP BEResolved — Schema confirmed: 9 columns with default_fields JSONB (default + Qontak fields) and custom_fields JSONB (user fields); basic event fields are top-level columns.
Q3What is the CDC lag SLA? If CDC lag > 24h, segment results may be based on stale data.Data teamOpen
Q4Should the preview endpoint enforce a maximum result row count for datamart queries (e.g., LIMIT 10,000)? Without a cap, large companies could trigger very expensive queries.BE + PMResolvedLIMIT set to SEGMENT_PREVIEW_MAX_ROWS (default 10,000); is_truncated: true returned when results are capped; total_customer_base added to response.
Q5Is chatroom data available in the datamart for WhatsApp reachability calculation? If not, reachability can fall back to phone IS NOT NULL.Data teamResolved — Chat-related data is stored inside default_fields->'chat_data' JSONB sub-object; a non-null chat_data value indicates an active chatroom. WhatsApp reach = default_fields->'chat_data' IS NOT NULL AND default_fields->>'phone' IS NOT NULL. If chat_data is absent for all members at Phase 5 cutover (CDC backfill not yet complete), contact-service must detect this and return null for WhatsApp reachability rather than silently presenting 0 as authoritative.
Q6How should the Broadcast service consume segment members — pull (API call) or push (webhook/event on recalculation)?Broadcast + CDP BEResolved — CDP publishes one Kafka event per segment per recalculation run to topic cdp.segment.recalculated. Each event carries segment_id, segment_name, company_sso_id, aggregate counts (total_added, total_removed, total_matched), and a recalc_run_id for idempotency. Consumers react at the segment level and pull individual contact details on-demand via GET /api/v1/segments/:id/customers when needed. See sequence diagram 2.6 and event payload definition.
Q7Member Loyalty segmentation group — confirmed deferred post-MVP?PMOpen

Known Limitations

  1. Segment freshness: For existing, unchanged segments, members are updated once daily — customers whose data changes after 08:00 WIB will not move in/out of a segment until the next recalculation (by product design, MVP). The exception is the moment of create or rule edit, which triggers an immediate best-effort scoped recalc (2.11) so the segment reflects the current datamart within seconds. This is best-effort: if the recalc feature flag is off, the per-segment lock is held, or the datamart is unavailable, the segment stays pending/stale until the next daily cron. Clients should consult processing_status rather than assuming an empty member list means an empty segment.
  2. Rule complexity cap: Maximum 2 groups × 3 conditions = 6 total conditions per segment. This is enforced in the service layer and validated at API input time.
  3. Large company recalculation time: Companies with > 500k customers and many active segments may push the cron window close to the 2-hour lock TTL. Mitigation: configurable goroutine pool size and an early-exit circuit breaker.
  4. Custom field type inference: The Rule Engine must know the field_type for each condition to apply the correct operator and SQL cast. The frontend must always pass field_type in the rule condition payload; the backend validates it against the registered field properties.
  5. Membership diff memory cost: The recalculation cron holds the old and new contact ID sets in memory simultaneously to compute the diff. For a company with 500k contacts and 50 active segments this can reach ~50–100 MB per worker goroutine at peak. The goroutine pool size should be tuned conservatively for large tenants until this is profiled in production.

6. Comment Logs

DateComment(s) FromAction Item(s)
2026-05-19Initial draftN/A
2026-05-19Data team confirmationDatamart schema confirmed: PostgreSQL, 9 columns, default_fields JSONB + custom_fields JSONB + 4 top-level basic event columns. Updated assumption #3, datamart schema section, Rule Engine field resolver + SQL generation example, and resolved Q1/Q2.
2026-05-19Design decisionAdded Option B audit trail for membership transitions: new customer_segment_membership_events collection, recalculation strategy updated to diff + write events, T4.3 added, T4.4–T4.5 renumbered, Known Limitation #5 revised to reflect memory cost trade-off.
2026-06-10RFC review — priority actions(1) Added shared error contract + per-endpoint error taxonomy (error envelope, code registry, per-endpoint table, example payloads). (2) Specified recalc atomicity: MongoDB session transaction wrapping ReplaceMembers+InsertEvents; added recalc_run_id ULID idempotency key to customer_segment_members and customer_segment_membership_events; lock heartbeat goroutine; poison-segment skip at 3 consecutive failures. (3) Resolved Q4: preview capped at SEGMENT_PREVIEW_MAX_ROWS (default 10,000) with is_truncated flag and total_customer_base denominator. Resolved Q5: chat data stored in default_fields->'chat_data' JSONB sub-object (no separate has_chatroom key); WhatsApp reach = default_fields->'chat_data' IS NOT NULL AND phone IS NOT NULL; gates on chat_data availability rather than shipping phone IS NOT NULL proxy. (4) Added file/signature/gps (presence-only) and multi_line_text to Rule Engine operator table and field resolver; replaced pending_evaluation status value with rule_dirty_at nullable timestamp field. (5) Added Configuration Contract table (7 env vars with types/defaults); added Resource & Storage Estimates table; reconciled T3.1/T3.2 from database/sql+go-sqlmock to pgx/v5+pgxmock.
2026-06-10Q6 design decisionResolved Q6: CDP emits Kafka events (push) when recalculation produces a non-empty member diff. Architecture diagram updated to include Kafka broker and EventPublisher component. Sequence diagram 2.6 added. Pull API retained for on-demand queries. Publish failure is non-fatal (logged + segment.event.publish_error metric); MongoDB is authoritative. (Topic design subsequently revised — see 2026-06-12 entry.)
2026-06-12Schema + event publishing revision(1) Datamart schema simplified to exactly 6 top-level columns: company_sso_id, customer_id, created_at_date, created_by_user_id, default_fields, custom_fields. Fields updated_at, updated_by, bsuid moved into default_fields JSONB; chat presence stored in default_fields->'chat_data' JSONB sub-object (no standalone has_chatroom key); company_id column removed. Updated: assumption #3, datamart schema table, field-access table, indexes, Rule Engine field resolver and SQL example, Q5 resolution. (2) field_category values renamed: "basic_event""system", "custom_qontak" merged into "default", "custom_user""custom". Three categories now: system (top-level columns), default (default_fields JSONB), custom (custom_fields JSONB). Updated throughout: RuleSet example, field-access table, Rule Engine resolver, SQL example, and Go constants in segment.go. (3) Segment member event redesigned from aggregate to per-contact: topic renamed to cdp.segment.member_changed (singular), one message per customer per membership transition; SegmentMemberChangedEvent struct adds contact_id and event ("added"/"removed"), removes aggregate count fields; ISegmentEventPublisher.BatchPublish replaces PublishMembersChanged; message key changed from segment_id to contact_id. Task breakdown: new T4.5 (ISegmentEventPublisher), T4.4 extended with batch-publish step, T4.6 wiring updated.
2026-06-15WhatsApp reachability schema correctionReplaced standalone has_chatroom key with default_fields->'chat_data' JSONB sub-object. A non-null chat_data value indicates an active chatroom. WhatsApp reach query updated to default_fields->'chat_data' IS NOT NULL AND default_fields->>'phone' IS NOT NULL. CDC backfill guard updated accordingly: check whether any member has a non-null chat_data object; if absent, return null reachability. Updated: assumption #3, datamart schema table (line 487), Q5 resolution, comment log 2026-06-10 and 2026-06-12 entries, and task breakdown T3.2.
2026-06-17Internal segment customers endpoint addedAdded GET /private/segments/:segment_id/customers — BasicAuth, no Launchpad permission gate — to expose detailed customer data for saved segment members to downstream machine-to-machine consumers (Broadcast, Chatbot). Response mirrors ContactSerializer minus crm_data, chat_data, avatar, accounts, disabled_field, field_permission, permission, is_enable_usernames, and scalar username; appends added_at from customer_segment_members. Added: sequence diagram 2.9, API spec with PrivateSegmentCustomerItem shape, per-endpoint error row. Task breakdown updated: T1.6 stub route added, T5.4 OpenAPI table updated, new task T5.6 added. Also backfilled missing task T5.2 (GET /iag/v1/segments/:id/customers IAG endpoint with OWNED_ONLY support).
2026-06-18S2S route prefix change + API domain mappingRenamed all S2S (BasicAuth) internal routes from /private/... to /api/v1/... to align with the service's standard API versioning convention. Added a Route → Public URL Mapping table in the APIs section documenting the two routing layers: IAG endpoints (/iag/v1/segments/...) are exposed via https://api.mekari.io/internal/qontak/customer/v1/segments/...; S2S endpoints (/api/v1/...) are exposed directly via https://contact-service.qontak.net/api/v1/.... Updated: sequence diagrams 2.7 and 2.9, per-endpoint error table, both S2S API spec headers, T1.6 route registration, T5.3 curl example, T5.4 OpenAPI table, and T5.6 router wiring in task breakdown.
2026-06-23RFC review — blocking gaps resolved(1) Timeouts: added "External Call Timeout Strategy" section; all five external call surfaces (datamart preview/recalc, MongoDB transaction, Kafka publish, Redis lock) now have context.WithTimeout deadlines driven by 7 new env vars in the Configuration Contract. (2) Feature flag: cdp_segmentation_enabled resolved to the existing FeatureFlagService / feature_flag MongoDB collection — absent at deploy (disabled for all), per-company targeting via CompanySsoIDs, full rollout via is_general_flag: true, S2S endpoints not gated. (3) Kafka retry: Publish retries up to SEGMENT_KAFKA_PUBLISH_MAX_RETRIES (default 2) with SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MS (default 500 ms) backoff; failure after all retries remains non-fatal. Task breakdown: T1.8 added, T3.1/T3.3/T4.4/T4.5/T4.6 updated.
2026-06-25S2S tenant isolation + segment versioning(1) S2S company_sso_id enforcement: added company_sso_id as a required query parameter to both internal endpoints (GET /api/v1/contacts/:contact_id/segments and GET /api/v1/segments/:segment_id/customers). BasicAuth middleware sets no company context, so the parameter is the sole tenant-isolation mechanism. For the contacts endpoint, the member-lookup query is scoped to company_sso_id — a contact from a different company yields an empty segments array. For the segments endpoint, the handler cross-validates segment.company_sso_id == company_sso_id after loading and returns 404 on mismatch to avoid leaking resource existence. Per-endpoint error table updated: both S2S endpoints now list 400. New tasks: T6.1. (2) Segment version label: added version (int64, starts at 1) to customer_segments collection. Incremented atomically via $inc on every user-initiated edit (UpdateSegment) and archive (ArchiveSegment); never touched by cron writes. Exposed in list and detail API responses. rule_version added to SegmentRecalculatedEvent Kafka payload so consumers can correlate which rule definition produced a given recalculation result. DuplicateSegment resets version to 1 on the copy. New tasks: T6.2, T6.3, T6.4.
2026-06-25Kafka event redesign — aggregate per-segmentReplaced per-customer Kafka events with a single aggregate event per segment per recalculation run. Rationale: emitting one message per contact membership change is cost-prohibitive at scale (e.g. 100k contacts × 50 active segments = 5M events/day). New design: topic renamed cdp.segment.member_changedcdp.segment.recalculated; one message per segment carrying total_added, total_removed, total_matched as aggregate counts; partition key changed from contact_id to segment_id. SegmentMemberChangedEvent renamed to SegmentRecalculatedEvent with ContactID/Event removed and TotalAdded/TotalRemoved/TotalMatched added. ISegmentEventPublisher.BatchPublish([]events) simplified to Publish(event). Consumer idempotency key updated to (segment_id, recalc_run_id). Per-contact membership history remains available via GET /api/v1/segments/:id/customers and the customer_segment_membership_events MongoDB collection (unchanged). Updated: architecture diagram, section 2.6 title + intro + sequence diagram + payload + Go struct + interface + publishing sequence + consumer contract + topic config, Q6 resolution. Task breakdown: T4.4 processSegment step, T4.5 struct/interface/implementation/tests, T4.6 topic wiring.
2026-06-26Operational controls(1) Circuit breaker: added segment_recalculation_stop_requested Redis stop-signal with one-shot semantics — checked at each batch boundary in doRecalculation; cron clears key and exits cleanly when detected. New env var SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDS (default 86400 s). (2) Recalc feature flag: cdp_segment_recalc_enabled global MongoDB flag gates RecalculateAllSegments and TriggerRecalculation before lock acquisition; distinct from the per-company CRUD flag cdp_segmentation_enabled. (3) Manual trigger API: POST /api/v1/segments/recalculate (BasicAuth); accepts company_sso_ids (required) + segment_ids (optional); acquires same Redis lock as daily cron; returns 202 Accepted with recalc_run_id immediately; runs in background goroutine; 409 if lock held, 503 if flag disabled. Updated: architecture diagram (new TriggerHandler node), sequence diagram 2.3 (feature flag + circuit breaker steps), new sequence diagram 2.10, HA section, APIs section, Config Contract, per-endpoint error table.
2026-06-30Immediate recalc on write + readiness indicator(1) Scoped recalc on create / rule edit: CreateSegment and a rule-changing UpdateSegment now fire a best-effort, asynchronous single-segment recalculation in a detached goroutine (context.WithoutCancel) so a new/edited segment is usable within seconds instead of waiting for the daily cron. Added sequence diagram 2.11; updated success criterion #4, create diagram 2.1, PUT description, Known Limitation #1. The path is gated by cdp_segment_recalc_enabled and reuses processSegment; failures never affect the create/update response (daily cron is the backstop). (2) Per-segment lock: introduced segment_recalculation_lock:{segmentID} (new env var SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS, default 300 s) so the on-write recalc never contends on the global cron lock; the cron's processSegment also acquires it to guarantee a single writer per segment (prevents double membership-transition events). Updated Lock-safety note. (3) Readiness indicator: GET detail and list responses now expose is_ready (bool) and processing_status (pending/ready/stale/failed), derived at mapping time from existing last_evaluated_at + rule_dirty_at + recalc_error_count (no new column/migration). Updated success criterion #5, list & detail API examples + field-rules table, added derivation table to Database Model, and a readiness caveat to the member-list endpoints (empty list vs not-yet-computed).
2026-07-01Recalc feature flag scope decision: S2S manual trigger exemptedConfirmed intended scope of cdp_segment_recalc_enabled: it gates both the daily cron RecalculateAllSegments and the scoped on-write recalc recalculateSegment/§2.11 (fired on create / rule-changing update) — those two are the regular, automatic recalculation paths. It does not gate the S2S manual trigger TriggerRecalculation/§2.10 — an operator must retain the ability to force a targeted recalc even during a maintenance window where automatic recalculation is paused. Corrected §2.10's diagram + description + per-endpoint error table (removed the FeatureEnabled step, the FF participant, and the ErrRecalcDisabled/503 row, which were never implemented) and the §3 "Recalculation feature flag" note to state this scope explicitly. Known gap: the shipped contact-service code does not yet check the flag in recalculateNewSegment (the code's name for this RFC's recalculateSegment) — isRecalcEnabled has exactly one caller (RecalculateAllSegments) as of this writing. Tracked as a follow-up implementation task in the task breakdown (T8.2).