This RFC is first created a while ago on confluence https://jurnal.atlassian.net/wiki/spaces/QON/pages/51115786492/DRAFT+RFC+Customer+Segmentation+based+on+Basic+Attributes+Backend which follows the Qontak RFC Template format.
RFC: Customer Segmentation based on Basic Attributes (Backend)
| Status | RFC |
|---|---|
| Owner | CDP — Contact Service |
| Submitted Date | 2026-05-19 |
| Approver | TBD — tech lead + infosec approver |
| Related Documents | PRD — Customer Segmentation based on Basic Attributes · Epic TF-2544 |
1. Overview
This RFC defines the backend technical design for Rule-Based Customer Segmentation inside the Customer Data Platform (CDP).
The feature allows CDP users to define dynamic customer groups (segments) using declarative filter rules on customer properties and basic events. Segment membership is re-evaluated daily so that customers automatically enter or leave a segment when their data changes.
Segment results are exposed as reusable assets that can be consumed by Broadcast, Chatbot automation, and downstream personalization services.
Success Criteria
- A user can create, edit, archive, and duplicate a named segment with up to 2 criteria groups, each containing up to 3 nested filter conditions.
- Each filter condition supports the full operator matrix defined in the PRD (text, date, number, dropdown, multi-select, boolean, file presence).
- A Preview Customers endpoint evaluates the current rule set against the datamart and returns total matched count + a sample list without persisting state.
- A daily cron job (08:00 WIB / 01:00 UTC) re-evaluates every active segment and writes the resulting member list and metadata to MongoDB. In addition, a scoped recalculation is triggered immediately (best-effort, asynchronous) when a segment is created or when its rule set is edited, so a new or just-edited segment becomes usable within seconds rather than waiting up to ~24h for the next cron run.
- Segment detail and list APIs expose: total matched, percentage reach, last evaluated timestamp, per-channel reachability (WhatsApp, Email), and a readiness indicator (
is_ready+processing_status) that tells the caller whether the segment's member list has been computed yet (pending/ready/stale/failed). - Four permission keys control access (
customers_segment_view,customers_segment_add,customers_segment_manage,customers_segment_archived) with support forALL_ACCESS,OWNED_ONLY, andDISABLEDlevels. - An internal API exposes the segment list per customer for downstream service integration (Broadcast, Chatbot).
- All segmentation queries hit the datamart (separate read-optimised SQL database), not the primary CDP MongoDB.
Out of Scope
- Frontend / UI implementation.
- Behavioral event segmentation (e.g., "opened email N times") — not in MVP.
- Member Loyalty segmentation group is NICE TO HAVE and may be deferred.
- Real-time streaming evaluation (segment recalculation is batch-daily only).
- Automatic routing to campaign creation (the Broadcast service is responsible for consuming the segment data).
- Performance tab / analytics charts (Metrics #2–#6 in the PRD) — deferred post-MVP.
Related Documents
- PRD — Customer Segmentation based on Basic Attributes
- Figma Design
- Field Properties — existing default & custom field definitions (source:
internal/app/repository/field_properties/base.go)
Assumptions
- A datamart (PostgreSQL or ClickHouse) exists or will be provisioned that mirrors customer data in a denormalised, queryable form per
company_sso_id. - The datamart is populated via CDC (Kafka events from contact-service writes) — provisioning the CDC pipeline is owned by the Data/Platform team and is a prerequisite for this feature.
- The datamart uses PostgreSQL with exactly six top-level columns:
customer_id,created_at_date,created_by_user_id,default_fields,custom_fields, andcompany_sso_id. All other contact data — includingupdated_at,updated_by, andbsuid— is stored insidedefault_fieldsJSONB. Chat-related data (chatroom presence etc.) is stored inside the nesteddefault_fields->'chat_data'JSONB object; a non-nullchat_datavalue indicates the contact has an active chatroom. User-defined custom fields are stored incustom_fieldsJSONB. Nocompany_idcolumn is present in the datamart; tenant isolation usescompany_sso_idexclusively. - Daily recalculation at 08:00 WIB is acceptable latency for segment freshness (no sub-minute SLA required).
- Maximum segment size per company is bounded by the company's total customer count (up to ~1M rows).
- Authentication is done via the existing
IAGMiddlewarefor frontend endpoints; role/permission data comes from Launchpad CRS viaRequirePermissionMiddleware(existing pattern). Internal service-to-service endpoints useBasicAuth. - Segment rules allow at most 2 groups (criteria) and 3 nested conditions per group — enforced at the service layer.
Dependencies
| Dependency | Owner | Notes |
|---|---|---|
| Datamart provisioning (PostgreSQL/ClickHouse) | Data / Platform team | Must contain denormalised customer rows queryable by company_sso_id |
| CDC pipeline: MongoDB → Datamart | Data / Platform team | Keeps datamart fresh; SLA to be agreed |
| Launchpad CRS new permission keys | Identity / CRM team | Four new segment permission keys must be registered |
| Broadcast Service | Broadcast team | Consumes segment IDs/member lists for recipient list creation |
2. Technical Design
Architecture & Tech Stack
The segmentation feature is implemented entirely within the existing contact-service Go application. No new microservice is introduced for MVP.
graph TB
FE["Frontend / CDP UI"]
GW["API Gateway"]
CS["contact-service (Go)"]
MongoDB[("MongoDB\n(CDP Primary)")]
Datamart[("Datamart\nPostgreSQL / ClickHouse")]
Redis[("Redis Cache")]
Kafka[["Kafka\n(cdp.segment.recalculated)"]]
Broadcast["Broadcast Service"]
Chatbot["Chatbot / Officeless"]
FE --> GW --> CS
CS -->|"Segment CRUD\nMember storage"| MongoDB
CS -->|"Rule evaluation\n(preview & recalc)"| Datamart
CS -->|"Feature flags\nCache"| Redis
CS -->|"Emit on member change"| Kafka
Kafka -->|"Subscribe"| Broadcast
Kafka -->|"Subscribe"| Chatbot
Broadcast -->|"GET /segments/:id/members\n(on-demand pull)"| CS
Chatbot -->|"GET /contacts/:id/segments\n(on-demand pull)"| CS
subgraph "contact-service internals"
SH["Segment Handler\n(HTTP)"]
SS["Segment Service\n(business logic)"]
RE["Rule Engine\n(SQL builder)"]
SR["Segment Repository\n(MongoDB)"]
DMR["Datamart Repository\n(SQL)"]
EP["Event Publisher\n(Kafka)"]
CJ["Cron Job\nSegmentRecalculation\n(daily 08:00 WIB)"]
TH["Trigger Handler\n(S2S BasicAuth)"]
SH --> SS
SS --> RE
SS --> SR
SS --> DMR
CJ --> SS
TH --> SS
SS --> EP
end
Tech stack additions:
| Component | Technology | Rationale |
|---|---|---|
| Segment metadata & member list | MongoDB (existing) | Consistent with CDP data store; document model fits variable rule schema |
| Segmentation query execution | Datamart SQL (new DB driver) | Separate read-optimised store; avoids pressure on primary MongoDB |
| Cron scheduling | gocraft/work (existing) | Same scheduler used by all existing cron jobs |
| Permission check | Launchpad CRS (existing api package) | Consistent with contact permission pattern |
| Rule Engine | Pure Go | Translates rule DSL → parameterised SQL WHERE clause |
Sequence Diagrams
2.1 Create Segment
sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant R as Segment Repository (MongoDB)
participant BG as Background Goroutine
FE->>H: POST /iag/v1/segments\n{name, description, rule_set}
H->>H: Validate request payload\n(name max 60, desc max 250,\nrule limits)
H->>S: CreateSegment(ctx, companySsoID, payload)
S->>S: Validate rule_set structure\n(≤2 groups, ≤3 conditions/group)
S->>R: InsertOne(customer_segments)\n(last_evaluated_at = null → status "pending")
R-->>S: segmentID
S->>BG: go recalculateSegment(WithoutCancel(ctx), companySsoID, segmentID)
Note over BG: Best-effort scoped recalc (see 2.11).\nDoes not block the response. \nfailures are logged only.
S-->>H: segmentID
H-->>FE: 201 Created {segment_id}
The segment is persisted with
last_evaluated_at = null, so it reportsprocessing_status: "pending"until the scoped recalc completes. The recalc runs in a detached goroutine (context.WithoutCancel) so the HTTP response returns immediately and is never affected by recalc outcome. If the recalc cannot run (feature flag off, per-segment lock held, required deps unwired, datamart unavailable), the next daily cron is the backstop.
2.2 Preview Customers (ad-hoc, unsaved rule)
sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant RE as Rule Engine
participant DM as Datamart Repository
FE->>H: POST /iag/v1/segments/preview\n{rule_set: {...}}
H->>H: Validate rule_set\n(≤2 groups, ≤3 conditions/group)
H->>S: PreviewSegment(ctx, ruleSet, companySsoID, userSsoID, permission)
S->>RE: BuildQuery(ruleSet)
RE-->>S: baseQuery (WHERE clause + params)
S->>DM: Count(ctx, companySsoID, baseQuery)
DM-->>S: totalMatched=2500, totalBase=11700
alt view_customer = OWNED_ONLY
S->>RE: BuildOwnedQuery(baseQuery, userSsoID)
Note over RE: appends AND created_by_user_id = $N
RE-->>S: ownedQuery
S->>DM: Sample(ctx, companySsoID, ownedQuery, SEGMENT_PREVIEW_MAX_ROWS)
else ALL_ACCESS
S->>DM: Sample(ctx, companySsoID, baseQuery, SEGMENT_PREVIEW_MAX_ROWS)
end
DM-->>S: sample=[...], isTruncated
S-->>H: PreviewResult
H-->>FE: 200 OK {total_matched, total_customer_base, sample, is_truncated}
2.3 Daily Segment Recalculation (Cron)
sequenceDiagram
participant CJ as Cron Job (gocraft/work)
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant RE as Rule Engine
participant DM as Datamart Repository
participant MR as Member Repository (MongoDB)
participant Cache as Redis Cache
CJ->>S: RecalculateAllSegments(ctx)
S->>Cache: FeatureEnabled("cdp_segment_recalc_enabled")
alt feature disabled
S-->>CJ: return nil (recalc disabled)
end
S->>SR: FindActiveSegments(ctx, batchSize=100)
loop For each batch of active segments
S->>Cache: Get("segment_recalculation_stop_requested")
alt stop signal present
S->>Cache: Del("segment_recalculation_stop_requested")
Note over S: Circuit breaker triggered — exit loop cleanly
end
loop For each segment in batch
S->>RE: BuildQuery(segment.RuleSet)
RE-->>S: SQL query
S->>DM: FetchMatchingContactIDs(ctx, companySsoID, query)
DM-->>S: []contactID
S->>MR: ReplaceMembers(ctx, segmentID, contactIDs)
S->>SR: UpdateStats(ctx, segmentID, totalMatched, lastEvaluatedAt)
end
end
S-->>CJ: done (errors logged per segment, non-fatal)
2.4 Get Segment Details (with Reachability Metrics)
sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant MR as Member Repository (MongoDB)
participant CR as Contact Repository (MongoDB)
FE->>H: GET /iag/v1/segments/:id
H->>S: GetSegmentDetails(ctx, segmentID, userPermission)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment (metadata, rule_set)
S->>MR: CountMembers(ctx, segmentID)
MR-->>S: totalMatched
alt segment is active
S->>MR: CountMembersWithPhone(ctx, segmentID)
MR-->>S: whatsappReachable
S->>MR: CountMembersWithEmail(ctx, segmentID)
MR-->>S: emailReachable
S->>CR: CountAll(ctx, companySsoID)
CR-->>S: totalCustomers
end
S-->>H: SegmentDetailResponse
H-->>FE: 200 OK {segment info, stats, reachability}
2.5 Archive Segment
sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
FE->>H: PATCH /iag/v1/segments/:id/archive
H->>S: ArchiveSegment(ctx, segmentID, userSSOID, permission)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment
S->>S: Check OWNED_ONLY permission\n(segment.CreatedBy == userSSOID?)
S->>SR: UpdateStatus(ctx, segmentID, "archived")
SR-->>S: ok
S-->>H: ok
H-->>FE: 200 OK
2.6 Segment Recalculated Event (Post-Recalculation)
Each event notifies subscribers that a segment's membership has been recalculated — one Kafka message per segment per recalculation run, carrying aggregate counts of how many contacts were added and removed. Downstream services (Broadcast, Chatbot) that need individual contact details pull them on-demand via GET /api/v1/segments/:id/customers.
sequenceDiagram
participant CJ as Cron Job (gocraft/work)
participant S as Segment Service
participant MR as Member Repository (MongoDB)
participant EP as Event Publisher (Kafka)
participant BS as Broadcast Service
participant CB as Chatbot / Officeless
CJ->>S: RecalculateAllSegments(ctx)
loop For each active segment
S->>MR: ReplaceMembers(ctx, segmentID, contactIDs) [in transaction]
Note over S: compute diff: added = new − old, removed = old − new
alt len(added) > 0 OR len(removed) > 0
S->>EP: Publish("cdp.segment.recalculated", SegmentRecalculatedEvent)
EP-->>BS: Kafka delivery (one msg per segment, async)
EP-->>CB: Kafka delivery (one msg per segment, async)
end
end
S-->>CJ: done
Event payload (cdp.segment.recalculated topic):
One message per segment per recalculation run:
{
"event_type": "segment.recalculated",
"segment_id": "683ab...",
"segment_name": "Loyal WhatsApp Customers",
"company_sso_id": "company_abc",
"recalc_run_id": "01J...",
"rule_version": 3,
"total_matched": 2500,
"total_added": 150,
"total_removed": 32,
"occurred_at": "2026-06-10T01:00:00Z"
}
Go struct and publisher interface (internal/app/service/segment/event_publisher.go):
type SegmentRecalculatedEvent struct {
EventType string `json:"event_type"` // always "segment.recalculated"
SegmentID string `json:"segment_id"`
SegmentName string `json:"segment_name"`
CompanySsoID string `json:"company_sso_id"`
RecalcRunID string `json:"recalc_run_id"` // ULID; idempotency key for consumers
RuleVersion int64 `json:"rule_version"` // segment.Version at time of recalculation; lets consumers correlate which rule definition produced the result
TotalMatched int64 `json:"total_matched"`
TotalAdded int64 `json:"total_added"`
TotalRemoved int64 `json:"total_removed"`
OccurredAt time.Time `json:"occurred_at"` // UTC timestamp of the recalc run
}
// ISegmentEventPublisher is injected into SegmentService; implemented by kafkaSegmentPublisher.
// Publish sends one Kafka message per segment per recalculation run.
type ISegmentEventPublisher interface {
Publish(ctx context.Context, event SegmentRecalculatedEvent) error
}
Publishing sequence (within processSegment, per segment):
- The MongoDB session transaction commits successfully (
ReplaceMembers+InsertEventsboth persist). - Guard: only if
len(added) > 0 OR len(removed) > 0— no-diff runs emit nothing. - Build one
SegmentRecalculatedEventwithTotalAdded = len(added),TotalRemoved = len(removed),TotalMatched = len(newIDs).OccurredAt= the run-start timestamp fromRecalculateAllSegments(same for all segments in one cron invocation);RecalcRunID= the ULID generated at cron run start. - Call
eventPublisher.Publish(ctx, event).- Kafka message:
topic = "cdp.segment.recalculated",key = []byte(segment_id)(all recalc events for one segment route to the same partition),value = JSON(event). - The implementation sends the single message and blocks until the broker acknowledges or returns an error.
- Kafka message:
- On publish error:
slog.ErrorContext(ctx, "segment event publish failed", slog.String("segment_id", seg.ID), slog.Any("error", err))+ increment Datadog metricsegment.event.publish_error{segment_id}→ continue to the next segment. The recalculation result is not rolled back; MongoDB member data is authoritative. - On MongoDB transaction failure:
Publishis never called — the topic never receives false-positive notifications for a failed write.
Consumer contract: each message signals that a segment's membership has changed, carrying aggregate counts (
total_added,total_removed,total_matched). Consumers that need individual contact details pull them on-demand viaGET /api/v1/segments/:id/customers.Idempotency: consumers must deduplicate by
(segment_id, recalc_run_id). A retried delivery with the same tuple must be treated as a no-op.Topic configuration:
cdp.segment.recalculated, partition key =segment_id(preserves per-segment event ordering), retention = 7 days.
2.7 Broadcast / Service Integration — Get Customer Segments
sequenceDiagram
participant BS as Broadcast Service
participant H as Segment Handler (s2s)
participant S as Segment Service
participant MR as Member Repository (MongoDB)
BS->>H: GET /api/v1/contacts/:contact_id/segments
H->>S: GetSegmentsByContact(ctx, contactID, companySsoID)
S->>MR: FindSegmentsByContactID(ctx, contactID, companySsoID)
MR-->>S: []SegmentMembership{segment_id, segment_name}
S-->>H: []SegmentInfo
H-->>BS: 200 OK {segments: [{segment_id, segment_name}]}
2.8 Get Customers in Saved Segment
sequenceDiagram
participant FE as Frontend
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant MR as Member Repository (MongoDB)
FE->>H: GET /iag/v1/segments/:id/customers?page=1&per_page=20
H->>S: GetSegmentCustomers(ctx, segmentID, companySsoID, userSsoID, permission, page, perPage)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment
S->>MR: FindMembers(ctx, segmentID, companySsoID, userSsoID, permission, page, perPage)
Note over MR: OWNED_ONLY: filter members by created_by_user_id\nvia contact lookup before pagination
MR-->>S: []member, total
S-->>H: CustomersResult
H-->>FE: 200 OK {customers: [...], pagination}
2.9 Get Detailed Customers in Saved Segment (Internal / S2S)
sequenceDiagram
participant SVC as Downstream Service (Broadcast / Chatbot)
participant H as Segment Handler
participant S as Segment Service
participant SR as Segment Repository (MongoDB)
participant MR as Member Repository (MongoDB)
participant CR as Contact Repository (MongoDB)
SVC->>H: GET /api/v1/segments/:id/customers?page=1&per_page=100
Note over H: BasicAuth — no Launchpad permission check,\nno OWNED_ONLY filtering
H->>S: GetPrivateSegmentCustomers(ctx, segmentID, companySsoID, page, perPage)
S->>SR: FindByID(ctx, segmentID)
SR-->>S: segment (existence check only)
S->>MR: FindMembers(ctx, segmentID, companySsoID, page, perPage)
MR-->>S: []contactID, total
S->>CR: FindByIDs(ctx, contactIDs, companySsoID)
CR-->>S: []ContactDetail
S-->>H: PrivateCustomersResult
H-->>SVC: 200 OK {customers: [...], pagination}
2.10 Manual Recalculation Trigger (Internal S2S)
Allows an internal service or operator tooling to trigger a targeted recalculation outside the daily schedule. The endpoint acquires the same Redis lock as the daily cron, so both can never run concurrently. Returns 202 Accepted immediately; progress is tracked via Kafka cdp.segment.recalculated events carrying the returned recalc_run_id. TriggerRecalculation intentionally bypasses cdp_segment_recalc_enabled — see the Recalculation feature flag note in §3 — so an operator can still run a targeted recalc via this endpoint even while the flag has the daily cron disabled (e.g. during a datamart maintenance window where only the unattended bulk job needs to stay off).
sequenceDiagram
participant SVC as Internal Service / Ops
participant H as Trigger Handler (S2S)
participant S as Segment Service
participant Cache as Redis Cache
participant BG as Background Goroutine
SVC->>H: POST /api/v1/segments/recalculate\n{company_sso_ids: [...], segment_ids: [...]}
H->>H: Validate company_sso_ids non-empty
H->>S: TriggerRecalculation(ctx, filter)
S->>Cache: SetNX("segment_recalculation_lock", recalcRunID, TTL)
alt lock already held
S-->>H: ErrRecalcAlreadyRunning
H-->>SVC: 409 Conflict "recalculation already in progress"
end
S->>BG: go doRecalculation(WithoutCancel(ctx), recalcRunID, filter)
Note over BG: Processes only segments matching filter.\nDefers Del("segment_recalculation_lock").\nEmits cdp.segment.recalculated Kafka events per segment.
S-->>H: recalcRunID
H-->>SVC: 202 Accepted {recalc_run_id}
Filter behaviour:
company_sso_ids | segment_ids | Result |
|---|---|---|
| Required, non-empty | Omitted | All active segments for the given companies |
| Required, non-empty | Provided | Only the listed segments (verified to belong to the given companies) |
2.11 Scoped Recalculation on Create / Rule Edit (Internal, Asynchronous)
When a segment is created (2.1) or its rule_set is edited (PUT /iag/v1/segments/:id), the service fires a scoped, single-segment recalculation in a detached background goroutine so the new/edited segment becomes usable within seconds instead of waiting for the next daily cron. This path is best-effort: it never blocks or fails the originating create/update request, and the daily cron remains the backstop.
Unlike the daily cron and the manual trigger API (2.10) — which both contend on the global lock segment_recalculation_lock — the on-write recalc uses a per-segment lock segment_recalculation_lock:{segmentID}, so it can run concurrently with (and is never starved by) the daily cron. To guarantee a single writer per segment, the daily cron's processSegment also acquires this per-segment lock before writing; the global lock continues to guard whole-cron concurrency across pods.
recalculateSegment checks cdp_segment_recalc_enabled, same as the cron. Unlike the manual S2S trigger (§2.10), which intentionally bypasses the flag, the create/rule-edit path is a regular user-facing action and must respect the operator's decision to pause recalculation (e.g. during a datamart maintenance window) — see the Recalculation feature flag note in §3.
sequenceDiagram
participant S as Segment Service
participant BG as Background Goroutine
participant FF as FeatureFlagService
participant Cache as Redis Cache
participant RE as Rule Engine
participant DM as Datamart Repository
participant MR as Member Repository (MongoDB)
participant SR as Segment Repository (MongoDB)
participant EP as Event Publisher (Kafka)
Note over S: After successful Insert (create)\nor rule-changing Update
S->>BG: go recalculateSegment(WithoutCancel(ctx), companySsoID, segmentID)
BG->>FF: FeatureEnabled("cdp_segment_recalc_enabled", "")
alt feature disabled
Note over BG: no-op — next daily cron will pick it up (once re-enabled)
end
BG->>Cache: SetNX("segment_recalculation_lock:{segmentID}", runID, TTL)
alt per-segment lock already held
Note over BG: a recalc for this segment is already in flight → back off
end
BG->>SR: FindByID(ctx, segmentID)
SR-->>BG: segment
BG->>RE: BuildQuery(segment.RuleSet)
RE-->>BG: SQL query
BG->>DM: FetchContactIDs(ctx, companySsoID, query)
DM-->>BG: []contactID
Note over BG: processSegment: diff vs current members,\nReplaceMembers + InsertEvents in one transaction
BG->>MR: ReplaceMembers + InsertEvents [transaction]
BG->>SR: UpdateStats(total_matched, last_evaluated_at) + ClearRuleDirtyAt
alt len(added) > 0 OR len(removed) > 0
BG->>EP: Publish("cdp.segment.recalculated", event)
end
BG->>Cache: Del("segment_recalculation_lock:{segmentID}") [deferred]
Behaviour summary:
| Condition | Result |
|---|---|
cdp_segment_recalc_enabled disabled | No-op; segment stays pending/stale until the next daily cron (once the flag is re-enabled) |
| Per-segment lock already held | Back off (a recalc for this segment is already running); no duplicate work |
| Datamart unavailable / build error / tx failure | Logged + recalc_error_count incremented (same poison-handling as the cron); response already returned |
| Success | last_evaluated_at set, rule_dirty_at cleared → processing_status becomes ready; Kafka event emitted on non-empty diff |
The recalc reuses the exact same
processSegmentroutine as the cron (BuildQuery → FetchContactIDs → atomic ReplaceMembers + InsertEvents → UpdateStats → ClearRuleDirtyAt → conditional publish). Only the trigger and the lock key differ. An edit that does not change the rule set (name/description only) does not trigger a recalc, mirroring howrule_dirty_atis only set whenrule_setchanges.
Database Model
Collection: customer_segments
Stored in MongoDB (existing CDP database).
customer_segments
├── _id : ObjectID (primary key)
├── company_sso_id : string (tenant isolation; indexed)
├── name : string (max 60 chars; required)
├── description : string (max 250 chars; optional, nullable)
├── status : string (enum: "active" | "archived"; default: "active")
├── rule_dirty_at : time.Time (nullable; set to current timestamp when rule_set changes; cleared to null after recalculation completes)
├── rule_set : RuleSet (embedded document — see schema below)
├── total_matched : int64 (updated on each recalculation)
├── last_evaluated_at: time.Time (updated on each recalculation; nullable until first eval)
├── created_by : string (user SSO ID)
├── updated_by : string (user SSO ID)
├── version : int64 (monotonically increments on each user-initiated edit; starts at 1 on creation)
├── created_at : time.Time
├── updated_at : time.Time
└── is_deleted : bool (soft delete)
RuleSet embedded document:
{
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{
"field": "source",
"field_category": "default",
"field_type": "dropdown",
"operator": "is",
"value": "WhatsApp",
"value2": null
},
{
"field": "address.city",
"field_category": "default",
"field_type": "dropdown",
"operator": "is",
"value": "Jakarta",
"value2": null
}
]
},
{
"operator": "OR",
"conditions": [
{
"field": "created_at_date",
"field_category": "system",
"field_type": "date",
"operator": "before",
"value": "2026-01-01T00:00:00Z",
"value2": null
}
]
}
]
}
Indexes:
{ company_sso_id: 1, status: 1, is_deleted: 1 } // list active segments per company
{ company_sso_id: 1, name: 1 } // search by name
{ company_sso_id: 1, created_at: -1 } // sort by date
{ company_sso_id: 1, created_by: 1 } // owned-only filter
Readiness is derived, not stored. The
is_ready/processing_statusvalues exposed by the GET APIs are computed at response-mapping time from three existing persisted fields —last_evaluated_at,rule_dirty_at, andrecalc_error_count— so no new column or migration is required. The mapping is:
processing_statusCondition is_readypendinglast_evaluated_at == nullANDrecalc_error_count < 3falsereadylast_evaluated_at != nullANDrule_dirty_at == nulltruestalelast_evaluated_at != nullANDrule_dirty_at != nulltruefailedlast_evaluated_at == nullANDrecalc_error_count >= 3false
is_readymeans the segment has a queryable member snapshot (last_evaluated_at != null); astalesegment is still queryable but its members predate the latest rule edit (a refresh is pending/in flight). On create,last_evaluated_atstartsnull→pending; the scoped recalc (2.11) flips it toreadyon success. A rule edit setsrule_dirty_at→staleuntil the scoped recalc clears it back toready.
Collection: customer_segment_members
customer_segment_members
├── _id : ObjectID
├── segment_id : string (ref customer_segments._id; indexed)
├── company_sso_id : string (denormalised for scoped queries; indexed)
├── contact_id : string (MongoDB ObjectID as string)
├── added_at : time.Time (timestamp of the recalculation run that added this membership)
├── recalc_run_id : string (ULID of the recalculation run; idempotency key — prevents duplicate inserts on retried runs)
Indexes:
{ segment_id: 1, company_sso_id: 1 } // all members of a segment (used by recalc)
{ contact_id: 1, company_sso_id: 1 } // all segments of a contact (service integration)
{ segment_id: 1, contact_id: 1 } // unique compound (no duplicate memberships)
Recalculation strategy: Each daily run generates a
recalc_run_id(ULID) before processing any segment. For each segment: (1) load current member IDs, (2) fetch new IDs from datamart, (3) computeadded = new − oldandremoved = old − new, (4) open a MongoDB session transaction (replica-set multi-document transaction) that atomically executesReplaceMembersandInsertEventstogether — both writes carry therecalc_run_idas an idempotency key so a retried run with the same ULID is a no-op.Lock safety: A Redis lock (
segment_recalculation_lock) with initial TTL =SEGMENT_RECALC_LOCK_TTL_SECONDS(default 7200 s / 2 h) guards against concurrent pod runs of the whole-cron / manual-trigger path. A heartbeat goroutine extends the TTL by 30 minutes everySEGMENT_RECALC_LOCK_RENEW_INTERVAL_SECONDS(default 600 s / 10 min) while the job is alive. If a pod crashes before releasing the lock, the TTL eventually expires and the next pod acquires a fresh lock with a newrecalc_run_id; becauseReplaceMembersis a full replacement inside a transaction, a partial prior run leaves no inconsistent state.Per-segment lock (single-writer guarantee): The scoped on-write recalc (create / rule edit — see 2.11) does not take the global lock; it takes a per-segment lock
segment_recalculation_lock:{segmentID}(TTL =SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS, default 300 s). To ensure a single writer per segment even when the daily cron and an on-write recalc overlap, the cron'sprocessSegmentalso acquires this per-segment lock before writing and skips the segment if it is already held. This prevents two runs with differentrecalc_run_ids from both writing membership-transition events for the same segment (which would otherwise double-countadded/removedfor consumers). The global lock and the per-segment locks are independent key spaces, so the on-write path is never starved by an in-progress daily cron.Poison-segment handling: A segment that fails on three consecutive daily runs is marked with a
recalc_error_countincrement and skipped on subsequent cron runs until manually reactivated. A Datadog metricsegment.recalc.error_count(tagged bysegment_id) fires on each failure.
Collection: customer_segment_membership_events
customer_segment_membership_events
├── _id : ObjectID
├── segment_id : string (ref customer_segments._id; indexed)
├── company_sso_id : string (denormalised for scoped queries; indexed)
├── contact_id : string (MongoDB ObjectID as string)
├── event : string (enum: "added" | "removed")
├── occurred_at : time.Time (timestamp of the recalculation run)
├── recalc_run_id : string (ULID of the recalculation run; used with segment_id for idempotent upsert)
Indexes:
{ segment_id: 1, company_sso_id: 1, occurred_at: -1 } // membership history per segment
{ contact_id: 1, company_sso_id: 1, occurred_at: -1 } // segment history per contact
{ occurred_at: 1 } // TTL index (90-day retention)
Documents are append-only and never updated. A TTL index expires events older than 90 days to bound collection growth. The
added_atfield oncustomer_segment_membersrecords current membership; this collection records the full history of transitions.
Datamart Schema (confirmed — owned by Data team)
Contact-service will query this via a PostgreSQL client. The schema is confirmed as follows:
| Column | Type | Notes |
|---|---|---|
company_sso_id | VARCHAR | Tenant isolation key; mandatory filter in all queries |
customer_id | VARCHAR | CDP contact ID (maps to MongoDB _id) |
created_at_date | TIMESTAMPTZ | When the contact was created; top-level column for efficient range queries |
created_by_user_id | VARCHAR | User SSO ID who created the contact; top-level column |
default_fields | JSONB | All default and Qontak-managed fields plus CDC-populated fields. Keys include: name, email, phone, source, status, owner, sex, dob, address, updated_at, updated_by, bsuid, chat_data (JSONB sub-object — non-null value indicates the contact has an active chatroom), and all other default/Qontak fields |
custom_fields | JSONB | All user-defined custom fields, keyed by field key |
These are the only six columns in the datamart. Any data not listed as a top-level column is stored in
default_fieldsorcustom_fields. Contact-service must not assume the existence of any other top-level column.
Field access by field_category:
field_category | SQL access pattern | Example |
|---|---|---|
default | default_fields->>'key' | default_fields->>'source', default_fields->>'updated_at' |
custom | custom_fields->>'key' | custom_fields->>'annual_revenue' |
system | direct column reference | created_at_date, created_by_user_id |
updated_atandupdated_byare stored insidedefault_fields— rule conditions targeting them must usefield_category: "default"(not"system"). Onlycreated_at_dateandcreated_by_user_idare validsystemfields.
Type casting for non-text operators:
field_type | Cast | Example |
|---|---|---|
number | ::numeric | (custom_fields->>'annual_revenue')::numeric > $2 |
date | ::date or ::timestamptz | (default_fields->>'date_of_birth')::date < $2 |
boolean | ::boolean | (default_fields->>'marketing_opt_in')::boolean = $2 |
single_line_text, multi_line_text, dropdown, url | none (text) | default_fields->>'source' = $2 |
file, signature, gps | none (presence-only) | default_fields->>'profile_photo' IS [NOT] NULL |
Required indexes on the datamart (Data team to provision):
CREATE INDEX idx_dm_company ON customer_datamart (company_sso_id);
CREATE INDEX idx_dm_customer ON customer_datamart (customer_id);
CREATE INDEX idx_dm_created ON customer_datamart (created_at_date);
CREATE INDEX idx_dm_def_gin ON customer_datamart USING GIN (default_fields);
CREATE INDEX idx_dm_cust_gin ON customer_datamart USING GIN (custom_fields);
Without GIN indexes on the JSONB columns, every segment rule that touches
default_fieldsorcustom_fieldswill do a full table scan.
APIs
All segment endpoints follow existing conventions: IAGMiddleware + RequirePermissionMiddleware (for permission checks) + ContextLogger middleware, Chi router, response via myhttp.HTTPHandler. Internal S2S endpoints (/api/v1/...) use BasicAuth instead.
Route → Public URL Mapping
| Auth | Internal Route | Public Base URL |
|---|---|---|
| IAG | /iag/v1/segments/... | https://api.mekari.io/internal/qontak/customer/v1/segments/... |
| S2S (BasicAuth) | /api/v1/... | https://contact-service.qontak.net/api/v1/... |
Base path: /iag/v1/segments
Response Envelope
All responses (success and error) share the same outer envelope, driven by myhttp.SuccessResponse and myhttp.ErrorResponse in internal/pkg/http/.
Success envelope — HTTP 200 for all successful operations:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": {
"version": "v1.0.0",
"api_env": "production",
"trace_id": "0af7651916cd43dd8448eb211c80319c"
},
"data": [ ... ],
"pagination": { "page": 1, "per_page": 20, "total": 42, "total_pages": 3 }
}
datais always an array. Single-object responses (create, get-by-id, archive, etc.) are wrapped:"data": [{...}]. Empty responses are"data": null.paginationis omitted for non-list responses.
Error envelope — HTTP status matches resp_code:
{
"resp_code": "404",
"resp_desc": {
"id": "Tidak ditemukan",
"en": "Not found"
},
"meta": {
"version": "v1.0.0",
"api_env": "production",
"trace_id": "0af7651916cd43dd8448eb211c80319c"
}
}
HTTP status → resp_desc mapping for segment endpoints:
| HTTP | resp_code | resp_desc.en | Trigger |
|---|---|---|---|
| 400 | "400" | "Invalid request" | Missing/malformed body or query param |
| 401 | "401" | "You are not authorized" | Missing auth context (IAGMiddleware) |
| 403 | "403" | "You are not authorized" | ErrPermissionDenied — OWNED_ONLY boundary or missing permission key |
| 404 | "404" | "Not found" | ErrSegmentNotFound |
| 409 | "409" | "Segment name already exists" | ErrSegmentNameConflict |
| 409 | "409" | "Operation not valid for current segment status" | ErrConflictStatus |
| 422 | "422" | "Unprocessable entity" | ErrInvalidRule |
| 429 | "429" | "Preview limit of 10 requests per minute exceeded" | Rate limit exceeded |
| 500 | "500" | "Internal error" | Unexpected error |
| 503 | "503" | "Segment preview is temporarily unavailable" | ErrDatamartUnavailable |
Per-endpoint error HTTP statuses:
| Endpoint | Possible HTTP errors |
|---|---|
POST /segments | 400, 401, 409 (name), 422 |
GET /segments | 400, 401 |
GET /segments/:id | 400, 401, 403, 404 |
PUT /segments/:id | 400, 401, 403, 404, 409 (name), 422 |
PATCH /segments/:id/archive | 400, 401, 403, 404, 409 (status) |
POST /segments/:id/duplicate | 400, 401, 404 |
POST /segments/preview | 400, 401, 422, 429, 503 |
GET /segments/:id/customers | 400, 401, 403, 404 |
GET /api/v1/contacts/:id/segments | 400 (missing company_sso_id), 401 (BasicAuth failure) |
GET /api/v1/segments/:id/customers | 400, 401 (BasicAuth failure), 404 |
POST /api/v1/segments/recalculate | 400 (missing company_sso_ids), 401 (BasicAuth failure), 409 (already running), 503 (disabled) |
Example error responses:
// 404 — segment not found
{
"resp_code": "404",
"resp_desc": { "id": "Tidak ditemukan", "en": "Not found" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}
// 409 — name conflict
{
"resp_code": "409",
"resp_desc": { "id": "Segment name already exists", "en": "Segment name already exists" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}
// 422 — invalid rulese
{
"resp_code": "422",
"resp_desc": { "id": "Data tidak dapat diproses", "en": "Unprocessable entity" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}
// 429 — rate limit
{
"resp_code": "429",
"resp_desc": { "id": "Terlalu banyak permintaan: Preview limit of 10 requests per minute exceeded", "en": "Preview limit of 10 requests per minute exceeded" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}
// 503 — datamart unavailable
{
"resp_code": "503",
"resp_desc": { "id": "Segment preview is temporarily unavailable", "en": "Segment preview is temporarily unavailable" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." }
}
POST /iag/v1/segments — Create Segment
Permission: customers_segment_add (ALL_ACCESS)
Request body:
{
"name": "Loyal WhatsApp Customers",
"description": "Customers from WhatsApp in Jakarta",
"rule_set": {
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{ "field": "source", "field_category": "default", "field_type": "dropdown", "operator": "is", "value": "WhatsApp" }
]
}
]
}
}
Response 200 OK:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "id": "683ab..." }]
}
GET /iag/v1/segments — List Segments
Permission: customers_segment_view (ALL_ACCESS returns all; OWNED_ONLY returns own)
Query params:
| Param | Type | Description |
|---|---|---|
page | int | Default 1 |
per_page | int | Default 20, max 100 |
search | string | Partial case-insensitive name match (min 3 chars) |
created_at_from | RFC3339 | Date range filter start (UTC) |
created_at_to | RFC3339 | Date range filter end (UTC) |
status | string | active or archived |
Response 200 OK:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"id": "683ab...",
"name": "Loyal WhatsApp Customers",
"total_matched": 1050,
"version": 1,
"created_by": "user_sso_id",
"created_at": "2026-05-15T08:00:00Z",
"status": "active",
"is_ready": true,
"processing_status": "ready"
}
],
"pagination": { "page": 1, "per_page": 20, "total": 42, "total_pages": 3 }
}
is_ready / processing_status are derived (see Database Model → "Readiness is derived"); a freshly created segment whose scoped recalc has not yet finished appears as "is_ready": false, "processing_status": "pending".
GET /iag/v1/segments/:id — Get Segment Detail
Permission: customers_segment_view
Response 200 OK (payload.SegmentDetailResponse):
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"id": "683ab...",
"name": "Loyal WhatsApp Customers",
"description": "Customers from WhatsApp in Jakarta",
"status": "active",
"rule_set": {
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{ "field": "source", "field_category": "default", "field_type": "dropdown", "operator": "is", "value": "WhatsApp", "value2": null }
]
}
]
},
"total_matched": 1050,
"version": 3,
"is_ready": true,
"processing_status": "ready",
"percentage_reach": 8.97,
"last_evaluated_at": "2026-05-19T01:00:00Z",
"reachability": {
"whatsapp": { "count": 820, "percentage": 78.1 },
"email": { "count": 940, "percentage": 89.5 }
},
"created_by": "user_sso_id",
"created_at": "2026-05-15T08:00:00Z",
"updated_at": "2026-05-18T09:00:00Z"
}
]
}
Response field rules:
| Field | Type | Condition |
|---|---|---|
version | int64 | Always present; starts at 1 on creation; increments on every user-initiated edit (PUT) or archive (PATCH /archive) |
is_ready | bool | Always present; true once the segment has a computed member snapshot (last_evaluated_at != null). Clients should gate member-list reads (GET /:id/customers) on this to distinguish "0 members" from "not yet computed" |
processing_status | string | Always present; one of pending | ready | stale | failed (derived — see Database Model). pending right after create until the scoped recalc finishes; stale after a rule edit until refreshed; failed after 3 consecutive recalc failures with no prior successful eval |
percentage_reach | float64 (omitempty) | total_matched / total_customers × 100; omitted when total_customers == 0 or contact counter unavailable |
reachability | object (omitempty) | Present only for status = "active" segments; omitted entirely for archived segments |
reachability.whatsapp | object | null | null when CDC backfill is incomplete (no member has chat_data populated); non-null once any member has chat_data |
reachability.email | object | null | null when datamart unavailable |
reachability.*.count | int64 | Absolute count of reachable contacts in the segment |
reachability.*.percentage | float64 | count / total_matched × 100; 0.0 when total_matched == 0 |
PUT /iag/v1/segments/:idreturns the same shape on success (sameSegmentDetailResponse), including reachability fields.
PUT /iag/v1/segments/:id — Update Segment
Permission: customers_segment_manage (ALL_ACCESS or OWNED_ONLY)
Request body: same as create. Updating rule_set sets rule_dirty_at to the current timestamp and fires a best-effort scoped recalculation (see 2.11) so members are refreshed within seconds; the daily cron remains the backstop if the scoped recalc cannot run. status remains "active". While the refresh is pending the segment reports processing_status: "stale" (members exist but predate the edit); callers can also detect this via the non-null rule_dirty_at. An edit that changes only name/description (not rule_set) does not set rule_dirty_at and does not trigger a recalc.
Response 200 OK: same shape as GET /segments/:id (returns the updated SegmentDetailResponse, including is_ready / processing_status).
PATCH /iag/v1/segments/:id/archive — Archive Segment
Permission: customers_segment_archived (ALL_ACCESS or OWNED_ONLY)
Response 200 OK:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "id": "683ab..." }]
}
POST /iag/v1/segments/:id/duplicate — Duplicate Segment
Permission: customers_segment_add
Creates a new segment with prefix "Copy of " in the name. Returns new segment ID. User is not auto-redirected (frontend responsibility).
Response 200 OK:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "id": "684cd..." }]
}
POST /iag/v1/segments/preview — Preview Customers (ad-hoc, unsaved rule)
Permission: customers_segment_view
Evaluates the provided rule_set directly against the datamart without persisting any state. Intended for real-time feedback while building or editing a segment before saving.
total_matched is always the company-wide count of customers matching the rule (ownership-agnostic). For view_customer = OWNED_ONLY, the sample list is filtered to contacts owned by the requesting user (AND created_by_user_id = $N); for ALL_ACCESS users the sample is unfiltered.
Request body:
{
"rule_set": {
"operator": "AND",
"groups": [
{
"operator": "AND",
"conditions": [
{ "field": "source", "field_category": "default", "field_type": "dropdown", "operator": "is", "value": "WhatsApp" }
]
}
]
}
}
Response 200 OK:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"total_matched": 2500,
"total_customer_base": 11700,
"is_truncated": false,
"sample": [
{
"contact_id": "abc123",
"name": "Budi Santoso",
"source": "WhatsApp",
"last_activity": "2026-05-10T08:00:00Z",
"added_by": "Andi Kurniawan",
"added_at": "2026-01-15T00:00:00Z"
}
]
}
]
}
is_truncated: true when total_matched > SEGMENT_PREVIEW_MAX_ROWS (default 10,000); the sample array is capped at that limit. total_customer_base is the company's total active contact count (denominator for the "2,500 of 11,700 matched" display).
Sample field sources:
| Field | Source |
|---|---|
contact_id | customer_id column in datamart |
name | default_fields->>'name' in datamart |
source | default_fields->>'source' in datamart |
last_activity | Most recent activity timestamp from MongoDB activity_logs collection; null when no activity recorded |
added_by | Display name of the user resolved from created_by_user_id (datamart top-level column) via the existing user lookup pattern; empty string when user no longer exists |
added_at | created_at_date top-level column in datamart |
GET /iag/v1/segments/:id/customers — Get Customers in Saved Segment
Permission: customers_segment_view
Returns the paginated list of customers from the most recent daily recalculation, sourced from the customer_segment_members MongoDB collection. For OWNED_ONLY users, only members created by the requesting user are returned.
Query params:
| Param | Type | Description |
|---|---|---|
page | int | Default 1 |
per_page | int | Default 20, max 100 |
Response 200 OK:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"contact_id": "abc123",
"name": "Budi Santoso",
"source": "WhatsApp",
"added_at": "2026-06-10T01:00:00Z"
}
],
"pagination": { "page": 1, "per_page": 20, "total": 1050, "total_pages": 53 }
}
pagination.total is the count of segment members visible to the requesting user (all for ALL_ACCESS; owned-only for OWNED_ONLY). added_at is when the contact entered the segment in the most recent recalculation run. Contact fields (name, source) are resolved from the MongoDB contact collection.
Readiness caveat: while a segment's first recalculation is still pending (
processing_status: "pending"), this endpoint returns an emptydataarray withtotal: 0— indistinguishable from a segment that genuinely matches zero customers. Clients that need to tell these apart should read the segment detail (GET /iag/v1/segments/:id) and checkis_ready/processing_statusbefore treating an empty result as authoritative. The same applies to the S2SGET /api/v1/segments/:id/customersendpoint.
GET /api/v1/contacts/:contact_id/segments — Internal: Get Segments for a Contact
Auth: BasicAuth (machine-to-machine, no user permission check)
Query params:
| Param | Type | Required | Description |
|---|---|---|---|
company_sso_id | string | Yes | Tenant scope. The handler returns 400 if absent. The member-lookup query is scoped to this value, so a valid contact_id belonging to a different company returns an empty segments array rather than an error. |
Response 200 OK:
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"customer_id": "abc123",
"segments": [
{ "segment_id": "seg_001", "segment_name": "Loyal Customer" },
{ "segment_id": "seg_002", "segment_name": "WhatsApp Customer" }
]
}
]
}
POST /api/v1/segments/recalculate — Internal: Trigger Segment Recalculation
Auth: BasicAuth (machine-to-machine, no user permission check)
Triggers an async recalculation for specific companies and, optionally, specific segments. Acquires the same Redis lock as the daily cron — concurrent runs are rejected with 409. Returns immediately with a recalc_run_id; callers track progress via Kafka cdp.segment.recalculated events carrying that ID.
Request body:
{
"company_sso_ids": ["company_abc", "company_xyz"],
"segment_ids": ["683ab..."]
}
| Field | Required | Description |
|---|---|---|
company_sso_ids | Yes | Non-empty array of companies whose segments to recalculate. |
segment_ids | No | When provided, only the listed segments are processed (verified to belong to one of the given companies). When omitted, all active segments for the given companies are processed. |
Response 202 Accepted:
{
"resp_code": "202",
"resp_desc": { "id": "Diterima", "en": "Accepted" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [{ "recalc_run_id": "01J..." }]
}
Per-endpoint error HTTP statuses:
| HTTP | Trigger | Notes |
|---|---|---|
| 400 | Missing or empty company_sso_ids | |
| 401 | BasicAuth failure | |
| 409 | ErrRecalcAlreadyRunning | Another recalculation is in progress (daily cron or a prior trigger) |
GET /api/v1/segments/:segment_id/customers — Internal: Get Detailed Customers in Saved Segment
Auth: BasicAuth (machine-to-machine, no user permission check)
Query params:
| Param | Type | Required | Description |
|---|---|---|---|
company_sso_id | string | Yes | Tenant scope. The handler returns 400 if absent. After loading the segment, the service verifies segment.company_sso_id == company_sso_id; a mismatch returns 404 to avoid leaking resource existence across tenants. |
page | int | No | Default 1 |
per_page | int | No | Default 20, max 100 |
Returns a paginated list of detailed customer data for all members of a saved segment. Intended for downstream service integration (Broadcast, Chatbot). Unlike GET /iag/v1/segments/:id/customers, there is no OWNED_ONLY filtering — all segment members are always returned.
The response shape mirrors ContactSerializer with the following fields omitted: crm_data, chat_data, avatar, accounts, disabled_field, field_permission, permission, is_enable_usernames, and the scalar username field (only usernames array is included). An added_at field (from customer_segment_members) is appended to each item.
Response 200 OK (payload.PrivateSegmentCustomerItem):
{
"resp_code": "200",
"resp_desc": { "id": "OK", "en": "OK" },
"meta": { "version": "v1.0.0", "api_env": "production", "trace_id": "..." },
"data": [
{
"id": "abc123",
"company_sso_id": "company_xyz",
"company_billing_id": "billing_001",
"name": "Budi Santoso",
"email": "budi@example.com",
"phone": ["+6281234567890"],
"is_deleted": false,
"custom_fields": [{ "key": "annual_revenue", "value": "500000000" }],
"owner_id": "user_sso_owner",
"owner_name": "Andi Kurniawan",
"assignee_id": "user_sso_assignee",
"assignee_name": "Dewi Rahayu",
"usernames": [],
"source": "WhatsApp",
"source_id": "src_001",
"source_name": "WhatsApp",
"status": "active",
"status_id": "sts_001",
"status_name": "Active",
"job_title": "Manager",
"tags": ["vip", "jakarta"],
"flag": "",
"address": { "street": "Jl. Sudirman", "city": "Jakarta", "country": "ID" },
"date_of_birth": "1990-01-15",
"sex": "M",
"sex_id": "sex_001",
"sex_name": "Male",
"phone_marketing_opt_in": [],
"is_loyalty_member": false,
"added_at": "2026-06-10T01:00:00Z",
"created_at": "2026-01-15T00:00:00Z",
"updated_at": "2026-05-01T00:00:00Z",
"created_by": "user_sso_creator",
"updated_by": "user_sso_updater"
}
],
"pagination": { "page": 1, "per_page": 20, "total": 1050, "total_pages": 53 }
}
pagination.total is the total count of all segment members. added_at is when the contact entered the segment in the most recent recalculation run.
Rule Engine Design
The Rule Engine is a pure Go component (no external dependency) that translates a RuleSet struct into a parameterised SQL WHERE clause.
Operator → SQL mapping
| Operator | Field types | SQL |
|---|---|---|
is | text, dropdown | field = $N |
is_not | text, dropdown | field != $N |
contains | text | field ILIKE $N (value wrapped in %..%) |
does_not_contain | text | field NOT ILIKE $N |
starts_with | text | field ILIKE $N (value appended with %) |
ends_with | text | field ILIKE $N (value prepended with %) |
is_empty | all | (field IS NULL OR field = '') |
is_not_empty | all | (field IS NOT NULL AND field != '') |
equals | number, boolean | field = $N |
not_equals | number | field != $N |
greater_than | number | field > $N |
less_than | number | field < $N |
between | number, date | field BETWEEN $N AND $M |
on | date | DATE(field) = $N |
not_on | date | DATE(field) != $N |
before | date | field < $N |
after | date | field > $N |
contains_any | multi-select | field && ARRAY[$N]::text[] (PostgreSQL) |
contains_all | multi-select | field @> ARRAY[$N]::text[] (PostgreSQL) |
is_empty | file, signature, gps | (field IS NULL) (presence check; no empty-string variant) |
is_not_empty | file, signature, gps | (field IS NOT NULL) |
Field resolver
Before building the SQL expression for a condition, the Rule Engine resolves the field reference using field_category + field_type:
buildFieldExpr(field, field_category, field_type) → string
system → field (e.g. "created_at_date", "created_by_user_id")
default,
text / multi_line_text / dropdown / url → "default_fields->>'{field}'"
number → "(default_fields->>'{field}')::numeric"
date → "(default_fields->>'{field}')::date"
boolean → "(default_fields->>'{field}')::boolean"
file / signature / gps → "default_fields->>'{field}'" (presence-only: is_empty/is_not_empty only; no other operators permitted)
custom,
text / multi_line_text / dropdown / url → "custom_fields->>'{field}'"
number → "(custom_fields->>'{field}')::numeric"
date → "(custom_fields->>'{field}')::date"
file / signature / gps → "custom_fields->>'{field}'" (presence-only)
is_empty / is_not_empty for JSONB fields differs from top-level columns — a key is absent or null, never an empty string:
-- is_empty (system)
created_at_date IS NULL
-- is_empty (default / custom field — text, number, date, dropdown, etc.)
default_fields->>'source' IS NULL
-- is_not_empty (default / custom field)
default_fields->>'source' IS NOT NULL
-- is_empty (file / signature / gps — presence-only; same SQL, no cast)
default_fields->>'profile_photo' IS NULL
-- is_not_empty (file / signature / gps)
default_fields->>'profile_photo' IS NOT NULL
file,signature, andgpsfield types support onlyis_emptyandis_not_empty. The Rule Engine must reject any other operator for these types and returnErrInvalidRule(surfaced as HTTP 422INVALID_RULE).multi_line_textfollows the same path astext— all text operators apply.
SQL generation example
Rule:
Group 1 (AND): source (default, dropdown) is 'WhatsApp'
AND annual_revenue (custom, number) greater_than 100000000
Group 2: created_at_date (system, date) before '2026-01-01'
Top-level: AND
Generated WHERE clause:
WHERE company_sso_id = $1
AND (
(
default_fields->>'source' = $2
AND (custom_fields->>'annual_revenue')::numeric > $3
)
AND
(created_at_date < $4)
)
Parameters: ['company_abc', 'WhatsApp', 100000000, '2026-01-01T00:00:00Z']
All SQL queries use positional placeholders (
$N) to prevent SQL injection. The Rule Engine must never interpolate user-supplied values directly into the query string. Field keys from the rule payload are validated against the company's registered field properties before the expression is built — an unrecognised key returns an error, never a raw SQL fragment.
3. High-Availability & Security
High Availability
- Contact-service is stateless and horizontally scalable behind the API Gateway load balancer.
- MongoDB replica set provides read/write availability; segment CRUD operations use the primary.
- Datamart queries are read-only; the connection pool should be sized separately from the MongoDB driver to avoid resource contention.
- The cron recalculation job is protected by a Redis distributed lock (
segment_recalculation_lockwith TTL = 2 hours) to ensure only one pod runs recalculation at a time in a multi-replica deployment.
Recalculation feature flag (cdp_segment_recalc_enabled):
A separate global MongoDB feature flag — distinct from cdp_segmentation_enabled — gates the two regular, user/schedule-driven recalculation paths: the daily cron RecalculateAllSegments and the scoped on-write recalc recalculateSegment (§2.11, fired after create / rule-changing update). Both check the flag at the start of the method/goroutine, before any lock acquisition; when disabled, they log and return/no-op immediately without touching a lock. Use this flag during datamart maintenance windows to pause all automatic recalculation without affecting the CRUD API — a segment created or edited while the flag is off simply stays pending/stale until the flag is re-enabled and the next cron run (or another edit) picks it up.
It deliberately does not gate TriggerRecalculation (§2.10) — the S2S manual trigger bypasses the flag unconditionally, so an operator retains a way to force a targeted recalculation even during a maintenance window where automatic recalculation is paused.
Recalculation circuit breaker:
An operator can gracefully stop a running recalculation without killing the pod by writing any non-empty value to the Redis key segment_recalculation_stop_requested. The recalculation loop checks this key at the start of each batch (before fetching the next page of segments). When detected, the cron: (1) deletes the key (one-shot — tomorrow's scheduled run is unaffected), (2) logs INFO "segment recalc circuit breaker triggered", (3) releases the lock (deferred), and (4) returns nil. The key carries a configurable TTL (SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDS, default 86400 s / 24 h) as a safety net in case the cron exits abnormally before detecting it. To permanently disable future runs, use the cdp_segment_recalc_enabled feature flag instead.
External Call Timeout Strategy
Every external call wraps its context with a dedicated context.WithTimeout so a single hanging call cannot block a goroutine indefinitely. Timeout values are configurable at deploy time via env vars (see Configuration Contract).
| Call site | Context applied to | Env var | Default |
|---|---|---|---|
IDatamartRepository.Count / Sample in PreviewSegment | datamart SQL query | DATAMART_QUERY_TIMEOUT_SECONDS | 30 s |
IDatamartRepository.FetchContactIDs in processSegment (cron) | per-segment SQL query | DATAMART_RECALC_QUERY_TIMEOUT_SECONDS | 60 s |
MongoDB session transaction in processSegment (ReplaceMembers + InsertEvents) | entire transaction | MONGODB_OPERATION_TIMEOUT_SECONDS | 30 s |
ISegmentEventPublisher.Publish per segment | Kafka produce + flush | KAFKA_PUBLISH_TIMEOUT_SECONDS | 10 s |
Redis lock acquire (SetNX) and heartbeat extend (Expire) | Redis command | REDIS_OPERATION_TIMEOUT_SECONDS | 5 s |
Kafka publish retry: Publish retries up to SEGMENT_KAFKA_PUBLISH_MAX_RETRIES times (default 2) with a fixed SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MS (default 500 ms) between attempts before returning a non-fatal error. Each attempt respects the per-attempt context timeout from KAFKA_PUBLISH_TIMEOUT_SECONDS. If all retries are exhausted, the failure is logged and the Datadog metric segment.event.publish_error{segment_id} is incremented; the recalculation result is not rolled back.
Performance Requirement
- Preview endpoint (
POST /segments/preview): p95 < 3s, p99 < 8s. Heavy queries hit the datamart only; no impact on MongoDB. - Segment list / detail APIs: p99 < 500ms (reads from MongoDB with proper indexes).
- Daily recalculation cron: Must complete within the 2-hour lock window for the largest companies. Segments are processed in batches of 100 per goroutine pool (configurable).
Scalability measures:
- Horizontal pod autoscaler can be applied independently since the service is stateless.
- Datamart read replica(s) for query isolation.
- Segment member replacement uses MongoDB bulk write (
BulkUpdate) to minimise round-trips. - Redis cache for
total_matchedandpercentage_reachvalues (TTL = 24 hours, invalidated on recalculation).
Monitoring & Alerting
| Signal | Tool | Alert Threshold |
|---|---|---|
| Cron job duration | Datadog custom metric segment.recalc.duration_ms | > 90 min |
| Cron job failure | Datadog error log monitor | Any error in recalculation loop |
| Preview endpoint error rate | Datadog APM | > 1% 5xx |
| Preview endpoint latency | Datadog APM p99 | > 8s |
| Datamart query errors | Datadog log | Any SQL error |
Datadog APM traces will be added to:
- All HTTP handlers (existing
chitracemiddleware covers this) - Cron job start/finish spans
- Rule engine
BuildQuery()call - Datamart
Execute()call
Logging
Structured slog logs with the following fields:
| Log point | Level | Fields |
|---|---|---|
| Segment created | INFO | segment_id, company_sso_id, created_by |
| Segment archived | INFO | segment_id, company_sso_id, user_sso_id |
| Recalculation started | INFO | batch_size, total_segments |
| Recalculation per segment | INFO | segment_id, company_sso_id, total_matched, duration_ms |
| Recalculation error (per segment) | ERROR | segment_id, error |
| Preview query executed | INFO | company_sso_id, duration_ms, total_matched |
| Rule engine build error | ERROR | rule_set_json, error |
No PII (customer names, phone numbers, emails) must appear in logs. contact_id (an opaque ObjectID) is acceptable.
Security Implications
- SQL Injection prevention: All datamart queries use parameterised statements. The Rule Engine never interpolates user values into SQL strings. Static code analysis via SonarQube will enforce this.
- Tenant isolation: Every datamart query and MongoDB query includes
company_sso_idas a mandatory filter. The service reads this from the authenticated request context (not from the request body). - Permission enforcement: All segment endpoints check Launchpad CRS permissions. Endpoints that write data (
POST,PUT,PATCH) require explicitALL_ACCESSorOWNED_ONLYgrants. - OWNED_ONLY boundary: When
OWNED_ONLYis evaluated, the service comparessegment.created_by == authenticated_user_sso_id. This check happens in the service layer, not in the handler. - Internal endpoints (
/api/v1/...) are protected byBasicAuthonly; they are not accessible from the internet (API Gateway restricts these paths to internal VPC traffic). - Rate limiting: The preview endpoint is computationally heavy; a rate limiter (existing
rate_limiterservice pattern) should be applied percompany_sso_id— max 10 preview requests per minute. - PII compliance (ISO 27001 / 27701): Segment names and rule values may contain PII (e.g., a rule
email contains @gmail.com). Rule values are stored encrypted at rest in MongoDB (existing MongoDB encryption-at-rest configuration). Rule values are excluded from logs.
4. Backwards Compatibility and Rollout Plan
Compatibility
- All new endpoints are additive No existing endpoints are modified.
- The
customer_segmentsandcustomer_segment_membersMongoDB collections are new; no schema migration is required for existing collections. - New permission keys (
customers_segment_*) must be registered in Launchpad CRS before deploying the backend. If keys are missing, the middleware will return 403 by default (fail-secure). - The datamart database connection is configured via a new env var
DATAMART_DSN. If unset, the preview and recalculation features will return a graceful error; segment CRUD operations are unaffected.
Configuration Contract
All new env vars, their types, and defaults. Every default must be overridable at deploy time without code changes.
| Env Var | Type | Default | Description |
|---|---|---|---|
DATAMART_DSN | string | "" (disabled) | PostgreSQL DSN for the segmentation datamart; feature disabled when empty |
SEGMENT_RECALC_WORKERS | int | 4 | Goroutine pool size for concurrent segment recalculation |
SEGMENT_RECALC_BATCH_SIZE | int | 100 | Segments fetched per page from MongoDB during recalculation |
SEGMENT_RECALC_LOCK_TTL_SECONDS | int | 7200 | Initial Redis lock TTL for the cron job (seconds) |
SEGMENT_RECALC_LOCK_RENEW_INTERVAL_SECONDS | int | 600 | Heartbeat goroutine renewal interval (seconds); must be < LOCK_TTL / 4 |
SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS | int | 300 | TTL (seconds) for the per-segment lock segment_recalculation_lock:{segmentID} used by the scoped on-write recalc (create / rule edit) and acquired by the cron per segment to guarantee a single writer |
SEGMENT_PREVIEW_RATE_LIMIT | int | 10 | Max preview requests per minute per company_sso_id |
SEGMENT_PREVIEW_MAX_ROWS | int | 10000 | Row cap for datamart queries in preview; triggers is_truncated: true when exceeded |
DATAMART_QUERY_TIMEOUT_SECONDS | int | 30 | context.WithTimeout deadline for preview datamart queries (seconds) |
DATAMART_RECALC_QUERY_TIMEOUT_SECONDS | int | 60 | context.WithTimeout deadline for per-segment FetchContactIDs queries during recalculation (seconds) |
MONGODB_OPERATION_TIMEOUT_SECONDS | int | 30 | context.WithTimeout deadline for MongoDB session transaction in processSegment — covers ReplaceMembers + InsertEvents together (seconds) |
KAFKA_PUBLISH_TIMEOUT_SECONDS | int | 10 | context.WithTimeout deadline for Publish per segment (seconds) |
REDIS_OPERATION_TIMEOUT_SECONDS | int | 5 | context.WithTimeout deadline for Redis lock acquire and heartbeat extend operations (seconds) |
SEGMENT_KAFKA_PUBLISH_MAX_RETRIES | int | 2 | Max retry attempts for Publish before declaring failure and continuing to the next segment |
SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MS | int | 500 | Fixed backoff delay between Kafka publish retry attempts (milliseconds) |
SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDS | int | 86400 | TTL (seconds) applied to the segment_recalculation_stop_requested Redis key when the trigger API writes it; auto-expires the signal if the cron pod exits before detecting it |
Resource & Storage Estimates
| Resource | Estimate | Basis |
|---|---|---|
customer_segments growth | ~1–5 KB/doc × N segments/company | Small; dominated by rule_set BSON; negligible at scale |
customer_segment_members growth | ~100 bytes/doc × members × segments | 1M contacts × 10 segments × 100 B = ~1 GB/company before recalc churn |
customer_segment_membership_events growth | ~150 bytes/doc; 90-day TTL | At 10% daily churn: 1M × 10% × 10 segs × 150 B × 90 days ≈ 13.5 GB/company; 90-day TTL bounds this |
| Datamart query load delta | Read-only; bounded by SEGMENT_RECALC_WORKERS × BATCH_SIZE concurrent queries | Separate connection pool; no impact on primary MongoDB |
| Per-goroutine peak memory | 50–100 MB (500k contact ID set × 2 for diff) | Tune SEGMENT_RECALC_WORKERS conservatively until profiled in production |
Rollout Strategy
| Phase | Description | Gate |
|---|---|---|
| Phase 0 — Infrastructure | Data team provisions datamart + CDC pipeline. Launchpad team registers 4 permission keys. | Datamart schema agreed and accessible from contact-service pods |
| Phase 1 — Segment CRUD | Deploy segment CRUD endpoints behind feature flag cdp_segmentation_enabled, evaluated via the existing FeatureFlagService.FeatureEnabled(ctx, "cdp_segmentation_enabled", companySsoID) (MongoDB feature_flag collection). Default state: disabled — the flag document is absent at deploy time; FeatureEnabled returns false for all companies. When the flag is disabled for the requesting company, all IAG segment endpoints return 404 Not Found. No recalculation yet. | QA sign-off on CRUD scenarios |
| Phase 2 — Preview | Deploy preview endpoint. Manual testing with real datamart data. Performance test preview latency. | p99 < 8s confirmed in staging |
| Phase 3 — Recalculation Cron | Enable cron job in staging (08:00 WIB daily). Monitor duration and correctness. | Cron completes in < 90 min for largest staging company |
| Phase 4 — Service Integration | Enable /api/v1/contacts/:id/segments endpoint. Coordinate with Broadcast team. | Broadcast service integration test passes |
| Phase 5 — Production Rollout | Add pilot company_sso_id values to the CompanySsoIDs array in the feature_flag document (code: "cdp_segmentation_enabled") → gradual rollout → all companies by setting is_general_flag: true and enabled: true on the same document. | No P0/P1 alerts in first 48h for pilot group |
Rollback procedure:
- Disable feature flag
cdp_segmentation_enabled: setenabled: falseon thefeature_flagdocument withcode: "cdp_segmentation_enabled"in MongoDB, or remove specificcompany_sso_idvalues from itsCompanySsoIDsarray for partial rollback. All IAG segment endpoints return404 Not Foundfor companies where the flag evaluates tofalse. - Cron job can be disabled by removing the cron registration from
worker.CronListand redeploying. - MongoDB collections (
customer_segments,customer_segment_members) can be left in place — they have no impact on existing functionality. - Datamart connection failure does not affect existing contact CRUD operations (separate connection pool).
5. Concerns, Questions, and Known Limitations
Open Questions
| # | Question | Owner | Status |
|---|---|---|---|
| Q1 | What is the confirmed datamart technology (PostgreSQL vs ClickHouse)? This affects SQL dialect (e.g., ILIKE, && operator, JSONB). | Data team | Resolved — PostgreSQL confirmed. |
| Q2 | What is the agreed datamart schema for custom fields — JSONB column or EAV rows? JSONB is preferred for query simplicity. | Data / CDP BE | Resolved — Schema confirmed: 9 columns with default_fields JSONB (default + Qontak fields) and custom_fields JSONB (user fields); basic event fields are top-level columns. |
| Q3 | What is the CDC lag SLA? If CDC lag > 24h, segment results may be based on stale data. | Data team | Open |
| Q4 | Should the preview endpoint enforce a maximum result row count for datamart queries (e.g., LIMIT 10,000)? Without a cap, large companies could trigger very expensive queries. | BE + PM | Resolved — LIMIT set to SEGMENT_PREVIEW_MAX_ROWS (default 10,000); is_truncated: true returned when results are capped; total_customer_base added to response. |
| Q5 | Is chatroom data available in the datamart for WhatsApp reachability calculation? If not, reachability can fall back to phone IS NOT NULL. | Data team | Resolved — Chat-related data is stored inside default_fields->'chat_data' JSONB sub-object; a non-null chat_data value indicates an active chatroom. WhatsApp reach = default_fields->'chat_data' IS NOT NULL AND default_fields->>'phone' IS NOT NULL. If chat_data is absent for all members at Phase 5 cutover (CDC backfill not yet complete), contact-service must detect this and return null for WhatsApp reachability rather than silently presenting 0 as authoritative. |
| Q6 | How should the Broadcast service consume segment members — pull (API call) or push (webhook/event on recalculation)? | Broadcast + CDP BE | Resolved — CDP publishes one Kafka event per segment per recalculation run to topic cdp.segment.recalculated. Each event carries segment_id, segment_name, company_sso_id, aggregate counts (total_added, total_removed, total_matched), and a recalc_run_id for idempotency. Consumers react at the segment level and pull individual contact details on-demand via GET /api/v1/segments/:id/customers when needed. See sequence diagram 2.6 and event payload definition. |
| Q7 | Member Loyalty segmentation group — confirmed deferred post-MVP? | PM | Open |
Known Limitations
- Segment freshness: For existing, unchanged segments, members are updated once daily — customers whose data changes after 08:00 WIB will not move in/out of a segment until the next recalculation (by product design, MVP). The exception is the moment of create or rule edit, which triggers an immediate best-effort scoped recalc (2.11) so the segment reflects the current datamart within seconds. This is best-effort: if the recalc feature flag is off, the per-segment lock is held, or the datamart is unavailable, the segment stays
pending/staleuntil the next daily cron. Clients should consultprocessing_statusrather than assuming an empty member list means an empty segment. - Rule complexity cap: Maximum 2 groups × 3 conditions = 6 total conditions per segment. This is enforced in the service layer and validated at API input time.
- Large company recalculation time: Companies with > 500k customers and many active segments may push the cron window close to the 2-hour lock TTL. Mitigation: configurable goroutine pool size and an early-exit circuit breaker.
- Custom field type inference: The Rule Engine must know the
field_typefor each condition to apply the correct operator and SQL cast. The frontend must always passfield_typein the rule condition payload; the backend validates it against the registered field properties. - Membership diff memory cost: The recalculation cron holds the old and new contact ID sets in memory simultaneously to compute the diff. For a company with 500k contacts and 50 active segments this can reach ~50–100 MB per worker goroutine at peak. The goroutine pool size should be tuned conservatively for large tenants until this is profiled in production.
6. Comment Logs
| Date | Comment(s) From | Action Item(s) |
|---|---|---|
| 2026-05-19 | Initial draft | N/A |
| 2026-05-19 | Data team confirmation | Datamart schema confirmed: PostgreSQL, 9 columns, default_fields JSONB + custom_fields JSONB + 4 top-level basic event columns. Updated assumption #3, datamart schema section, Rule Engine field resolver + SQL generation example, and resolved Q1/Q2. |
| 2026-05-19 | Design decision | Added Option B audit trail for membership transitions: new customer_segment_membership_events collection, recalculation strategy updated to diff + write events, T4.3 added, T4.4–T4.5 renumbered, Known Limitation #5 revised to reflect memory cost trade-off. |
| 2026-06-10 | RFC review — priority actions | (1) Added shared error contract + per-endpoint error taxonomy (error envelope, code registry, per-endpoint table, example payloads). (2) Specified recalc atomicity: MongoDB session transaction wrapping ReplaceMembers+InsertEvents; added recalc_run_id ULID idempotency key to customer_segment_members and customer_segment_membership_events; lock heartbeat goroutine; poison-segment skip at 3 consecutive failures. (3) Resolved Q4: preview capped at SEGMENT_PREVIEW_MAX_ROWS (default 10,000) with is_truncated flag and total_customer_base denominator. Resolved Q5: chat data stored in default_fields->'chat_data' JSONB sub-object (no separate has_chatroom key); WhatsApp reach = default_fields->'chat_data' IS NOT NULL AND phone IS NOT NULL; gates on chat_data availability rather than shipping phone IS NOT NULL proxy. (4) Added file/signature/gps (presence-only) and multi_line_text to Rule Engine operator table and field resolver; replaced pending_evaluation status value with rule_dirty_at nullable timestamp field. (5) Added Configuration Contract table (7 env vars with types/defaults); added Resource & Storage Estimates table; reconciled T3.1/T3.2 from database/sql+go-sqlmock to pgx/v5+pgxmock. |
| 2026-06-10 | Q6 design decision | Resolved Q6: CDP emits Kafka events (push) when recalculation produces a non-empty member diff. Architecture diagram updated to include Kafka broker and EventPublisher component. Sequence diagram 2.6 added. Pull API retained for on-demand queries. Publish failure is non-fatal (logged + segment.event.publish_error metric); MongoDB is authoritative. (Topic design subsequently revised — see 2026-06-12 entry.) |
| 2026-06-12 | Schema + event publishing revision | (1) Datamart schema simplified to exactly 6 top-level columns: company_sso_id, customer_id, created_at_date, created_by_user_id, default_fields, custom_fields. Fields updated_at, updated_by, bsuid moved into default_fields JSONB; chat presence stored in default_fields->'chat_data' JSONB sub-object (no standalone has_chatroom key); company_id column removed. Updated: assumption #3, datamart schema table, field-access table, indexes, Rule Engine field resolver and SQL example, Q5 resolution. (2) field_category values renamed: "basic_event" → "system", "custom_qontak" merged into "default", "custom_user" → "custom". Three categories now: system (top-level columns), default (default_fields JSONB), custom (custom_fields JSONB). Updated throughout: RuleSet example, field-access table, Rule Engine resolver, SQL example, and Go constants in segment.go. (3) Segment member event redesigned from aggregate to per-contact: topic renamed to cdp.segment.member_changed (singular), one message per customer per membership transition; SegmentMemberChangedEvent struct adds contact_id and event ("added"/"removed"), removes aggregate count fields; ISegmentEventPublisher.BatchPublish replaces PublishMembersChanged; message key changed from segment_id to contact_id. Task breakdown: new T4.5 (ISegmentEventPublisher), T4.4 extended with batch-publish step, T4.6 wiring updated. |
| 2026-06-15 | WhatsApp reachability schema correction | Replaced standalone has_chatroom key with default_fields->'chat_data' JSONB sub-object. A non-null chat_data value indicates an active chatroom. WhatsApp reach query updated to default_fields->'chat_data' IS NOT NULL AND default_fields->>'phone' IS NOT NULL. CDC backfill guard updated accordingly: check whether any member has a non-null chat_data object; if absent, return null reachability. Updated: assumption #3, datamart schema table (line 487), Q5 resolution, comment log 2026-06-10 and 2026-06-12 entries, and task breakdown T3.2. |
| 2026-06-17 | Internal segment customers endpoint added | Added GET /private/segments/:segment_id/customers — BasicAuth, no Launchpad permission gate — to expose detailed customer data for saved segment members to downstream machine-to-machine consumers (Broadcast, Chatbot). Response mirrors ContactSerializer minus crm_data, chat_data, avatar, accounts, disabled_field, field_permission, permission, is_enable_usernames, and scalar username; appends added_at from customer_segment_members. Added: sequence diagram 2.9, API spec with PrivateSegmentCustomerItem shape, per-endpoint error row. Task breakdown updated: T1.6 stub route added, T5.4 OpenAPI table updated, new task T5.6 added. Also backfilled missing task T5.2 (GET /iag/v1/segments/:id/customers IAG endpoint with OWNED_ONLY support). |
| 2026-06-18 | S2S route prefix change + API domain mapping | Renamed all S2S (BasicAuth) internal routes from /private/... to /api/v1/... to align with the service's standard API versioning convention. Added a Route → Public URL Mapping table in the APIs section documenting the two routing layers: IAG endpoints (/iag/v1/segments/...) are exposed via https://api.mekari.io/internal/qontak/customer/v1/segments/...; S2S endpoints (/api/v1/...) are exposed directly via https://contact-service.qontak.net/api/v1/.... Updated: sequence diagrams 2.7 and 2.9, per-endpoint error table, both S2S API spec headers, T1.6 route registration, T5.3 curl example, T5.4 OpenAPI table, and T5.6 router wiring in task breakdown. |
| 2026-06-23 | RFC review — blocking gaps resolved | (1) Timeouts: added "External Call Timeout Strategy" section; all five external call surfaces (datamart preview/recalc, MongoDB transaction, Kafka publish, Redis lock) now have context.WithTimeout deadlines driven by 7 new env vars in the Configuration Contract. (2) Feature flag: cdp_segmentation_enabled resolved to the existing FeatureFlagService / feature_flag MongoDB collection — absent at deploy (disabled for all), per-company targeting via CompanySsoIDs, full rollout via is_general_flag: true, S2S endpoints not gated. (3) Kafka retry: Publish retries up to SEGMENT_KAFKA_PUBLISH_MAX_RETRIES (default 2) with SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MS (default 500 ms) backoff; failure after all retries remains non-fatal. Task breakdown: T1.8 added, T3.1/T3.3/T4.4/T4.5/T4.6 updated. |
| 2026-06-25 | S2S tenant isolation + segment versioning | (1) S2S company_sso_id enforcement: added company_sso_id as a required query parameter to both internal endpoints (GET /api/v1/contacts/:contact_id/segments and GET /api/v1/segments/:segment_id/customers). BasicAuth middleware sets no company context, so the parameter is the sole tenant-isolation mechanism. For the contacts endpoint, the member-lookup query is scoped to company_sso_id — a contact from a different company yields an empty segments array. For the segments endpoint, the handler cross-validates segment.company_sso_id == company_sso_id after loading and returns 404 on mismatch to avoid leaking resource existence. Per-endpoint error table updated: both S2S endpoints now list 400. New tasks: T6.1. (2) Segment version label: added version (int64, starts at 1) to customer_segments collection. Incremented atomically via $inc on every user-initiated edit (UpdateSegment) and archive (ArchiveSegment); never touched by cron writes. Exposed in list and detail API responses. rule_version added to SegmentRecalculatedEvent Kafka payload so consumers can correlate which rule definition produced a given recalculation result. DuplicateSegment resets version to 1 on the copy. New tasks: T6.2, T6.3, T6.4. |
| 2026-06-25 | Kafka event redesign — aggregate per-segment | Replaced per-customer Kafka events with a single aggregate event per segment per recalculation run. Rationale: emitting one message per contact membership change is cost-prohibitive at scale (e.g. 100k contacts × 50 active segments = 5M events/day). New design: topic renamed cdp.segment.member_changed → cdp.segment.recalculated; one message per segment carrying total_added, total_removed, total_matched as aggregate counts; partition key changed from contact_id to segment_id. SegmentMemberChangedEvent renamed to SegmentRecalculatedEvent with ContactID/Event removed and TotalAdded/TotalRemoved/TotalMatched added. ISegmentEventPublisher.BatchPublish([]events) simplified to Publish(event). Consumer idempotency key updated to (segment_id, recalc_run_id). Per-contact membership history remains available via GET /api/v1/segments/:id/customers and the customer_segment_membership_events MongoDB collection (unchanged). Updated: architecture diagram, section 2.6 title + intro + sequence diagram + payload + Go struct + interface + publishing sequence + consumer contract + topic config, Q6 resolution. Task breakdown: T4.4 processSegment step, T4.5 struct/interface/implementation/tests, T4.6 topic wiring. |
| 2026-06-26 | Operational controls | (1) Circuit breaker: added segment_recalculation_stop_requested Redis stop-signal with one-shot semantics — checked at each batch boundary in doRecalculation; cron clears key and exits cleanly when detected. New env var SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDS (default 86400 s). (2) Recalc feature flag: cdp_segment_recalc_enabled global MongoDB flag gates RecalculateAllSegments and TriggerRecalculation before lock acquisition; distinct from the per-company CRUD flag cdp_segmentation_enabled. (3) Manual trigger API: POST /api/v1/segments/recalculate (BasicAuth); accepts company_sso_ids (required) + segment_ids (optional); acquires same Redis lock as daily cron; returns 202 Accepted with recalc_run_id immediately; runs in background goroutine; 409 if lock held, 503 if flag disabled. Updated: architecture diagram (new TriggerHandler node), sequence diagram 2.3 (feature flag + circuit breaker steps), new sequence diagram 2.10, HA section, APIs section, Config Contract, per-endpoint error table. |
| 2026-06-30 | Immediate recalc on write + readiness indicator | (1) Scoped recalc on create / rule edit: CreateSegment and a rule-changing UpdateSegment now fire a best-effort, asynchronous single-segment recalculation in a detached goroutine (context.WithoutCancel) so a new/edited segment is usable within seconds instead of waiting for the daily cron. Added sequence diagram 2.11; updated success criterion #4, create diagram 2.1, PUT description, Known Limitation #1. The path is gated by cdp_segment_recalc_enabled and reuses processSegment; failures never affect the create/update response (daily cron is the backstop). (2) Per-segment lock: introduced segment_recalculation_lock:{segmentID} (new env var SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS, default 300 s) so the on-write recalc never contends on the global cron lock; the cron's processSegment also acquires it to guarantee a single writer per segment (prevents double membership-transition events). Updated Lock-safety note. (3) Readiness indicator: GET detail and list responses now expose is_ready (bool) and processing_status (pending/ready/stale/failed), derived at mapping time from existing last_evaluated_at + rule_dirty_at + recalc_error_count (no new column/migration). Updated success criterion #5, list & detail API examples + field-rules table, added derivation table to Database Model, and a readiness caveat to the member-list endpoints (empty list vs not-yet-computed). |
| 2026-07-01 | Recalc feature flag scope decision: S2S manual trigger exempted | Confirmed intended scope of cdp_segment_recalc_enabled: it gates both the daily cron RecalculateAllSegments and the scoped on-write recalc recalculateSegment/§2.11 (fired on create / rule-changing update) — those two are the regular, automatic recalculation paths. It does not gate the S2S manual trigger TriggerRecalculation/§2.10 — an operator must retain the ability to force a targeted recalc even during a maintenance window where automatic recalculation is paused. Corrected §2.10's diagram + description + per-endpoint error table (removed the FeatureEnabled step, the FF participant, and the ErrRecalcDisabled/503 row, which were never implemented) and the §3 "Recalculation feature flag" note to state this scope explicitly. Known gap: the shipped contact-service code does not yet check the flag in recalculateNewSegment (the code's name for this RFC's recalculateSegment) — isRecalcEnabled has exactly one caller (RecalculateAllSegments) as of this writing. Tracked as a follow-up implementation task in the task breakdown (T8.2). |