rfc-customer-segmentation-basic-attributes-be.task-breakdown
7. Task Breakdown
The implementation is broken into 8 phases with individually completable tasks. Each task follows TDD (Red → Green → Refactor) per AGENTS.md. Mocks (make mocks) and unit tests are implicit deliverables of every task — they are not listed as separate items.
Phase 1: Segment CRUD (Foundation)
T1.1 — Define domain structs [Jira: —]
- Purpose: Establish the core Go types shared across all layers before any logic is written.
- Scope:
internal/app/repository/segment/—CustomerSegment,RuleSet,RuleGroup,RuleConditionstructs withbson/jsontags; enum constants forstatusandoperator. - Expected Outcome: Types compile; JSON and BSON round-trip tests pass;
go vetclean. - Implementation Plan:
- Create
internal/app/repository/segment/types.go. Define the following structs:type CustomerSegment struct {ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`CompanySsoID string `bson:"company_sso_id" json:"company_sso_id"`Name string `bson:"name" json:"name"`Description *string `bson:"description,omitempty" json:"description,omitempty"`Status string `bson:"status" json:"status"`RuleDirtyAt *time.Time `bson:"rule_dirty_at,omitempty" json:"rule_dirty_at,omitempty"`RuleSet RuleSet `bson:"rule_set" json:"rule_set"`TotalMatched int64 `bson:"total_matched" json:"total_matched"`LastEvaluatedAt *time.Time `bson:"last_evaluated_at,omitempty" json:"last_evaluated_at,omitempty"`CreatedBy string `bson:"created_by" json:"created_by"`UpdatedBy string `bson:"updated_by" json:"updated_by"`CreatedAt time.Time `bson:"created_at" json:"created_at"`UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`IsDeleted bool `bson:"is_deleted" json:"is_deleted"`RecalcErrorCount int `bson:"recalc_error_count" json:"recalc_error_count"`}type RuleSet struct {Operator string `bson:"operator" json:"operator"` // "AND" | "OR"Groups []RuleGroup `bson:"groups" json:"groups"`}type RuleGroup struct {Operator string `bson:"operator" json:"operator"` // "AND" | "OR"Conditions []RuleCondition `bson:"conditions" json:"conditions"`}type RuleCondition struct {Field string `bson:"field" json:"field"`FieldCategory string `bson:"field_category" json:"field_category"` // "system" | "default" | "custom"FieldType string `bson:"field_type" json:"field_type"`Operator string `bson:"operator" json:"operator"`Value string `bson:"value" json:"value"`Value2 *string `bson:"value2,omitempty" json:"value2,omitempty"`} - Add constants in the same file:
const (StatusActive = "active"StatusArchived = "archived"RuleOperatorAND = "AND"RuleOperatorOR = "OR"FieldCategorySystem = "system" // top-level datamart columns: created_at_date, created_by_user_idFieldCategoryDefault = "default" // default_fields JSONB: default + Qontak-managed fieldsFieldCategoryCustom = "custom" // custom_fields JSONB: user-defined fields// Full set of condition operatorsConditionOperatorIs = "is"ConditionOperatorIsNot = "is_not"ConditionOperatorContains = "contains"ConditionOperatorDoesNotContain = "does_not_contain"ConditionOperatorStartsWith = "starts_with"ConditionOperatorEndsWith = "ends_with"ConditionOperatorIsEmpty = "is_empty"ConditionOperatorIsNotEmpty = "is_not_empty"ConditionOperatorEquals = "equals"ConditionOperatorNotEquals = "not_equals"ConditionOperatorGreaterThan = "greater_than"ConditionOperatorLessThan = "less_than"ConditionOperatorBetween = "between"ConditionOperatorOn = "on"ConditionOperatorNotOn = "not_on"ConditionOperatorBefore = "before"ConditionOperatorAfter = "after"ConditionOperatorContainsAny = "contains_any"ConditionOperatorContainsAll = "contains_all")
- Create
internal/app/repository/segment/types_test.go. Write table-driven tests that marshal/unmarshal a fully-populatedCustomerSegmentto JSON and BSON, asserting: field names are snake_case, nullable fields omit from output when nil, enum string values match the constants exactly.
- Create
T1.2 — MongoDB migration: customer_segments collection [Jira: —]
- Purpose: Provision the MongoDB collection and indexes required by the CRUD layer before any repository code is deployed.
- Scope:
db/migrations/— up/down JSON migration files creating thecustomer_segmentscollection and four compound indexes. - Expected Outcome: Migration runs successfully against a local MongoDB instance; indexes verified via
db.customer_segments.getIndexes(). - Implementation Plan:
- Determine the next sequence number from
db/migrations/and create{seq}_create_customer_segments.up.json:[{ "create": "customer_segments" },{"createIndexes": "customer_segments","indexes": [{ "key": { "company_sso_id": 1, "status": 1, "is_deleted": 1 }, "name": "idx_cs_company_status" },{ "key": { "company_sso_id": 1, "name": 1 }, "name": "idx_cs_company_name" },{ "key": { "company_sso_id": 1, "created_at": -1 }, "name": "idx_cs_company_created" },{ "key": { "company_sso_id": 1, "created_by": 1 }, "name": "idx_cs_company_owner" }]}] - Create
{seq}_create_customer_segments.down.json:[{"drop": "customer_segments"}]. - Run the migration against a local MongoDB instance and verify all four indexes appear in
db.customer_segments.getIndexes().
- Determine the next sequence number from
T1.3 — ISegmentRepository interface + MongoDB implementation [Jira: —]
- Purpose: Provide the persistence layer for segment documents so the service layer has no direct MongoDB dependency.
- Scope:
internal/app/repository/segment/— interface definition and MongoDB implementation; mockery-generated mock included. - Expected Outcome: All repository methods covered by table-driven unit tests;
go test ./internal/app/repository/segment/...passes with ≥ 80% coverage. - Implementation Plan:
- Create
internal/app/repository/segment/segment_repository.go. Define the filter and update structs plus the interface:type SegmentFilter struct {CompanySsoID stringStatus string // "active" | "archived" | "" (all)Search string // partial name match, min 3 chars when non-emptyCreatedFrom *time.TimeCreatedTo *time.TimeCreatedBy string // non-empty when OWNED_ONLY permissionPage intPageSize int}type CustomerSegmentUpdate struct {Name *stringDescription *stringRuleSet *RuleSetUpdatedBy stringRuleDirtyAt *time.Time // set by service when rule_set changes}type ISegmentRepository interface {FindByID(ctx context.Context, id string) (*CustomerSegment, error)FindAll(ctx context.Context, filter SegmentFilter) ([]CustomerSegment, int64, error)Insert(ctx context.Context, seg *CustomerSegment) (string, error)Update(ctx context.Context, id string, update *CustomerSegmentUpdate) errorUpdateStatus(ctx context.Context, id, status string) errorUpdateStats(ctx context.Context, id string, totalMatched int64, lastEvaluatedAt time.Time) errorClearRuleDirtyAt(ctx context.Context, id string) errorIncrementRecalcErrorCount(ctx context.Context, id string) error} - Create
internal/app/repository/segment/segment_repository_mongo.go. ImplementsegmentRepositorystruct wrappingrepository.IDbRepo. Constructor:NewSegmentRepository(db repository.IDbRepo) ISegmentRepository. Each method builds a BSON filter and delegates toIDbRepoprimitives (FindBy,Create,UpdateOne, etc.). - Run
make mocksto generatemocks/MockISegmentRepository.go. - Create
internal/app/repository/segment/segment_repository_test.go. Table-driven tests for each method, covering: found, not-found, and DB error cases for reads; success and error for writes.
- Create
T1.4 — ISegmentService interface + SegmentService implementation [Jira: —]
- Purpose: Implement all segment business logic (
CreateSegment,GetSegment,ListSegments,UpdateSegment,ArchiveSegment,DuplicateSegment) including rule structure validation and permission evaluation. - Scope:
internal/app/service/segment/— interface, service struct, validation helpers; depends onISegmentRepository(injected). - Expected Outcome: Table-driven unit tests cover all happy paths and error cases;
go test ./internal/app/service/segment/...passes with ≥ 80% coverage. - Implementation Plan:
- Create
internal/app/service/segment/segment_service_interface.go. DefineISegmentServicewith:CreateSegment,GetSegment,ListSegments,UpdateSegment,ArchiveSegment,DuplicateSegment(plusPreviewSegmentadded in T3.3,RecalculateAllSegmentsin T4.4,GetSegmentDetailsin T5.1,GetSegmentsByContactin T5.3). - Create
internal/app/service/segment/segment_service.go. Define sentinel errors:Definevar (ErrSegmentNotFound = errors.New("segment not found")ErrSegmentNameConflict = errors.New("segment name already exists")ErrConflictStatus = errors.New("operation not valid for current segment status")ErrInvalidRule = errors.New("invalid rule")ErrPermissionDenied = errors.New("permission denied"))segmentServicestruct injectingISegmentRepository. Constructor:NewSegmentService(repo segment.ISegmentRepository) ISegmentService. - Implement
CreateSegment:- Rule structure validation:
len(ruleSet.Groups) > 2→ErrInvalidRule; anylen(group.Conditions) > 3→ErrInvalidRule. - Name uniqueness: call
repo.FindAllwithSegmentFilter{CompanySsoID: companySsoID, Search: name}— returnErrSegmentNameConflicton collision. - Call
repo.Insert.
- Rule structure validation:
- Implement
GetSegment:repo.FindByID; returnErrSegmentNotFoundif nil oris_deleted == true. - Implement
ListSegments: buildSegmentFilter(status, search ≥3 chars, date range,company_sso_id); setCreatedBy = userSsoIDwhenOWNED_ONLY; delegate torepo.FindAll. - Implement
UpdateSegment: same rule validation as create; ifOWNED_ONLY, verifysegment.CreatedBy == userSsoID; ifrule_setchanged, setRuleDirtyAt = &now; callrepo.Update. - Implement
ArchiveSegment: checkOWNED_ONLYboundary; verifysegment.Status == StatusActiveelse returnErrConflictStatus; callrepo.UpdateStatus(StatusArchived). - Implement
DuplicateSegment: load source viaFindByID; zero outID,TotalMatched,LastEvaluatedAt,RuleDirtyAt,RecalcErrorCount, timestamps; prefixName = "Copy of " + source.Name; callrepo.Insert. - Create
internal/app/service/segment/segment_service_test.go. Table-driven tests covering all happy paths and every sentinel error:ErrSegmentNotFound,ErrSegmentNameConflict,ErrConflictStatus,ErrInvalidRule, permission-denied cases.
- Create
T1.5 — SegmentHandler HTTP handlers [Jira: —]
- Purpose: Expose segment CRUD over HTTP, translating requests into service calls and service errors into standard API responses.
- Scope:
internal/app/handler/segment_handler.go—Create,List,GetByID,Update,Archive,Duplicatehandlers; response viamyhttp.HTTPHandler. - Expected Outcome: Unit tests cover input validation errors, permission-denied cases, and successful responses;
go test ./internal/app/handler/...passes with ≥ 80% coverage. - Implementation Plan:
-
Create
internal/app/payload/segment_payload.go. Define DTOs:CreateSegmentRequest:Namerequired max 60,Descriptionoptional max 250,RuleSetrequired.UpdateSegmentRequest: same shape as create.ListSegmentsQuery:Page(default 1),PerPage(default 20, max 100),Searchstring,CreatedAtFrom/CreatedAtToRFC3339,Statusstring.SegmentResponse:id,name,total_matched,created_by,created_at,status.SegmentListResponse:data []SegmentResponse+meta {total, page, per_page}.SegmentDetailResponse:id,name,description,status,rule_set,total_matched,percentage_reach,last_evaluated_at,reachability {whatsapp {count, percentage}, email {count, percentage}}(omitreachabilitykey for archived segments),created_by,created_at,updated_at.
-
Create
internal/app/handler/segment_handler.go. DefineSegmentHandlerwithsegmentService service.ISegmentService. Constructor:NewSegmentHandler(svc service.ISegmentService) *SegmentHandler. -
Implement each handler with signature
func (h SegmentHandler) Method(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error):Create: bind → validate → extractcompanySsoIDfrom auth context →svc.CreateSegment→201 Created{data: {id}}.List: parseListSegmentsQueryfrom URL params →svc.ListSegments→200with data array + paginationmeta.GetByID:chi.URLParam(r, "segmentID")→svc.GetSegment→200.Update: bind → validate →svc.UpdateSegment→200.Archive: extract:segmentID→svc.ArchiveSegment→200.Duplicate: extract:segmentID→svc.DuplicateSegment→201{data: {id}}.
-
Map service sentinel errors to HTTP responses via
mapServiceError. All errors use themyhttp.ErrorResponseenvelope (resp_code,resp_desc.id,resp_desc.en,meta):Service error HTTP myhttpfactoryErrSegmentNotFound404 ErrNotFound()ErrSegmentNameConflict409 ErrConflictCustomDesc("Segment name already exists", "Segment name already exists")ErrInvalidRule422 ErrUnprocessableEntity()ErrConflictStatus409 ErrConflictCustomDesc("Operation not valid for current segment status", "Operation not valid for current segment status")ErrPermissionDenied403 ErrForbidden()other 500 ErrInternal() -
Create
internal/app/handler/segment_handler_test.go. Test all handlers: bad payload → 400, permission denied → 403, not found → 404, name conflict → 409, invalid rule → 422, success → 200 with correctresp_code/datashape.
-
T1.6 — Route registration [Jira: —]
- Purpose: Make segment endpoints reachable via the API gateway by wiring them into the Chi router.
- Scope:
internal/server/rest_router.go— register all routes under/iag/v1/segmentsand one private route. - Expected Outcome:
go build ./...passes; routes verifiable viacurlagainst a locally running server. - Implementation Plan:
- Open
internal/server/rest_router.go. In theiag/v1subrouter block, add:r.Route("/segments", func(r chi.Router) {r.Use(IAGMiddleware, ContextLoggerMiddleware)// POST /iag/v1/segments permission key: customers_segment_add (ALL_ACCESS)r.With(RequirePermission("customers_segment_add", AllAccess)).Post("/", segmentHandler.Create)// GET /iag/v1/segments permission key: customers_segment_viewr.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Get("/", segmentHandler.List)// POST /preview permission key: customers_segment_view — handler attached in T3.4// (register placeholder route now so the router is aware; attach real handler in T3.4)// GET /iag/v1/segments/:segmentID permission key: customers_segment_viewr.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Get("/{segmentID}", segmentHandler.GetByID)// PUT /iag/v1/segments/:segmentID permission key: customers_segment_manager.With(RequirePermission("customers_segment_manage", AllAccess|OwnedOnly)).Put("/{segmentID}", segmentHandler.Update)// PATCH /iag/v1/segments/:segmentID/archive permission key: customers_segment_archivedr.With(RequirePermission("customers_segment_archived", AllAccess|OwnedOnly)).Patch("/{segmentID}/archive", segmentHandler.Archive)// POST /iag/v1/segments/:segmentID/duplicate permission key: customers_segment_addr.With(RequirePermission("customers_segment_add", AllAccess)).Post("/{segmentID}/duplicate", segmentHandler.Duplicate)}) - Outside the IAG subrouter, under the
BasicAuthmiddleware group:// GET /api/v1/contacts/:contactID/segments — machine-to-machine, handler attached in T5.3r.With(BasicAuth).Get("/api/v1/contacts/{contactID}/segments", segmentHandler.GetContactSegments)// GET /api/v1/segments/:segmentID/customers — machine-to-machine detailed customer list, handler attached in T5.6r.With(BasicAuth).Get("/api/v1/segments/{segmentID}/customers", segmentHandler.GetPrivateSegmentCustomers) - Run
go build ./...and confirm the build is clean.
- Open
T1.7 — Dependency wiring [Jira: —]
- Purpose: Connect all Phase 1 components at startup so the service boots end-to-end.
- Scope:
internal/serverenv/— injectSegmentRepository,SegmentService,SegmentHandlerfollowing existing wiring patterns. - Expected Outcome: Server starts without panic; smoke test (create + get segment) passes against local MongoDB.
- Implementation Plan:
- Open
cmd/initializer.go. Following the existing repo → service → handler layering:segmentRepo := segment.NewSegmentRepository(dbRepo)segmentService := segmentservice.NewSegmentService(segmentRepo)segmentHandler := handler.NewSegmentHandler(segmentService) - Pass
segmentHandlerto the router/server setup (however existing handlers are threaded — likely aHandlersstruct or direct argument toNewRouter). - Start the server locally (
./contact-service server). Run a smoke test:POST /iag/v1/segmentswith a valid body → expect201 Createdwith anid;GET /iag/v1/segments/:idwith that id → expect200 OKwith the segment data.
- Open
T1.8 — Feature flag cdp_segmentation_enabled [Jira: —]
- Purpose: Gate all IAG segment endpoints behind a per-company feature flag so the feature can be enabled for pilot companies without a code redeploy. Uses the existing
FeatureFlagService/feature_flagMongoDB collection pattern. - Scope:
internal/pkg/consts/feature_flag.go— constant;internal/app/handler/segment_handler.go— flag check helper;internal/server/rest_router.go— injectFeatureFlagServiceintoSegmentHandlerand call the check at the top of each IAG handler. - Expected Outcome: A company with the flag absent/disabled receives
404 Not Foundfrom every IAG segment endpoint; a company with the flag enabled receives the normal response. Unit tests cover both paths for one representative handler (Create). - Implementation Plan:
- Open
internal/pkg/consts/feature_flag.go. Add:FeatureFlagCDPSegmentation = "cdp_segmentation_enabled" - Inject
FeatureFlagServiceintoSegmentHandler. Extend the constructor:type SegmentHandler struct {segmentService service.ISegmentServicefeatureFlags service.FeatureFlagService // existing type from internal/app/service/feature_flag.go}func NewSegmentHandler(svc service.ISegmentService, ff service.FeatureFlagService) *SegmentHandler {return &SegmentHandler{segmentService: svc, featureFlags: ff}} - Add a private helper on
SegmentHandler:// requireFeatureEnabled returns a non-nil error response when cdp_segmentation_enabled is// disabled for the company. Callers return immediately when this returns non-nil.func (h *SegmentHandler) requireFeatureEnabled(ctx context.Context, companySsoID string) *myhttp.ResponseBody {if !h.featureFlags.FeatureEnabled(ctx, consts.FeatureFlagCDPSegmentation, companySsoID) {resp := myhttp.ErrNotFound()return &resp}return nil} - At the top of every IAG segment handler method (Create, List, GetByID, Update, Archive, Duplicate, Preview, GetSegmentCustomers), add:
S2S endpoints (if errResp := h.requireFeatureEnabled(r.Context(), companySsoID); errResp != nil {return *errResp, nil}
GetContactSegments,GetPrivateSegmentCustomers) are not gated — they are always accessible to internal services. - In
cmd/initializer.go, pass the existingfeatureFlagServicetoNewSegmentHandler. - Unit tests for
Createhandler: flag disabled →404 Not Foundwithout calling the service; flag enabled → handler proceeds normally. Repeat pattern is sufficient — the single helper is exercised by all handlers.
- Open
Phase 2: Rule Engine
T2.1 — RuleEngine interface + package skeleton [Jira: —]
- Purpose: Define the public contract for query building so the service and future callers depend on an interface, not a concrete type.
- Scope:
internal/pkg/segment_engine/— package directory,IRuleEngineinterface, sentinel errors. - Expected Outcome: Package compiles; interface is importable from the service layer.
- Implementation Plan:
- Create
internal/pkg/segment_engine/directory. - Create
internal/pkg/segment_engine/engine.go:Handlers check these withtype IRuleEngine interface {BuildQuery(ruleSet RuleSet, companySsoID string) (sql string, args []any, err error)}var ErrInvalidRule = errors.New("invalid rule")func ErrInvalidRuleField(field string) error {return fmt.Errorf("%w: unrecognised field %q", ErrInvalidRule, field)}func ErrUnsupportedOperator(op, fieldType string) error {return fmt.Errorf("%w: operator %q not supported for field_type %q", ErrInvalidRule, op, fieldType)}errors.Is(err, ErrInvalidRule)to return HTTP 422. - Run
make mocksto generatemocks/MockIRuleEngine.gofor service-layer tests (T1.4, T3.3, T4.4).
- Create
T2.2 — BuildQuery full implementation [Jira: —]
- Purpose: Translate a
RuleSetDSL into a safe, parameterised PostgreSQL WHERE clause covering all supported field types, operators, and JSONB access patterns. - Scope:
internal/pkg/segment_engine/— fullBuildQueryimplementation with field resolver and operator dispatch. - Expected Outcome: Table-driven tests cover every operator × field-type combination; injection-defence test passes;
go test ./internal/pkg/segment_engine/...≥ 90% coverage (higher bar due to security criticality). - Implementation Plan:
-
Create
internal/pkg/segment_engine/engine_impl.go. DefineruleEnginestruct (stateless) andNewRuleEngine() IRuleEngine. -
Implement
buildFieldExpr(field, category, fieldType string) (string, error)using this complete mapping:field_categoryfield_typeSQL expression systemany field(bare column, e.g.created_at_date)defaultsingle_line_text,multi_line_text,dropdown,urldefault_fields->>'field'defaultnumber(default_fields->>'field')::numericdefaultdate(default_fields->>'field')::datedefaultboolean(default_fields->>'field')::booleandefaultfile,signature,gpsdefault_fields->>'field'(presence-only)customsingle_line_text,multi_line_text,dropdown,urlcustom_fields->>'field'customnumber(custom_fields->>'field')::numericcustomdate(custom_fields->>'field')::datecustomfile,signature,gpscustom_fields->>'field'(presence-only)Unknown
field_categoryor a field key not in the registered allow-list → returnErrInvalidRuleField. -
Implement
buildConditionSQL(cond RuleCondition, argOffset int) (clause string, args []any, err error)using this complete operator → SQL mapping:Operator Applies to SQL template Args consumed istext, dropdown {field} = $Nvalue is_nottext, dropdown {field} != $Nvalue containstext {field} ILIKE $N(wrap:%value%)value does_not_containtext {field} NOT ILIKE $N(wrap:%value%)value starts_withtext {field} ILIKE $N(append:value%)value ends_withtext {field} ILIKE $N(prepend:%value)value is_emptytext, number, date, boolean (JSONB) ({field} IS NULL OR {field} = '')none is_not_emptytext, number, date, boolean (JSONB) ({field} IS NOT NULL AND {field} != '')none is_emptyfile, signature, gps ({field} IS NULL)none is_not_emptyfile, signature, gps ({field} IS NOT NULL)none is_emptysystem (top-level column) {field} IS NULLnone is_not_emptysystem (top-level column) {field} IS NOT NULLnone equalsnumber, boolean {field} = $Nvalue not_equalsnumber {field} != $Nvalue greater_thannumber {field} > $Nvalue less_thannumber {field} < $Nvalue betweennumber, date {field} BETWEEN $N AND $Mvalue, value2 ondate DATE({field}) = $Nvalue not_ondate DATE({field}) != $Nvalue beforedate {field} < $Nvalue afterdate {field} > $Nvalue contains_anymulti-select {field} && ARRAY[$N]::text[]value contains_allmulti-select {field} @> ARRAY[$N]::text[]value Reject:
file/signature/gpswith any operator other thanis_empty/is_not_empty→ErrUnsupportedOperator. Validatecond.Fieldagainst a static allow-list of registered field keys before building the expression (prevents SQL injection via crafted field names). -
Implement
buildGroupSQL(group RuleGroup, argOffset int) (string, []any, error): join condition clauses with the group'sAND/ORoperator, accumulatingargOffsetacross conditions. -
Implement
BuildQuery: prependWHERE company_sso_id = $1(arg:companySsoID, arg index starts at 2 for the first condition); join groups with the top-level operator; return the final SQL and args slice.Example for rule
(source is 'WhatsApp') AND (annual_revenue > 100000000) AND (created_at < '2026-01-01')expressed as two groups:WHERE company_sso_id = $1AND ((default_fields->>'source' = $2 AND (custom_fields->>'annual_revenue')::numeric > $3)AND(created_at < $4))args:
['company_abc', 'WhatsApp', 100000000, '2026-01-01T00:00:00Z'] -
Create
internal/pkg/segment_engine/engine_test.go. Table-driven matrix covering every operator × field_type cell. Add a dedicated injection test: pass"'; DROP TABLE customer_datamart; --"asfield→ must returnErrInvalidRuleField; returned SQL must be empty; the injected string must not appear anywhere in the returned SQL or args.
-
Phase 3: Datamart Integration & Preview
T3.1 — DATAMART_DSN configuration [Jira: —]
- Purpose: Allow the datamart PostgreSQL connection string to be supplied at deploy time without touching code.
- Scope:
params/config struct;serverenvpgx pool initialisation. - Expected Outcome: Application starts without the env var (nil pool); starts correctly when set; config tests cover both states.
- Implementation Plan:
- Open the
params/config struct file. Add these fields:DatamartDSN string `env:"DATAMART_DSN" envDefault:""`SegmentRecalcWorkers int `env:"SEGMENT_RECALC_WORKERS" envDefault:"4"`SegmentRecalcBatchSize int `env:"SEGMENT_RECALC_BATCH_SIZE" envDefault:"100"`SegmentRecalcLockTTLSeconds int `env:"SEGMENT_RECALC_LOCK_TTL_SECONDS" envDefault:"7200"`SegmentRecalcLockRenewIntervalSeconds int `env:"SEGMENT_RECALC_LOCK_RENEW_INTERVAL_SECONDS" envDefault:"600"`SegmentPreviewRateLimit int `env:"SEGMENT_PREVIEW_RATE_LIMIT" envDefault:"10"`SegmentPreviewMaxRows int `env:"SEGMENT_PREVIEW_MAX_ROWS" envDefault:"10000"`// Timeout configurationDatamartQueryTimeoutSeconds int `env:"DATAMART_QUERY_TIMEOUT_SECONDS" envDefault:"30"`DatamartRecalcQueryTimeoutSeconds int `env:"DATAMART_RECALC_QUERY_TIMEOUT_SECONDS" envDefault:"60"`MongoOperationTimeoutSeconds int `env:"MONGODB_OPERATION_TIMEOUT_SECONDS" envDefault:"30"`KafkaPublishTimeoutSeconds int `env:"KAFKA_PUBLISH_TIMEOUT_SECONDS" envDefault:"10"`RedisOperationTimeoutSeconds int `env:"REDIS_OPERATION_TIMEOUT_SECONDS" envDefault:"5"`// Kafka retry configurationSegmentKafkaPublishMaxRetries int `env:"SEGMENT_KAFKA_PUBLISH_MAX_RETRIES" envDefault:"2"`SegmentKafkaPublishRetryDelayMs int `env:"SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MS" envDefault:"500"` - In
cmd/initializer.go, after existing DB initialisations:var datamartPool *pgxpool.Poolif cfg.DatamartDSN != "" {datamartPool, err = pgxpool.New(ctx, cfg.DatamartDSN)if err != nil { log.Fatal(err) }} - Define
var ErrDatamartUnavailable = errors.New("datamart unavailable")in the datamart repository package. All repository methods guardif pool == nil { return ..., ErrDatamartUnavailable }. - Config tests: env var absent → pool is nil; env var set to a valid DSN → pool is non-nil.
- Open the
T3.2 — IDatamartRepository interface + SQL implementation [Jira: —]
- Purpose: Encapsulate all datamart query execution behind an interface so the service layer stays free of SQL.
- Scope:
internal/app/repository/segment_datamart/— interface,pgx/v5implementation, mockery mock,pgxmockunit tests (notgo-sqlmock, which would introduce a parallel SQL driver). - Expected Outcome: All methods tested;
go test ./internal/app/repository/segment_datamart/...≥ 80% coverage. - Implementation Plan:
-
Create
internal/app/repository/segment_datamart/datamart_repository.go:type SampleContact struct {ContactID string `json:"contact_id"`Name string `json:"name"`Source string `json:"source"`LastActivity *time.Time `json:"last_activity,omitempty"` // from MongoDB activity_logs; null when no activityAddedBy string `json:"added_by"` // display name resolved from created_by_user_idAddedAt *time.Time `json:"added_at,omitempty"` // created_at_date from datamart}type IDatamartRepository interface {// whereSQL is a parameterised WHERE fragment from the rule engine (no leading WHERE keyword).// args[0] is always company_sso_id; rule conditions start at args[1].//// Count returns total rows matching whereSQL and the company-wide total (totalBase).// Used for total_matched and total_customer_base; never applies OWNED_ONLY so the count is always company-wide.Count(ctx context.Context, companySsoID, whereSQL string, args []any) (total int64, totalBase int64, err error)// Sample fetches one page of matching rows using offset pagination. The whereSQL passed here// may carry an extra OWNED_ONLY condition (AND created_by_user_id = $N) appended by the service layer.// offset = (page-1) * perPage; LIMIT perPage OFFSET offset applied in the query.Sample(ctx context.Context, companySsoID, whereSQL string, args []any, page, perPage int) ([]SampleContact, error)FetchContactIDs(ctx context.Context, companySsoID, whereSQL string, args []any) ([]string, error)CountWithPhone(ctx context.Context, companySsoID string, contactIDs []string) (int64, error)CountWithEmail(ctx context.Context, companySsoID string, contactIDs []string) (int64, error)} -
Create
internal/app/repository/segment_datamart/datamart_repository_pgx.go. ImplementdatamartRepositorywrapping*pgxpool.Pool. Constructor:NewDatamartRepository(pool *pgxpool.Pool) IDatamartRepository. Every method:if pool == nil { return ..., ErrDatamartUnavailable }.Timeout note: The repository methods accept a
ctx context.Contextthat already carries a deadline set by the caller (service layer). Do not set a second deadline inside the repository — the caller is responsible forcontext.WithTimeoutusing the configured timeout value. This keeps timeout policy in one place (the service layer). -
Count— two queries usingpgx/v5pool.QueryRow:-- total matched (company-wide, no OWNED_ONLY filter)SELECT COUNT(*) FROM customer_datamart WHERE company_sso_id=$1 AND ({whereSQL})-- total customer base (denominator for percentage_reach display)SELECT COUNT(*) FROM customer_datamart WHERE company_sso_id=$1 -
Sample— one query using offset pagination; thewhereSQLargument may already include an OWNED_ONLY clause appended by the service:SELECT customer_id,default_fields->>'name',default_fields->>'source',created_by_user_id,created_at_dateFROM customer_datamartWHERE company_sso_id=$1 AND ({whereSQL})ORDER BY customer_idLIMIT $N OFFSET $MLIMIT = perPage,OFFSET = (page-1) * perPage. After fetching raw rows, resolveadded_bydisplay names via the existing user lookup helper and enrichlast_activityfrom MongoDBactivity_logs. Both are best-effort — failure yields empty string / nil rather than failing the whole call. -
FetchContactIDs:SELECT customer_id FROM customer_datamart WHERE company_sso_id=$1 AND ({whereSQL}) -
CountWithPhone:SELECT COUNT(*) FROM customer_datamartWHERE company_sso_id=$1 AND customer_id = ANY($2)AND default_fields->'chat_data' IS NOT NULLAND default_fields->>'phone' IS NOT NULLBefore returning the count, check whether any member has a non-null
chat_datasub-object indefault_fields:SELECT EXISTS (SELECT 1 FROM customer_datamartWHERE company_sso_id=$1 AND customer_id = ANY($2)AND default_fields->'chat_data' IS NOT NULL)If no member has a
chat_datasub-object populated (CDC backfill not yet complete), return0, ErrDatamartUnavailable. The caller must displaynullreachability rather than present 0 as authoritative. -
CountWithEmail:SELECT COUNT(*) FROM customer_datamartWHERE company_sso_id=$1 AND customer_id = ANY($2)AND default_fields->>'email' IS NOT NULL -
Run
make mocks. Writedatamart_repository_test.gousingpgxmock:Count: success (returns total + totalBase), empty result (both 0), nil pool →ErrDatamartUnavailable, query error.Sample: success page 1 (verifyadded_atandadded_bypopulated fromcreated_at_date/created_by_user_id, correct LIMIT/OFFSET applied), page 2 (OFFSET advances correctly),last_activitynull when no activity log found, OWNED_ONLY clause in whereSQL (extra arg appended), empty result, nil pool →ErrDatamartUnavailable, general query error.chat_datasub-object absent for all members →CountWithPhonereturnsErrDatamartUnavailable.
-
T3.3 — PreviewSegment service method [Jira: —]
- Purpose: Give frontend users real-time feedback on how many contacts a rule set would match before saving. The rule is provided directly in the request — no existing segment is required.
- Scope:
internal/app/service/segment/—PreviewSegmentmethod; rule validation + rule engine + datamart delegation. - Expected Outcome: Unit tests cover valid rule, engine error, datamart error, empty result; coverage remains ≥ 80%.
- Implementation Plan:
- Add to
ISegmentService:PreviewSegment(ctx context.Context, ruleSet repository.RuleSet, companySsoID, userSsoID string, ownedOnly bool) (*PreviewResult, error) - Inject
IRuleEngineandIDatamartRepositoryintosegmentService(extend constructor; these can be nil-safe until T3 wiring in T3.1).ISegmentRepositoryis not needed here — preview is stateless. - Define in
internal/app/payload/segment_payload.go:type PreviewResult struct {TotalMatched int64 `json:"total_matched"`TotalCustomerBase int64 `json:"total_customer_base"`IsTruncated bool `json:"is_truncated"` // true when total_matched > SEGMENT_PREVIEW_MAX_ROWSSample []SampleContact `json:"sample"` // capped at SEGMENT_PREVIEW_MAX_ROWS} - Implement
PreviewSegment:- Validate rule structure:
len(ruleSet.Groups) > 2or anylen(group.Conditions) > 3→ErrInvalidRule. ruleEngine.BuildQuery(ruleSet, companySsoID)→baseSQL, baseArgs— propagateErrInvalidRule.- Wrap the datamart calls with a timeout from config:
UsedmCtx, dmCancel := context.WithTimeout(ctx, time.Duration(cfg.DatamartQueryTimeoutSeconds)*time.Second)defer dmCancel()
dmCtxfor bothCountandSamplecalls below. datamartRepo.Count(dmCtx, companySsoID, baseSQL, baseArgs)→total, totalBase— propagateErrDatamartUnavailableandcontext.DeadlineExceeded(surface asErrDatamartUnavailable→ HTTP 503). Always company-wide regardless ofownedOnly.- Build sample query: if
ownedOnly, appendAND created_by_user_id = $NtobaseSQLand appenduserSsoIDtobaseArgs→sampleSQL, sampleArgs; otherwisesampleSQL, sampleArgs = baseSQL, baseArgs. datamartRepo.Sample(dmCtx, companySsoID, sampleSQL, sampleArgs, 1, cfg.SegmentPreviewMaxRows)→sample— propagateErrDatamartUnavailable.isTruncated = total > int64(cfg.SegmentPreviewMaxRows).- Return
PreviewResult{TotalMatched: total, TotalCustomerBase: totalBase, IsTruncated: isTruncated, Sample: sample}.
- Validate rule structure:
- Unit tests (mock engine + mock datamart):
- Valid rule, no OWNED_ONLY →
Countcalled with baseArgs,Samplecalled withpage=1, perPage=SEGMENT_PREVIEW_MAX_ROWSand same baseArgs. - Valid rule, OWNED_ONLY →
Countcalled with baseArgs (no userSsoID),Samplecalled with baseArgs + userSsoID appended. total > SEGMENT_PREVIEW_MAX_ROWS→IsTruncated = true,TotalMatchedreflects real count.- Engine returns
ErrInvalidRule→ propagated (also covers rule struct validation failures). CountreturnsErrDatamartUnavailable→ propagated.SamplereturnsErrDatamartUnavailable→ propagated.- Zero total → empty
Sample,IsTruncated = false. - Verify sample contacts contain
last_activity,added_by,added_at.
- Valid rule, no OWNED_ONLY →
- Add to
T3.4 — POST /iag/v1/segments/preview handler [Jira: —]
- Purpose: Expose preview over HTTP with a per-company rate limit (10 req/min) to protect the datamart. The rule is provided in the request body — no saved segment ID is involved.
- Scope:
internal/app/handler/segment_handler.go—Previewhandler; rate limiter; route wired. - Expected Outcome: Unit tests cover 429, 503, 422 (invalid rule), 200 (
is_truncated: true), 200 happy-path (with and without OWNED_ONLY). - Implementation Plan:
- Define in
internal/app/payload/segment_payload.go:type PreviewSegmentRequest struct {RuleSet repository.RuleSet `json:"rule_set" validate:"required"`} - Add
Preview(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error)toSegmentHandler:- Bind
PreviewSegmentRequestfrom request body — return 400 if missing or malformed. - Extract
companySsoID,userSsoID, andownedOnly(whetherview_customer == OWNED_ONLY) from auth context. - Check Redis rate limiter (existing
rate_limiterpattern): keypreview_rate:{companySsoID}, window 60 s, limit fromcfg.SegmentPreviewRateLimit(default 10). On breach, returnmyhttp.ErrTooManyRequests("Preview limit of 10 requests per minute exceeded")→ HTTP 429. - Call
svc.PreviewSegment(ctx, req.RuleSet, companySsoID, userSsoID, ownedOnly); map errors:ErrInvalidRule→myhttp.ErrUnprocessableEntity()→ HTTP 422ErrDatamartUnavailable→myhttp.NewErrorResponse(503, "Segment preview is temporarily unavailable", "Segment preview is temporarily unavailable")→ HTTP 503
- Build response (no pagination wrapper):
{"data": [{"total_matched": 2500,"total_customer_base": 11700,"is_truncated": false,"sample": [{ "contact_id", "name", "source", "last_activity", "added_by", "added_at" }]}]}
- Bind
- In
rest_router.goinside the/iag/v1/segmentsblock (stubbed in T1.6), attach:r.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Post("/preview", segmentHandler.Preview). - Inject
cfg(forSegmentPreviewRateLimit) and the rate-limiter service intoSegmentHandlerconstructor. - Unit tests: 400 on missing/malformed body, 422 on
ErrInvalidRulefrom rule engine, 429 on rate-limit exceeded, 503 onErrDatamartUnavailable,is_truncated: truewhentotal_matched > SEGMENT_PREVIEW_MAX_ROWS, OWNED_ONLY (verify service called withownedOnly=true), 200 happy-path verifying all sample fields present, empty rule match (total=0, sample=[]).
- Define in
Phase 4: Daily Recalculation Cron
T4.1 — CustomerSegmentMember struct + customer_segment_members migration [Jira: —]
- Purpose: Provision the membership store that tracks which contacts belong to which segment after each recalculation run.
- Scope:
internal/app/repository/segment_member/— struct with tags;db/migrations/— collection + indexes. - Expected Outcome: Migration applies; indexes verified; struct round-trip tests pass.
- Implementation Plan:
- Create
internal/app/repository/segment_member/types.go:type CustomerSegmentMember struct {ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`SegmentID string `bson:"segment_id" json:"segment_id"`CompanySsoID string `bson:"company_sso_id" json:"company_sso_id"`ContactID string `bson:"contact_id" json:"contact_id"`AddedAt time.Time `bson:"added_at" json:"added_at"`RecalcRunID string `bson:"recalc_run_id" json:"recalc_run_id"`} - Create
db/migrations/{seq}_create_customer_segment_members.up.json:[{ "create": "customer_segment_members" },{"createIndexes": "customer_segment_members","indexes": [{ "key": { "segment_id": 1, "company_sso_id": 1 }, "name": "idx_csm_segment_company" },{ "key": { "contact_id": 1, "company_sso_id": 1 }, "name": "idx_csm_contact_company" },{ "key": { "segment_id": 1, "contact_id": 1 }, "name": "idx_csm_segment_contact", "unique": true }]}] - Create
.down.json:[{"drop": "customer_segment_members"}]. - Write
types_test.go— round-trip asserting BSON snake_case andrecalc_run_idis preserved.
- Create
T4.2 — ISegmentMemberRepository interface + MongoDB implementation [Jira: —]
- Purpose: Provide atomic member-list replacement and membership lookup operations for the cron and service layers.
- Scope:
internal/app/repository/segment_member/— interface, MongoDB implementation using bulk write, mock, unit tests. - Expected Outcome:
ReplaceMembersatomicity tested;InsertEventsbulk tested; ≥ 80% coverage. - Implementation Plan:
- Create
internal/app/repository/segment_member/segment_member_repository.go:type ISegmentMemberRepository interface {// sessCtx is a MongoDB session context — the caller (service) owns the transaction boundary.ReplaceMembers(ctx context.Context, sessCtx mongo.SessionContext, segmentID, companyID, recalcRunID string, contactIDs []string) errorInsertEvents(ctx context.Context, sessCtx mongo.SessionContext, events []SegmentMembershipEvent) errorFindBySegmentID(ctx context.Context, segmentID, companyID string, page, pageSize int) ([]CustomerSegmentMember, int64, error)FindByContactID(ctx context.Context, contactID, companyID string) ([]CustomerSegmentMember, error)CountBySegmentID(ctx context.Context, segmentID, companyID string) (int64, error)// CountWithPhone and CountWithEmail delegate to the injected IDatamartRepositoryCountWithPhone(ctx context.Context, segmentID, companyID string) (int64, error)CountWithEmail(ctx context.Context, segmentID, companyID string) (int64, error)} - Create
segment_member_repository_mongo.go. ImplementsegmentMemberRepositorywrappingIDbRepoand an injectedIDatamartRepository.ReplaceMembersexecutes withinsessCtx: (1)DeleteManyall docs matching{segment_id, company_sso_id}, (2)InsertManynew docs each carryingrecalc_run_idandadded_at = time.Now().InsertEventsbulk-inserts viaInsertManywithinsessCtx.
- Run
make mocks. - Unit tests:
ReplaceMemberswith empty new set (deletes all, inserts nothing); non-empty set (deletes old, inserts new withrecalc_run_id);InsertEventsbulk; all lookup/count methods — empty-result and error paths.
- Create
T4.3 — customer_segment_membership_events struct + migration [Jira: —]
- Purpose: Provision the append-only audit log recording every membership transition (who entered/left each segment).
- Scope:
internal/app/repository/segment_member/—SegmentMembershipEventstruct;db/migrations/— collection, indexes, 90-day TTL index. - Expected Outcome: Migration applies; TTL index confirmed; struct round-trip tests pass.
- Implementation Plan:
- Add to
internal/app/repository/segment_member/types.go:const (MembershipEventAdded = "added"MembershipEventRemoved = "removed")type SegmentMembershipEvent struct {ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`SegmentID string `bson:"segment_id" json:"segment_id"`CompanySsoID string `bson:"company_sso_id" json:"company_sso_id"`ContactID string `bson:"contact_id" json:"contact_id"`Event string `bson:"event" json:"event"` // "added" | "removed"OccurredAt time.Time `bson:"occurred_at" json:"occurred_at"`RecalcRunID string `bson:"recalc_run_id" json:"recalc_run_id"`} - Create
db/migrations/{seq}_create_customer_segment_membership_events.up.json:(7776000 = 90 × 24 × 3600 seconds)[{ "create": "customer_segment_membership_events" },{"createIndexes": "customer_segment_membership_events","indexes": [{ "key": { "segment_id": 1, "company_sso_id": 1, "occurred_at": -1 }, "name": "idx_csme_segment_company_date" },{ "key": { "contact_id": 1, "company_sso_id": 1, "occurred_at": -1 }, "name": "idx_csme_contact_company_date" },{ "key": { "occurred_at": 1 }, "name": "ttl_90d", "expireAfterSeconds": 7776000 }]}] - Create
.down.json:[{"drop": "customer_segment_membership_events"}]. - Run migration; verify TTL index via
db.customer_segment_membership_events.getIndexes(). - Round-trip test: both
"added"and"removed"marshal correctly; BSON field names are snake_case.
- Add to
T4.4 — RecalculateAllSegments service method with diff + audit [Jira: —]
- Purpose: Implement the daily recalculation: evaluate every active segment against the datamart, update member lists, write audit trail of who entered/left each segment.
- Scope:
internal/app/service/segment/—RecalculateAllSegmentswith Redis distributed lock, heartbeat goroutine, worker pool, diff computation, MongoDB session transaction, poison-segment skip. - Expected Outcome: All unit test cases listed pass; ≥ 80% coverage.
- Implementation Plan:
- Add
RecalculateAllSegments(ctx context.Context) errortoISegmentService. ExtendsegmentServiceconstructor to acceptISegmentMemberRepository,IDatamartRepository,IRuleEngine,ICacheRepo,ISegmentEventPublisher(defined in T4.5), and config. - Implement method body:
recalcRunID := ulid.Make().String() // github.com/oklog/ulid/v2 — generated once per invocation// Acquire Redis distributed lock (key: "segment_recalculation_lock")// TTL from cfg.SegmentRecalcLockTTLSeconds (default 7200 s)acquired := cacheRepo.SetNX("segment_recalculation_lock", recalcRunID, cfg.SegmentRecalcLockTTLSeconds)if !acquired {slog.InfoContext(ctx, "recalc skipped: lock held")return nil}defer cacheRepo.Del("segment_recalculation_lock")// Heartbeat: extend TTL by 30 min every cfg.SegmentRecalcLockRenewIntervalSeconds (default 600 s)hbCtx, hbCancel := context.WithCancel(ctx)defer hbCancel()go func() {ticker := time.NewTicker(time.Duration(cfg.SegmentRecalcLockRenewIntervalSeconds) * time.Second)defer ticker.Stop()for {select {case <-ticker.C:cacheRepo.Expire("segment_recalculation_lock", 1800) // extend +30 mincase <-hbCtx.Done():return}}}()// Page through active segments in batches of cfg.SegmentRecalcBatchSize (default 100)// Worker pool: sem := make(chan struct{}, cfg.SegmentRecalcWorkers) (default 4)// For each segment: sem <- struct{}{}; wg.Add(1); go func(seg) { defer wg.Done(); defer <-sem; processSegment(...) }()// wg.Wait() before returning
- Implement
processSegment(ctx context.Context, recalcRunID string, seg CustomerSegment):- Poison skip:
if seg.RecalcErrorCount >= 3 { slog.WarnContext(...); return }. sql, args, err := ruleEngine.BuildQuery(seg.RuleSet, seg.CompanySsoID)— on error go to error handler.- Wrap datamart call with a timeout from config:
On error (includingdmCtx, dmCancel := context.WithTimeout(ctx, time.Duration(cfg.DatamartRecalcQueryTimeoutSeconds)*time.Second)defer dmCancel()newIDs, err := datamartRepo.FetchContactIDs(dmCtx, seg.CompanySsoID, sql, args)
context.DeadlineExceeded) go to error handler. oldMembers, _, _ := memberRepo.FindBySegmentID(ctx, seg.ID, seg.CompanySsoID, 0, 0)— extractoldIDs.- Compute set difference:
added = newIDs − oldIDs,removed = oldIDs − newIDs. - Open MongoDB session transaction with a timeout:
txCtx, txCancel := context.WithTimeout(ctx, time.Duration(cfg.MongoOperationTimeoutSeconds)*time.Second)defer txCancel()session, _ := mongoClient.StartSession()defer session.EndSession(ctx)session.WithTransaction(txCtx, func(sessCtx mongo.SessionContext) (interface{}, error) {if err := memberRepo.ReplaceMembers(sessCtx, seg.ID, seg.CompanySsoID, recalcRunID, newIDs); err != nil {return nil, err}if len(added) > 0 || len(removed) > 0 {events := buildMembershipEvents(recalcRunID, seg, added, removed) // creates []SegmentMembershipEventreturn nil, memberRepo.InsertEvents(sessCtx, events)}return nil, nil})
- On transaction success:
segmentRepo.UpdateStats(ctx, seg.ID, int64(len(newIDs)), time.Now())+segmentRepo.ClearRuleDirtyAt(ctx, seg.ID). - After transaction success, if
len(added) > 0 || len(removed) > 0: build oneSegmentRecalculatedEvent{TotalAdded: len(added), TotalRemoved: len(removed), TotalMatched: len(newIDs)}; calleventPublisher.Publish(ctx, event)(implementation handles per-attempt timeout + retries internally — see T4.5). If publish returns an error after all retries:slog.ErrorContext(ctx, "segment event publish failed", ...)+ incrementsegment.event.publish_error{segment_id}metric; do not roll back or fail the recalc. - Also wrap the Redis lock acquire with a timeout:
And each heartbeat extend:redisCtx, redisCancel := context.WithTimeout(ctx, time.Duration(cfg.RedisOperationTimeoutSeconds)*time.Second)defer redisCancel()acquired := cacheRepo.SetNX(redisCtx, "segment_recalculation_lock", recalcRunID, cfg.SegmentRecalcLockTTLSeconds)extCtx, extCancel := context.WithTimeout(context.Background(), time.Duration(cfg.RedisOperationTimeoutSeconds)*time.Second)cacheRepo.Expire(extCtx, "segment_recalculation_lock", 1800)extCancel()
- On any error:
segmentRepo.IncrementRecalcErrorCount(ctx, seg.ID)+slog.ErrorContext(ctx, "segment recalc failed", slog.String("segment_id", seg.ID), slog.Any("error", err)).
- Poison skip:
- Unit tests (mock all deps):
- Lock held → no segments processed, returns
nil. - Empty diff →
InsertEventsnot called,UpdateStatscalled,Publishnot called. - Non-empty diff →
Publishcalled once withEventType = "segment.recalculated",TotalAdded = len(added),TotalRemoved = len(removed),TotalMatched = len(newIDs). InsertEventserror → transaction rolled back,ReplaceMembersnot persisted,Publishnot called.Publishreturns error → recalc not rolled back,UpdateStatsalready called, error logged + metric incremented.RecalcErrorCount == 3→ segment skipped.- Successful run →
UpdateStats+ClearRuleDirtyAtcalled.
- Lock held → no segments processed, returns
- Add
T4.5 — ISegmentEventPublisher interface + Kafka implementation [Jira: —]
- Purpose: Decouple the recalculation service from Kafka so the publisher can be stubbed in tests and swapped independently. Each event notifies subscribers of the aggregate outcome of a segment's recalculation — total members added and removed in that run. Provides the Go struct and interface referenced in T4.4.
- Scope:
internal/app/service/segment/event_publisher.go— struct + interface;internal/pkg/segment_kafka/publisher.go— Kafka implementation using the existing sarama producer pattern. - Expected Outcome: Publisher tested with a mock Kafka producer;
ISegmentEventPublisherimportable from T4.4;make mocksgeneratesMockISegmentEventPublisher; ≥ 80% coverage. - Implementation Plan:
- Create
internal/app/service/segment/event_publisher.go:type SegmentRecalculatedEvent struct {EventType string `json:"event_type"` // always "segment.recalculated"SegmentID string `json:"segment_id"`SegmentName string `json:"segment_name"`CompanySsoID string `json:"company_sso_id"`RecalcRunID string `json:"recalc_run_id"` // ULID; idempotency key for consumersTotalMatched int64 `json:"total_matched"`TotalAdded int64 `json:"total_added"`TotalRemoved int64 `json:"total_removed"`OccurredAt time.Time `json:"occurred_at"`}// ISegmentEventPublisher sends one Kafka message per segment per recalculation run.// Publish emits an aggregate event with total counts of added and removed members.type ISegmentEventPublisher interface {Publish(ctx context.Context, event SegmentRecalculatedEvent) error} - Create
internal/pkg/segment_kafka/publisher.go. The codebase uses Confluent kafka-go (github.com/confluentinc/confluent-kafka-go/v2/kafka), not Sarama — follow the existingdriver/kafka.gopattern. DefinekafkaSegmentPublisherstruct:type kafkaSegmentPublisher struct {producer *kafka.Producertopic stringmaxRetries intretryDelay time.DurationpubTimeout time.Duration}func NewKafkaSegmentPublisher(producer *kafka.Producer, topic string, maxRetries, retryDelayMs, pubTimeoutSec int) ISegmentEventPublisher {return &kafkaSegmentPublisher{producer: producer,topic: topic,maxRetries: maxRetries,retryDelay: time.Duration(retryDelayMs) * time.Millisecond,pubTimeout: time.Duration(pubTimeoutSec) * time.Second,}} - Implement
Publishwith retry and per-attempt timeout:func (p *kafkaSegmentPublisher) Publish(ctx context.Context, event SegmentRecalculatedEvent) error {var lastErr errorfor attempt := 0; attempt <= p.maxRetries; attempt++ {if attempt > 0 {select {case <-ctx.Done():return ctx.Err()case <-time.After(p.retryDelay):}}attemptCtx, cancel := context.WithTimeout(ctx, p.pubTimeout)lastErr = p.publish(attemptCtx, event)cancel()if lastErr == nil {return nil}slog.WarnContext(ctx, "segment kafka publish attempt failed",slog.Int("attempt", attempt+1),slog.Any("error", lastErr))}return lastErr}func (p *kafkaSegmentPublisher) publish(ctx context.Context, event SegmentRecalculatedEvent) error {payload, err := json.Marshal(event)if err != nil {return err}deliveryChan := make(chan kafka.Event, 1)msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &p.topic, Partition: kafka.PartitionAny},Key: []byte(event.SegmentID),Value: payload,}if err := p.producer.Produce(msg, deliveryChan); err != nil {return err}select {case <-ctx.Done():return ctx.Err()case e := <-deliveryChan:m, ok := e.(*kafka.Message)if !ok {return nil}return m.TopicPartition.Error}} - Run
make mocksto generatemocks/MockISegmentEventPublisher.go. - Unit tests: event with non-zero
TotalAdded/TotalRemoved→ correct JSON marshalled +Key = segment_id; producer error on first attempt → retried up tomaxRetries, then error propagated; producer succeeds on second attempt → no error returned;ctx.Done()during retry wait →ctx.Err()returned immediately.
- Create
T4.6 — SegmentRecalculationCron job + registration [Jira: —]
- Purpose: Schedule
RecalculateAllSegmentsto run at 08:00 WIB (01:00 UTC) daily using the existinggocraft/workcron infrastructure. - Scope:
internal/app/cron/segment_recalculation.go; cron registration ininternal/worker/; dependency wiring incmd/initializer.go. - Expected Outcome:
go build ./...passes; cron appears in worker-ui; manual trigger populatescustomer_segment_membersandcustomer_segment_membership_events. - Implementation Plan:
- Create
internal/app/cron/segment_recalculation.go:type SegmentRecalculationCron struct {segmentService service.ISegmentService}func NewSegmentRecalculationCron(svc service.ISegmentService) *SegmentRecalculationCron {return &SegmentRecalculationCron{segmentService: svc}}func (c *SegmentRecalculationCron) SegmentRecalculation(job *work.Job) error {return c.segmentService.RecalculateAllSegments(context.Background())} - Open the cron registration file in
internal/worker/. Add toCronList:({"0 1 * * *", "segment_recalculation", segmentRecalcCron.SegmentRecalculation},0 1 * * *= minute 0, hour 1 UTC = 08:00 WIB) - In
cmd/initializer.go, extend thesegmentServiceconstruction to include the Kafka publisher and re-wire:PasssegmentEventPublisher := segmentkafka.NewKafkaSegmentPublisher(kafkaProducer,"cdp.segment.recalculated",cfg.SegmentKafkaPublishMaxRetries,cfg.SegmentKafkaPublishRetryDelayMs,cfg.KafkaPublishTimeoutSeconds,)segmentService := segmentservice.NewSegmentService(segmentRepo, memberRepo, datamartRepo, ruleEngine, cacheRepo, segmentEventPublisher, cfg)segmentRecalcCron := cron.NewSegmentRecalculationCron(segmentService)segmentRecalcCronto the worker registration step. - Smoke test:
./contact-service worker; open worker-ui (./contact-service worker-ui); verifysegment_recalculationin the schedule list; manually enqueue → confirmcustomer_segment_memberspopulated,customer_segment_membership_eventscontains"added"events, and Kafka topiccdp.segment.recalculatedreceives one message per segment withtotal_added,total_removed,total_matchedpopulated correctly.
- Create
Phase 5: Metrics, Service Integration & Delivery
T5.1 — Segment detail with reachability metrics [Jira: —]
-
Purpose: Enrich
GET /iag/v1/segments/:id(and thePUT /iag/v1/segments/:idresponse) withpercentage_reachand per-channel reachability counts. -
Scope:
internal/app/repository/segment_datamart/— new package:errors.goonly (ErrDatamartUnavailable).internal/app/repository/segment_member/interface.go— new package:ISegmentMemberRepositoryinterface (stub; implementation deferred to T4.2).internal/app/repository/contact/base.go— addCountAllByCompanytoContactInterfaceand implement onContactRepo.internal/app/payload/segment_payload.go— addChannelReachability,ReachabilityInfo; extendSegmentDetailResponse.internal/app/service/segment/— addIContactCounternarrow interface,ChannelReachabilityResult,SegmentDetailsresult type,SegmentServiceOptionoptions pattern,GetSegmentDetailsmethod.internal/app/handler/segment_handler.go—GetByIDandUpdatecallGetSegmentDetails; add privatetoSegmentDetailResponse(SegmentDetails)conversion.cmd/initializer.go— wireWithContactCounter(contactRepo)intoNewSegmentService.
-
Expected Outcome:
GetByIDreturnspercentage_reachandreachabilityfor active segments; archived segments omitreachability;whatsappisnullwhen datamart unavailable;go test ./...≥ 80% coverage. -
Implementation Plan:
Step 1 —
segment_datamarterrors packageCreate
internal/app/repository/segment_datamart/errors.go:package segmentdatamartimport "errors"// ErrDatamartUnavailable is returned by segment_member methods when the datamart// connection is not configured or CDC backfill is incomplete.var ErrDatamartUnavailable = errors.New("datamart unavailable")Step 2 —
segment_memberinterfaceCreate
internal/app/repository/segment_member/interface.go:package segmentmemberimport "context"// ISegmentMemberRepository manages the membership of contacts within segments.// Implementation is provided in T4.2; this stub defines the contract consumed by T5.1.type ISegmentMemberRepository interface {CountBySegmentID(ctx context.Context, segmentID, companyID string) (int64, error)// CountWithPhone returns the number of members with chat_data IS NOT NULL AND phone IS NOT NULL.// Returns segmentdatamart.ErrDatamartUnavailable when no member has chat_data populated (CDC backfill// incomplete) or when the datamart pool is nil.CountWithPhone(ctx context.Context, segmentID, companyID string) (int64, error)// CountWithEmail returns the number of members with email IS NOT NULL.CountWithEmail(ctx context.Context, segmentID, companyID string) (int64, error)}Run
make mocksafter creating this file to generateinternal/app/repository/segment_member/mocks/ISegmentMemberRepository.go.Step 3 —
CountAllByCompanyonContactInterfaceIn
internal/app/repository/contact/base.go:- Add to
ContactInterface:CountAllByCompany(ctx context.Context, companySsoID string) (int64, error) - Implement on
ContactRepo:func (r *ContactRepo) CountAllByCompany(ctx context.Context, companySsoID string) (int64, error) {return r.CountWithFilters(ctx, bson.M{"company_sso_id": companySsoID,"is_deleted": bson.M{"$ne": true},})}
Run
make mocksto regenerateinternal/app/repository/contact/mocks/ContactInterface.go.Step 4 — Payload additions
In
internal/app/payload/segment_payload.go, add:type ChannelReachability struct {Count int64 `json:"count"`Percentage float64 `json:"percentage"`}type ReachabilityInfo struct {WhatsApp *ChannelReachability `json:"whatsapp"` // null = datamart unavailableEmail *ChannelReachability `json:"email"` // null = datamart unavailable}Extend
SegmentDetailResponse:type SegmentDetailResponse struct {ID string `json:"id"`Name string `json:"name"`Description *string `json:"description"`Status string `json:"status"`RuleSet reposegment.RuleSet `json:"rule_set"`TotalMatched int64 `json:"total_matched"`PercentageReach *float64 `json:"percentage_reach,omitempty"`LastEvaluatedAt *time.Time `json:"last_evaluated_at"`Reachability *ReachabilityInfo `json:"reachability,omitempty"`CreatedBy string `json:"created_by"`CreatedAt time.Time `json:"created_at"`UpdatedAt time.Time `json:"updated_at"`}Keep the existing
ToSegmentDetailResponse(seg reposegment.CustomerSegment)mapper intact (it leaves the new fields nil); the handler uses the newtoSegmentDetailResponseconversion (Step 6) for enriched responses.Step 5 — Service interface additions
In
internal/app/service/segment/segment_service_interface.go, add:// IContactCounter is a narrow interface consumed by SegmentService to fetch the total// active contact count for a company. Satisfied by the existing ContactInterface concrete type.type IContactCounter interface {CountAllByCompany(ctx context.Context, companySsoID string) (int64, error)}// ChannelReachabilityResult holds computed counts for a single channel.type ChannelReachabilityResult struct {Count int64Percentage float64}// SegmentDetails is the service-level result returned by GetSegmentDetails.// The handler converts this into payload.SegmentDetailResponse.type SegmentDetails struct {Segment reposegment.CustomerSegmentTotalCustomers int64WhatsApp *ChannelReachabilityResult // nil = datamart unavailableEmail *ChannelReachabilityResult // nil = datamart unavailable}Add to
ISegmentService:// GetSegmentDetails returns the full segment view including reachability metrics.// Replaces GetSegment in the GetByID and Update handlers.GetSegmentDetails(ctx context.Context, companySsoID, segmentID, userSsoID string, ownedOnly bool) (SegmentDetails, error)Run
make mocksto regenerateISegmentService.go.Step 6 — Service implementation
In
internal/app/service/segment/segment_service.go:a. Extend
segmentServicestruct:type segmentService struct {repo reposegment.ISegmentRepositorymemberRepo segmentmember.ISegmentMemberRepository // nil until T4.2contactCounter IContactCounter // nil until wired}b. Add options pattern (keeps
NewSegmentService(repo)backward-compatible):type SegmentServiceOption func(*segmentService)func WithMemberRepo(r segmentmember.ISegmentMemberRepository) SegmentServiceOption {return func(s *segmentService) { s.memberRepo = r }}func WithContactCounter(c IContactCounter) SegmentServiceOption {return func(s *segmentService) { s.contactCounter = c }}func NewSegmentService(repo reposegment.ISegmentRepository, opts ...SegmentServiceOption) ISegmentService {svc := &segmentService{repo: repo}for _, o := range opts {o(svc)}return svc}c. Implement
GetSegmentDetails:func (s *segmentService) GetSegmentDetails(ctx context.Context, companySsoID, segmentID, userSsoID string, ownedOnly bool) (SegmentDetails, error) {seg, err := s.GetSegment(ctx, companySsoID, segmentID, userSsoID, ownedOnly)if err != nil {return SegmentDetails{}, err}result := SegmentDetails{Segment: seg}if s.contactCounter != nil {total, cErr := s.contactCounter.CountAllByCompany(ctx, companySsoID)if cErr != nil {slog.WarnContext(ctx, "segment_service get_segment_details count_contacts", slog.Any("error", cErr))} else {result.TotalCustomers = total}}// Reachability is only computed for active segments.if seg.Status != reposegment.SegmentStatusActive || s.memberRepo == nil {return result, nil}wCount, wErr := s.memberRepo.CountWithPhone(ctx, segmentID, companySsoID)if wErr != nil && !errors.Is(wErr, segmentdatamart.ErrDatamartUnavailable) {slog.WarnContext(ctx, "segment_service get_segment_details count_phone", slog.Any("error", wErr))} else if wErr == nil {pct := float64(0)if seg.TotalMatched > 0 {pct = float64(wCount) / float64(seg.TotalMatched) * 100}result.WhatsApp = &ChannelReachabilityResult{Count: wCount, Percentage: pct}}eCount, eErr := s.memberRepo.CountWithEmail(ctx, segmentID, companySsoID)if eErr != nil && !errors.Is(eErr, segmentdatamart.ErrDatamartUnavailable) {slog.WarnContext(ctx, "segment_service get_segment_details count_email", slog.Any("error", eErr))} else if eErr == nil {pct := float64(0)if seg.TotalMatched > 0 {pct = float64(eCount) / float64(seg.TotalMatched) * 100}result.Email = &ChannelReachabilityResult{Count: eCount, Percentage: pct}}return result, nil}Step 7 — Handler update
In
internal/app/handler/segment_handler.go:a. Add a private conversion helper:
func toSegmentDetailResponse(d segmentsvc.SegmentDetails) payload.SegmentDetailResponse {resp := payload.ToSegmentDetailResponse(d.Segment)if d.TotalCustomers > 0 {pct := float64(d.Segment.TotalMatched) / float64(d.TotalCustomers) * 100resp.PercentageReach = &pct}if d.Segment.Status == reposegment.SegmentStatusActive {reach := &payload.ReachabilityInfo{}if d.WhatsApp != nil {reach.WhatsApp = &payload.ChannelReachability{Count: d.WhatsApp.Count, Percentage: d.WhatsApp.Percentage}}if d.Email != nil {reach.Email = &payload.ChannelReachability{Count: d.Email.Count, Percentage: d.Email.Percentage}}resp.Reachability = reach}return resp}b. Update
GetByID— replaceGetSegmentcall:details, err := h.segmentService.GetSegmentDetails(ctx, companySsoID, segmentID, userSsoID, ownedOnly)if err != nil {return myhttp.ResponseBody{}, h.mapServiceError(ctx, "segment_handler get_by_id", err)}return myhttp.NewJSONResponse(toSegmentDetailResponse(details), nil), nilc. Update
Update— replace the post-updateGetSegmentcall:details, err := h.segmentService.GetSegmentDetails(ctx, companySsoID, segmentID, userSsoID, ownedOnly)if err != nil {return myhttp.ResponseBody{}, h.mapServiceError(ctx, "segment_handler update get", err)}return myhttp.NewJSONResponse(toSegmentDetailResponse(details), nil), nilStep 8 — Wiring
In
cmd/initializer.go, update the segment service construction:segmentService := segmentsvc.NewSegmentService(segmentRepo,segmentsvc.WithContactCounter(contactRepo), // contactRepo satisfies IContactCounter via CountAllByCompany)WithMemberRepois not wired here — added in T4.6 onceISegmentMemberRepositoryhas a concrete implementation.Step 9 — Tests
Handler tests (
internal/app/handler/segment_handler_test.go):- Replace all
svc.On("GetSegment", ...)insideTestSegmentHandler_GetByIDand the post-update fetch insideTestSegmentHandler_Updatewithsvc.On("GetSegmentDetails", ...)returning asegmentsvc.SegmentDetails{Segment: sampleSegment(segID)}. - Add cases:
GetSegmentDetailsreturnsErrSegmentNotFound→ 404; returnsErrPermissionDenied→ 403.
Service tests (
internal/app/service/segment/segment_service_test.go), addTestSegmentService_GetSegmentDetails:| Case | memberRepo | contactCounter | expected ||---|---|---|---|| active, both deps nil | nil | nil | Segment returned, WhatsApp nil, Email nil, TotalCustomers 0 || active, memberRepo wired, no CDC backfill | CountWithPhone → ErrDatamartUnavailable | wired | WhatsApp nil, Email populated || active, both wired | CountWithPhone → 800, CountWithEmail → 940 | total=11700 | WhatsApp={800, 76.19}, Email={940, 89.52}, PercentageReach computable || archived | memberRepo wired | wired | WhatsApp nil, Email nil (reachability skipped for archived) || not found | — | — | ErrSegmentNotFound || permission denied (ownedOnly) | — | — | ErrPermissionDenied | - Add to
T5.2 — GetSegmentCustomers + GET /iag/v1/segments/:id/customers handler [Jira: —]
- Purpose: Expose paginated customer membership for a saved segment to the CDP frontend with OWNED_ONLY permission support.
- Scope:
internal/app/payload/segment_payload.go—SegmentCustomerItem,SegmentCustomersResultinternal/app/service/segment/segment_service_interface.go—GetSegmentCustomersinternal/app/service/segment/segment_service.go— implementGetSegmentCustomersinternal/app/handler/segment_handler.go—GetSegmentCustomershandlerinternal/server/rest_router.go— wireGET /iag/v1/segments/:segmentID/customers
- Depends on: T4.1 (
CustomerSegmentMemberstruct), T4.2 (ISegmentMemberRepository). - Expected Outcome: Unit tests cover ALL_ACCESS, OWNED_ONLY, segment-not-found, and invalid pagination; endpoint verified in staging with the CDP frontend.
- Implementation Plan:
- In
internal/app/payload/segment_payload.go, add:type SegmentCustomerItem struct {ContactID string `json:"contact_id"`Name string `json:"name"`Source string `json:"source"`AddedAt time.Time `json:"added_at"`} - Add to
ISegmentService:RunGetSegmentCustomers(ctx context.Context, segmentID, companySsoID, userSsoID string, ownedOnly bool, page, perPage int) ([]payload.SegmentCustomerItem, int64, error)make mocksto regenerate the service mock. - Implement
GetSegmentCustomersinsegment_service.go:- Call
segmentRepo.FindByID— returnErrSegmentNotFoundif segment does not exist. - OWNED_ONLY boundary: if
ownedOnly && seg.CreatedBy != userSsoIDreturnErrPermissionDenied. - Call
memberRepo.FindMembers(ctx, segmentID, companySsoID, userSsoID, ownedOnly, page, perPage)→[]CustomerSegmentMember, total. - Resolve contact name and source from
contactRepo.FindByIDs; populateSegmentCustomerItemslice. - Return slice + total.
- Call
- Add
GetSegmentCustomers(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error)toSegmentHandler:- Parse
segmentIDfrom URL param; parse and validatepage/per_pagequery params (defaults: 1/20; max per_page: 100 → 400 if exceeded). - Extract
companySsoID,userSsoID,ownedOnlyfrom IAG permission context. - Call
svc.GetSegmentCustomers; map errors viamapServiceError. - Return
myhttp.NewJSONResponse(customers, pagination).
- Parse
- In
internal/server/rest_router.go, inside the/{segmentID}sub-route block:r.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Get("/customers", segmentHandler.GetSegmentCustomers) - Unit tests:
- Service: ALL_ACCESS → full list; OWNED_ONLY + owner → filtered; OWNED_ONLY + non-owner →
ErrPermissionDenied; segment not found →ErrSegmentNotFound. - Handler: valid → 200;
ErrSegmentNotFound→ 404;ErrPermissionDenied→ 403;per_page> 100 → 400.
- Service: ALL_ACCESS → full list; OWNED_ONLY + owner → filtered; OWNED_ONLY + non-owner →
- In
T5.3 — GetSegmentsByContact + GET /api/v1/contacts/:id/segments endpoint [Jira: —]
- Purpose: Enable downstream services (Broadcast, Chatbot) to discover which segments a contact belongs to via a machine-to-machine endpoint.
- Scope:
internal/app/service/segment/—GetSegmentsByContact;internal/app/handler/segment_handler.go—GetContactSegmentshandler underBasicAuth. - Expected Outcome: Unit tests cover multiple segments, no segments, invalid contactID; staging curl verified with Broadcast team.
- Implementation Plan:
- Add to
ISegmentService:GetSegmentsByContact(ctx context.Context, contactID, companySsoID string) ([]SegmentInfo, error) - Define in
internal/app/payload/segment_payload.go:type SegmentInfo struct {SegmentID string `json:"segment_id"`SegmentName string `json:"segment_name"`} - Implement
GetSegmentsByContact:memberRepo.FindByContactID(ctx, contactID, companySsoID); for each membershipsegmentRepo.FindByIDto get the segment name; return[]SegmentInfo. Return empty slice (not an error) when there are no memberships. - Add
GetContactSegments(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error)toSegmentHandler:contactID := chi.URLParam(r, "contactID")— return 400 if empty.companySsoIDextracted fromBasicAuthcontext (machine-to-machine; no user permission check).- Call
svc.GetSegmentsByContact. - Return
200 OK{data: {customer_id: contactID, segments: [...]}}.
- Route registered in T1.6 under
BasicAuthmiddleware. Verify withcurl -u user:pass /api/v1/contacts/{id}/segments. - Unit tests: 2 segments → both returned; no memberships → empty array; missing
contactID→ 400.
- Add to
T5.4 — OpenAPI spec update [Jira: —]
- Purpose: Keep contract documentation in sync with implementation for frontend, Broadcast, and QA consumers.
- Scope:
docs/swagger.yaml— all new segment endpoints with schemas and per-endpoint error responses. - Expected Outcome:
swagger-cli validate docs/swagger.yamlpasses; Swagger UI renders all new endpoints. - Implementation Plan:
-
Open
docs/swagger.yaml. Define reusable$refcomponent schemas:RuleCondition,RuleGroup,RuleSet,SegmentResponse,SegmentDetailResponse(with nestedReachabilityInfo),SegmentListResponse,PreviewRequest,PreviewResponse,ErrorResponse(envelope:{error, message, details}). -
Add a path entry for each endpoint. Each entry must include
summary,operationId,security,parameters(GET),requestBody(POST/PUT), and aresponsessection with200/201plus all error codes from the table below — each error response uses$ref: '#/components/schemas/ErrorResponse'with an example showing the relevanterrorcode string:Endpoint Error codes to document POST /iag/v1/segments400 INVALID_PAYLOAD,403 PERMISSION_DENIED,409 CONFLICT_NAME,422 INVALID_RULEGET /iag/v1/segments400 INVALID_PAYLOAD,403 PERMISSION_DENIEDGET /iag/v1/segments/:id403 PERMISSION_DENIED,404 NOT_FOUNDPUT /iag/v1/segments/:id400 INVALID_PAYLOAD,403 PERMISSION_DENIED,404 NOT_FOUND,409 CONFLICT_NAME,422 INVALID_RULEPATCH /iag/v1/segments/:id/archive403 PERMISSION_DENIED,404 NOT_FOUND,409 CONFLICT_STATUSPOST /iag/v1/segments/:id/duplicate403 PERMISSION_DENIED,404 NOT_FOUND,409 CONFLICT_NAMEPOST /iag/v1/segments/preview400 INVALID_PAYLOAD,422 INVALID_RULE,429 RATE_LIMIT_EXCEEDED,503 DATAMART_UNAVAILABLEGET /iag/v1/segments/:id/customers400 INVALID_PAYLOAD,403 PERMISSION_DENIED,404 NOT_FOUNDGET /api/v1/contacts/:id/segments401(BasicAuth failure)GET /api/v1/segments/:id/customers400 INVALID_PAYLOAD,401(BasicAuth failure),404 NOT_FOUND -
Add the S2S paths
/api/v1/contacts/{contactID}/segmentsand/api/v1/segments/{segmentID}/customerswithbasicAuthsecurity scheme and their200responses. DocumentPrivateSegmentCustomerItemas a reusable$refcomponent schema for the latter. -
Run
make init(callsswag init) if the project uses swag annotations; otherwiseswagger-cli validate docs/swagger.yaml. -
Open Swagger UI; verify all new endpoints render with correct schemas and example error payloads.
-
T5.5 — Quality gate & final verification [Jira: —]
- Purpose: Confirm the complete implementation meets the project's mandatory quality standards before the rollout plan begins.
- Scope: CI pipeline —
go fmt ./...,go vet ./...,golangci-lint run,go build ./...,go test ./...; all segment packages must reach ≥ 80% coverage (≥ 90% forsegment_engine). - Expected Outcome: All commands exit 0 with no warnings or failures; coverage thresholds met; service boots cleanly with a local
docker-compose up. - Implementation Plan:
- Run
make prepare(init → build → mocks → test → lint → sec). Fix every failure before advancing; do not suppress warnings. - Check per-package coverage thresholds:
go test -coverprofile=cover.out ./internal/... && go tool cover -func=cover.out | grep segment. Confirmsegment_engine≥ 90% and all other segment packages ≥ 80%. Add tests to any package below threshold. - Run
staticcheck ./internal/...— resolve all new findings in segment packages. - Run
gosec ./internal/...— confirm no newHIGH/CRITICALfindings; the Rule Engine's parameterised-only SQL path should be clean. - Start the full stack:
docker-compose up(MongoDB + Redis + datamart), then./contact-service server. HitPOST /iag/v1/segmentsandPOST /iag/v1/segments/previewend-to-end to confirm the full wiring is live.
- Run
T5.6 — GetPrivateSegmentCustomers + GET /api/v1/segments/:id/customers endpoint [Jira: —]
- Purpose: Expose detailed customer data for all members of a saved segment to downstream machine-to-machine consumers (Broadcast, Chatbot) without a Launchpad permission gate.
- Scope:
internal/app/payload/segment_payload.go—PrivateSegmentCustomerIteminternal/app/service/segment/segment_service_interface.go—GetPrivateSegmentCustomersinternal/app/service/segment/segment_service.go— implementGetPrivateSegmentCustomersinternal/app/handler/segment_handler.go—GetPrivateSegmentCustomershandler underBasicAuthinternal/server/rest_router.go— wireGET /api/v1/segments/:segmentID/customers(stub added in T1.6)
- Depends on: T4.1 (
CustomerSegmentMemberstruct), T4.2 (ISegmentMemberRepository), T5.2 (FindMemberson member repo already present). - Expected Outcome: Unit tests cover pagination, segment-not-found, empty member list, partial contact-lookup failure; staging
curlverified with the Broadcast team. - Implementation Plan:
- In
internal/app/payload/segment_payload.go, add:type PrivateSegmentCustomerItem struct {ID string `json:"id"`CompanySsoID string `json:"company_sso_id"`CompanyBillingID string `json:"company_billing_id"`Name string `json:"name"`Email string `json:"email"`Phone []string `json:"phone"`IsDeleted *bool `json:"is_deleted"`CustomFields *[]CustomField `json:"custom_fields"`OwnerID string `json:"owner_id"`OwnerName string `json:"owner_name"`AssigneeID string `json:"assignee_id"`AssigneeName string `json:"assignee_name"`Usernames []Username `json:"usernames"`Source string `json:"source"`SourceID string `json:"source_id"`SourceName string `json:"source_name"`Status string `json:"status"`StatusID string `json:"status_id"`StatusName string `json:"status_name"`JobTitle string `json:"job_title"`Tags []string `json:"tags"`Flag string `json:"flag"`Address *Address `json:"address"`DateOfBirth string `json:"date_of_birth"`Sex string `json:"sex"`SexID string `json:"sex_id"`SexName string `json:"sex_name"`PhoneMarketingOptIn []WhatsappMarketingOptedInSerializer `json:"phone_marketing_opt_in,omitempty"`IsLoyaltyMember bool `json:"is_loyalty_member"`AddedAt time.Time `json:"added_at"`CreatedAt time.Time `json:"created_at"`UpdatedAt time.Time `json:"updated_at"`CreatedBy string `json:"created_by"`UpdatedBy string `json:"updated_by"`}CustomField,Username,Address, andWhatsappMarketingOptedInSerializerare the existing types frominternal/app/repository/contact/create_serializer.go. This struct isContactSerializerminusCrmData,ChatData,Avatar,Accounts,DisabledField,FieldPermissions,Permission,IsEnableUsernames, and the scalarUsernamefield. - Add to
ISegmentService:RunGetPrivateSegmentCustomers(ctx context.Context, segmentID, companySsoID string, page, perPage int) ([]payload.PrivateSegmentCustomerItem, int64, error)make mocksto regenerate the service mock. - Implement
GetPrivateSegmentCustomersinsegment_service.go:- Call
segmentRepo.FindByID— returnErrSegmentNotFoundif segment does not exist. - Call
memberRepo.FindMembers(ctx, segmentID, companySsoID, "", false, page, perPage)— no OWNED_ONLY filtering; all members returned. - Collect
contactIDsfrom member records; callcontactRepo.FindByIDs(ctx, contactIDs, companySsoID). - Map each
ContactSerializertoPrivateSegmentCustomerItem, copying all included fields; setAddedAtfrom the corresponding member record. - Contacts missing from the lookup (e.g., soft-deleted) are silently skipped (
slog.WarnContextwithcontact_id). - Return slice + total (total is the full member count, not the resolved contact count).
- Call
- Add
GetPrivateSegmentCustomers(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error)toSegmentHandler:- Parse
segmentIDfrom URL param — return 400 if empty. - Parse and validate
page/per_pagequery params (defaults: 1/20; max per_page: 100 → 400 if exceeded). - Extract
companySsoIDfrom BasicAuth context — no user SSO ID or Launchpad permission required. - Call
svc.GetPrivateSegmentCustomers; mapErrSegmentNotFound→ 404. - Return
myhttp.NewJSONResponse(customers, pagination).
- Parse
- In
internal/server/rest_router.go, attach the handler to the stub route registered in T1.6:r.With(BasicAuth).Get("/api/v1/segments/{segmentID}/customers", segmentHandler.GetPrivateSegmentCustomers) - Unit tests:
- Service: segment found → full member list with all fields populated; segment not found →
ErrSegmentNotFound; empty member list → empty slice (not error); partial contact lookup failure → available contacts returned, missing ones skipped with warning log. - Handler: valid request → 200 with
data+pagination;ErrSegmentNotFound→ 404; emptysegmentID→ 400;per_page> 100 → 400.
- Service: segment found → full member list with all fields populated; segment not found →
- In
Phase 6: Tenant Isolation & Versioning Hardening
Phase 6 tasks are additive-only — they do not modify Phase 1–5 source files, they extend them. Each follows the same TDD discipline as prior phases: tests written first, implementation to pass, then refactor.
T6.1 — S2S company_sso_id enforcement on both internal endpoints [Jira: —]
- Purpose: Close a tenant-isolation gap in the two S2S endpoints. The
BasicAuthmiddleware only validates credentials and sets nothing in context, so without an explicitcompany_sso_idparameter any authorised internal service could query data belonging to any tenant. This task adds the parameter, threads it through, and validates ownership before returning data. - Scope:
internal/app/handler/segment_handler.go—GetContactSegmentsandGetPrivateSegmentCustomershandlers.internal/app/service/segment/segment_service_interface.go+segment_service.go—GetPrivateSegmentCustomerssignature extension (segment ownership check).internal/server/rest_router.go— no route change; only handler logic changes.
- Depends on: T5.3 (
GetContactSegments), T5.6 (GetPrivateSegmentCustomers). - Expected Outcome: Missing
company_sso_id→ 400 from both endpoints. Cross-tenant segment ID on the customers endpoint → 404. Cross-tenant contact ID on the segments endpoint → emptysegmentsarray (not an error). Unit tests cover all paths;go test ./internal/app/handler/...andgo test ./internal/app/service/segment/...remain green. - Implementation Plan:
GetContactSegmentshandler (internal/app/handler/segment_handler.go):- Read
companySsoID := r.URL.Query().Get("company_sso_id"). - If empty, return
myhttp.ErrBadRequest("company_sso_id is required")→ HTTP 400 immediately (before any service call). - Pass
companySsoIDtosvc.GetSegmentsByContact(ctx, contactID, companySsoID). - No service-layer change needed:
memberRepo.FindByContactIDalready filters bycompany_sso_id, so querying with a wrong value simply returns an empty slice.
- Read
GetPrivateSegmentCustomershandler (internal/app/handler/segment_handler.go):- Read
companySsoID := r.URL.Query().Get("company_sso_id"). - If empty, return HTTP 400 immediately.
- Pass
companySsoIDtosvc.GetPrivateSegmentCustomers(ctx, segmentID, companySsoID, page, perPage).
- Read
GetPrivateSegmentCustomersservice (internal/app/service/segment/segment_service.go):- After
segmentRepo.FindByID(ctx, segmentID), add:if seg.CompanySsoID != companySsoID {return nil, 0, ErrSegmentNotFound // intentionally 404, not 403 — do not leak existence} - This is the only service-layer change;
FindMembersalready carriescompanyIDas a filter argument.
- After
- Unit tests (
internal/app/handler/segment_handler_test.go):GetContactSegments: missingcompany_sso_idquery param → 400 without calling service; param present → service called with correct value.GetPrivateSegmentCustomers: missingcompany_sso_id→ 400; param present but segment belongs to different company → service returnsErrSegmentNotFound→ handler returns 404; happy-path → 200.
- Service unit tests (
internal/app/service/segment/segment_service_test.go):GetPrivateSegmentCustomers:seg.CompanySsoID != companySsoID→ErrSegmentNotFound;seg.CompanySsoID == companySsoID→ proceeds normally.
T6.2 — Segment version field: struct, repository, and service layer [Jira: —]
- Purpose: Track a monotonically increasing edit counter on each segment so that consumers, downstream services, and the Kafka event stream can detect when a segment's definition has changed. The counter is owned by the user-facing edit operations and is never touched by the cron.
- Scope:
internal/app/repository/segment/types.go— addVersionfield.internal/app/repository/segment/segment_repository_mongo.go—InsertsetsVersion = 1;Update(called only fromUpdateSegment) andUpdateStatus(called only fromArchiveSegment) both add"$inc": bson.M{"version": 1}.internal/app/service/segment/segment_service.go—CreateSegmentsetsVersion: 1;DuplicateSegmentresets to1; no other service changes (version increment is a repository-layer concern).
- Depends on: T1.1, T1.3, T1.4.
- Expected Outcome:
Versionpersists and increments correctly;UpdateStats/ClearRuleDirtyAt/IncrementRecalcErrorCountdo not changeVersion; unit tests validate all paths;go test ./internal/app/repository/segment/...andgo test ./internal/app/service/segment/...pass with ≥ 80% coverage. - Implementation Plan:
- Struct (
internal/app/repository/segment/types.go): Add the field afterRecalcErrorCount:Add to the round-trip test inVersion int64 `bson:"version" json:"version"`types_test.go: assert thatVersionmarshals to the"version"key in both JSON and BSON, and that its value is preserved. - Repository —
Insert(segment_repository_mongo.go): The caller (CreateSegment) setsseg.Version = 1before callingInsert. No repository change is strictly needed, but verify the field is included in the BSON document (it is, by default, since it carries abson:"version"tag). - Repository —
Update(segment_repository_mongo.go): In the$setupdate document, add a sibling$incoperator:This method is only called fromupdate := bson.M{"$set": bson.M{ /* existing fields */ },"$inc": bson.M{"version": 1},}UpdateSegment, so every user-initiated edit bumps the counter atomically. - Repository —
UpdateStatus(segment_repository_mongo.go): Same pattern — add"$inc": bson.M{"version": 1}alongside the$setforstatus. This method is only called fromArchiveSegment. - Repository —
UpdateStats,ClearRuleDirtyAt,IncrementRecalcErrorCount: confirm none of these touchversion(they use their own targeted$set/$incdocuments). Add comments noting this is intentional. - Service —
CreateSegment(segment_service.go): Before callingrepo.Insert, setseg.Version = 1(alongsideseg.CreatedAt,seg.Status, etc.). - Service —
DuplicateSegment(segment_service.go): In the field-zeroing block, setcopy.Version = 1(alongsideTotalMatched,LastEvaluatedAt, etc.). - Repository unit tests (
segment_repository_test.go):Insert→ fetched document hasversion == 1.Updatecalled once →version == 2; called twice →version == 3.UpdateStatus→versionincrements.UpdateStats→versionunchanged.ClearRuleDirtyAt→versionunchanged.IncrementRecalcErrorCount→versionunchanged.
- Service unit tests (
segment_service_test.go):CreateSegment→repo.Insertcalled with a segment whereVersion == 1.DuplicateSegment→ duplicated segment hasVersion == 1regardless of source version.
- Struct (
T6.3 — version field in API payload responses [Jira: —]
- Purpose: Surface the
versioncounter in all segment-facing API responses so that the CDP frontend and downstream consumers can detect definition changes without comparing rule sets. - Scope:
internal/app/payload/segment_payload.go— addVersiontoSegmentResponseandSegmentDetailResponse; update mapper functions.internal/app/handler/segment_handler_test.go— assertversionin all relevant response assertions.docs/swagger.yaml— addversionto affected schemas (can be done alongside T5.4 if not yet merged, or as an addendum).
- Depends on: T6.2, T1.5, T5.1.
- Expected Outcome:
GET /iag/v1/segments,GET /iag/v1/segments/:id, andPUT /iag/v1/segments/:idall return"version"in their data objects. Handler tests verify the field is present and correctly populated from the domain struct.go test ./internal/app/handler/...andgo test ./internal/app/payload/...pass. - Implementation Plan:
SegmentResponse(list item) insegment_payload.go: AddVersion int64 \json:"version"`as a field. Update the mapperToSegmentResponse(seg CustomerSegment) SegmentResponse(or wherever the list item is built) to setVersion: seg.Version`.SegmentDetailResponseinsegment_payload.go: AddVersion int64 \json:"version"`. UpdateToSegmentDetailResponse(seg CustomerSegment)to setVersion: seg.Version. ThetoSegmentDetailResponse(SegmentDetails)` helper in the handler (T5.1 Step 7) calls this mapper, so it picks up the field automatically.- Handler tests:
TestSegmentHandler_List: assertdata[0].version == 1(or whatever the mock returns).TestSegmentHandler_GetByID: assertdata[0].versionis populated.TestSegmentHandler_Update: assert updated response includesversion.
- OpenAPI (
docs/swagger.yaml): In theSegmentResponseandSegmentDetailResponseschema components, add:version:type: integerformat: int64description: Monotonically increasing counter; increments on every user-initiated edit or archive.example: 3
T6.4 — rule_version in Kafka SegmentRecalculatedEvent [Jira: —]
- Purpose: Let Kafka consumers (Broadcast, Chatbot) correlate each recalculation result with the specific version of the segment's rule that produced it. Without this, a consumer cannot tell whether a recalculation was triggered by the current rule or a since-updated one.
- Scope:
internal/app/service/segment/event_publisher.go— addRuleVersion int64toSegmentRecalculatedEvent.internal/app/service/segment/segment_service.go— populateRuleVersionfromseg.VersioninprocessSegment.internal/pkg/segment_kafka/publisher.go— no change; the field is marshalled automatically.- Unit tests for T4.5 publisher and T4.4
processSegment.
- Depends on: T6.2, T4.4, T4.5.
- Expected Outcome: Every
cdp.segment.recalculatedKafka message includes"rule_version". Publisher unit tests assert the field is serialised in the JSON payload.go test ./internal/app/service/segment/...andgo test ./internal/pkg/segment_kafka/...pass. - Implementation Plan:
- Struct (
internal/app/service/segment/event_publisher.go): Add the field betweenRecalcRunIDandTotalMatched:RunRuleVersion int64 `json:"rule_version"` // segment.Version at time of recalculationmake mocksto regenerateMockISegmentEventPublisher. processSegment(internal/app/service/segment/segment_service.go): When building theSegmentRecalculatedEvent, populate the new field:event := SegmentRecalculatedEvent{EventType: "segment.recalculated",SegmentID: seg.ID,SegmentName: seg.Name,CompanySsoID: seg.CompanySsoID,RecalcRunID: recalcRunID,RuleVersion: seg.Version, // version of the rule definition that was evaluatedTotalMatched: int64(len(newIDs)),TotalAdded: int64(len(added)),TotalRemoved: int64(len(removed)),OccurredAt: runStartedAt,}- Publisher unit tests (
internal/pkg/segment_kafka/publisher_test.go):- Assert that the JSON payload delivered to the Kafka producer contains
"rule_version": Nwith the value set in the event struct. - Unmarshal the captured
msg.Valueand verifyRuleVersionfield is present and non-zero.
- Assert that the JSON payload delivered to the Kafka producer contains
processSegmentunit tests (internal/app/service/segment/segment_service_test.go):- Extend the "non-empty diff →
Publishcalled" test: assert that the capturedSegmentRecalculatedEventargument hasRuleVersion == seg.Version. - Add a test where
seg.Version == 5to confirm the value flows correctly from the domain object into the event.
- Extend the "non-empty diff →
- Consumer contract addendum (comment in
event_publisher.go): Add a one-line note to theISegmentEventPublisherdocstring:// Consumers should treat a change in rule_version as a signal that the segment membership// may now reflect a different rule definition and re-fetch member details if needed.
- Struct (
Phase 7: Operational Controls
Phase 7 adds three runtime-control mechanisms: a circuit breaker to abort an in-progress recalculation, a dedicated feature flag to disable recalculation independently of the CRUD API, and a manual trigger endpoint for targeted on-demand recalculation. All three tasks must follow the same TDD discipline (Red → Green → Refactor) as prior phases.
T7.1 — Recalculation circuit breaker [Jira: —]
- Purpose: Give operators a way to gracefully stop a running recalculation cron without killing the pod. Writing any value to a Redis key signals the loop to exit cleanly after the current segment finishes.
- Scope:
internal/pkg/consts/(new key constant),params/config (new env var),internal/app/service/segment/segment_service.go(doRecalculationloop — extracted fromRecalculateAllSegmentsin T7.3). - Depends on: T4.4 (
RecalculateAllSegmentsexists), T7.3 (the refactor that extractsdoRecalculation). - Expected Outcome: Unit tests confirm the loop exits without processing further segments when the key is present; key is cleared on detection (one-shot); absent key has no effect.
- Implementation Plan:
- Add to
internal/pkg/consts/segment.go(create file if absent):const SegmentRecalcCircuitBreakerKey = "segment_recalculation_stop_requested" - Add to the
params/config struct:This TTL is used by the trigger API handler (T7.3) when it writes the key; it is NOT used by the cron loop itself (the loop only reads and deletes it).SegmentRecalcCircuitBreakerKeyTTLSeconds int `env:"SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDS" envDefault:"86400"` - In
doRecalculation(the private method extracted in T7.3), at the top of each outer batch-fetch iteration — before callingFindActiveSegmentsfor the next page:Semantics: one-shot — the cron clears the key upon detection so tomorrow's scheduled run is unaffected.if val, _ := s.cacheRepo.Get(ctx, consts.SegmentRecalcCircuitBreakerKey); val != "" {slog.InfoContext(ctx, "segment recalc circuit breaker triggered, stopping",slog.String("recalc_run_id", recalcRunID))s.cacheRepo.Del(ctx, consts.SegmentRecalcCircuitBreakerKey) //nolint:errcheckreturn nil} - Unit tests (
internal/app/service/segment/segment_service_test.go):- Circuit breaker key present before first batch fetch →
doRecalculationreturns nil; zero segments processed;Delcalled once. - Circuit breaker key set after the first batch completes → second batch loop iteration detects it; first-batch segments are processed normally;
Delcalled once. - Circuit breaker key absent → loop runs to completion normally;
Delnever called for this key.
- Circuit breaker key present before first batch fetch →
- Add to
T7.2 — cdp_segment_recalc_enabled feature flag [Jira: —]
- Purpose: Allow the two regular, automatic recalculation paths — the daily cron
RecalculateAllSegmentsand the scoped on-write recalcrecalculateSegment(T8.1, fired after create / rule-changing update) — to be disabled globally (e.g., during datamart maintenance) without affecting segment CRUD endpoints. It deliberately does not gate the S2S manual triggerTriggerRecalculation(T7.3): an operator must retain a way to force a targeted recalc even while automatic recalculation is paused. This flag is also separate fromcdp_segmentation_enabled(CRUD API gate). - Scope:
internal/pkg/consts/feature_flag.go(new constant),internal/app/service/segment/segment_service.go(newWithFeatureFlagServiceoption + early-exit check, called fromRecalculateAllSegmentsandrecalculateSegment— NOT fromTriggerRecalculation),cmd/initializer.go(wiring). - Depends on: T5.1 (options pattern established by
WithContactCounter). - Expected Outcome:
RecalculateAllSegmentsandrecalculateSegmentboth return/no-op without acquiring their respective lock when the flag is disabled;TriggerRecalculationis unaffected by the flag. Flag absent from MongoDB (nil feature flag service) defaults to enabled. Unit tests cover all paths. - Implementation Plan:
- In
internal/pkg/consts/feature_flag.go, add:FeatureFlagCDPSegmentRecalc = "cdp_segment_recalc_enabled" - Add a new option to
internal/app/service/segment/segment_service.go:Add the field tofunc WithFeatureFlagService(ff service.IFeatureFlagService) SegmentServiceOption {return func(s *segmentService) { s.featureFlagService = ff }}segmentServicestruct:featureFlagService service.IFeatureFlagService // nil = recalc always enabled - Add a private helper called at the start of
RecalculateAllSegmentsandrecalculateSegment(T8.1), before any lock acquisition —TriggerRecalculation(T7.3) must NOT call this helper:At the start offunc (s *segmentService) isRecalcEnabled(ctx context.Context) bool {if s.featureFlagService == nil {return true // safe default when not wired}return s.featureFlagService.FeatureEnabled(ctx, consts.FeatureFlagCDPSegmentRecalc, "")// empty sso_id → global check: returns data.Enabled regardless of CompanySsoIDs}RecalculateAllSegments:And at the start ofif !s.isRecalcEnabled(ctx) {slog.InfoContext(ctx, "segment recalc disabled by feature flag")return nil}recalculateSegment(T8.1), after the nil-dependency guard:if !s.isRecalcEnabled(ctx) {return} - In
cmd/initializer.go, extend theNewSegmentServicecall:segmentService := segmentsvc.NewSegmentService(segmentRepo,segmentsvc.WithContactCounter(contactRepo),segmentsvc.WithFeatureFlagService(featureFlagService), // add this line) - Unit tests (
internal/app/service/segment/segment_service_test.go):- Flag service nil →
RecalculateAllSegmentsproceeds (no early exit). - Flag
cdp_segment_recalc_enableddisabled →RecalculateAllSegmentsreturns nil; lockSetNXnever called. - Flag enabled → proceeds to lock acquisition normally.
TriggerRecalculationwith flag disabled → still proceeds to lock acquisition (the S2S manual trigger is exempt;isRecalcEnabledis never called from this method). FullrecalculateSegmentflag-disabled coverage lives in T8.1's tests.
- Flag service nil →
- In
T7.3 — TriggerRecalculation service method + POST /api/v1/segments/recalculate endpoint [Jira: —]
-
Purpose: Expose an internal S2S endpoint that triggers on-demand recalculation for specific companies and optionally specific segments. Useful for immediately refreshing a segment after a rule update or CDC lag recovery without waiting for the next daily run. Returns 202 immediately; progress is tracked via Kafka events carrying the
recalc_run_id. -
Scope:
internal/app/repository/segment/segment_repository.go— extendSegmentFilterwithCompanySsoIDs []stringandSegmentIDs []string.internal/app/repository/segment/segment_repository_mongo.go— apply filters inFindAllquery.internal/app/service/segment/segment_service_interface.go—RecalcTriggerFilter,TriggerRecalculation,ErrRecalcAlreadyRunning.internal/app/service/segment/segment_service.go— refactorRecalculateAllSegments→ extractdoRecalculation(ctx, recalcRunID string, filter *RecalcTriggerFilter); implementTriggerRecalculation.internal/app/payload/segment_payload.go—TriggerRecalculationRequest,TriggerRecalculationResponse.internal/app/handler/segment_handler.go—TriggerRecalculationhandler.internal/server/rest_router.go— register under the/api/v1/BasicAuth block.
-
Depends on: T4.4 (
RecalculateAllSegments), T7.1 (circuit breaker check lives indoRecalculation), T7.2 (options pattern; noteTriggerRecalculationdeliberately does NOT callisRecalcEnabled— the S2S manual trigger is the one path exempt from the flag). -
Expected Outcome: Unit tests cover all happy paths and error cases;
go build ./...passes; stagingcurlverified. -
Implementation Plan:
Step 1 — Extend
SegmentFilterIn
internal/app/repository/segment/segment_repository.go, add two optional fields toSegmentFilter:CompanySsoIDs []string // when non-empty, constrains to segments belonging to these companiesSegmentIDs []string // when non-empty, constrains to only these segment IDsIn
segment_repository_mongo.go, in theFindAllBSON filter construction, append when non-empty:if len(f.CompanySsoIDs) > 0 {filter["company_sso_id"] = bson.M{"$in": f.CompanySsoIDs}}if len(f.SegmentIDs) > 0 {ids := make([]primitive.ObjectID, 0, len(f.SegmentIDs))for _, id := range f.SegmentIDs {oid, err := primitive.ObjectIDFromHex(id)if err == nil {ids = append(ids, oid)}}filter["_id"] = bson.M{"$in": ids}}Step 2 — Define domain types and sentinels
In
internal/app/service/segment/segment_service_interface.go:// RecalcTriggerFilter scopes a manual recalculation to specific companies/segments.// SegmentIDs is optional; when nil/empty all active segments for CompanySsoIDs are processed.type RecalcTriggerFilter struct {CompanySsoIDs []stringSegmentIDs []string}var (ErrRecalcAlreadyRunning = errors.New("recalculation already in progress"))Add to
ISegmentService:// TriggerRecalculation starts an async recalculation for the given filter and returns its recalcRunID.// Bypasses the cdp_segment_recalc_enabled flag unconditionally — the S2S manual trigger is// the one recalc path exempt from it (see T7.2); the cron and the scoped on-write recalc// (T8.1) both check it.// Returns ErrRecalcAlreadyRunning when the distributed lock is already held.TriggerRecalculation(ctx context.Context, filter RecalcTriggerFilter) (string, error)Run
make mocksto regenerateMockISegmentService.Step 3 — Refactor
RecalculateAllSegmentsand implementdoRecalculationIn
internal/app/service/segment/segment_service.go:a. Extract the main body of
RecalculateAllSegments(after lock acquisition) into a private method:// doRecalculation executes the full recalculation loop. filter == nil means all companies/segments.func (s *segmentService) doRecalculation(ctx context.Context, recalcRunID string, filter *RecalcTriggerFilter) error {// existing RecalculateAllSegments body (batch loop + worker pool + processSegment)// with the circuit breaker check (T7.1) at the top of each outer batch iteration:// if val, _ := s.cacheRepo.Get(ctx, consts.SegmentRecalcCircuitBreakerKey); val != "" { ... }// filter applied to FindActiveSegments call via SegmentFilter.CompanySsoIDs / .SegmentIDs}b.
RecalculateAllSegmentsbecomes a thin wrapper:func (s *segmentService) RecalculateAllSegments(ctx context.Context) error {if !s.isRecalcEnabled(ctx) { // T7.2slog.InfoContext(ctx, "segment recalc disabled by feature flag")return nil}recalcRunID := ulid.Make().String()// acquire lock (existing logic) ...defer s.cacheRepo.Del(ctx, segmentRecalcLockKey) //nolint:errcheckreturn s.doRecalculation(ctx, recalcRunID, nil)}c. Implement
TriggerRecalculation— note it deliberately does NOT callisRecalcEnabled; the S2S manual trigger is exempt from the flag (T7.2):func (s *segmentService) TriggerRecalculation(ctx context.Context, filter RecalcTriggerFilter) (string, error) {recalcRunID := ulid.Make().String()acquired, err := s.cacheRepo.SetNX(ctx, segmentRecalcLockKey, recalcRunID, s.cfg.SegmentRecalcLockTTLSeconds)if err != nil || !acquired {return "", ErrRecalcAlreadyRunning}go func() {bgCtx := context.WithoutCancel(ctx)defer s.cacheRepo.Del(bgCtx, segmentRecalcLockKey) //nolint:errcheckif err := s.doRecalculation(bgCtx, recalcRunID, &filter); err != nil {slog.ErrorContext(bgCtx, "trigger recalculation failed", slog.Any("error", err))}}()return recalcRunID, nil}Step 4 — Payload DTOs
In
internal/app/payload/segment_payload.go:type TriggerRecalculationRequest struct {CompanySsoIDs []string `json:"company_sso_ids" validate:"required,min=1"`SegmentIDs []string `json:"segment_ids"`}type TriggerRecalculationResponse struct {RecalcRunID string `json:"recalc_run_id"`}Step 5 — Handler
Add to
internal/app/handler/segment_handler.go:func (h *SegmentHandler) TriggerRecalculation(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error) {var req payload.TriggerRecalculationRequestif err := myhttp.BindJSON(r, &req); err != nil {return myhttp.ResponseBody{}, myhttp.ErrBadRequest("invalid request body")}if len(req.CompanySsoIDs) == 0 {return myhttp.ResponseBody{}, myhttp.ErrBadRequest("company_sso_ids is required")}filter := segmentsvc.RecalcTriggerFilter{CompanySsoIDs: req.CompanySsoIDs,SegmentIDs: req.SegmentIDs,}runID, err := h.segmentService.TriggerRecalculation(r.Context(), filter)if err != nil {switch {case errors.Is(err, segmentsvc.ErrRecalcAlreadyRunning):return myhttp.ResponseBody{}, myhttp.ErrConflict("recalculation already in progress")default:return myhttp.ResponseBody{}, myhttp.ErrInternal()}}resp := payload.TriggerRecalculationResponse{RecalcRunID: runID}return myhttp.NewJSONResponseWithStatus(http.StatusAccepted, resp, nil), nil}Step 6 — Route registration
In
internal/server/rest_router.go, inside the/api/v1/BasicAuth subrouter block, add:r1.With(mymiddleware.BasicAuth).Post("/segments/recalculate", segmentHandler.TriggerRecalculation)(Add before the per-segment
/segments/{segmentID}/customersroute to avoid route conflicts.)Step 7 — Unit tests
Repository (
segment_repository_test.go):FindAllwithCompanySsoIDs = ["co1", "co2"]→ BSON filter includes$inclause; segments from other companies not returned.FindAllwithSegmentIDs = ["id1"]→ filter includes_id: {$in: [ObjectID("id1")]}.- Both filters combined → intersection applied correctly.
Service (
segment_service_test.go):TriggerRecalculation: feature flag disabled → still acquires the lock and proceeds (S2S manual trigger is exempt; assertisRecalcEnabled/the feature flag mock is never invoked by this method).- Lock held →
ErrRecalcAlreadyRunning, no goroutine started. - Valid filter with
SegmentIDs→doRecalculationcalled with correct filter;recalcRunIDreturned matches ULID format. - Valid filter without
SegmentIDs→doRecalculationcalled with emptySegmentIDs. RecalculateAllSegmentscallsdoRecalculationwithnilfilter (all segments).
Handler (
segment_handler_test.go):- Missing body → 400.
- Empty
company_sso_idsarray → 400. ErrRecalcAlreadyRunning→ 409 with message.- Valid request → 202 with
data[0].recalc_run_idnon-empty.
Phase 8: Immediate Recalculation on Write & Readiness Signal
Phase 8 is additive-only — it extends Phase 1–7 source files without rewriting them, following the same TDD discipline. It makes a newly created or rule-edited segment usable within seconds (instead of waiting for the daily cron) and gives API callers an explicit signal of whether a segment's member list has been computed yet.
T8.1 — Scoped recalc on create / rule edit + readiness indicator in GET responses [Jira: —]
-
Purpose: Two coupled deliverables. (1) Fire a best-effort, asynchronous, single-segment recalculation immediately after
CreateSegmentand after a rule-changingUpdateSegment, so members are populated within seconds rather than at the next daily cron. (2) Expose a derivedis_ready/processing_statuspair on the segment detail and list responses so callers can distinguish "segment matches 0 customers" from "segment not computed yet". Both pieces ship together because the readiness signal is what makes the best-effort recalc safe to rely on. -
Scope:
internal/pkg/consts/segment.go— per-segment lock key prefix constant.params/config — new env varSEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS(default 300).internal/app/service/segment/segment_service.go—recalculateSegmentprivate method (scoped, per-segment lock, reusesprocessSegment);processSegmentLockedwrapper acquiring the per-segment lock; wire fire-and-forget calls intoCreateSegmentand rule-changingUpdateSegment.internal/app/service/segment/recalculate.go(or whereverdoRecalculation's worker pool lives, per T7.3) — cron worker callsprocessSegmentLockedso the daily run and an on-write recalc never write the same segment concurrently.internal/app/payload/segment_payload.go—IsReady+ProcessingStatusonSegmentResponse(list item) andSegmentDetailResponse;deriveReadinesshelper; update mappers.internal/app/handler/segment_handler.go— no new handler;GetByID/Update/Listpick up the new fields via the mappers.docs/swagger.yaml— addis_ready/processing_statusto the affected schemas.
-
Depends on: T7.2 (
isRecalcEnabledfeature-flag helper —recalculateSegmentgates on it, same as the cron), T7.3 (processSegment/doRecalculationextraction, distributed-lock usage), T1.4 (CreateSegment/UpdateSegment), T5.1 (SegmentDetailResponse+ mapper), T6.3 (Versionin payloads — same mappers touched). -
Expected Outcome: Creating a segment, or editing its rule set, triggers a single-segment recalc that populates
customer_segment_memberswithin seconds without blocking the HTTP response; the recalc never fails the create/update request; the daily cron remains the backstop. GET detail and list responses includeis_ready(bool) andprocessing_status(pending/ready/stale/failed), derived purely from existing fields (no migration). All listed unit tests pass; affected packages stay ≥ 80% coverage. -
Implementation Plan:
Step 1 — Per-segment lock key + config
In
internal/pkg/consts/segment.go, add:// SegmentRecalcScopedLockKeyPrefix + segmentID forms the per-segment recalc lock key,// e.g. "segment_recalculation_lock:683ab...". Distinct from the global cron lock so the// on-write recalc is never starved by an in-progress daily run.const SegmentRecalcScopedLockKeyPrefix = "segment_recalculation_lock:"In the
params/config struct, add:SegmentRecalcScopedLockTTLSeconds int `env:"SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS" envDefault:"300"`Thread it into the
segmentServiceconfig the same way the otherSegmentRecalc*values are wired in T3.1 / T4.4.Step 2 — Single-writer wrapper + scoped recalc method
In
internal/app/service/segment/segment_service.go(orrecalculate.go):a. Add a wrapper that guarantees a single writer per segment, used by BOTH the cron worker pool and the on-write path:
// processSegmentLocked acquires the per-segment lock, runs processSegment, then releases it.// Returns without processing if the lock is already held (another run owns this segment).func (s *segmentService) processSegmentLocked(ctx context.Context, recalcRunID string, seg reposegment.CustomerSegment) {key := consts.SegmentRecalcScopedLockKeyPrefix + seg.ID.Hex()acquired, err := s.cacheRepo.SetNX(ctx, key, recalcRunID, s.cfg.SegmentRecalcScopedLockTTLSeconds)if err != nil || !acquired {slog.InfoContext(ctx, "segment recalc skipped: per-segment lock held",slog.String("segment_id", seg.ID.Hex()))return}defer s.cacheRepo.Del(ctx, key) //nolint:errchecks.processSegment(ctx, recalcRunID, seg)}Update the
doRecalculationworker pool (T7.3) to callprocessSegmentLockedinstead ofprocessSegmentdirectly, so the daily cron and any on-write recalc serialize per segment (prevents two runs with differentrecalc_run_ids double-writing membership-transition events).b. Add the scoped, best-effort entry point invoked on write:
// recalculateSegment runs a best-effort single-segment recalc. Intended to be called as a// detached goroutine after create / rule edit. Never returns an error to the caller — all// failures are logged and the daily cron is the backstop. Checks cdp_segment_recalc_enabled// (T7.2), same as the cron — the S2S manual trigger (T7.3) is the only path exempt from it.func (s *segmentService) recalculateSegment(ctx context.Context, companySsoID, segmentID string) {if s.memberRepo == nil || s.ruleEngine == nil || s.datamartRepo == nil ||s.cacheRepo == nil || s.txRunner == nil {return // recalc deps not wired (e.g. datamart disabled) — no-op}if !s.isRecalcEnabled(ctx) { // T7.2 — feature flag offreturn}id, err := primitive.ObjectIDFromHex(segmentID)if err != nil {return}seg, err := s.repo.FindByID(ctx, id, companySsoID)if err != nil {slog.WarnContext(ctx, "scoped recalc: segment lookup failed",slog.String("segment_id", segmentID), slog.Any("error", err))return}s.processSegmentLocked(ctx, ulid.Make().String(), seg)}Step 3 — Wire into CreateSegment and UpdateSegment (fire-and-forget)
In
CreateSegment, after a successfulrepo.Insertreturns the newid:go s.recalculateSegment(context.WithoutCancel(ctx), companySsoID, id.Hex())return id, nilIn
UpdateSegment, trigger only when the rule set actually changed — reuse the exact condition that already setsRuleDirtyAt:if ruleSetChanged(existing.RuleSet, ruleSet) {updated.RuleDirtyAt = &now}// ... after repo.Update succeeds:if ruleSetChanged(existing.RuleSet, ruleSet) {go s.recalculateSegment(context.WithoutCancel(ctx), companySsoID, segmentID)}context.WithoutCanceldetaches the goroutine from the request context so the recalc survives after the HTTP response returns; the response itself is never affected by recalc outcome. A name/description-only edit does not setRuleDirtyAtand does not trigger a recalc.Step 4 — Readiness derivation in payloads
In
internal/app/payload/segment_payload.go, add a pure helper:// deriveReadiness computes the membership-readiness signal from persisted fields only// (no new storage). See RFC "Readiness is derived, not stored".func deriveReadiness(seg reposegment.CustomerSegment) (isReady bool, status string) {switch {case seg.LastEvaluatedAt == nil && seg.RecalcErrorCount >= 3:return false, "failed"case seg.LastEvaluatedAt == nil:return false, "pending"case seg.RuleDirtyAt != nil:return true, "stale"default:return true, "ready"}}Add
IsReady bool \json:"is_ready"`andProcessingStatus string `json:"processing_status"`to bothSegmentResponse(list item) andSegmentDetailResponse. InToSegmentResponseandToSegmentDetailResponse, set both fromderiveReadiness(seg). The handler'stoSegmentDetailResponse(SegmentDetails)` (T5.1) and the list mapper pick the fields up automatically.Step 5 — OpenAPI
In
docs/swagger.yaml, add to theSegmentResponseandSegmentDetailResponseschema components:is_ready:type: booleandescription: True once the segment has a computed member snapshot (last_evaluated_at != null).example: trueprocessing_status:type: stringenum: [pending, ready, stale, failed]description: Derived membership-readiness state. pending = first recalc not finished; stale = rule edited since last recalc; failed = 3+ consecutive recalc failures with no prior success.example: readyStep 6 — Tests
Payload (
segment_payload_test.go): table-drivenderiveReadinesscovering all four states at the boundaries —last_evaluated_at == nil& errors < 3 →pending/false;last_evaluated_at == nil& errors ≥ 3 →failed/false; evaluated &rule_dirty_at == nil→ready/true; evaluated &rule_dirty_at != nil→stale/true. Assert the mappers emitis_ready/processing_status.Service (
segment_service_test.go):CreateSegmenthappy path → returns the new id ANDcacheRepo.SetNXis called with key prefixsegment_recalculation_lock:(verify the scoped recalc fired). Use a synchronization point (e.g. a channel-backed mock) since the recalc runs in a goroutine.UpdateSegmentwith a changed rule set → scoped recalc fired; with name/description-only change →SetNXNOT called.recalculateSegment: deps unwired → no-op; feature flag off → no-op (SetNXnot called); per-segment lock already held (SetNXreturns false) →processSegmentnot invoked; happy path →processSegmentinvoked and lock released (Delcalled).processSegmentLocked: lock acquired →processSegmentruns,Delcalled on return; lock held →processSegmentskipped.- Confirm the cron path (
doRecalculation) now routes throughprocessSegmentLocked(existing T4.4/T7.x tests updated to expect the per-segmentSetNX/Delcalls).
Handler (
segment_handler_test.go):GetByID/Listresponses includeis_readyandprocessing_status; assert apendingsegment (mock returnsLastEvaluatedAt == nil) serializes"is_ready": false, "processing_status": "pending".
T8.2 — Fix: gate recalculateNewSegment behind cdp_segment_recalc_enabled [Jira: —]
- Purpose: The shipped
contact-servicecode implements T8.1's scoped on-write recalc asrecalculateNewSegment(internal/app/service/segment/recalculate.go), invoked fromCreateSegmentand rule-changingUpdateSegment(segment_service.go). It never checkscdp_segment_recalc_enabled—isRecalcEnabled(T7.2) has exactly one caller today,RecalculateAllSegments. Per the confirmed design (RFC §2.11 / §3, task breakdown T7.2/T8.1), the scoped recalc is one of the two regular, automatic recalculation paths and must respect the flag exactly like the cron; only the S2S manual triggerTriggerRecalculationis exempt. Today, disabling the flag during a datamart maintenance window silently fails to pause recalculation for every segment created or rule-edited during that window — this closes that gap so the flag's operational guarantee actually holds. - Implementation Plan:
- In
internal/app/service/segment/recalculate.go, inrecalculateNewSegment, add the flag check immediately after the existing nil-dependency guard and before the per-segment lock acquisition (mirrorsRecalculateAllSegmentsin the same file):func (s *segmentService) recalculateNewSegment(ctx context.Context, companySsoID, segmentID string) {if s.memberRepo == nil || s.ruleEngine == nil || s.datamartRepo == nil ||s.distributedLock == nil || s.txRunner == nil {return}if !s.isRecalcEnabled(ctx) {slog.InfoContext(ctx, "segment_service recalculate_new_segment: recalc disabled by feature flag",slog.String("segment_id", segmentID))return}lockKey := segmentRecalcPerSegmentLockPrefix + segmentID// ... unchanged from here - No other call site, wiring, or config changes are needed:
isRecalcEnabledandfeatureFlagServicealready exist onsegmentService(T7.2), andcmd/initializer.goalready injects the samefeatureFlagServiceinstance used byRecalculateAllSegments—recalculateNewSegmentpicks it up for free by calling the existing helper. CreateSegment/UpdateSegment(segment_service.go) need no changes — they already fire-and-forget viago s.recalculateNewSegment(...), and this fix is entirely internal to that goroutine; the create/update response is unaffected either way.- Unit tests (
internal/app/service/segment/recalculate_test.go, alongside the existingTF-3379block):- New
TestRecalculateNewSegment_RecalcDisabledByFeatureFlag: build amockFeatureFlagService(already defined in this test file) returningfalseforconsts.FeatureFlagCDPSegmentRecalc, inject viaWithFeatureFlagService, callrecalculateNewSegmentdirectly, then assert the distributed lock'sTryAcquireand the segment repo'sFindByIDare never called (AssertNotCalled), andffSvc.AssertExpectations(t). Model this onTestRecalculateAllSegments_RecalcDisabledByFeatureFlag(same file, TF-3337 block). - Update
TestRecalculateNewSegment_LockAcquired_ProcessesSegment(and any other happy-path test in the TF-3379 block that doesn't already wire aWithFeatureFlagService) to either inject a flag service returningtrueor leave it nil — nil already defaultsisRecalcEnabledtotrue, so most existing happy-path tests need no change; only confirm none of them break now that the new check runs first.
- New
- Run
go test ./internal/app/service/segment/...and confirm coverage stays ≥ 80% for the package (existing project convention per other T7.x/T8.x tasks).
- In