Skip to main content

rfc-customer-segmentation-basic-attributes-be.task-breakdown

7. Task Breakdown

The implementation is broken into 8 phases with individually completable tasks. Each task follows TDD (Red → Green → Refactor) per AGENTS.md. Mocks (make mocks) and unit tests are implicit deliverables of every task — they are not listed as separate items.


Phase 1: Segment CRUD (Foundation)


T1.1 — Define domain structs [Jira: —]

  • Purpose: Establish the core Go types shared across all layers before any logic is written.
  • Scope: internal/app/repository/segment/CustomerSegment, RuleSet, RuleGroup, RuleCondition structs with bson/json tags; enum constants for status and operator.
  • Expected Outcome: Types compile; JSON and BSON round-trip tests pass; go vet clean.
  • Implementation Plan:
    1. Create internal/app/repository/segment/types.go. Define the following structs:
      type CustomerSegment struct {
      ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
      CompanySsoID string `bson:"company_sso_id" json:"company_sso_id"`
      Name string `bson:"name" json:"name"`
      Description *string `bson:"description,omitempty" json:"description,omitempty"`
      Status string `bson:"status" json:"status"`
      RuleDirtyAt *time.Time `bson:"rule_dirty_at,omitempty" json:"rule_dirty_at,omitempty"`
      RuleSet RuleSet `bson:"rule_set" json:"rule_set"`
      TotalMatched int64 `bson:"total_matched" json:"total_matched"`
      LastEvaluatedAt *time.Time `bson:"last_evaluated_at,omitempty" json:"last_evaluated_at,omitempty"`
      CreatedBy string `bson:"created_by" json:"created_by"`
      UpdatedBy string `bson:"updated_by" json:"updated_by"`
      CreatedAt time.Time `bson:"created_at" json:"created_at"`
      UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
      IsDeleted bool `bson:"is_deleted" json:"is_deleted"`
      RecalcErrorCount int `bson:"recalc_error_count" json:"recalc_error_count"`
      }

      type RuleSet struct {
      Operator string `bson:"operator" json:"operator"` // "AND" | "OR"
      Groups []RuleGroup `bson:"groups" json:"groups"`
      }

      type RuleGroup struct {
      Operator string `bson:"operator" json:"operator"` // "AND" | "OR"
      Conditions []RuleCondition `bson:"conditions" json:"conditions"`
      }

      type RuleCondition struct {
      Field string `bson:"field" json:"field"`
      FieldCategory string `bson:"field_category" json:"field_category"` // "system" | "default" | "custom"
      FieldType string `bson:"field_type" json:"field_type"`
      Operator string `bson:"operator" json:"operator"`
      Value string `bson:"value" json:"value"`
      Value2 *string `bson:"value2,omitempty" json:"value2,omitempty"`
      }
    2. Add constants in the same file:
      const (
      StatusActive = "active"
      StatusArchived = "archived"

      RuleOperatorAND = "AND"
      RuleOperatorOR = "OR"

      FieldCategorySystem = "system" // top-level datamart columns: created_at_date, created_by_user_id
      FieldCategoryDefault = "default" // default_fields JSONB: default + Qontak-managed fields
      FieldCategoryCustom = "custom" // custom_fields JSONB: user-defined fields

      // Full set of condition operators
      ConditionOperatorIs = "is"
      ConditionOperatorIsNot = "is_not"
      ConditionOperatorContains = "contains"
      ConditionOperatorDoesNotContain = "does_not_contain"
      ConditionOperatorStartsWith = "starts_with"
      ConditionOperatorEndsWith = "ends_with"
      ConditionOperatorIsEmpty = "is_empty"
      ConditionOperatorIsNotEmpty = "is_not_empty"
      ConditionOperatorEquals = "equals"
      ConditionOperatorNotEquals = "not_equals"
      ConditionOperatorGreaterThan = "greater_than"
      ConditionOperatorLessThan = "less_than"
      ConditionOperatorBetween = "between"
      ConditionOperatorOn = "on"
      ConditionOperatorNotOn = "not_on"
      ConditionOperatorBefore = "before"
      ConditionOperatorAfter = "after"
      ConditionOperatorContainsAny = "contains_any"
      ConditionOperatorContainsAll = "contains_all"
      )
    3. Create internal/app/repository/segment/types_test.go. Write table-driven tests that marshal/unmarshal a fully-populated CustomerSegment to JSON and BSON, asserting: field names are snake_case, nullable fields omit from output when nil, enum string values match the constants exactly.

T1.2 — MongoDB migration: customer_segments collection [Jira: —]

  • Purpose: Provision the MongoDB collection and indexes required by the CRUD layer before any repository code is deployed.
  • Scope: db/migrations/ — up/down JSON migration files creating the customer_segments collection and four compound indexes.
  • Expected Outcome: Migration runs successfully against a local MongoDB instance; indexes verified via db.customer_segments.getIndexes().
  • Implementation Plan:
    1. Determine the next sequence number from db/migrations/ and create {seq}_create_customer_segments.up.json:
      [
      { "create": "customer_segments" },
      {
      "createIndexes": "customer_segments",
      "indexes": [
      { "key": { "company_sso_id": 1, "status": 1, "is_deleted": 1 }, "name": "idx_cs_company_status" },
      { "key": { "company_sso_id": 1, "name": 1 }, "name": "idx_cs_company_name" },
      { "key": { "company_sso_id": 1, "created_at": -1 }, "name": "idx_cs_company_created" },
      { "key": { "company_sso_id": 1, "created_by": 1 }, "name": "idx_cs_company_owner" }
      ]
      }
      ]
    2. Create {seq}_create_customer_segments.down.json: [{"drop": "customer_segments"}].
    3. Run the migration against a local MongoDB instance and verify all four indexes appear in db.customer_segments.getIndexes().

T1.3 — ISegmentRepository interface + MongoDB implementation [Jira: —]

  • Purpose: Provide the persistence layer for segment documents so the service layer has no direct MongoDB dependency.
  • Scope: internal/app/repository/segment/ — interface definition and MongoDB implementation; mockery-generated mock included.
  • Expected Outcome: All repository methods covered by table-driven unit tests; go test ./internal/app/repository/segment/... passes with ≥ 80% coverage.
  • Implementation Plan:
    1. Create internal/app/repository/segment/segment_repository.go. Define the filter and update structs plus the interface:
      type SegmentFilter struct {
      CompanySsoID string
      Status string // "active" | "archived" | "" (all)
      Search string // partial name match, min 3 chars when non-empty
      CreatedFrom *time.Time
      CreatedTo *time.Time
      CreatedBy string // non-empty when OWNED_ONLY permission
      Page int
      PageSize int
      }

      type CustomerSegmentUpdate struct {
      Name *string
      Description *string
      RuleSet *RuleSet
      UpdatedBy string
      RuleDirtyAt *time.Time // set by service when rule_set changes
      }

      type ISegmentRepository interface {
      FindByID(ctx context.Context, id string) (*CustomerSegment, error)
      FindAll(ctx context.Context, filter SegmentFilter) ([]CustomerSegment, int64, error)
      Insert(ctx context.Context, seg *CustomerSegment) (string, error)
      Update(ctx context.Context, id string, update *CustomerSegmentUpdate) error
      UpdateStatus(ctx context.Context, id, status string) error
      UpdateStats(ctx context.Context, id string, totalMatched int64, lastEvaluatedAt time.Time) error
      ClearRuleDirtyAt(ctx context.Context, id string) error
      IncrementRecalcErrorCount(ctx context.Context, id string) error
      }
    2. Create internal/app/repository/segment/segment_repository_mongo.go. Implement segmentRepository struct wrapping repository.IDbRepo. Constructor: NewSegmentRepository(db repository.IDbRepo) ISegmentRepository. Each method builds a BSON filter and delegates to IDbRepo primitives (FindBy, Create, UpdateOne, etc.).
    3. Run make mocks to generate mocks/MockISegmentRepository.go.
    4. Create internal/app/repository/segment/segment_repository_test.go. Table-driven tests for each method, covering: found, not-found, and DB error cases for reads; success and error for writes.

T1.4 — ISegmentService interface + SegmentService implementation [Jira: —]

  • Purpose: Implement all segment business logic (CreateSegment, GetSegment, ListSegments, UpdateSegment, ArchiveSegment, DuplicateSegment) including rule structure validation and permission evaluation.
  • Scope: internal/app/service/segment/ — interface, service struct, validation helpers; depends on ISegmentRepository (injected).
  • Expected Outcome: Table-driven unit tests cover all happy paths and error cases; go test ./internal/app/service/segment/... passes with ≥ 80% coverage.
  • Implementation Plan:
    1. Create internal/app/service/segment/segment_service_interface.go. Define ISegmentService with: CreateSegment, GetSegment, ListSegments, UpdateSegment, ArchiveSegment, DuplicateSegment (plus PreviewSegment added in T3.3, RecalculateAllSegments in T4.4, GetSegmentDetails in T5.1, GetSegmentsByContact in T5.3).
    2. Create internal/app/service/segment/segment_service.go. Define sentinel errors:
      var (
      ErrSegmentNotFound = errors.New("segment not found")
      ErrSegmentNameConflict = errors.New("segment name already exists")
      ErrConflictStatus = errors.New("operation not valid for current segment status")
      ErrInvalidRule = errors.New("invalid rule")
      ErrPermissionDenied = errors.New("permission denied")
      )
      Define segmentService struct injecting ISegmentRepository. Constructor: NewSegmentService(repo segment.ISegmentRepository) ISegmentService.
    3. Implement CreateSegment:
      • Rule structure validation: len(ruleSet.Groups) > 2ErrInvalidRule; any len(group.Conditions) > 3ErrInvalidRule.
      • Name uniqueness: call repo.FindAll with SegmentFilter{CompanySsoID: companySsoID, Search: name} — return ErrSegmentNameConflict on collision.
      • Call repo.Insert.
    4. Implement GetSegment: repo.FindByID; return ErrSegmentNotFound if nil or is_deleted == true.
    5. Implement ListSegments: build SegmentFilter (status, search ≥3 chars, date range, company_sso_id); set CreatedBy = userSsoID when OWNED_ONLY; delegate to repo.FindAll.
    6. Implement UpdateSegment: same rule validation as create; if OWNED_ONLY, verify segment.CreatedBy == userSsoID; if rule_set changed, set RuleDirtyAt = &now; call repo.Update.
    7. Implement ArchiveSegment: check OWNED_ONLY boundary; verify segment.Status == StatusActive else return ErrConflictStatus; call repo.UpdateStatus(StatusArchived).
    8. Implement DuplicateSegment: load source via FindByID; zero out ID, TotalMatched, LastEvaluatedAt, RuleDirtyAt, RecalcErrorCount, timestamps; prefix Name = "Copy of " + source.Name; call repo.Insert.
    9. Create internal/app/service/segment/segment_service_test.go. Table-driven tests covering all happy paths and every sentinel error: ErrSegmentNotFound, ErrSegmentNameConflict, ErrConflictStatus, ErrInvalidRule, permission-denied cases.

T1.5 — SegmentHandler HTTP handlers [Jira: —]

  • Purpose: Expose segment CRUD over HTTP, translating requests into service calls and service errors into standard API responses.
  • Scope: internal/app/handler/segment_handler.goCreate, List, GetByID, Update, Archive, Duplicate handlers; response via myhttp.HTTPHandler.
  • Expected Outcome: Unit tests cover input validation errors, permission-denied cases, and successful responses; go test ./internal/app/handler/... passes with ≥ 80% coverage.
  • Implementation Plan:
    1. Create internal/app/payload/segment_payload.go. Define DTOs:

      • CreateSegmentRequest: Name required max 60, Description optional max 250, RuleSet required.
      • UpdateSegmentRequest: same shape as create.
      • ListSegmentsQuery: Page (default 1), PerPage (default 20, max 100), Search string, CreatedAtFrom/CreatedAtTo RFC3339, Status string.
      • SegmentResponse: id, name, total_matched, created_by, created_at, status.
      • SegmentListResponse: data []SegmentResponse + meta {total, page, per_page}.
      • SegmentDetailResponse: id, name, description, status, rule_set, total_matched, percentage_reach, last_evaluated_at, reachability {whatsapp {count, percentage}, email {count, percentage}} (omit reachability key for archived segments), created_by, created_at, updated_at.
    2. Create internal/app/handler/segment_handler.go. Define SegmentHandler with segmentService service.ISegmentService. Constructor: NewSegmentHandler(svc service.ISegmentService) *SegmentHandler.

    3. Implement each handler with signature func (h SegmentHandler) Method(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error):

      • Create: bind → validate → extract companySsoID from auth context → svc.CreateSegment201 Created {data: {id}}.
      • List: parse ListSegmentsQuery from URL params → svc.ListSegments200 with data array + pagination meta.
      • GetByID: chi.URLParam(r, "segmentID")svc.GetSegment200.
      • Update: bind → validate → svc.UpdateSegment200.
      • Archive: extract :segmentIDsvc.ArchiveSegment200.
      • Duplicate: extract :segmentIDsvc.DuplicateSegment201 {data: {id}}.
    4. Map service sentinel errors to HTTP responses via mapServiceError. All errors use the myhttp.ErrorResponse envelope (resp_code, resp_desc.id, resp_desc.en, meta):

      Service errorHTTPmyhttp factory
      ErrSegmentNotFound404ErrNotFound()
      ErrSegmentNameConflict409ErrConflictCustomDesc("Segment name already exists", "Segment name already exists")
      ErrInvalidRule422ErrUnprocessableEntity()
      ErrConflictStatus409ErrConflictCustomDesc("Operation not valid for current segment status", "Operation not valid for current segment status")
      ErrPermissionDenied403ErrForbidden()
      other500ErrInternal()
    5. Create internal/app/handler/segment_handler_test.go. Test all handlers: bad payload → 400, permission denied → 403, not found → 404, name conflict → 409, invalid rule → 422, success → 200 with correct resp_code/data shape.


T1.6 — Route registration [Jira: —]

  • Purpose: Make segment endpoints reachable via the API gateway by wiring them into the Chi router.
  • Scope: internal/server/rest_router.go — register all routes under /iag/v1/segments and one private route.
  • Expected Outcome: go build ./... passes; routes verifiable via curl against a locally running server.
  • Implementation Plan:
    1. Open internal/server/rest_router.go. In the iag/v1 subrouter block, add:
      r.Route("/segments", func(r chi.Router) {
      r.Use(IAGMiddleware, ContextLoggerMiddleware)

      // POST /iag/v1/segments permission key: customers_segment_add (ALL_ACCESS)
      r.With(RequirePermission("customers_segment_add", AllAccess)).Post("/", segmentHandler.Create)

      // GET /iag/v1/segments permission key: customers_segment_view
      r.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Get("/", segmentHandler.List)

      // POST /preview permission key: customers_segment_view — handler attached in T3.4
      // (register placeholder route now so the router is aware; attach real handler in T3.4)

      // GET /iag/v1/segments/:segmentID permission key: customers_segment_view
      r.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Get("/{segmentID}", segmentHandler.GetByID)

      // PUT /iag/v1/segments/:segmentID permission key: customers_segment_manage
      r.With(RequirePermission("customers_segment_manage", AllAccess|OwnedOnly)).Put("/{segmentID}", segmentHandler.Update)

      // PATCH /iag/v1/segments/:segmentID/archive permission key: customers_segment_archived
      r.With(RequirePermission("customers_segment_archived", AllAccess|OwnedOnly)).Patch("/{segmentID}/archive", segmentHandler.Archive)

      // POST /iag/v1/segments/:segmentID/duplicate permission key: customers_segment_add
      r.With(RequirePermission("customers_segment_add", AllAccess)).Post("/{segmentID}/duplicate", segmentHandler.Duplicate)
      })
    2. Outside the IAG subrouter, under the BasicAuth middleware group:
      // GET /api/v1/contacts/:contactID/segments — machine-to-machine, handler attached in T5.3
      r.With(BasicAuth).Get("/api/v1/contacts/{contactID}/segments", segmentHandler.GetContactSegments)

      // GET /api/v1/segments/:segmentID/customers — machine-to-machine detailed customer list, handler attached in T5.6
      r.With(BasicAuth).Get("/api/v1/segments/{segmentID}/customers", segmentHandler.GetPrivateSegmentCustomers)
    3. Run go build ./... and confirm the build is clean.

T1.7 — Dependency wiring [Jira: —]

  • Purpose: Connect all Phase 1 components at startup so the service boots end-to-end.
  • Scope: internal/serverenv/ — inject SegmentRepository, SegmentService, SegmentHandler following existing wiring patterns.
  • Expected Outcome: Server starts without panic; smoke test (create + get segment) passes against local MongoDB.
  • Implementation Plan:
    1. Open cmd/initializer.go. Following the existing repo → service → handler layering:
      segmentRepo := segment.NewSegmentRepository(dbRepo)
      segmentService := segmentservice.NewSegmentService(segmentRepo)
      segmentHandler := handler.NewSegmentHandler(segmentService)
    2. Pass segmentHandler to the router/server setup (however existing handlers are threaded — likely a Handlers struct or direct argument to NewRouter).
    3. Start the server locally (./contact-service server). Run a smoke test: POST /iag/v1/segments with a valid body → expect 201 Created with an id; GET /iag/v1/segments/:id with that id → expect 200 OK with the segment data.

T1.8 — Feature flag cdp_segmentation_enabled [Jira: —]

  • Purpose: Gate all IAG segment endpoints behind a per-company feature flag so the feature can be enabled for pilot companies without a code redeploy. Uses the existing FeatureFlagService / feature_flag MongoDB collection pattern.
  • Scope: internal/pkg/consts/feature_flag.go — constant; internal/app/handler/segment_handler.go — flag check helper; internal/server/rest_router.go — inject FeatureFlagService into SegmentHandler and call the check at the top of each IAG handler.
  • Expected Outcome: A company with the flag absent/disabled receives 404 Not Found from every IAG segment endpoint; a company with the flag enabled receives the normal response. Unit tests cover both paths for one representative handler (Create).
  • Implementation Plan:
    1. Open internal/pkg/consts/feature_flag.go. Add:
      FeatureFlagCDPSegmentation = "cdp_segmentation_enabled"
    2. Inject FeatureFlagService into SegmentHandler. Extend the constructor:
      type SegmentHandler struct {
      segmentService service.ISegmentService
      featureFlags service.FeatureFlagService // existing type from internal/app/service/feature_flag.go
      }

      func NewSegmentHandler(svc service.ISegmentService, ff service.FeatureFlagService) *SegmentHandler {
      return &SegmentHandler{segmentService: svc, featureFlags: ff}
      }
    3. Add a private helper on SegmentHandler:
      // requireFeatureEnabled returns a non-nil error response when cdp_segmentation_enabled is
      // disabled for the company. Callers return immediately when this returns non-nil.
      func (h *SegmentHandler) requireFeatureEnabled(ctx context.Context, companySsoID string) *myhttp.ResponseBody {
      if !h.featureFlags.FeatureEnabled(ctx, consts.FeatureFlagCDPSegmentation, companySsoID) {
      resp := myhttp.ErrNotFound()
      return &resp
      }
      return nil
      }
    4. At the top of every IAG segment handler method (Create, List, GetByID, Update, Archive, Duplicate, Preview, GetSegmentCustomers), add:
      if errResp := h.requireFeatureEnabled(r.Context(), companySsoID); errResp != nil {
      return *errResp, nil
      }
      S2S endpoints (GetContactSegments, GetPrivateSegmentCustomers) are not gated — they are always accessible to internal services.
    5. In cmd/initializer.go, pass the existing featureFlagService to NewSegmentHandler.
    6. Unit tests for Create handler: flag disabled → 404 Not Found without calling the service; flag enabled → handler proceeds normally. Repeat pattern is sufficient — the single helper is exercised by all handlers.

Phase 2: Rule Engine


T2.1 — RuleEngine interface + package skeleton [Jira: —]

  • Purpose: Define the public contract for query building so the service and future callers depend on an interface, not a concrete type.
  • Scope: internal/pkg/segment_engine/ — package directory, IRuleEngine interface, sentinel errors.
  • Expected Outcome: Package compiles; interface is importable from the service layer.
  • Implementation Plan:
    1. Create internal/pkg/segment_engine/ directory.
    2. Create internal/pkg/segment_engine/engine.go:
      type IRuleEngine interface {
      BuildQuery(ruleSet RuleSet, companySsoID string) (sql string, args []any, err error)
      }

      var ErrInvalidRule = errors.New("invalid rule")

      func ErrInvalidRuleField(field string) error {
      return fmt.Errorf("%w: unrecognised field %q", ErrInvalidRule, field)
      }
      func ErrUnsupportedOperator(op, fieldType string) error {
      return fmt.Errorf("%w: operator %q not supported for field_type %q", ErrInvalidRule, op, fieldType)
      }
      Handlers check these with errors.Is(err, ErrInvalidRule) to return HTTP 422.
    3. Run make mocks to generate mocks/MockIRuleEngine.go for service-layer tests (T1.4, T3.3, T4.4).

T2.2 — BuildQuery full implementation [Jira: —]

  • Purpose: Translate a RuleSet DSL into a safe, parameterised PostgreSQL WHERE clause covering all supported field types, operators, and JSONB access patterns.
  • Scope: internal/pkg/segment_engine/ — full BuildQuery implementation with field resolver and operator dispatch.
  • Expected Outcome: Table-driven tests cover every operator × field-type combination; injection-defence test passes; go test ./internal/pkg/segment_engine/... ≥ 90% coverage (higher bar due to security criticality).
  • Implementation Plan:
    1. Create internal/pkg/segment_engine/engine_impl.go. Define ruleEngine struct (stateless) and NewRuleEngine() IRuleEngine.

    2. Implement buildFieldExpr(field, category, fieldType string) (string, error) using this complete mapping:

      field_categoryfield_typeSQL expression
      systemanyfield (bare column, e.g. created_at_date)
      defaultsingle_line_text, multi_line_text, dropdown, urldefault_fields->>'field'
      defaultnumber(default_fields->>'field')::numeric
      defaultdate(default_fields->>'field')::date
      defaultboolean(default_fields->>'field')::boolean
      defaultfile, signature, gpsdefault_fields->>'field' (presence-only)
      customsingle_line_text, multi_line_text, dropdown, urlcustom_fields->>'field'
      customnumber(custom_fields->>'field')::numeric
      customdate(custom_fields->>'field')::date
      customfile, signature, gpscustom_fields->>'field' (presence-only)

      Unknown field_category or a field key not in the registered allow-list → return ErrInvalidRuleField.

    3. Implement buildConditionSQL(cond RuleCondition, argOffset int) (clause string, args []any, err error) using this complete operator → SQL mapping:

      OperatorApplies toSQL templateArgs consumed
      istext, dropdown{field} = $Nvalue
      is_nottext, dropdown{field} != $Nvalue
      containstext{field} ILIKE $N (wrap: %value%)value
      does_not_containtext{field} NOT ILIKE $N (wrap: %value%)value
      starts_withtext{field} ILIKE $N (append: value%)value
      ends_withtext{field} ILIKE $N (prepend: %value)value
      is_emptytext, number, date, boolean (JSONB)({field} IS NULL OR {field} = '')none
      is_not_emptytext, number, date, boolean (JSONB)({field} IS NOT NULL AND {field} != '')none
      is_emptyfile, signature, gps({field} IS NULL)none
      is_not_emptyfile, signature, gps({field} IS NOT NULL)none
      is_emptysystem (top-level column){field} IS NULLnone
      is_not_emptysystem (top-level column){field} IS NOT NULLnone
      equalsnumber, boolean{field} = $Nvalue
      not_equalsnumber{field} != $Nvalue
      greater_thannumber{field} > $Nvalue
      less_thannumber{field} < $Nvalue
      betweennumber, date{field} BETWEEN $N AND $Mvalue, value2
      ondateDATE({field}) = $Nvalue
      not_ondateDATE({field}) != $Nvalue
      beforedate{field} < $Nvalue
      afterdate{field} > $Nvalue
      contains_anymulti-select{field} && ARRAY[$N]::text[]value
      contains_allmulti-select{field} @> ARRAY[$N]::text[]value

      Reject: file/signature/gps with any operator other than is_empty/is_not_emptyErrUnsupportedOperator. Validate cond.Field against a static allow-list of registered field keys before building the expression (prevents SQL injection via crafted field names).

    4. Implement buildGroupSQL(group RuleGroup, argOffset int) (string, []any, error): join condition clauses with the group's AND/OR operator, accumulating argOffset across conditions.

    5. Implement BuildQuery: prepend WHERE company_sso_id = $1 (arg: companySsoID, arg index starts at 2 for the first condition); join groups with the top-level operator; return the final SQL and args slice.

      Example for rule (source is 'WhatsApp') AND (annual_revenue > 100000000) AND (created_at < '2026-01-01') expressed as two groups:

      WHERE company_sso_id = $1
      AND (
      (default_fields->>'source' = $2 AND (custom_fields->>'annual_revenue')::numeric > $3)
      AND
      (created_at < $4)
      )

      args: ['company_abc', 'WhatsApp', 100000000, '2026-01-01T00:00:00Z']

    6. Create internal/pkg/segment_engine/engine_test.go. Table-driven matrix covering every operator × field_type cell. Add a dedicated injection test: pass "'; DROP TABLE customer_datamart; --" as field → must return ErrInvalidRuleField; returned SQL must be empty; the injected string must not appear anywhere in the returned SQL or args.


Phase 3: Datamart Integration & Preview


T3.1 — DATAMART_DSN configuration [Jira: —]

  • Purpose: Allow the datamart PostgreSQL connection string to be supplied at deploy time without touching code.
  • Scope: params/ config struct; serverenv pgx pool initialisation.
  • Expected Outcome: Application starts without the env var (nil pool); starts correctly when set; config tests cover both states.
  • Implementation Plan:
    1. Open the params/ config struct file. Add these fields:
      DatamartDSN string `env:"DATAMART_DSN" envDefault:""`
      SegmentRecalcWorkers int `env:"SEGMENT_RECALC_WORKERS" envDefault:"4"`
      SegmentRecalcBatchSize int `env:"SEGMENT_RECALC_BATCH_SIZE" envDefault:"100"`
      SegmentRecalcLockTTLSeconds int `env:"SEGMENT_RECALC_LOCK_TTL_SECONDS" envDefault:"7200"`
      SegmentRecalcLockRenewIntervalSeconds int `env:"SEGMENT_RECALC_LOCK_RENEW_INTERVAL_SECONDS" envDefault:"600"`
      SegmentPreviewRateLimit int `env:"SEGMENT_PREVIEW_RATE_LIMIT" envDefault:"10"`
      SegmentPreviewMaxRows int `env:"SEGMENT_PREVIEW_MAX_ROWS" envDefault:"10000"`
      // Timeout configuration
      DatamartQueryTimeoutSeconds int `env:"DATAMART_QUERY_TIMEOUT_SECONDS" envDefault:"30"`
      DatamartRecalcQueryTimeoutSeconds int `env:"DATAMART_RECALC_QUERY_TIMEOUT_SECONDS" envDefault:"60"`
      MongoOperationTimeoutSeconds int `env:"MONGODB_OPERATION_TIMEOUT_SECONDS" envDefault:"30"`
      KafkaPublishTimeoutSeconds int `env:"KAFKA_PUBLISH_TIMEOUT_SECONDS" envDefault:"10"`
      RedisOperationTimeoutSeconds int `env:"REDIS_OPERATION_TIMEOUT_SECONDS" envDefault:"5"`
      // Kafka retry configuration
      SegmentKafkaPublishMaxRetries int `env:"SEGMENT_KAFKA_PUBLISH_MAX_RETRIES" envDefault:"2"`
      SegmentKafkaPublishRetryDelayMs int `env:"SEGMENT_KAFKA_PUBLISH_RETRY_DELAY_MS" envDefault:"500"`
    2. In cmd/initializer.go, after existing DB initialisations:
      var datamartPool *pgxpool.Pool
      if cfg.DatamartDSN != "" {
      datamartPool, err = pgxpool.New(ctx, cfg.DatamartDSN)
      if err != nil { log.Fatal(err) }
      }
    3. Define var ErrDatamartUnavailable = errors.New("datamart unavailable") in the datamart repository package. All repository methods guard if pool == nil { return ..., ErrDatamartUnavailable }.
    4. Config tests: env var absent → pool is nil; env var set to a valid DSN → pool is non-nil.

T3.2 — IDatamartRepository interface + SQL implementation [Jira: —]

  • Purpose: Encapsulate all datamart query execution behind an interface so the service layer stays free of SQL.
  • Scope: internal/app/repository/segment_datamart/ — interface, pgx/v5 implementation, mockery mock, pgxmock unit tests (not go-sqlmock, which would introduce a parallel SQL driver).
  • Expected Outcome: All methods tested; go test ./internal/app/repository/segment_datamart/... ≥ 80% coverage.
  • Implementation Plan:
    1. Create internal/app/repository/segment_datamart/datamart_repository.go:

      type SampleContact struct {
      ContactID string `json:"contact_id"`
      Name string `json:"name"`
      Source string `json:"source"`
      LastActivity *time.Time `json:"last_activity,omitempty"` // from MongoDB activity_logs; null when no activity
      AddedBy string `json:"added_by"` // display name resolved from created_by_user_id
      AddedAt *time.Time `json:"added_at,omitempty"` // created_at_date from datamart
      }

      type IDatamartRepository interface {
      // whereSQL is a parameterised WHERE fragment from the rule engine (no leading WHERE keyword).
      // args[0] is always company_sso_id; rule conditions start at args[1].
      //
      // Count returns total rows matching whereSQL and the company-wide total (totalBase).
      // Used for total_matched and total_customer_base; never applies OWNED_ONLY so the count is always company-wide.
      Count(ctx context.Context, companySsoID, whereSQL string, args []any) (total int64, totalBase int64, err error)
      // Sample fetches one page of matching rows using offset pagination. The whereSQL passed here
      // may carry an extra OWNED_ONLY condition (AND created_by_user_id = $N) appended by the service layer.
      // offset = (page-1) * perPage; LIMIT perPage OFFSET offset applied in the query.
      Sample(ctx context.Context, companySsoID, whereSQL string, args []any, page, perPage int) ([]SampleContact, error)
      FetchContactIDs(ctx context.Context, companySsoID, whereSQL string, args []any) ([]string, error)
      CountWithPhone(ctx context.Context, companySsoID string, contactIDs []string) (int64, error)
      CountWithEmail(ctx context.Context, companySsoID string, contactIDs []string) (int64, error)
      }
    2. Create internal/app/repository/segment_datamart/datamart_repository_pgx.go. Implement datamartRepository wrapping *pgxpool.Pool. Constructor: NewDatamartRepository(pool *pgxpool.Pool) IDatamartRepository. Every method: if pool == nil { return ..., ErrDatamartUnavailable }.

      Timeout note: The repository methods accept a ctx context.Context that already carries a deadline set by the caller (service layer). Do not set a second deadline inside the repository — the caller is responsible for context.WithTimeout using the configured timeout value. This keeps timeout policy in one place (the service layer).

    3. Count — two queries using pgx/v5 pool.QueryRow:

      -- total matched (company-wide, no OWNED_ONLY filter)
      SELECT COUNT(*) FROM customer_datamart WHERE company_sso_id=$1 AND ({whereSQL})
      -- total customer base (denominator for percentage_reach display)
      SELECT COUNT(*) FROM customer_datamart WHERE company_sso_id=$1
    4. Sample — one query using offset pagination; the whereSQL argument may already include an OWNED_ONLY clause appended by the service:

      SELECT customer_id,
      default_fields->>'name',
      default_fields->>'source',
      created_by_user_id,
      created_at_date
      FROM customer_datamart
      WHERE company_sso_id=$1 AND ({whereSQL})
      ORDER BY customer_id
      LIMIT $N OFFSET $M

      LIMIT = perPage, OFFSET = (page-1) * perPage. After fetching raw rows, resolve added_by display names via the existing user lookup helper and enrich last_activity from MongoDB activity_logs. Both are best-effort — failure yields empty string / nil rather than failing the whole call.

    5. FetchContactIDs:

      SELECT customer_id FROM customer_datamart WHERE company_sso_id=$1 AND ({whereSQL})
    6. CountWithPhone:

      SELECT COUNT(*) FROM customer_datamart
      WHERE company_sso_id=$1 AND customer_id = ANY($2)
      AND default_fields->'chat_data' IS NOT NULL
      AND default_fields->>'phone' IS NOT NULL

      Before returning the count, check whether any member has a non-null chat_data sub-object in default_fields:

      SELECT EXISTS (
      SELECT 1 FROM customer_datamart
      WHERE company_sso_id=$1 AND customer_id = ANY($2)
      AND default_fields->'chat_data' IS NOT NULL
      )

      If no member has a chat_data sub-object populated (CDC backfill not yet complete), return 0, ErrDatamartUnavailable. The caller must display null reachability rather than present 0 as authoritative.

    7. CountWithEmail:

      SELECT COUNT(*) FROM customer_datamart
      WHERE company_sso_id=$1 AND customer_id = ANY($2)
      AND default_fields->>'email' IS NOT NULL
    8. Run make mocks. Write datamart_repository_test.go using pgxmock:

      • Count: success (returns total + totalBase), empty result (both 0), nil pool → ErrDatamartUnavailable, query error.
      • Sample: success page 1 (verify added_at and added_by populated from created_at_date/created_by_user_id, correct LIMIT/OFFSET applied), page 2 (OFFSET advances correctly), last_activity null when no activity log found, OWNED_ONLY clause in whereSQL (extra arg appended), empty result, nil pool → ErrDatamartUnavailable, general query error.
      • chat_data sub-object absent for all members → CountWithPhone returns ErrDatamartUnavailable.

T3.3 — PreviewSegment service method [Jira: —]

  • Purpose: Give frontend users real-time feedback on how many contacts a rule set would match before saving. The rule is provided directly in the request — no existing segment is required.
  • Scope: internal/app/service/segment/PreviewSegment method; rule validation + rule engine + datamart delegation.
  • Expected Outcome: Unit tests cover valid rule, engine error, datamart error, empty result; coverage remains ≥ 80%.
  • Implementation Plan:
    1. Add to ISegmentService:
      PreviewSegment(ctx context.Context, ruleSet repository.RuleSet, companySsoID, userSsoID string, ownedOnly bool) (*PreviewResult, error)
    2. Inject IRuleEngine and IDatamartRepository into segmentService (extend constructor; these can be nil-safe until T3 wiring in T3.1). ISegmentRepository is not needed here — preview is stateless.
    3. Define in internal/app/payload/segment_payload.go:
      type PreviewResult struct {
      TotalMatched int64 `json:"total_matched"`
      TotalCustomerBase int64 `json:"total_customer_base"`
      IsTruncated bool `json:"is_truncated"` // true when total_matched > SEGMENT_PREVIEW_MAX_ROWS
      Sample []SampleContact `json:"sample"` // capped at SEGMENT_PREVIEW_MAX_ROWS
      }
    4. Implement PreviewSegment:
      • Validate rule structure: len(ruleSet.Groups) > 2 or any len(group.Conditions) > 3ErrInvalidRule.
      • ruleEngine.BuildQuery(ruleSet, companySsoID)baseSQL, baseArgs — propagate ErrInvalidRule.
      • Wrap the datamart calls with a timeout from config:
        dmCtx, dmCancel := context.WithTimeout(ctx, time.Duration(cfg.DatamartQueryTimeoutSeconds)*time.Second)
        defer dmCancel()
        Use dmCtx for both Count and Sample calls below.
      • datamartRepo.Count(dmCtx, companySsoID, baseSQL, baseArgs)total, totalBase — propagate ErrDatamartUnavailable and context.DeadlineExceeded (surface as ErrDatamartUnavailable → HTTP 503). Always company-wide regardless of ownedOnly.
      • Build sample query: if ownedOnly, append AND created_by_user_id = $N to baseSQL and append userSsoID to baseArgssampleSQL, sampleArgs; otherwise sampleSQL, sampleArgs = baseSQL, baseArgs.
      • datamartRepo.Sample(dmCtx, companySsoID, sampleSQL, sampleArgs, 1, cfg.SegmentPreviewMaxRows)sample — propagate ErrDatamartUnavailable.
      • isTruncated = total > int64(cfg.SegmentPreviewMaxRows).
      • Return PreviewResult{TotalMatched: total, TotalCustomerBase: totalBase, IsTruncated: isTruncated, Sample: sample}.
    5. Unit tests (mock engine + mock datamart):
      • Valid rule, no OWNED_ONLY → Count called with baseArgs, Sample called with page=1, perPage=SEGMENT_PREVIEW_MAX_ROWS and same baseArgs.
      • Valid rule, OWNED_ONLY → Count called with baseArgs (no userSsoID), Sample called with baseArgs + userSsoID appended.
      • total > SEGMENT_PREVIEW_MAX_ROWSIsTruncated = true, TotalMatched reflects real count.
      • Engine returns ErrInvalidRule → propagated (also covers rule struct validation failures).
      • Count returns ErrDatamartUnavailable → propagated.
      • Sample returns ErrDatamartUnavailable → propagated.
      • Zero total → empty Sample, IsTruncated = false.
      • Verify sample contacts contain last_activity, added_by, added_at.

T3.4 — POST /iag/v1/segments/preview handler [Jira: —]

  • Purpose: Expose preview over HTTP with a per-company rate limit (10 req/min) to protect the datamart. The rule is provided in the request body — no saved segment ID is involved.
  • Scope: internal/app/handler/segment_handler.goPreview handler; rate limiter; route wired.
  • Expected Outcome: Unit tests cover 429, 503, 422 (invalid rule), 200 (is_truncated: true), 200 happy-path (with and without OWNED_ONLY).
  • Implementation Plan:
    1. Define in internal/app/payload/segment_payload.go:
      type PreviewSegmentRequest struct {
      RuleSet repository.RuleSet `json:"rule_set" validate:"required"`
      }
    2. Add Preview(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error) to SegmentHandler:
      • Bind PreviewSegmentRequest from request body — return 400 if missing or malformed.
      • Extract companySsoID, userSsoID, and ownedOnly (whether view_customer == OWNED_ONLY) from auth context.
      • Check Redis rate limiter (existing rate_limiter pattern): key preview_rate:{companySsoID}, window 60 s, limit from cfg.SegmentPreviewRateLimit (default 10). On breach, return myhttp.ErrTooManyRequests("Preview limit of 10 requests per minute exceeded") → HTTP 429.
      • Call svc.PreviewSegment(ctx, req.RuleSet, companySsoID, userSsoID, ownedOnly); map errors:
        • ErrInvalidRulemyhttp.ErrUnprocessableEntity() → HTTP 422
        • ErrDatamartUnavailablemyhttp.NewErrorResponse(503, "Segment preview is temporarily unavailable", "Segment preview is temporarily unavailable") → HTTP 503
      • Build response (no pagination wrapper):
        {
        "data": [{
        "total_matched": 2500,
        "total_customer_base": 11700,
        "is_truncated": false,
        "sample": [{ "contact_id", "name", "source", "last_activity", "added_by", "added_at" }]
        }]
        }
    3. In rest_router.go inside the /iag/v1/segments block (stubbed in T1.6), attach: r.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Post("/preview", segmentHandler.Preview).
    4. Inject cfg (for SegmentPreviewRateLimit) and the rate-limiter service into SegmentHandler constructor.
    5. Unit tests: 400 on missing/malformed body, 422 on ErrInvalidRule from rule engine, 429 on rate-limit exceeded, 503 on ErrDatamartUnavailable, is_truncated: true when total_matched > SEGMENT_PREVIEW_MAX_ROWS, OWNED_ONLY (verify service called with ownedOnly=true), 200 happy-path verifying all sample fields present, empty rule match (total=0, sample=[]).

Phase 4: Daily Recalculation Cron


T4.1 — CustomerSegmentMember struct + customer_segment_members migration [Jira: —]

  • Purpose: Provision the membership store that tracks which contacts belong to which segment after each recalculation run.
  • Scope: internal/app/repository/segment_member/ — struct with tags; db/migrations/ — collection + indexes.
  • Expected Outcome: Migration applies; indexes verified; struct round-trip tests pass.
  • Implementation Plan:
    1. Create internal/app/repository/segment_member/types.go:
      type CustomerSegmentMember struct {
      ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
      SegmentID string `bson:"segment_id" json:"segment_id"`
      CompanySsoID string `bson:"company_sso_id" json:"company_sso_id"`
      ContactID string `bson:"contact_id" json:"contact_id"`
      AddedAt time.Time `bson:"added_at" json:"added_at"`
      RecalcRunID string `bson:"recalc_run_id" json:"recalc_run_id"`
      }
    2. Create db/migrations/{seq}_create_customer_segment_members.up.json:
      [
      { "create": "customer_segment_members" },
      {
      "createIndexes": "customer_segment_members",
      "indexes": [
      { "key": { "segment_id": 1, "company_sso_id": 1 }, "name": "idx_csm_segment_company" },
      { "key": { "contact_id": 1, "company_sso_id": 1 }, "name": "idx_csm_contact_company" },
      { "key": { "segment_id": 1, "contact_id": 1 }, "name": "idx_csm_segment_contact", "unique": true }
      ]
      }
      ]
    3. Create .down.json: [{"drop": "customer_segment_members"}].
    4. Write types_test.go — round-trip asserting BSON snake_case and recalc_run_id is preserved.

T4.2 — ISegmentMemberRepository interface + MongoDB implementation [Jira: —]

  • Purpose: Provide atomic member-list replacement and membership lookup operations for the cron and service layers.
  • Scope: internal/app/repository/segment_member/ — interface, MongoDB implementation using bulk write, mock, unit tests.
  • Expected Outcome: ReplaceMembers atomicity tested; InsertEvents bulk tested; ≥ 80% coverage.
  • Implementation Plan:
    1. Create internal/app/repository/segment_member/segment_member_repository.go:
      type ISegmentMemberRepository interface {
      // sessCtx is a MongoDB session context — the caller (service) owns the transaction boundary.
      ReplaceMembers(ctx context.Context, sessCtx mongo.SessionContext, segmentID, companyID, recalcRunID string, contactIDs []string) error
      InsertEvents(ctx context.Context, sessCtx mongo.SessionContext, events []SegmentMembershipEvent) error
      FindBySegmentID(ctx context.Context, segmentID, companyID string, page, pageSize int) ([]CustomerSegmentMember, int64, error)
      FindByContactID(ctx context.Context, contactID, companyID string) ([]CustomerSegmentMember, error)
      CountBySegmentID(ctx context.Context, segmentID, companyID string) (int64, error)
      // CountWithPhone and CountWithEmail delegate to the injected IDatamartRepository
      CountWithPhone(ctx context.Context, segmentID, companyID string) (int64, error)
      CountWithEmail(ctx context.Context, segmentID, companyID string) (int64, error)
      }
    2. Create segment_member_repository_mongo.go. Implement segmentMemberRepository wrapping IDbRepo and an injected IDatamartRepository.
      • ReplaceMembers executes within sessCtx: (1) DeleteMany all docs matching {segment_id, company_sso_id}, (2) InsertMany new docs each carrying recalc_run_id and added_at = time.Now().
      • InsertEvents bulk-inserts via InsertMany within sessCtx.
    3. Run make mocks.
    4. Unit tests: ReplaceMembers with empty new set (deletes all, inserts nothing); non-empty set (deletes old, inserts new with recalc_run_id); InsertEvents bulk; all lookup/count methods — empty-result and error paths.

T4.3 — customer_segment_membership_events struct + migration [Jira: —]

  • Purpose: Provision the append-only audit log recording every membership transition (who entered/left each segment).
  • Scope: internal/app/repository/segment_member/SegmentMembershipEvent struct; db/migrations/ — collection, indexes, 90-day TTL index.
  • Expected Outcome: Migration applies; TTL index confirmed; struct round-trip tests pass.
  • Implementation Plan:
    1. Add to internal/app/repository/segment_member/types.go:
      const (
      MembershipEventAdded = "added"
      MembershipEventRemoved = "removed"
      )

      type SegmentMembershipEvent struct {
      ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
      SegmentID string `bson:"segment_id" json:"segment_id"`
      CompanySsoID string `bson:"company_sso_id" json:"company_sso_id"`
      ContactID string `bson:"contact_id" json:"contact_id"`
      Event string `bson:"event" json:"event"` // "added" | "removed"
      OccurredAt time.Time `bson:"occurred_at" json:"occurred_at"`
      RecalcRunID string `bson:"recalc_run_id" json:"recalc_run_id"`
      }
    2. Create db/migrations/{seq}_create_customer_segment_membership_events.up.json:
      [
      { "create": "customer_segment_membership_events" },
      {
      "createIndexes": "customer_segment_membership_events",
      "indexes": [
      { "key": { "segment_id": 1, "company_sso_id": 1, "occurred_at": -1 }, "name": "idx_csme_segment_company_date" },
      { "key": { "contact_id": 1, "company_sso_id": 1, "occurred_at": -1 }, "name": "idx_csme_contact_company_date" },
      { "key": { "occurred_at": 1 }, "name": "ttl_90d", "expireAfterSeconds": 7776000 }
      ]
      }
      ]
      (7776000 = 90 × 24 × 3600 seconds)
    3. Create .down.json: [{"drop": "customer_segment_membership_events"}].
    4. Run migration; verify TTL index via db.customer_segment_membership_events.getIndexes().
    5. Round-trip test: both "added" and "removed" marshal correctly; BSON field names are snake_case.

T4.4 — RecalculateAllSegments service method with diff + audit [Jira: —]

  • Purpose: Implement the daily recalculation: evaluate every active segment against the datamart, update member lists, write audit trail of who entered/left each segment.
  • Scope: internal/app/service/segment/RecalculateAllSegments with Redis distributed lock, heartbeat goroutine, worker pool, diff computation, MongoDB session transaction, poison-segment skip.
  • Expected Outcome: All unit test cases listed pass; ≥ 80% coverage.
  • Implementation Plan:
    1. Add RecalculateAllSegments(ctx context.Context) error to ISegmentService. Extend segmentService constructor to accept ISegmentMemberRepository, IDatamartRepository, IRuleEngine, ICacheRepo, ISegmentEventPublisher (defined in T4.5), and config.
    2. Implement method body:
      recalcRunID := ulid.Make().String() // github.com/oklog/ulid/v2 — generated once per invocation

      // Acquire Redis distributed lock (key: "segment_recalculation_lock")
      // TTL from cfg.SegmentRecalcLockTTLSeconds (default 7200 s)
      acquired := cacheRepo.SetNX("segment_recalculation_lock", recalcRunID, cfg.SegmentRecalcLockTTLSeconds)
      if !acquired {
      slog.InfoContext(ctx, "recalc skipped: lock held")
      return nil
      }
      defer cacheRepo.Del("segment_recalculation_lock")

      // Heartbeat: extend TTL by 30 min every cfg.SegmentRecalcLockRenewIntervalSeconds (default 600 s)
      hbCtx, hbCancel := context.WithCancel(ctx)
      defer hbCancel()
      go func() {
      ticker := time.NewTicker(time.Duration(cfg.SegmentRecalcLockRenewIntervalSeconds) * time.Second)
      defer ticker.Stop()
      for {
      select {
      case <-ticker.C:
      cacheRepo.Expire("segment_recalculation_lock", 1800) // extend +30 min
      case <-hbCtx.Done():
      return
      }
      }
      }()

      // Page through active segments in batches of cfg.SegmentRecalcBatchSize (default 100)
      // Worker pool: sem := make(chan struct{}, cfg.SegmentRecalcWorkers) (default 4)
      // For each segment: sem <- struct{}{}; wg.Add(1); go func(seg) { defer wg.Done(); defer <-sem; processSegment(...) }()
      // wg.Wait() before returning
    3. Implement processSegment(ctx context.Context, recalcRunID string, seg CustomerSegment):
      • Poison skip: if seg.RecalcErrorCount >= 3 { slog.WarnContext(...); return }.
      • sql, args, err := ruleEngine.BuildQuery(seg.RuleSet, seg.CompanySsoID) — on error go to error handler.
      • Wrap datamart call with a timeout from config:
        dmCtx, dmCancel := context.WithTimeout(ctx, time.Duration(cfg.DatamartRecalcQueryTimeoutSeconds)*time.Second)
        defer dmCancel()
        newIDs, err := datamartRepo.FetchContactIDs(dmCtx, seg.CompanySsoID, sql, args)
        On error (including context.DeadlineExceeded) go to error handler.
      • oldMembers, _, _ := memberRepo.FindBySegmentID(ctx, seg.ID, seg.CompanySsoID, 0, 0) — extract oldIDs.
      • Compute set difference: added = newIDs − oldIDs, removed = oldIDs − newIDs.
      • Open MongoDB session transaction with a timeout:
        txCtx, txCancel := context.WithTimeout(ctx, time.Duration(cfg.MongoOperationTimeoutSeconds)*time.Second)
        defer txCancel()
        session, _ := mongoClient.StartSession()
        defer session.EndSession(ctx)
        session.WithTransaction(txCtx, func(sessCtx mongo.SessionContext) (interface{}, error) {
        if err := memberRepo.ReplaceMembers(sessCtx, seg.ID, seg.CompanySsoID, recalcRunID, newIDs); err != nil {
        return nil, err
        }
        if len(added) > 0 || len(removed) > 0 {
        events := buildMembershipEvents(recalcRunID, seg, added, removed) // creates []SegmentMembershipEvent
        return nil, memberRepo.InsertEvents(sessCtx, events)
        }
        return nil, nil
        })
      • On transaction success: segmentRepo.UpdateStats(ctx, seg.ID, int64(len(newIDs)), time.Now()) + segmentRepo.ClearRuleDirtyAt(ctx, seg.ID).
      • After transaction success, if len(added) > 0 || len(removed) > 0: build one SegmentRecalculatedEvent{TotalAdded: len(added), TotalRemoved: len(removed), TotalMatched: len(newIDs)}; call eventPublisher.Publish(ctx, event) (implementation handles per-attempt timeout + retries internally — see T4.5). If publish returns an error after all retries: slog.ErrorContext(ctx, "segment event publish failed", ...) + increment segment.event.publish_error{segment_id} metric; do not roll back or fail the recalc.
      • Also wrap the Redis lock acquire with a timeout:
        redisCtx, redisCancel := context.WithTimeout(ctx, time.Duration(cfg.RedisOperationTimeoutSeconds)*time.Second)
        defer redisCancel()
        acquired := cacheRepo.SetNX(redisCtx, "segment_recalculation_lock", recalcRunID, cfg.SegmentRecalcLockTTLSeconds)
        And each heartbeat extend:
        extCtx, extCancel := context.WithTimeout(context.Background(), time.Duration(cfg.RedisOperationTimeoutSeconds)*time.Second)
        cacheRepo.Expire(extCtx, "segment_recalculation_lock", 1800)
        extCancel()
      • On any error: segmentRepo.IncrementRecalcErrorCount(ctx, seg.ID) + slog.ErrorContext(ctx, "segment recalc failed", slog.String("segment_id", seg.ID), slog.Any("error", err)).
    4. Unit tests (mock all deps):
      • Lock held → no segments processed, returns nil.
      • Empty diff → InsertEvents not called, UpdateStats called, Publish not called.
      • Non-empty diff → Publish called once with EventType = "segment.recalculated", TotalAdded = len(added), TotalRemoved = len(removed), TotalMatched = len(newIDs).
      • InsertEvents error → transaction rolled back, ReplaceMembers not persisted, Publish not called.
      • Publish returns error → recalc not rolled back, UpdateStats already called, error logged + metric incremented.
      • RecalcErrorCount == 3 → segment skipped.
      • Successful run → UpdateStats + ClearRuleDirtyAt called.

T4.5 — ISegmentEventPublisher interface + Kafka implementation [Jira: —]

  • Purpose: Decouple the recalculation service from Kafka so the publisher can be stubbed in tests and swapped independently. Each event notifies subscribers of the aggregate outcome of a segment's recalculation — total members added and removed in that run. Provides the Go struct and interface referenced in T4.4.
  • Scope: internal/app/service/segment/event_publisher.go — struct + interface; internal/pkg/segment_kafka/publisher.go — Kafka implementation using the existing sarama producer pattern.
  • Expected Outcome: Publisher tested with a mock Kafka producer; ISegmentEventPublisher importable from T4.4; make mocks generates MockISegmentEventPublisher; ≥ 80% coverage.
  • Implementation Plan:
    1. Create internal/app/service/segment/event_publisher.go:
      type SegmentRecalculatedEvent struct {
      EventType string `json:"event_type"` // always "segment.recalculated"
      SegmentID string `json:"segment_id"`
      SegmentName string `json:"segment_name"`
      CompanySsoID string `json:"company_sso_id"`
      RecalcRunID string `json:"recalc_run_id"` // ULID; idempotency key for consumers
      TotalMatched int64 `json:"total_matched"`
      TotalAdded int64 `json:"total_added"`
      TotalRemoved int64 `json:"total_removed"`
      OccurredAt time.Time `json:"occurred_at"`
      }

      // ISegmentEventPublisher sends one Kafka message per segment per recalculation run.
      // Publish emits an aggregate event with total counts of added and removed members.
      type ISegmentEventPublisher interface {
      Publish(ctx context.Context, event SegmentRecalculatedEvent) error
      }
    2. Create internal/pkg/segment_kafka/publisher.go. The codebase uses Confluent kafka-go (github.com/confluentinc/confluent-kafka-go/v2/kafka), not Sarama — follow the existing driver/kafka.go pattern. Define kafkaSegmentPublisher struct:
      type kafkaSegmentPublisher struct {
      producer *kafka.Producer
      topic string
      maxRetries int
      retryDelay time.Duration
      pubTimeout time.Duration
      }

      func NewKafkaSegmentPublisher(producer *kafka.Producer, topic string, maxRetries, retryDelayMs, pubTimeoutSec int) ISegmentEventPublisher {
      return &kafkaSegmentPublisher{
      producer: producer,
      topic: topic,
      maxRetries: maxRetries,
      retryDelay: time.Duration(retryDelayMs) * time.Millisecond,
      pubTimeout: time.Duration(pubTimeoutSec) * time.Second,
      }
      }
    3. Implement Publish with retry and per-attempt timeout:
      func (p *kafkaSegmentPublisher) Publish(ctx context.Context, event SegmentRecalculatedEvent) error {
      var lastErr error
      for attempt := 0; attempt <= p.maxRetries; attempt++ {
      if attempt > 0 {
      select {
      case <-ctx.Done():
      return ctx.Err()
      case <-time.After(p.retryDelay):
      }
      }
      attemptCtx, cancel := context.WithTimeout(ctx, p.pubTimeout)
      lastErr = p.publish(attemptCtx, event)
      cancel()
      if lastErr == nil {
      return nil
      }
      slog.WarnContext(ctx, "segment kafka publish attempt failed",
      slog.Int("attempt", attempt+1),
      slog.Any("error", lastErr))
      }
      return lastErr
      }

      func (p *kafkaSegmentPublisher) publish(ctx context.Context, event SegmentRecalculatedEvent) error {
      payload, err := json.Marshal(event)
      if err != nil {
      return err
      }
      deliveryChan := make(chan kafka.Event, 1)
      msg := &kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &p.topic, Partition: kafka.PartitionAny},
      Key: []byte(event.SegmentID),
      Value: payload,
      }
      if err := p.producer.Produce(msg, deliveryChan); err != nil {
      return err
      }
      select {
      case <-ctx.Done():
      return ctx.Err()
      case e := <-deliveryChan:
      m, ok := e.(*kafka.Message)
      if !ok {
      return nil
      }
      return m.TopicPartition.Error
      }
      }
    4. Run make mocks to generate mocks/MockISegmentEventPublisher.go.
    5. Unit tests: event with non-zero TotalAdded/TotalRemoved → correct JSON marshalled + Key = segment_id; producer error on first attempt → retried up to maxRetries, then error propagated; producer succeeds on second attempt → no error returned; ctx.Done() during retry wait → ctx.Err() returned immediately.

T4.6 — SegmentRecalculationCron job + registration [Jira: —]

  • Purpose: Schedule RecalculateAllSegments to run at 08:00 WIB (01:00 UTC) daily using the existing gocraft/work cron infrastructure.
  • Scope: internal/app/cron/segment_recalculation.go; cron registration in internal/worker/; dependency wiring in cmd/initializer.go.
  • Expected Outcome: go build ./... passes; cron appears in worker-ui; manual trigger populates customer_segment_members and customer_segment_membership_events.
  • Implementation Plan:
    1. Create internal/app/cron/segment_recalculation.go:
      type SegmentRecalculationCron struct {
      segmentService service.ISegmentService
      }
      func NewSegmentRecalculationCron(svc service.ISegmentService) *SegmentRecalculationCron {
      return &SegmentRecalculationCron{segmentService: svc}
      }
      func (c *SegmentRecalculationCron) SegmentRecalculation(job *work.Job) error {
      return c.segmentService.RecalculateAllSegments(context.Background())
      }
    2. Open the cron registration file in internal/worker/. Add to CronList:
      {"0 1 * * *", "segment_recalculation", segmentRecalcCron.SegmentRecalculation},
      (0 1 * * * = minute 0, hour 1 UTC = 08:00 WIB)
    3. In cmd/initializer.go, extend the segmentService construction to include the Kafka publisher and re-wire:
      segmentEventPublisher := segmentkafka.NewKafkaSegmentPublisher(
      kafkaProducer,
      "cdp.segment.recalculated",
      cfg.SegmentKafkaPublishMaxRetries,
      cfg.SegmentKafkaPublishRetryDelayMs,
      cfg.KafkaPublishTimeoutSeconds,
      )
      segmentService := segmentservice.NewSegmentService(segmentRepo, memberRepo, datamartRepo, ruleEngine, cacheRepo, segmentEventPublisher, cfg)
      segmentRecalcCron := cron.NewSegmentRecalculationCron(segmentService)
      Pass segmentRecalcCron to the worker registration step.
    4. Smoke test: ./contact-service worker; open worker-ui (./contact-service worker-ui); verify segment_recalculation in the schedule list; manually enqueue → confirm customer_segment_members populated, customer_segment_membership_events contains "added" events, and Kafka topic cdp.segment.recalculated receives one message per segment with total_added, total_removed, total_matched populated correctly.

Phase 5: Metrics, Service Integration & Delivery


T5.1 — Segment detail with reachability metrics [Jira: —]

  • Purpose: Enrich GET /iag/v1/segments/:id (and the PUT /iag/v1/segments/:id response) with percentage_reach and per-channel reachability counts.

  • Scope:

    • internal/app/repository/segment_datamart/ — new package: errors.go only (ErrDatamartUnavailable).
    • internal/app/repository/segment_member/interface.go — new package: ISegmentMemberRepository interface (stub; implementation deferred to T4.2).
    • internal/app/repository/contact/base.go — add CountAllByCompany to ContactInterface and implement on ContactRepo.
    • internal/app/payload/segment_payload.go — add ChannelReachability, ReachabilityInfo; extend SegmentDetailResponse.
    • internal/app/service/segment/ — add IContactCounter narrow interface, ChannelReachabilityResult, SegmentDetails result type, SegmentServiceOption options pattern, GetSegmentDetails method.
    • internal/app/handler/segment_handler.goGetByID and Update call GetSegmentDetails; add private toSegmentDetailResponse(SegmentDetails) conversion.
    • cmd/initializer.go — wire WithContactCounter(contactRepo) into NewSegmentService.
  • Expected Outcome: GetByID returns percentage_reach and reachability for active segments; archived segments omit reachability; whatsapp is null when datamart unavailable; go test ./... ≥ 80% coverage.

  • Implementation Plan:

    Step 1 — segment_datamart errors package

    Create internal/app/repository/segment_datamart/errors.go:

    package segmentdatamart

    import "errors"

    // ErrDatamartUnavailable is returned by segment_member methods when the datamart
    // connection is not configured or CDC backfill is incomplete.
    var ErrDatamartUnavailable = errors.New("datamart unavailable")

    Step 2 — segment_member interface

    Create internal/app/repository/segment_member/interface.go:

    package segmentmember

    import "context"

    // ISegmentMemberRepository manages the membership of contacts within segments.
    // Implementation is provided in T4.2; this stub defines the contract consumed by T5.1.
    type ISegmentMemberRepository interface {
    CountBySegmentID(ctx context.Context, segmentID, companyID string) (int64, error)
    // CountWithPhone returns the number of members with chat_data IS NOT NULL AND phone IS NOT NULL.
    // Returns segmentdatamart.ErrDatamartUnavailable when no member has chat_data populated (CDC backfill
    // incomplete) or when the datamart pool is nil.
    CountWithPhone(ctx context.Context, segmentID, companyID string) (int64, error)
    // CountWithEmail returns the number of members with email IS NOT NULL.
    CountWithEmail(ctx context.Context, segmentID, companyID string) (int64, error)
    }

    Run make mocks after creating this file to generate internal/app/repository/segment_member/mocks/ISegmentMemberRepository.go.

    Step 3 — CountAllByCompany on ContactInterface

    In internal/app/repository/contact/base.go:

    • Add to ContactInterface:
      CountAllByCompany(ctx context.Context, companySsoID string) (int64, error)
    • Implement on ContactRepo:
      func (r *ContactRepo) CountAllByCompany(ctx context.Context, companySsoID string) (int64, error) {
      return r.CountWithFilters(ctx, bson.M{
      "company_sso_id": companySsoID,
      "is_deleted": bson.M{"$ne": true},
      })
      }

    Run make mocks to regenerate internal/app/repository/contact/mocks/ContactInterface.go.

    Step 4 — Payload additions

    In internal/app/payload/segment_payload.go, add:

    type ChannelReachability struct {
    Count int64 `json:"count"`
    Percentage float64 `json:"percentage"`
    }

    type ReachabilityInfo struct {
    WhatsApp *ChannelReachability `json:"whatsapp"` // null = datamart unavailable
    Email *ChannelReachability `json:"email"` // null = datamart unavailable
    }

    Extend SegmentDetailResponse:

    type SegmentDetailResponse struct {
    ID string `json:"id"`
    Name string `json:"name"`
    Description *string `json:"description"`
    Status string `json:"status"`
    RuleSet reposegment.RuleSet `json:"rule_set"`
    TotalMatched int64 `json:"total_matched"`
    PercentageReach *float64 `json:"percentage_reach,omitempty"`
    LastEvaluatedAt *time.Time `json:"last_evaluated_at"`
    Reachability *ReachabilityInfo `json:"reachability,omitempty"`
    CreatedBy string `json:"created_by"`
    CreatedAt time.Time `json:"created_at"`
    UpdatedAt time.Time `json:"updated_at"`
    }

    Keep the existing ToSegmentDetailResponse(seg reposegment.CustomerSegment) mapper intact (it leaves the new fields nil); the handler uses the new toSegmentDetailResponse conversion (Step 6) for enriched responses.

    Step 5 — Service interface additions

    In internal/app/service/segment/segment_service_interface.go, add:

    // IContactCounter is a narrow interface consumed by SegmentService to fetch the total
    // active contact count for a company. Satisfied by the existing ContactInterface concrete type.
    type IContactCounter interface {
    CountAllByCompany(ctx context.Context, companySsoID string) (int64, error)
    }

    // ChannelReachabilityResult holds computed counts for a single channel.
    type ChannelReachabilityResult struct {
    Count int64
    Percentage float64
    }

    // SegmentDetails is the service-level result returned by GetSegmentDetails.
    // The handler converts this into payload.SegmentDetailResponse.
    type SegmentDetails struct {
    Segment reposegment.CustomerSegment
    TotalCustomers int64
    WhatsApp *ChannelReachabilityResult // nil = datamart unavailable
    Email *ChannelReachabilityResult // nil = datamart unavailable
    }

    Add to ISegmentService:

    // GetSegmentDetails returns the full segment view including reachability metrics.
    // Replaces GetSegment in the GetByID and Update handlers.
    GetSegmentDetails(ctx context.Context, companySsoID, segmentID, userSsoID string, ownedOnly bool) (SegmentDetails, error)

    Run make mocks to regenerate ISegmentService.go.

    Step 6 — Service implementation

    In internal/app/service/segment/segment_service.go:

    a. Extend segmentService struct:

    type segmentService struct {
    repo reposegment.ISegmentRepository
    memberRepo segmentmember.ISegmentMemberRepository // nil until T4.2
    contactCounter IContactCounter // nil until wired
    }

    b. Add options pattern (keeps NewSegmentService(repo) backward-compatible):

    type SegmentServiceOption func(*segmentService)

    func WithMemberRepo(r segmentmember.ISegmentMemberRepository) SegmentServiceOption {
    return func(s *segmentService) { s.memberRepo = r }
    }

    func WithContactCounter(c IContactCounter) SegmentServiceOption {
    return func(s *segmentService) { s.contactCounter = c }
    }

    func NewSegmentService(repo reposegment.ISegmentRepository, opts ...SegmentServiceOption) ISegmentService {
    svc := &segmentService{repo: repo}
    for _, o := range opts {
    o(svc)
    }
    return svc
    }

    c. Implement GetSegmentDetails:

    func (s *segmentService) GetSegmentDetails(ctx context.Context, companySsoID, segmentID, userSsoID string, ownedOnly bool) (SegmentDetails, error) {
    seg, err := s.GetSegment(ctx, companySsoID, segmentID, userSsoID, ownedOnly)
    if err != nil {
    return SegmentDetails{}, err
    }

    result := SegmentDetails{Segment: seg}

    if s.contactCounter != nil {
    total, cErr := s.contactCounter.CountAllByCompany(ctx, companySsoID)
    if cErr != nil {
    slog.WarnContext(ctx, "segment_service get_segment_details count_contacts", slog.Any("error", cErr))
    } else {
    result.TotalCustomers = total
    }
    }

    // Reachability is only computed for active segments.
    if seg.Status != reposegment.SegmentStatusActive || s.memberRepo == nil {
    return result, nil
    }

    wCount, wErr := s.memberRepo.CountWithPhone(ctx, segmentID, companySsoID)
    if wErr != nil && !errors.Is(wErr, segmentdatamart.ErrDatamartUnavailable) {
    slog.WarnContext(ctx, "segment_service get_segment_details count_phone", slog.Any("error", wErr))
    } else if wErr == nil {
    pct := float64(0)
    if seg.TotalMatched > 0 {
    pct = float64(wCount) / float64(seg.TotalMatched) * 100
    }
    result.WhatsApp = &ChannelReachabilityResult{Count: wCount, Percentage: pct}
    }

    eCount, eErr := s.memberRepo.CountWithEmail(ctx, segmentID, companySsoID)
    if eErr != nil && !errors.Is(eErr, segmentdatamart.ErrDatamartUnavailable) {
    slog.WarnContext(ctx, "segment_service get_segment_details count_email", slog.Any("error", eErr))
    } else if eErr == nil {
    pct := float64(0)
    if seg.TotalMatched > 0 {
    pct = float64(eCount) / float64(seg.TotalMatched) * 100
    }
    result.Email = &ChannelReachabilityResult{Count: eCount, Percentage: pct}
    }

    return result, nil
    }

    Step 7 — Handler update

    In internal/app/handler/segment_handler.go:

    a. Add a private conversion helper:

    func toSegmentDetailResponse(d segmentsvc.SegmentDetails) payload.SegmentDetailResponse {
    resp := payload.ToSegmentDetailResponse(d.Segment)

    if d.TotalCustomers > 0 {
    pct := float64(d.Segment.TotalMatched) / float64(d.TotalCustomers) * 100
    resp.PercentageReach = &pct
    }

    if d.Segment.Status == reposegment.SegmentStatusActive {
    reach := &payload.ReachabilityInfo{}
    if d.WhatsApp != nil {
    reach.WhatsApp = &payload.ChannelReachability{Count: d.WhatsApp.Count, Percentage: d.WhatsApp.Percentage}
    }
    if d.Email != nil {
    reach.Email = &payload.ChannelReachability{Count: d.Email.Count, Percentage: d.Email.Percentage}
    }
    resp.Reachability = reach
    }
    return resp
    }

    b. Update GetByID — replace GetSegment call:

    details, err := h.segmentService.GetSegmentDetails(ctx, companySsoID, segmentID, userSsoID, ownedOnly)
    if err != nil {
    return myhttp.ResponseBody{}, h.mapServiceError(ctx, "segment_handler get_by_id", err)
    }
    return myhttp.NewJSONResponse(toSegmentDetailResponse(details), nil), nil

    c. Update Update — replace the post-update GetSegment call:

    details, err := h.segmentService.GetSegmentDetails(ctx, companySsoID, segmentID, userSsoID, ownedOnly)
    if err != nil {
    return myhttp.ResponseBody{}, h.mapServiceError(ctx, "segment_handler update get", err)
    }
    return myhttp.NewJSONResponse(toSegmentDetailResponse(details), nil), nil

    Step 8 — Wiring

    In cmd/initializer.go, update the segment service construction:

    segmentService := segmentsvc.NewSegmentService(
    segmentRepo,
    segmentsvc.WithContactCounter(contactRepo), // contactRepo satisfies IContactCounter via CountAllByCompany
    )

    WithMemberRepo is not wired here — added in T4.6 once ISegmentMemberRepository has a concrete implementation.

    Step 9 — Tests

    Handler tests (internal/app/handler/segment_handler_test.go):

    • Replace all svc.On("GetSegment", ...) inside TestSegmentHandler_GetByID and the post-update fetch inside TestSegmentHandler_Update with svc.On("GetSegmentDetails", ...) returning a segmentsvc.SegmentDetails{Segment: sampleSegment(segID)}.
    • Add cases: GetSegmentDetails returns ErrSegmentNotFound → 404; returns ErrPermissionDenied → 403.

    Service tests (internal/app/service/segment/segment_service_test.go), add TestSegmentService_GetSegmentDetails:

    | Case | memberRepo | contactCounter | expected |
    |---|---|---|---|
    | active, both deps nil | nil | nil | Segment returned, WhatsApp nil, Email nil, TotalCustomers 0 |
    | active, memberRepo wired, no CDC backfill | CountWithPhone → ErrDatamartUnavailable | wired | WhatsApp nil, Email populated |
    | active, both wired | CountWithPhone → 800, CountWithEmail → 940 | total=11700 | WhatsApp={800, 76.19}, Email={940, 89.52}, PercentageReach computable |
    | archived | memberRepo wired | wired | WhatsApp nil, Email nil (reachability skipped for archived) |
    | not found | — | — | ErrSegmentNotFound |
    | permission denied (ownedOnly) | — | — | ErrPermissionDenied |

T5.2 — GetSegmentCustomers + GET /iag/v1/segments/:id/customers handler [Jira: —]

  • Purpose: Expose paginated customer membership for a saved segment to the CDP frontend with OWNED_ONLY permission support.
  • Scope:
    • internal/app/payload/segment_payload.goSegmentCustomerItem, SegmentCustomersResult
    • internal/app/service/segment/segment_service_interface.goGetSegmentCustomers
    • internal/app/service/segment/segment_service.go — implement GetSegmentCustomers
    • internal/app/handler/segment_handler.goGetSegmentCustomers handler
    • internal/server/rest_router.go — wire GET /iag/v1/segments/:segmentID/customers
  • Depends on: T4.1 (CustomerSegmentMember struct), T4.2 (ISegmentMemberRepository).
  • Expected Outcome: Unit tests cover ALL_ACCESS, OWNED_ONLY, segment-not-found, and invalid pagination; endpoint verified in staging with the CDP frontend.
  • Implementation Plan:
    1. In internal/app/payload/segment_payload.go, add:
      type SegmentCustomerItem struct {
      ContactID string `json:"contact_id"`
      Name string `json:"name"`
      Source string `json:"source"`
      AddedAt time.Time `json:"added_at"`
      }
    2. Add to ISegmentService:
      GetSegmentCustomers(ctx context.Context, segmentID, companySsoID, userSsoID string, ownedOnly bool, page, perPage int) ([]payload.SegmentCustomerItem, int64, error)
      Run make mocks to regenerate the service mock.
    3. Implement GetSegmentCustomers in segment_service.go:
      • Call segmentRepo.FindByID — return ErrSegmentNotFound if segment does not exist.
      • OWNED_ONLY boundary: if ownedOnly && seg.CreatedBy != userSsoID return ErrPermissionDenied.
      • Call memberRepo.FindMembers(ctx, segmentID, companySsoID, userSsoID, ownedOnly, page, perPage)[]CustomerSegmentMember, total.
      • Resolve contact name and source from contactRepo.FindByIDs; populate SegmentCustomerItem slice.
      • Return slice + total.
    4. Add GetSegmentCustomers(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error) to SegmentHandler:
      • Parse segmentID from URL param; parse and validate page/per_page query params (defaults: 1/20; max per_page: 100 → 400 if exceeded).
      • Extract companySsoID, userSsoID, ownedOnly from IAG permission context.
      • Call svc.GetSegmentCustomers; map errors via mapServiceError.
      • Return myhttp.NewJSONResponse(customers, pagination).
    5. In internal/server/rest_router.go, inside the /{segmentID} sub-route block:
      r.With(RequirePermission("customers_segment_view", AllAccess|OwnedOnly)).Get("/customers", segmentHandler.GetSegmentCustomers)
    6. Unit tests:
      • Service: ALL_ACCESS → full list; OWNED_ONLY + owner → filtered; OWNED_ONLY + non-owner → ErrPermissionDenied; segment not found → ErrSegmentNotFound.
      • Handler: valid → 200; ErrSegmentNotFound → 404; ErrPermissionDenied → 403; per_page > 100 → 400.

T5.3 — GetSegmentsByContact + GET /api/v1/contacts/:id/segments endpoint [Jira: —]

  • Purpose: Enable downstream services (Broadcast, Chatbot) to discover which segments a contact belongs to via a machine-to-machine endpoint.
  • Scope: internal/app/service/segment/GetSegmentsByContact; internal/app/handler/segment_handler.goGetContactSegments handler under BasicAuth.
  • Expected Outcome: Unit tests cover multiple segments, no segments, invalid contactID; staging curl verified with Broadcast team.
  • Implementation Plan:
    1. Add to ISegmentService:
      GetSegmentsByContact(ctx context.Context, contactID, companySsoID string) ([]SegmentInfo, error)
    2. Define in internal/app/payload/segment_payload.go:
      type SegmentInfo struct {
      SegmentID string `json:"segment_id"`
      SegmentName string `json:"segment_name"`
      }
    3. Implement GetSegmentsByContact: memberRepo.FindByContactID(ctx, contactID, companySsoID); for each membership segmentRepo.FindByID to get the segment name; return []SegmentInfo. Return empty slice (not an error) when there are no memberships.
    4. Add GetContactSegments(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error) to SegmentHandler:
      • contactID := chi.URLParam(r, "contactID") — return 400 if empty.
      • companySsoID extracted from BasicAuth context (machine-to-machine; no user permission check).
      • Call svc.GetSegmentsByContact.
      • Return 200 OK {data: {customer_id: contactID, segments: [...]}}.
    5. Route registered in T1.6 under BasicAuth middleware. Verify with curl -u user:pass /api/v1/contacts/{id}/segments.
    6. Unit tests: 2 segments → both returned; no memberships → empty array; missing contactID → 400.

T5.4 — OpenAPI spec update [Jira: —]

  • Purpose: Keep contract documentation in sync with implementation for frontend, Broadcast, and QA consumers.
  • Scope: docs/swagger.yaml — all new segment endpoints with schemas and per-endpoint error responses.
  • Expected Outcome: swagger-cli validate docs/swagger.yaml passes; Swagger UI renders all new endpoints.
  • Implementation Plan:
    1. Open docs/swagger.yaml. Define reusable $ref component schemas: RuleCondition, RuleGroup, RuleSet, SegmentResponse, SegmentDetailResponse (with nested ReachabilityInfo), SegmentListResponse, PreviewRequest, PreviewResponse, ErrorResponse (envelope: {error, message, details}).

    2. Add a path entry for each endpoint. Each entry must include summary, operationId, security, parameters (GET), requestBody (POST/PUT), and a responses section with 200/201 plus all error codes from the table below — each error response uses $ref: '#/components/schemas/ErrorResponse' with an example showing the relevant error code string:

      EndpointError codes to document
      POST /iag/v1/segments400 INVALID_PAYLOAD, 403 PERMISSION_DENIED, 409 CONFLICT_NAME, 422 INVALID_RULE
      GET /iag/v1/segments400 INVALID_PAYLOAD, 403 PERMISSION_DENIED
      GET /iag/v1/segments/:id403 PERMISSION_DENIED, 404 NOT_FOUND
      PUT /iag/v1/segments/:id400 INVALID_PAYLOAD, 403 PERMISSION_DENIED, 404 NOT_FOUND, 409 CONFLICT_NAME, 422 INVALID_RULE
      PATCH /iag/v1/segments/:id/archive403 PERMISSION_DENIED, 404 NOT_FOUND, 409 CONFLICT_STATUS
      POST /iag/v1/segments/:id/duplicate403 PERMISSION_DENIED, 404 NOT_FOUND, 409 CONFLICT_NAME
      POST /iag/v1/segments/preview400 INVALID_PAYLOAD, 422 INVALID_RULE, 429 RATE_LIMIT_EXCEEDED, 503 DATAMART_UNAVAILABLE
      GET /iag/v1/segments/:id/customers400 INVALID_PAYLOAD, 403 PERMISSION_DENIED, 404 NOT_FOUND
      GET /api/v1/contacts/:id/segments401 (BasicAuth failure)
      GET /api/v1/segments/:id/customers400 INVALID_PAYLOAD, 401 (BasicAuth failure), 404 NOT_FOUND
    3. Add the S2S paths /api/v1/contacts/{contactID}/segments and /api/v1/segments/{segmentID}/customers with basicAuth security scheme and their 200 responses. Document PrivateSegmentCustomerItem as a reusable $ref component schema for the latter.

    4. Run make init (calls swag init) if the project uses swag annotations; otherwise swagger-cli validate docs/swagger.yaml.

    5. Open Swagger UI; verify all new endpoints render with correct schemas and example error payloads.


T5.5 — Quality gate & final verification [Jira: —]

  • Purpose: Confirm the complete implementation meets the project's mandatory quality standards before the rollout plan begins.
  • Scope: CI pipeline — go fmt ./..., go vet ./..., golangci-lint run, go build ./..., go test ./...; all segment packages must reach ≥ 80% coverage (≥ 90% for segment_engine).
  • Expected Outcome: All commands exit 0 with no warnings or failures; coverage thresholds met; service boots cleanly with a local docker-compose up.
  • Implementation Plan:
    1. Run make prepare (init → build → mocks → test → lint → sec). Fix every failure before advancing; do not suppress warnings.
    2. Check per-package coverage thresholds: go test -coverprofile=cover.out ./internal/... && go tool cover -func=cover.out | grep segment. Confirm segment_engine ≥ 90% and all other segment packages ≥ 80%. Add tests to any package below threshold.
    3. Run staticcheck ./internal/... — resolve all new findings in segment packages.
    4. Run gosec ./internal/... — confirm no new HIGH/CRITICAL findings; the Rule Engine's parameterised-only SQL path should be clean.
    5. Start the full stack: docker-compose up (MongoDB + Redis + datamart), then ./contact-service server. Hit POST /iag/v1/segments and POST /iag/v1/segments/preview end-to-end to confirm the full wiring is live.

T5.6 — GetPrivateSegmentCustomers + GET /api/v1/segments/:id/customers endpoint [Jira: —]

  • Purpose: Expose detailed customer data for all members of a saved segment to downstream machine-to-machine consumers (Broadcast, Chatbot) without a Launchpad permission gate.
  • Scope:
    • internal/app/payload/segment_payload.goPrivateSegmentCustomerItem
    • internal/app/service/segment/segment_service_interface.goGetPrivateSegmentCustomers
    • internal/app/service/segment/segment_service.go — implement GetPrivateSegmentCustomers
    • internal/app/handler/segment_handler.goGetPrivateSegmentCustomers handler under BasicAuth
    • internal/server/rest_router.go — wire GET /api/v1/segments/:segmentID/customers (stub added in T1.6)
  • Depends on: T4.1 (CustomerSegmentMember struct), T4.2 (ISegmentMemberRepository), T5.2 (FindMembers on member repo already present).
  • Expected Outcome: Unit tests cover pagination, segment-not-found, empty member list, partial contact-lookup failure; staging curl verified with the Broadcast team.
  • Implementation Plan:
    1. In internal/app/payload/segment_payload.go, add:
      type PrivateSegmentCustomerItem struct {
      ID string `json:"id"`
      CompanySsoID string `json:"company_sso_id"`
      CompanyBillingID string `json:"company_billing_id"`
      Name string `json:"name"`
      Email string `json:"email"`
      Phone []string `json:"phone"`
      IsDeleted *bool `json:"is_deleted"`
      CustomFields *[]CustomField `json:"custom_fields"`
      OwnerID string `json:"owner_id"`
      OwnerName string `json:"owner_name"`
      AssigneeID string `json:"assignee_id"`
      AssigneeName string `json:"assignee_name"`
      Usernames []Username `json:"usernames"`
      Source string `json:"source"`
      SourceID string `json:"source_id"`
      SourceName string `json:"source_name"`
      Status string `json:"status"`
      StatusID string `json:"status_id"`
      StatusName string `json:"status_name"`
      JobTitle string `json:"job_title"`
      Tags []string `json:"tags"`
      Flag string `json:"flag"`
      Address *Address `json:"address"`
      DateOfBirth string `json:"date_of_birth"`
      Sex string `json:"sex"`
      SexID string `json:"sex_id"`
      SexName string `json:"sex_name"`

      PhoneMarketingOptIn []WhatsappMarketingOptedInSerializer `json:"phone_marketing_opt_in,omitempty"`
      IsLoyaltyMember bool `json:"is_loyalty_member"`

      AddedAt time.Time `json:"added_at"`
      CreatedAt time.Time `json:"created_at"`
      UpdatedAt time.Time `json:"updated_at"`
      CreatedBy string `json:"created_by"`
      UpdatedBy string `json:"updated_by"`
      }
      CustomField, Username, Address, and WhatsappMarketingOptedInSerializer are the existing types from internal/app/repository/contact/create_serializer.go. This struct is ContactSerializer minus CrmData, ChatData, Avatar, Accounts, DisabledField, FieldPermissions, Permission, IsEnableUsernames, and the scalar Username field.
    2. Add to ISegmentService:
      GetPrivateSegmentCustomers(ctx context.Context, segmentID, companySsoID string, page, perPage int) ([]payload.PrivateSegmentCustomerItem, int64, error)
      Run make mocks to regenerate the service mock.
    3. Implement GetPrivateSegmentCustomers in segment_service.go:
      • Call segmentRepo.FindByID — return ErrSegmentNotFound if segment does not exist.
      • Call memberRepo.FindMembers(ctx, segmentID, companySsoID, "", false, page, perPage) — no OWNED_ONLY filtering; all members returned.
      • Collect contactIDs from member records; call contactRepo.FindByIDs(ctx, contactIDs, companySsoID).
      • Map each ContactSerializer to PrivateSegmentCustomerItem, copying all included fields; set AddedAt from the corresponding member record.
      • Contacts missing from the lookup (e.g., soft-deleted) are silently skipped (slog.WarnContext with contact_id).
      • Return slice + total (total is the full member count, not the resolved contact count).
    4. Add GetPrivateSegmentCustomers(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error) to SegmentHandler:
      • Parse segmentID from URL param — return 400 if empty.
      • Parse and validate page/per_page query params (defaults: 1/20; max per_page: 100 → 400 if exceeded).
      • Extract companySsoID from BasicAuth context — no user SSO ID or Launchpad permission required.
      • Call svc.GetPrivateSegmentCustomers; map ErrSegmentNotFound → 404.
      • Return myhttp.NewJSONResponse(customers, pagination).
    5. In internal/server/rest_router.go, attach the handler to the stub route registered in T1.6:
      r.With(BasicAuth).Get("/api/v1/segments/{segmentID}/customers", segmentHandler.GetPrivateSegmentCustomers)
    6. Unit tests:
      • Service: segment found → full member list with all fields populated; segment not found → ErrSegmentNotFound; empty member list → empty slice (not error); partial contact lookup failure → available contacts returned, missing ones skipped with warning log.
      • Handler: valid request → 200 with data + pagination; ErrSegmentNotFound → 404; empty segmentID → 400; per_page > 100 → 400.

Phase 6: Tenant Isolation & Versioning Hardening

Phase 6 tasks are additive-only — they do not modify Phase 1–5 source files, they extend them. Each follows the same TDD discipline as prior phases: tests written first, implementation to pass, then refactor.


T6.1 — S2S company_sso_id enforcement on both internal endpoints [Jira: —]

  • Purpose: Close a tenant-isolation gap in the two S2S endpoints. The BasicAuth middleware only validates credentials and sets nothing in context, so without an explicit company_sso_id parameter any authorised internal service could query data belonging to any tenant. This task adds the parameter, threads it through, and validates ownership before returning data.
  • Scope:
    • internal/app/handler/segment_handler.goGetContactSegments and GetPrivateSegmentCustomers handlers.
    • internal/app/service/segment/segment_service_interface.go + segment_service.goGetPrivateSegmentCustomers signature extension (segment ownership check).
    • internal/server/rest_router.go — no route change; only handler logic changes.
  • Depends on: T5.3 (GetContactSegments), T5.6 (GetPrivateSegmentCustomers).
  • Expected Outcome: Missing company_sso_id → 400 from both endpoints. Cross-tenant segment ID on the customers endpoint → 404. Cross-tenant contact ID on the segments endpoint → empty segments array (not an error). Unit tests cover all paths; go test ./internal/app/handler/... and go test ./internal/app/service/segment/... remain green.
  • Implementation Plan:
    1. GetContactSegments handler (internal/app/handler/segment_handler.go):
      • Read companySsoID := r.URL.Query().Get("company_sso_id").
      • If empty, return myhttp.ErrBadRequest("company_sso_id is required") → HTTP 400 immediately (before any service call).
      • Pass companySsoID to svc.GetSegmentsByContact(ctx, contactID, companySsoID).
      • No service-layer change needed: memberRepo.FindByContactID already filters by company_sso_id, so querying with a wrong value simply returns an empty slice.
    2. GetPrivateSegmentCustomers handler (internal/app/handler/segment_handler.go):
      • Read companySsoID := r.URL.Query().Get("company_sso_id").
      • If empty, return HTTP 400 immediately.
      • Pass companySsoID to svc.GetPrivateSegmentCustomers(ctx, segmentID, companySsoID, page, perPage).
    3. GetPrivateSegmentCustomers service (internal/app/service/segment/segment_service.go):
      • After segmentRepo.FindByID(ctx, segmentID), add:
        if seg.CompanySsoID != companySsoID {
        return nil, 0, ErrSegmentNotFound // intentionally 404, not 403 — do not leak existence
        }
      • This is the only service-layer change; FindMembers already carries companyID as a filter argument.
    4. Unit tests (internal/app/handler/segment_handler_test.go):
      • GetContactSegments: missing company_sso_id query param → 400 without calling service; param present → service called with correct value.
      • GetPrivateSegmentCustomers: missing company_sso_id → 400; param present but segment belongs to different company → service returns ErrSegmentNotFound → handler returns 404; happy-path → 200.
    5. Service unit tests (internal/app/service/segment/segment_service_test.go):
      • GetPrivateSegmentCustomers: seg.CompanySsoID != companySsoIDErrSegmentNotFound; seg.CompanySsoID == companySsoID → proceeds normally.

T6.2 — Segment version field: struct, repository, and service layer [Jira: —]

  • Purpose: Track a monotonically increasing edit counter on each segment so that consumers, downstream services, and the Kafka event stream can detect when a segment's definition has changed. The counter is owned by the user-facing edit operations and is never touched by the cron.
  • Scope:
    • internal/app/repository/segment/types.go — add Version field.
    • internal/app/repository/segment/segment_repository_mongo.goInsert sets Version = 1; Update (called only from UpdateSegment) and UpdateStatus (called only from ArchiveSegment) both add "$inc": bson.M{"version": 1}.
    • internal/app/service/segment/segment_service.goCreateSegment sets Version: 1; DuplicateSegment resets to 1; no other service changes (version increment is a repository-layer concern).
  • Depends on: T1.1, T1.3, T1.4.
  • Expected Outcome: Version persists and increments correctly; UpdateStats / ClearRuleDirtyAt / IncrementRecalcErrorCount do not change Version; unit tests validate all paths; go test ./internal/app/repository/segment/... and go test ./internal/app/service/segment/... pass with ≥ 80% coverage.
  • Implementation Plan:
    1. Struct (internal/app/repository/segment/types.go): Add the field after RecalcErrorCount:
      Version int64 `bson:"version" json:"version"`
      Add to the round-trip test in types_test.go: assert that Version marshals to the "version" key in both JSON and BSON, and that its value is preserved.
    2. Repository — Insert (segment_repository_mongo.go): The caller (CreateSegment) sets seg.Version = 1 before calling Insert. No repository change is strictly needed, but verify the field is included in the BSON document (it is, by default, since it carries a bson:"version" tag).
    3. Repository — Update (segment_repository_mongo.go): In the $set update document, add a sibling $inc operator:
      update := bson.M{
      "$set": bson.M{ /* existing fields */ },
      "$inc": bson.M{"version": 1},
      }
      This method is only called from UpdateSegment, so every user-initiated edit bumps the counter atomically.
    4. Repository — UpdateStatus (segment_repository_mongo.go): Same pattern — add "$inc": bson.M{"version": 1} alongside the $set for status. This method is only called from ArchiveSegment.
    5. Repository — UpdateStats, ClearRuleDirtyAt, IncrementRecalcErrorCount: confirm none of these touch version (they use their own targeted $set / $inc documents). Add comments noting this is intentional.
    6. Service — CreateSegment (segment_service.go): Before calling repo.Insert, set seg.Version = 1 (alongside seg.CreatedAt, seg.Status, etc.).
    7. Service — DuplicateSegment (segment_service.go): In the field-zeroing block, set copy.Version = 1 (alongside TotalMatched, LastEvaluatedAt, etc.).
    8. Repository unit tests (segment_repository_test.go):
      • Insert → fetched document has version == 1.
      • Update called once → version == 2; called twice → version == 3.
      • UpdateStatusversion increments.
      • UpdateStatsversion unchanged.
      • ClearRuleDirtyAtversion unchanged.
      • IncrementRecalcErrorCountversion unchanged.
    9. Service unit tests (segment_service_test.go):
      • CreateSegmentrepo.Insert called with a segment where Version == 1.
      • DuplicateSegment → duplicated segment has Version == 1 regardless of source version.

T6.3 — version field in API payload responses [Jira: —]

  • Purpose: Surface the version counter in all segment-facing API responses so that the CDP frontend and downstream consumers can detect definition changes without comparing rule sets.
  • Scope:
    • internal/app/payload/segment_payload.go — add Version to SegmentResponse and SegmentDetailResponse; update mapper functions.
    • internal/app/handler/segment_handler_test.go — assert version in all relevant response assertions.
    • docs/swagger.yaml — add version to affected schemas (can be done alongside T5.4 if not yet merged, or as an addendum).
  • Depends on: T6.2, T1.5, T5.1.
  • Expected Outcome: GET /iag/v1/segments, GET /iag/v1/segments/:id, and PUT /iag/v1/segments/:id all return "version" in their data objects. Handler tests verify the field is present and correctly populated from the domain struct. go test ./internal/app/handler/... and go test ./internal/app/payload/... pass.
  • Implementation Plan:
    1. SegmentResponse (list item) in segment_payload.go: Add Version int64 \json:"version"`as a field. Update the mapperToSegmentResponse(seg CustomerSegment) SegmentResponse(or wherever the list item is built) to setVersion: seg.Version`.
    2. SegmentDetailResponse in segment_payload.go: Add Version int64 \json:"version"`. Update ToSegmentDetailResponse(seg CustomerSegment)to setVersion: seg.Version. The toSegmentDetailResponse(SegmentDetails)` helper in the handler (T5.1 Step 7) calls this mapper, so it picks up the field automatically.
    3. Handler tests:
      • TestSegmentHandler_List: assert data[0].version == 1 (or whatever the mock returns).
      • TestSegmentHandler_GetByID: assert data[0].version is populated.
      • TestSegmentHandler_Update: assert updated response includes version.
    4. OpenAPI (docs/swagger.yaml): In the SegmentResponse and SegmentDetailResponse schema components, add:
      version:
      type: integer
      format: int64
      description: Monotonically increasing counter; increments on every user-initiated edit or archive.
      example: 3

T6.4 — rule_version in Kafka SegmentRecalculatedEvent [Jira: —]

  • Purpose: Let Kafka consumers (Broadcast, Chatbot) correlate each recalculation result with the specific version of the segment's rule that produced it. Without this, a consumer cannot tell whether a recalculation was triggered by the current rule or a since-updated one.
  • Scope:
    • internal/app/service/segment/event_publisher.go — add RuleVersion int64 to SegmentRecalculatedEvent.
    • internal/app/service/segment/segment_service.go — populate RuleVersion from seg.Version in processSegment.
    • internal/pkg/segment_kafka/publisher.go — no change; the field is marshalled automatically.
    • Unit tests for T4.5 publisher and T4.4 processSegment.
  • Depends on: T6.2, T4.4, T4.5.
  • Expected Outcome: Every cdp.segment.recalculated Kafka message includes "rule_version". Publisher unit tests assert the field is serialised in the JSON payload. go test ./internal/app/service/segment/... and go test ./internal/pkg/segment_kafka/... pass.
  • Implementation Plan:
    1. Struct (internal/app/service/segment/event_publisher.go): Add the field between RecalcRunID and TotalMatched:
      RuleVersion int64 `json:"rule_version"` // segment.Version at time of recalculation
      Run make mocks to regenerate MockISegmentEventPublisher.
    2. processSegment (internal/app/service/segment/segment_service.go): When building the SegmentRecalculatedEvent, populate the new field:
      event := SegmentRecalculatedEvent{
      EventType: "segment.recalculated",
      SegmentID: seg.ID,
      SegmentName: seg.Name,
      CompanySsoID: seg.CompanySsoID,
      RecalcRunID: recalcRunID,
      RuleVersion: seg.Version, // version of the rule definition that was evaluated
      TotalMatched: int64(len(newIDs)),
      TotalAdded: int64(len(added)),
      TotalRemoved: int64(len(removed)),
      OccurredAt: runStartedAt,
      }
    3. Publisher unit tests (internal/pkg/segment_kafka/publisher_test.go):
      • Assert that the JSON payload delivered to the Kafka producer contains "rule_version": N with the value set in the event struct.
      • Unmarshal the captured msg.Value and verify RuleVersion field is present and non-zero.
    4. processSegment unit tests (internal/app/service/segment/segment_service_test.go):
      • Extend the "non-empty diff → Publish called" test: assert that the captured SegmentRecalculatedEvent argument has RuleVersion == seg.Version.
      • Add a test where seg.Version == 5 to confirm the value flows correctly from the domain object into the event.
    5. Consumer contract addendum (comment in event_publisher.go): Add a one-line note to the ISegmentEventPublisher docstring:
      // Consumers should treat a change in rule_version as a signal that the segment membership
      // may now reflect a different rule definition and re-fetch member details if needed.

Phase 7: Operational Controls

Phase 7 adds three runtime-control mechanisms: a circuit breaker to abort an in-progress recalculation, a dedicated feature flag to disable recalculation independently of the CRUD API, and a manual trigger endpoint for targeted on-demand recalculation. All three tasks must follow the same TDD discipline (Red → Green → Refactor) as prior phases.


T7.1 — Recalculation circuit breaker [Jira: —]

  • Purpose: Give operators a way to gracefully stop a running recalculation cron without killing the pod. Writing any value to a Redis key signals the loop to exit cleanly after the current segment finishes.
  • Scope: internal/pkg/consts/ (new key constant), params/ config (new env var), internal/app/service/segment/segment_service.go (doRecalculation loop — extracted from RecalculateAllSegments in T7.3).
  • Depends on: T4.4 (RecalculateAllSegments exists), T7.3 (the refactor that extracts doRecalculation).
  • Expected Outcome: Unit tests confirm the loop exits without processing further segments when the key is present; key is cleared on detection (one-shot); absent key has no effect.
  • Implementation Plan:
    1. Add to internal/pkg/consts/segment.go (create file if absent):
      const SegmentRecalcCircuitBreakerKey = "segment_recalculation_stop_requested"
    2. Add to the params/ config struct:
      SegmentRecalcCircuitBreakerKeyTTLSeconds int `env:"SEGMENT_RECALC_CIRCUIT_BREAKER_KEY_TTL_SECONDS" envDefault:"86400"`
      This TTL is used by the trigger API handler (T7.3) when it writes the key; it is NOT used by the cron loop itself (the loop only reads and deletes it).
    3. In doRecalculation (the private method extracted in T7.3), at the top of each outer batch-fetch iteration — before calling FindActiveSegments for the next page:
      if val, _ := s.cacheRepo.Get(ctx, consts.SegmentRecalcCircuitBreakerKey); val != "" {
      slog.InfoContext(ctx, "segment recalc circuit breaker triggered, stopping",
      slog.String("recalc_run_id", recalcRunID))
      s.cacheRepo.Del(ctx, consts.SegmentRecalcCircuitBreakerKey) //nolint:errcheck
      return nil
      }
      Semantics: one-shot — the cron clears the key upon detection so tomorrow's scheduled run is unaffected.
    4. Unit tests (internal/app/service/segment/segment_service_test.go):
      • Circuit breaker key present before first batch fetch → doRecalculation returns nil; zero segments processed; Del called once.
      • Circuit breaker key set after the first batch completes → second batch loop iteration detects it; first-batch segments are processed normally; Del called once.
      • Circuit breaker key absent → loop runs to completion normally; Del never called for this key.

T7.2 — cdp_segment_recalc_enabled feature flag [Jira: —]

  • Purpose: Allow the two regular, automatic recalculation paths — the daily cron RecalculateAllSegments and the scoped on-write recalc recalculateSegment (T8.1, fired after create / rule-changing update) — to be disabled globally (e.g., during datamart maintenance) without affecting segment CRUD endpoints. It deliberately does not gate the S2S manual trigger TriggerRecalculation (T7.3): an operator must retain a way to force a targeted recalc even while automatic recalculation is paused. This flag is also separate from cdp_segmentation_enabled (CRUD API gate).
  • Scope: internal/pkg/consts/feature_flag.go (new constant), internal/app/service/segment/segment_service.go (new WithFeatureFlagService option + early-exit check, called from RecalculateAllSegments and recalculateSegment — NOT from TriggerRecalculation), cmd/initializer.go (wiring).
  • Depends on: T5.1 (options pattern established by WithContactCounter).
  • Expected Outcome: RecalculateAllSegments and recalculateSegment both return/no-op without acquiring their respective lock when the flag is disabled; TriggerRecalculation is unaffected by the flag. Flag absent from MongoDB (nil feature flag service) defaults to enabled. Unit tests cover all paths.
  • Implementation Plan:
    1. In internal/pkg/consts/feature_flag.go, add:
      FeatureFlagCDPSegmentRecalc = "cdp_segment_recalc_enabled"
    2. Add a new option to internal/app/service/segment/segment_service.go:
      func WithFeatureFlagService(ff service.IFeatureFlagService) SegmentServiceOption {
      return func(s *segmentService) { s.featureFlagService = ff }
      }
      Add the field to segmentService struct:
      featureFlagService service.IFeatureFlagService // nil = recalc always enabled
    3. Add a private helper called at the start of RecalculateAllSegments and recalculateSegment (T8.1), before any lock acquisition — TriggerRecalculation (T7.3) must NOT call this helper:
      func (s *segmentService) isRecalcEnabled(ctx context.Context) bool {
      if s.featureFlagService == nil {
      return true // safe default when not wired
      }
      return s.featureFlagService.FeatureEnabled(ctx, consts.FeatureFlagCDPSegmentRecalc, "")
      // empty sso_id → global check: returns data.Enabled regardless of CompanySsoIDs
      }
      At the start of RecalculateAllSegments:
      if !s.isRecalcEnabled(ctx) {
      slog.InfoContext(ctx, "segment recalc disabled by feature flag")
      return nil
      }
      And at the start of recalculateSegment (T8.1), after the nil-dependency guard:
      if !s.isRecalcEnabled(ctx) {
      return
      }
    4. In cmd/initializer.go, extend the NewSegmentService call:
      segmentService := segmentsvc.NewSegmentService(
      segmentRepo,
      segmentsvc.WithContactCounter(contactRepo),
      segmentsvc.WithFeatureFlagService(featureFlagService), // add this line
      )
    5. Unit tests (internal/app/service/segment/segment_service_test.go):
      • Flag service nil → RecalculateAllSegments proceeds (no early exit).
      • Flag cdp_segment_recalc_enabled disabled → RecalculateAllSegments returns nil; lock SetNX never called.
      • Flag enabled → proceeds to lock acquisition normally.
      • TriggerRecalculation with flag disabled → still proceeds to lock acquisition (the S2S manual trigger is exempt; isRecalcEnabled is never called from this method). Full recalculateSegment flag-disabled coverage lives in T8.1's tests.

T7.3 — TriggerRecalculation service method + POST /api/v1/segments/recalculate endpoint [Jira: —]

  • Purpose: Expose an internal S2S endpoint that triggers on-demand recalculation for specific companies and optionally specific segments. Useful for immediately refreshing a segment after a rule update or CDC lag recovery without waiting for the next daily run. Returns 202 immediately; progress is tracked via Kafka events carrying the recalc_run_id.

  • Scope:

    • internal/app/repository/segment/segment_repository.go — extend SegmentFilter with CompanySsoIDs []string and SegmentIDs []string.
    • internal/app/repository/segment/segment_repository_mongo.go — apply filters in FindAll query.
    • internal/app/service/segment/segment_service_interface.goRecalcTriggerFilter, TriggerRecalculation, ErrRecalcAlreadyRunning.
    • internal/app/service/segment/segment_service.go — refactor RecalculateAllSegments → extract doRecalculation(ctx, recalcRunID string, filter *RecalcTriggerFilter); implement TriggerRecalculation.
    • internal/app/payload/segment_payload.goTriggerRecalculationRequest, TriggerRecalculationResponse.
    • internal/app/handler/segment_handler.goTriggerRecalculation handler.
    • internal/server/rest_router.go — register under the /api/v1/ BasicAuth block.
  • Depends on: T4.4 (RecalculateAllSegments), T7.1 (circuit breaker check lives in doRecalculation), T7.2 (options pattern; note TriggerRecalculation deliberately does NOT call isRecalcEnabled — the S2S manual trigger is the one path exempt from the flag).

  • Expected Outcome: Unit tests cover all happy paths and error cases; go build ./... passes; staging curl verified.

  • Implementation Plan:

    Step 1 — Extend SegmentFilter

    In internal/app/repository/segment/segment_repository.go, add two optional fields to SegmentFilter:

    CompanySsoIDs []string // when non-empty, constrains to segments belonging to these companies
    SegmentIDs []string // when non-empty, constrains to only these segment IDs

    In segment_repository_mongo.go, in the FindAll BSON filter construction, append when non-empty:

    if len(f.CompanySsoIDs) > 0 {
    filter["company_sso_id"] = bson.M{"$in": f.CompanySsoIDs}
    }
    if len(f.SegmentIDs) > 0 {
    ids := make([]primitive.ObjectID, 0, len(f.SegmentIDs))
    for _, id := range f.SegmentIDs {
    oid, err := primitive.ObjectIDFromHex(id)
    if err == nil {
    ids = append(ids, oid)
    }
    }
    filter["_id"] = bson.M{"$in": ids}
    }

    Step 2 — Define domain types and sentinels

    In internal/app/service/segment/segment_service_interface.go:

    // RecalcTriggerFilter scopes a manual recalculation to specific companies/segments.
    // SegmentIDs is optional; when nil/empty all active segments for CompanySsoIDs are processed.
    type RecalcTriggerFilter struct {
    CompanySsoIDs []string
    SegmentIDs []string
    }

    var (
    ErrRecalcAlreadyRunning = errors.New("recalculation already in progress")
    )

    Add to ISegmentService:

    // TriggerRecalculation starts an async recalculation for the given filter and returns its recalcRunID.
    // Bypasses the cdp_segment_recalc_enabled flag unconditionally — the S2S manual trigger is
    // the one recalc path exempt from it (see T7.2); the cron and the scoped on-write recalc
    // (T8.1) both check it.
    // Returns ErrRecalcAlreadyRunning when the distributed lock is already held.
    TriggerRecalculation(ctx context.Context, filter RecalcTriggerFilter) (string, error)

    Run make mocks to regenerate MockISegmentService.

    Step 3 — Refactor RecalculateAllSegments and implement doRecalculation

    In internal/app/service/segment/segment_service.go:

    a. Extract the main body of RecalculateAllSegments (after lock acquisition) into a private method:

    // doRecalculation executes the full recalculation loop. filter == nil means all companies/segments.
    func (s *segmentService) doRecalculation(ctx context.Context, recalcRunID string, filter *RecalcTriggerFilter) error {
    // existing RecalculateAllSegments body (batch loop + worker pool + processSegment)
    // with the circuit breaker check (T7.1) at the top of each outer batch iteration:
    // if val, _ := s.cacheRepo.Get(ctx, consts.SegmentRecalcCircuitBreakerKey); val != "" { ... }
    // filter applied to FindActiveSegments call via SegmentFilter.CompanySsoIDs / .SegmentIDs
    }

    b. RecalculateAllSegments becomes a thin wrapper:

    func (s *segmentService) RecalculateAllSegments(ctx context.Context) error {
    if !s.isRecalcEnabled(ctx) { // T7.2
    slog.InfoContext(ctx, "segment recalc disabled by feature flag")
    return nil
    }
    recalcRunID := ulid.Make().String()
    // acquire lock (existing logic) ...
    defer s.cacheRepo.Del(ctx, segmentRecalcLockKey) //nolint:errcheck
    return s.doRecalculation(ctx, recalcRunID, nil)
    }

    c. Implement TriggerRecalculation — note it deliberately does NOT call isRecalcEnabled; the S2S manual trigger is exempt from the flag (T7.2):

    func (s *segmentService) TriggerRecalculation(ctx context.Context, filter RecalcTriggerFilter) (string, error) {
    recalcRunID := ulid.Make().String()
    acquired, err := s.cacheRepo.SetNX(ctx, segmentRecalcLockKey, recalcRunID, s.cfg.SegmentRecalcLockTTLSeconds)
    if err != nil || !acquired {
    return "", ErrRecalcAlreadyRunning
    }
    go func() {
    bgCtx := context.WithoutCancel(ctx)
    defer s.cacheRepo.Del(bgCtx, segmentRecalcLockKey) //nolint:errcheck
    if err := s.doRecalculation(bgCtx, recalcRunID, &filter); err != nil {
    slog.ErrorContext(bgCtx, "trigger recalculation failed", slog.Any("error", err))
    }
    }()
    return recalcRunID, nil
    }

    Step 4 — Payload DTOs

    In internal/app/payload/segment_payload.go:

    type TriggerRecalculationRequest struct {
    CompanySsoIDs []string `json:"company_sso_ids" validate:"required,min=1"`
    SegmentIDs []string `json:"segment_ids"`
    }

    type TriggerRecalculationResponse struct {
    RecalcRunID string `json:"recalc_run_id"`
    }

    Step 5 — Handler

    Add to internal/app/handler/segment_handler.go:

    func (h *SegmentHandler) TriggerRecalculation(_ http.ResponseWriter, r *http.Request) (myhttp.ResponseBody, error) {
    var req payload.TriggerRecalculationRequest
    if err := myhttp.BindJSON(r, &req); err != nil {
    return myhttp.ResponseBody{}, myhttp.ErrBadRequest("invalid request body")
    }
    if len(req.CompanySsoIDs) == 0 {
    return myhttp.ResponseBody{}, myhttp.ErrBadRequest("company_sso_ids is required")
    }
    filter := segmentsvc.RecalcTriggerFilter{
    CompanySsoIDs: req.CompanySsoIDs,
    SegmentIDs: req.SegmentIDs,
    }
    runID, err := h.segmentService.TriggerRecalculation(r.Context(), filter)
    if err != nil {
    switch {
    case errors.Is(err, segmentsvc.ErrRecalcAlreadyRunning):
    return myhttp.ResponseBody{}, myhttp.ErrConflict("recalculation already in progress")
    default:
    return myhttp.ResponseBody{}, myhttp.ErrInternal()
    }
    }
    resp := payload.TriggerRecalculationResponse{RecalcRunID: runID}
    return myhttp.NewJSONResponseWithStatus(http.StatusAccepted, resp, nil), nil
    }

    Step 6 — Route registration

    In internal/server/rest_router.go, inside the /api/v1/ BasicAuth subrouter block, add:

    r1.With(mymiddleware.BasicAuth).Post("/segments/recalculate", segmentHandler.TriggerRecalculation)

    (Add before the per-segment /segments/{segmentID}/customers route to avoid route conflicts.)

    Step 7 — Unit tests

    Repository (segment_repository_test.go):

    • FindAll with CompanySsoIDs = ["co1", "co2"] → BSON filter includes $in clause; segments from other companies not returned.
    • FindAll with SegmentIDs = ["id1"] → filter includes _id: {$in: [ObjectID("id1")]}.
    • Both filters combined → intersection applied correctly.

    Service (segment_service_test.go):

    • TriggerRecalculation: feature flag disabled → still acquires the lock and proceeds (S2S manual trigger is exempt; assert isRecalcEnabled/the feature flag mock is never invoked by this method).
    • Lock held → ErrRecalcAlreadyRunning, no goroutine started.
    • Valid filter with SegmentIDsdoRecalculation called with correct filter; recalcRunID returned matches ULID format.
    • Valid filter without SegmentIDsdoRecalculation called with empty SegmentIDs.
    • RecalculateAllSegments calls doRecalculation with nil filter (all segments).

    Handler (segment_handler_test.go):

    • Missing body → 400.
    • Empty company_sso_ids array → 400.
    • ErrRecalcAlreadyRunning → 409 with message.
    • Valid request → 202 with data[0].recalc_run_id non-empty.

Phase 8: Immediate Recalculation on Write & Readiness Signal

Phase 8 is additive-only — it extends Phase 1–7 source files without rewriting them, following the same TDD discipline. It makes a newly created or rule-edited segment usable within seconds (instead of waiting for the daily cron) and gives API callers an explicit signal of whether a segment's member list has been computed yet.


T8.1 — Scoped recalc on create / rule edit + readiness indicator in GET responses [Jira: —]

  • Purpose: Two coupled deliverables. (1) Fire a best-effort, asynchronous, single-segment recalculation immediately after CreateSegment and after a rule-changing UpdateSegment, so members are populated within seconds rather than at the next daily cron. (2) Expose a derived is_ready / processing_status pair on the segment detail and list responses so callers can distinguish "segment matches 0 customers" from "segment not computed yet". Both pieces ship together because the readiness signal is what makes the best-effort recalc safe to rely on.

  • Scope:

    • internal/pkg/consts/segment.go — per-segment lock key prefix constant.
    • params/ config — new env var SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS (default 300).
    • internal/app/service/segment/segment_service.gorecalculateSegment private method (scoped, per-segment lock, reuses processSegment); processSegmentLocked wrapper acquiring the per-segment lock; wire fire-and-forget calls into CreateSegment and rule-changing UpdateSegment.
    • internal/app/service/segment/recalculate.go (or wherever doRecalculation's worker pool lives, per T7.3) — cron worker calls processSegmentLocked so the daily run and an on-write recalc never write the same segment concurrently.
    • internal/app/payload/segment_payload.goIsReady + ProcessingStatus on SegmentResponse (list item) and SegmentDetailResponse; deriveReadiness helper; update mappers.
    • internal/app/handler/segment_handler.go — no new handler; GetByID/Update/List pick up the new fields via the mappers.
    • docs/swagger.yaml — add is_ready / processing_status to the affected schemas.
  • Depends on: T7.2 (isRecalcEnabled feature-flag helper — recalculateSegment gates on it, same as the cron), T7.3 (processSegment / doRecalculation extraction, distributed-lock usage), T1.4 (CreateSegment/UpdateSegment), T5.1 (SegmentDetailResponse + mapper), T6.3 (Version in payloads — same mappers touched).

  • Expected Outcome: Creating a segment, or editing its rule set, triggers a single-segment recalc that populates customer_segment_members within seconds without blocking the HTTP response; the recalc never fails the create/update request; the daily cron remains the backstop. GET detail and list responses include is_ready (bool) and processing_status (pending/ready/stale/failed), derived purely from existing fields (no migration). All listed unit tests pass; affected packages stay ≥ 80% coverage.

  • Implementation Plan:

    Step 1 — Per-segment lock key + config

    In internal/pkg/consts/segment.go, add:

    // SegmentRecalcScopedLockKeyPrefix + segmentID forms the per-segment recalc lock key,
    // e.g. "segment_recalculation_lock:683ab...". Distinct from the global cron lock so the
    // on-write recalc is never starved by an in-progress daily run.
    const SegmentRecalcScopedLockKeyPrefix = "segment_recalculation_lock:"

    In the params/ config struct, add:

    SegmentRecalcScopedLockTTLSeconds int `env:"SEGMENT_RECALC_SCOPED_LOCK_TTL_SECONDS" envDefault:"300"`

    Thread it into the segmentService config the same way the other SegmentRecalc* values are wired in T3.1 / T4.4.

    Step 2 — Single-writer wrapper + scoped recalc method

    In internal/app/service/segment/segment_service.go (or recalculate.go):

    a. Add a wrapper that guarantees a single writer per segment, used by BOTH the cron worker pool and the on-write path:

    // processSegmentLocked acquires the per-segment lock, runs processSegment, then releases it.
    // Returns without processing if the lock is already held (another run owns this segment).
    func (s *segmentService) processSegmentLocked(ctx context.Context, recalcRunID string, seg reposegment.CustomerSegment) {
    key := consts.SegmentRecalcScopedLockKeyPrefix + seg.ID.Hex()
    acquired, err := s.cacheRepo.SetNX(ctx, key, recalcRunID, s.cfg.SegmentRecalcScopedLockTTLSeconds)
    if err != nil || !acquired {
    slog.InfoContext(ctx, "segment recalc skipped: per-segment lock held",
    slog.String("segment_id", seg.ID.Hex()))
    return
    }
    defer s.cacheRepo.Del(ctx, key) //nolint:errcheck
    s.processSegment(ctx, recalcRunID, seg)
    }

    Update the doRecalculation worker pool (T7.3) to call processSegmentLocked instead of processSegment directly, so the daily cron and any on-write recalc serialize per segment (prevents two runs with different recalc_run_ids double-writing membership-transition events).

    b. Add the scoped, best-effort entry point invoked on write:

    // recalculateSegment runs a best-effort single-segment recalc. Intended to be called as a
    // detached goroutine after create / rule edit. Never returns an error to the caller — all
    // failures are logged and the daily cron is the backstop. Checks cdp_segment_recalc_enabled
    // (T7.2), same as the cron — the S2S manual trigger (T7.3) is the only path exempt from it.
    func (s *segmentService) recalculateSegment(ctx context.Context, companySsoID, segmentID string) {
    if s.memberRepo == nil || s.ruleEngine == nil || s.datamartRepo == nil ||
    s.cacheRepo == nil || s.txRunner == nil {
    return // recalc deps not wired (e.g. datamart disabled) — no-op
    }
    if !s.isRecalcEnabled(ctx) { // T7.2 — feature flag off
    return
    }
    id, err := primitive.ObjectIDFromHex(segmentID)
    if err != nil {
    return
    }
    seg, err := s.repo.FindByID(ctx, id, companySsoID)
    if err != nil {
    slog.WarnContext(ctx, "scoped recalc: segment lookup failed",
    slog.String("segment_id", segmentID), slog.Any("error", err))
    return
    }
    s.processSegmentLocked(ctx, ulid.Make().String(), seg)
    }

    Step 3 — Wire into CreateSegment and UpdateSegment (fire-and-forget)

    In CreateSegment, after a successful repo.Insert returns the new id:

    go s.recalculateSegment(context.WithoutCancel(ctx), companySsoID, id.Hex())
    return id, nil

    In UpdateSegment, trigger only when the rule set actually changed — reuse the exact condition that already sets RuleDirtyAt:

    if ruleSetChanged(existing.RuleSet, ruleSet) {
    updated.RuleDirtyAt = &now
    }
    // ... after repo.Update succeeds:
    if ruleSetChanged(existing.RuleSet, ruleSet) {
    go s.recalculateSegment(context.WithoutCancel(ctx), companySsoID, segmentID)
    }

    context.WithoutCancel detaches the goroutine from the request context so the recalc survives after the HTTP response returns; the response itself is never affected by recalc outcome. A name/description-only edit does not set RuleDirtyAt and does not trigger a recalc.

    Step 4 — Readiness derivation in payloads

    In internal/app/payload/segment_payload.go, add a pure helper:

    // deriveReadiness computes the membership-readiness signal from persisted fields only
    // (no new storage). See RFC "Readiness is derived, not stored".
    func deriveReadiness(seg reposegment.CustomerSegment) (isReady bool, status string) {
    switch {
    case seg.LastEvaluatedAt == nil && seg.RecalcErrorCount >= 3:
    return false, "failed"
    case seg.LastEvaluatedAt == nil:
    return false, "pending"
    case seg.RuleDirtyAt != nil:
    return true, "stale"
    default:
    return true, "ready"
    }
    }

    Add IsReady bool \json:"is_ready"`andProcessingStatus string `json:"processing_status"`to bothSegmentResponse(list item) andSegmentDetailResponse. In ToSegmentResponseandToSegmentDetailResponse, set both from deriveReadiness(seg). The handler's toSegmentDetailResponse(SegmentDetails)` (T5.1) and the list mapper pick the fields up automatically.

    Step 5 — OpenAPI

    In docs/swagger.yaml, add to the SegmentResponse and SegmentDetailResponse schema components:

    is_ready:
    type: boolean
    description: True once the segment has a computed member snapshot (last_evaluated_at != null).
    example: true
    processing_status:
    type: string
    enum: [pending, ready, stale, failed]
    description: Derived membership-readiness state. pending = first recalc not finished; stale = rule edited since last recalc; failed = 3+ consecutive recalc failures with no prior success.
    example: ready

    Step 6 — Tests

    Payload (segment_payload_test.go): table-driven deriveReadiness covering all four states at the boundaries — last_evaluated_at == nil & errors < 3 → pending/false; last_evaluated_at == nil & errors ≥ 3 → failed/false; evaluated & rule_dirty_at == nilready/true; evaluated & rule_dirty_at != nilstale/true. Assert the mappers emit is_ready / processing_status.

    Service (segment_service_test.go):

    • CreateSegment happy path → returns the new id AND cacheRepo.SetNX is called with key prefix segment_recalculation_lock: (verify the scoped recalc fired). Use a synchronization point (e.g. a channel-backed mock) since the recalc runs in a goroutine.
    • UpdateSegment with a changed rule set → scoped recalc fired; with name/description-only change → SetNX NOT called.
    • recalculateSegment: deps unwired → no-op; feature flag off → no-op (SetNX not called); per-segment lock already held (SetNX returns false) → processSegment not invoked; happy path → processSegment invoked and lock released (Del called).
    • processSegmentLocked: lock acquired → processSegment runs, Del called on return; lock held → processSegment skipped.
    • Confirm the cron path (doRecalculation) now routes through processSegmentLocked (existing T4.4/T7.x tests updated to expect the per-segment SetNX/Del calls).

    Handler (segment_handler_test.go): GetByID / List responses include is_ready and processing_status; assert a pending segment (mock returns LastEvaluatedAt == nil) serializes "is_ready": false, "processing_status": "pending".


T8.2 — Fix: gate recalculateNewSegment behind cdp_segment_recalc_enabled [Jira: —]

  • Purpose: The shipped contact-service code implements T8.1's scoped on-write recalc as recalculateNewSegment (internal/app/service/segment/recalculate.go), invoked from CreateSegment and rule-changing UpdateSegment (segment_service.go). It never checks cdp_segment_recalc_enabledisRecalcEnabled (T7.2) has exactly one caller today, RecalculateAllSegments. Per the confirmed design (RFC §2.11 / §3, task breakdown T7.2/T8.1), the scoped recalc is one of the two regular, automatic recalculation paths and must respect the flag exactly like the cron; only the S2S manual trigger TriggerRecalculation is exempt. Today, disabling the flag during a datamart maintenance window silently fails to pause recalculation for every segment created or rule-edited during that window — this closes that gap so the flag's operational guarantee actually holds.
  • Implementation Plan:
    1. In internal/app/service/segment/recalculate.go, in recalculateNewSegment, add the flag check immediately after the existing nil-dependency guard and before the per-segment lock acquisition (mirrors RecalculateAllSegments in the same file):
      func (s *segmentService) recalculateNewSegment(ctx context.Context, companySsoID, segmentID string) {
      if s.memberRepo == nil || s.ruleEngine == nil || s.datamartRepo == nil ||
      s.distributedLock == nil || s.txRunner == nil {
      return
      }

      if !s.isRecalcEnabled(ctx) {
      slog.InfoContext(ctx, "segment_service recalculate_new_segment: recalc disabled by feature flag",
      slog.String("segment_id", segmentID))
      return
      }

      lockKey := segmentRecalcPerSegmentLockPrefix + segmentID
      // ... unchanged from here
    2. No other call site, wiring, or config changes are needed: isRecalcEnabled and featureFlagService already exist on segmentService (T7.2), and cmd/initializer.go already injects the same featureFlagService instance used by RecalculateAllSegmentsrecalculateNewSegment picks it up for free by calling the existing helper.
    3. CreateSegment / UpdateSegment (segment_service.go) need no changes — they already fire-and-forget via go s.recalculateNewSegment(...), and this fix is entirely internal to that goroutine; the create/update response is unaffected either way.
    4. Unit tests (internal/app/service/segment/recalculate_test.go, alongside the existing TF-3379 block):
      • New TestRecalculateNewSegment_RecalcDisabledByFeatureFlag: build a mockFeatureFlagService (already defined in this test file) returning false for consts.FeatureFlagCDPSegmentRecalc, inject via WithFeatureFlagService, call recalculateNewSegment directly, then assert the distributed lock's TryAcquire and the segment repo's FindByID are never called (AssertNotCalled), and ffSvc.AssertExpectations(t). Model this on TestRecalculateAllSegments_RecalcDisabledByFeatureFlag (same file, TF-3337 block).
      • Update TestRecalculateNewSegment_LockAcquired_ProcessesSegment (and any other happy-path test in the TF-3379 block that doesn't already wire a WithFeatureFlagService) to either inject a flag service returning true or leave it nil — nil already defaults isRecalcEnabled to true, so most existing happy-path tests need no change; only confirm none of them break now that the new check runs first.
    5. Run go test ./internal/app/service/segment/... and confirm coverage stays ≥ 80% for the package (existing project convention per other T7.x/T8.x tasks).