Task Breakdown — Legacy Migration: CRM Contact Activity Logs → CDP
Source RFC:
rfc-legacy-migration-crm-activity-logs.mdMode: vertical (one task per execution chunk, end-to-end) · Scope: all 9 chunks (BE + FE) · Actionable-only. Paths verified against the live worktreescontact-service,qontak-customer-fe,qontak.comon 2026-06-30.
Effort Summary
| Task | Story IDs | FE days | BE days | QA days | Total |
|---|---|---|---|---|---|
| 1 — Schema + source const + Mongo indexes | S02, S04, S06-NEG | — | 1 | 0.5 | 1.5 |
2 — Contact finder FindOneByCrmDataID | S01, S03 | — | 1 | 0.5 | 1.5 |
| 3 — Schema transformer package | S03 | — | 3 | 1 | 4 |
4 — MigrateBatch service + /migrate endpoint | S01, S02 | — | 2 | 0.5 | 2.5 |
| 5 — Trigger + status + IAG proxy + rollback + durable job store + Phase-A gate | S01, S02, S05, S06-NEG, S09 | — | 3 | 1 | 4 |
| 6 — Consumer + worker registration + CRM extractor | S01, S02 | — | 3 | 1 | 4 |
| 7 — FE migration-status indicator + composable | S05 | 1 | — | 0.5 | 1.5 |
8 — SyncWatermarkResolver (pre-sync cutoff) | S07 | — | 1 | 0.5 | 1.5 |
9 — AssociationLogEnricher (in-place metadata patch) | S08 | — | 3 | 1 | 4 |
| Grand total | 1 | 17 | 6.5 | 24.5 |
Confidence: medium. The RFC is exceptionally well-grounded (every
path:lineverified against the live worktrees), so the build is unambiguous. The estimate risk sits in the data-correctness BE tasks: Task 3 (transformer) and Task 6 (consumer) depend on OQ-12 (join-model class names + the won/resolved/done id-sets) and OQ-2 (the CRM extraction query plan); Task 9 (enrichment) is the highest-risk new task — locating live-feed association logs that carry noexternal_iddepends on the OQ-13 match-key, whose collision-safety must be confirmed (the default skips ambiguous matches rather than guessing). Task 8 (watermark) is low-risk — it readscontact.created_atalready fetched by the resolver. All have safe fallbacks, so they don't block building, but they could move the numbers. All 9 chunks are buildable + unit-testable now (Task 9 against fixtures); only their execution runs / Stage-1 validation (Gate G) wait on the Stage-0 gates.
Task 1: [BE] Schema additions + source const + Mongo indexes (CALM-S02, CALM-S04, CALM-S06-NEG)
A migrated audit can be written with a CRM-traceable id and is provably de-duplicated by the database, while existing native logs are untouched.
Status: ✅ Actionable
What to build
Add three omitempty fields (external_id, source_tag, metadata) to the ActivityLog struct, one source constant, and a golang-migrate JSON migration creating the partial-unique idempotency index plus the supporting contact and job-collection indexes.
Implementation Plan
| Action | File | What changes |
|---|---|---|
| extend | internal/app/repository/activity_log/base.go | add ExternalID, SourceTag (string) + Metadata (map[string]interface{}) after Actor (line 28), all omitempty json+bson |
| extend | internal/pkg/consts/const.go | add ActivityLogSourceCrmMigration = "crm_migration" beside line 105-106 |
| create | db/migrations/035_crm_migration_indexes.up.json | createIndexes array: uniq_company_external_id (unique, partial on external_id $exists); idx_company_crm_data_id on contacts; idx_company_sso_id + partial-unique idx_company_status (status=in_progress) on crm_activity_log_migration_jobs |
| create | db/migrations/035_crm_migration_indexes.down.json | drop the four indexes |
Migration number 035 verified as next available (highest is
034_create_customer_segment_membership_events). Format = JSON array of{createIndexes, indexes:[…]}objects, per008_contact_accounts_index.up.json.
Implementation steps
- Explore — Open
internal/app/repository/activity_log/base.goand read the struct (fields end withAttributes;Actoris at line 28, tag stylejson:"actor,omitempty" bson:"actor,omitempty"). Opendb/migrations/016_activity_log_category_index.up.jsonto copy thecreateIndexesJSON shape (note the down-file'scategotytypo — don't replicate it). - Scaffold migration (red) — Write
035_crm_migration_indexes.{up,down}.json. Runmake migrate-up && make migrate-downagainst a local Mongo; confirm both directions are clean. - Add struct fields + const — Append the three fields; add the const.
- Verify —
make migrate-upthen in a Mongo shell confirmuniq_company_external_idisunique:truewithpartialFilterExpression, and theidx_company_statuspartial-unique enforces onein_progressjob per account. - Quality gate —
make build && make lint && make migrate-down.
Acceptance criteria
- Struct compiles with the three new
omitemptyfields; existing native docs (noexternal_id) are unaffected. -
uniq_company_external_idexists as a partial unique index on(company_sso_id, external_id). -
idx_company_crm_data_idoncontactsand the two job-collection indexes exist. -
make migrate-downreverses cleanly (drops all four).
Test strategy
No Go unit test (schema/migration only). Proof is make migrate-up/migrate-down idempotency plus a Mongo-shell index assertion that the unique + partial filters are present.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 1 |
| QA | 0.5 |
| Total | 1.5 |
Assumptions: reuses the existing golang-migrate JSON tooling and
omitemptytag convention; no data backfill in this task.
Run to verify
make migrate-up && make migrate-down && make build && make lint
Depends on
- None (foundational; everything else builds on these fields/indexes).
Task 2: [BE] Contact finder FindOneByCrmDataID (CALM-S01/AC-4, CALM-S03)
The migrator can resolve a CRM person id to the already-migrated CDP contact so each audit lands on the right customer timeline.
Status: ✅ Actionable
What to build
A repository method on the contact repo that finds one contact by (company_sso_id, crm_data.id), returning a not-found sentinel on miss. Backed by the idx_company_crm_data_id index from Task 1.
Implementation Plan
| Action | File | What changes |
|---|---|---|
| extend | internal/app/repository/contact/base.go | add FindOneByCrmDataID(ctx, companySsoID, crmDataID) (*Contact, error); reuse the crm_data.id bson path (appNameColumnMapper, line 533-541); CrmData.ID is at 342-344 |
| create | internal/app/repository/contact/base_test.go (or extend existing) | test: known crm_data.id resolves; miss → not-found |
| update | internal/app/repository/contact/mocks/ | regenerate via make mocks |
Implementation steps
- Explore — Open
internal/app/repository/contact/base.go; readFindByIDs(line 547) for the find-one query idiom and theCrmDatastruct (342-344). Note there is no existing crm_data.id finder (confirmed by grep). - Write failing test (red) — Add a table-driven test for hit + miss; run
make test, confirm it fails. - Implement — Add
FindOneByCrmDataID, scoping the filter bycompany_sso_idANDcrm_data.id, mirroring the existing find-one decode. - Mocks + green —
make mocks && make testuntil green. - Quality gate —
make build && make lint.
Acceptance criteria
- Query is scoped by both
company_sso_idandcrm_data.id(no cross-tenant read). - Returns the contact on hit; a clear not-found error on miss (consumed later as
contact_not_foundskip). - Mocks regenerated; unit test green.
Test strategy
Table-driven unit test against a mocked/embedded Mongo: assert the filter document equals {company_sso_id, "crm_data.id"} and that a miss returns the not-found sentinel.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 1 |
| QA | 0.5 |
| Total | 1.5 |
Assumptions: reuses existing contact-repo query patterns and the index from Task 1; no new collection.
Run to verify
make mocks && make test && make lint
Depends on
- Task 1 (the
idx_company_crm_data_idindex).
Task 3: [BE] Schema transformer package (CALM-S03)
Each CRM audit row is deterministically converted into a correct CDP
ActivityLog— right category/action, structured field deltas, and a resolved actor — without dropping legitimate history.
Status: ⚠️ Partially blocked — fully buildable now with safe fallbacks; CategoryMapper precision for resolved/completed/link actions awaits OQ-12 (join-model class names + won/resolved/done id-sets). Until OQ-12 lands, those update rows fall through to generic …/update (no record dropped). ActorResolver's USMAN deactivated-user mapping awaits OQ-9 (empty result vs Status flag) — default to case 2 for now.
Design reference: n/a — BE-only.
What to build
A new crm_migration service sub-package porting qontak.com's audit.rb mapping to Go: CategoryMapper, ChangesExtractor (two branches), ActorResolver (four cases via the CRM-users→USMAN two-hop), and ContactResolver, with golden-file tests sampled from real audits shapes.
Implementation Plan
| Action | File | What changes |
|---|---|---|
| create | internal/app/service/crm_migration/category_mapper.go | CDP-defined (category, action) from auditable_type + action + audited_changes keys (table in §2.F); link/unlink from join-model create/destroy; unmapped_action_type → skip |
| create | internal/app/service/crm_migration/changes_extractor.go | two branches: update→{field,from:old,to:new}; destroy→{field,from:value,to:null}; column→label map (OQ-6, migration-local) |
| create | internal/app/service/crm_migration/actor_resolver.go | 4 cases; hop 1 = CRM users SELECT email,full_name,deleted_at WHERE id=:user_id; hop 2 = QontakLaunchpadClient.GetUsersByEmails(ctx, companySsoId, [email]) → SsoID (qontak_launchpad.go:264); per-job user_id→{email,sso_uuid} cache |
| create | internal/app/service/crm_migration/contact_resolver.go | comment JSON crm_person_id first → audited_changes YAML → FindOneByCrmDataID; else contact_not_found |
| create | internal/app/service/crm_migration/transformer.go | orchestrates the four into a CDP ActivityLog + UTC-normalized timestamp |
| create | internal/app/service/crm_migration/*_test.go | golden-file tests (co-located) |
| update | internal/app/service/crm_migration/mocks/ | make mocks for the new interfaces (USMAN client, CRM users reader) |
Implementation steps
- Explore — Read the source-of-truth Ruby:
qontak.com/app/models/audit.rb— YAML parse (213-215), update/destroy branches (472-533),mapping_who4 outcomes (1824-1842),mapping_action"changed" (1052-1062). Readcontact-service/internal/app/api/qontak_launchpad.go:264(GetUsersByEmails,SsoID json:"sso_id"at 548) andactivity_log/base.go:28(Actor, noDescription). - Write golden-file tests (red) — Seed fixtures for: update→
[from,to]; destroy→to:null; active→SSO UUID; soft-deleted→[Deleted CRM User]; system→Qontak System; bare-fallback→[Deleted CRM User]; comment-JSONcrm_person_idfirst; UTC timestamp; unmapped→skip.make test— confirm red. - ChangesExtractor — port the two YAML branches; add the column→label map.
- CategoryMapper — implement the §2.F predicate table; for OQ-12-pending rows, default to
…/updateand emit a counter so the gap is visible. - ActorResolver — implement the two-hop with the per-job cache; on USMAN timeout/empty → case 2, never skip (Decision 6).
- ContactResolver — three-tier lookup ending in
FindOneByCrmDataID(Task 2). - Go green —
make mocks && make testuntil all golden cases pass. - Quality gate —
make build && make lint.
Acceptance criteria
-
update→{from,to};destroy→{from:value,to:null}. - Four actor cases resolve exactly as specified; a USMAN failure degrades to
[Deleted CRM User]and does not drop the record. -
commentJSONcrm_person_idis tried beforeaudited_changes. -
timestampis UTC-normalized. - Unmapped
auditable_type→ skip withunmapped_action_type; OQ-12-pending rows fall through to…/update(no drop).
Test strategy
Golden-file table tests: fixture audits rows (YAML + comment JSON) → asserted CDP ActivityLog. Key mocks: the USMAN GetUsersByEmails client and the CRM users reader; key assertions are the four actor outputs and the two change-branch shapes. Pin fixtures to audited gem v5.8.0 semantics.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 3 |
| QA | 1 |
| Total | 4 |
Assumptions: the deterministic Rails mapping is ported (not called live, Decision 5); OQ-12 id-sets arrive before Stage 2 (interim fallthrough is safe); USMAN client already exists (verified
qontak_launchpad.go:264).
Run to verify
make mocks && make test && make lint
Depends on
- Task 2 (
FindOneByCrmDataIDforContactResolver). - [External: OQ-12 join-class names + won/resolved/done id-sets — precision only, safe fallthrough until then] · [External: OQ-9 USMAN deactivated-user representation — default already specified]
Task 4: [BE] MigrateBatch service + /migrate endpoint (CALM-S01, CALM-S02, CALM-S03/AC-7)
The migrator can write a batch of up to 20 transformed records idempotently, silently skipping any it has already inserted.
Status: ✅ Actionable
What to build
The shared MigrateBatch() write path (used by both the consumer and the standalone endpoint) plus the S2S POST /private/activity_logs/migrate route — batch ≤20, skip-on-conflict via the (company_sso_id, external_id) partial-unique index, returning inserted/skipped/failed counts.
Implementation Plan
| Action | File | What changes |
|---|---|---|
| create | internal/app/service/crm_activity_log_migration_service.go | MigrateBatch(ctx, records) — enforce ≤20 (mirror maxBulkCreateLimit=20, activity_log.go:26); per-record upsert; on duplicate-key → skip + duplicate_external_id |
| create | internal/app/handler/crm_activity_log_migration_handler.go | Migrate handler |
| create | internal/app/payload/crm_activity_log_migration.go | request {records:[…] ≤20}; response {inserted,skipped,failed,results[]} |
| extend | internal/server/rest_router.go | mount POST /private/activity_logs/migrate under the BasicAuth /private/activity_logs group (69-76) |
| create | …_service_test.go, …_handler_test.go | idempotency + batch-cap tests |
Implementation steps
- Explore — Read
internal/app/service/activity_log.go:26,128-156(maxBulkCreateLimit,BulkCreateActivityLog) for the batch idiom, andrest_router.go:69-76for how the existing/private/activity_logsroutes mount underBasicAuth. Readpkg/http/response.go:12-49for theBaseResponseenvelope. - Write failing tests (red) — re-run inserts 0/skips N;
>20→BATCH_TOO_LARGE400; duplicateexternal_id→ skip + reason.make test. - Implement
MigrateBatch— per-record upsert relying on the partial-unique index to reject dupes; translate duplicate-key errors to skips. - Handler + payload + route — wire the S2S endpoint with the standard error envelope.
- Go green —
make mocks && make test. - Quality gate —
make build && make lint.
Acceptance criteria
- Batch >20 →
400 BATCH_TOO_LARGE. - Re-running an identical batch inserts 0, skips N,
statusper-record reflectsduplicate_external_id. - Conflict handling relies on the DB index (Task 1), not query-before-insert.
- Route is BasicAuth-only under
/private.
Test strategy
Mongo-backed service test (closest existing target is make diff-test; add a focused test) asserting insert-then-reinsert yields {inserted:0, skipped:N}; handler test asserts the 20-cap and the BaseResponse error shape.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 2 |
| QA | 0.5 |
| Total | 2.5 |
Assumptions: idempotency is DB-enforced by Task 1's index; reuses the existing
/privateBasicAuth middleware and error envelope.
Run to verify
make mocks && make test && make build && make lint
Depends on
- Task 1 (partial-unique index), Task 3 (the transformed-record shape
MigrateBatchwrites).
Task 5: [BE] Trigger + status + IAG proxy + rollback + durable job store + Phase-A coverage gate (CALM-S01, CALM-S02/AC-3, CALM-S05 BE-half, CALM-S06-NEG, CALM-S09)
An engineer can start, monitor, and reverse a per-account migration over an S2S API, and the web app can read tenant-scoped progress — with exactly one active job per account, enforced by the database. The trigger also enforces the Phase-A coverage gate: if too many contacts are unsynced it returns 422 and creates no job, leaving the upstream backfill to remediate.
Status: ✅ Actionable
What to build
The durable Mongo job-state store + repository, the trigger handler (flag-gate, prereq check, enqueue), the S2S status + rollback endpoints, and the session-authed IAG status proxy the FE reads.
Implementation Plan
| Action | File | What changes |
|---|---|---|
| create | internal/app/repository/crm_migration_job/base.go | durable crm_activity_log_migration_jobs repo: insert/get/update by (company_sso_id); resume cursor last_audit_id (replaces the Redis TTL store, Decision 8) |
| extend | internal/app/service/crm_activity_log_migration_service.go | Trigger (flag check → 403 FLAG_DISABLED; prereq → 422 contacts_not_migrated; active job → 409 JOB_ALREADY_RUNNING; completed → 409 ALREADY_COMPLETED); GetStatus; Rollback (DELETE WHERE source_tag='crm_migration' AND company_sso_id) |
| extend | internal/app/handler/crm_activity_log_migration_handler.go | trigger/status/rollback handlers + the IAG status handler (tenant from consts.CompanySSOKey, not a query param) |
| extend | internal/app/payload/crm_activity_log_migration.go | status payload (mirror ActivityLogUserIDMigrationStatus, payload/activity_log_migration.go:18-25) + IAG subset (omit accuracy_pct) |
| extend | internal/server/rest_router.go | 3 routes under /private/activity_logs (69-76) + 1 GET /iag/v1/crm_migration/status under the IAG group (line 116/215) wrapped in RequirePermissionMiddleware(CustomersCustomersViewKey) |
| create | …_test.go + crm_migration_job/mocks/ | guard tests + make mocks |
Implementation steps
- Explore — Read
service/activity_log_migration_service.go:22,25,30-31(the Redis-TTL store you're replacing with Mongo) andhandler/activity_log_migration_handler.go:91(GetMigrationStatusshape). Readrest_router.go:69-76(/privategroup),:116/:215(IAG group,RequirePermissionMiddleware), and howconsts.CompanySSOKeyis read on existing IAG handlers. - Job repo (red) — write the durable repo + tests;
make test. - Trigger guards — implement flag/prereq/concurrency checks; the one-active-job invariant relies on the
idx_company_statuspartial-unique (Task 1) → 409 on insert conflict. - Status + rollback — S2S handlers; rollback delete scoped by
source_tag+company_sso_id. - IAG proxy — session-authed read deriving tenant server-side; reject cross-tenant → 403; omit
accuracy_pct. - Go green + gate —
make mocks && make test && make build && make lint.
Acceptance criteria
- Flag OFF →
403 FLAG_DISABLED; Phase-A gate (CALM-S09): contact-coverage below threshold →422 contacts_not_migratedwith no job row (remediated by the upstreamBackfillCustomerIdWorker— this task never provisions contacts or writes CRM); second concurrent trigger →409 JOB_ALREADY_RUNNING; re-trigger completed →409 ALREADY_COMPLETED. - Status returns
{status, progress_pct, records_*}; no job →200 {status:not_started}. - IAG proxy derives tenant from session; a cross-tenant request →
403;accuracy_pctnot exposed. - Rollback deletes only
source_tag='crm_migration'rows for the account and is idempotent (re-delete = 0). - Job state is durable in Mongo (survives a worker restart; resume cursor persisted).
Test strategy
Service tests assert each guard's status code/resp_code; a repo test asserts the partial-unique blocks a second in_progress job; an IAG handler test asserts tenant is taken from consts.CompanySSOKey and cross-tenant is rejected.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 3 |
| QA | 1 |
| Total | 4 |
Assumptions: reuses the TF-2519 status-handler pattern (not its Redis store), the
/privateBasicAuth + IAGRequirePermissionMiddleware; the one-active-job guard leans on Task 1's partial-unique index.
Run to verify
make mocks && make test && make build && make lint
Depends on
- Task 1 (job-collection indexes), Task 4 (
MigrateBatch/source_tagfor rollback).
Task 6: [BE] Consumer + worker registration + CRM extractor (CALM-S01, CALM-S02 resume)
Once triggered, the job runs end-to-end on the worker — cursor-reading CRM
audits, transforming each row, writing in idempotent batches, advancing progress, and resuming from where it stopped after any restart.
Status: ⚠️ Partially blocked — fully buildable + unit-testable now; the live extraction run waits on OQ-1 (CRM read-only DSN) and OQ-2 (EXPLAIN ANALYZE of the cursor query; a CRM-side index may be required). Build against a stubbed CRMExtractor until the DSN lands.
Design reference: n/a — BE-only.
What to build
The CrmActivityLogMigrationConsumer (gocraft/work) that orchestrates extract → transform → MigrateBatch → job-state update, the worker registration + enqueue wiring, and the CRMExtractor cursor query (WHERE id > :cursor AND auditable_type IN(scope) AND created_at >= :cutoff ORDER BY id ASC LIMIT 500).
Implementation Plan
| Action | File | What changes |
|---|---|---|
| create | internal/app/consumer/crm_activity_log_migration_consumer.go | mirror ProcessUpdateUserIDJob shape (activity_log_migration_consumer.go:25); loop batches, advance last_audit_id, compute cumulative failure_rate (records_failed/records_processed), halt at >0.10 |
| create | internal/app/service/crm_migration/extractor.go | CRMExtractor interface + read-only Postgres cursor query; :cutoff = COALESCE(date_range_start, MIN(created_at)) |
| extend | internal/worker/worker_service.go | registerJobWithOptions(JobName, opts, consumer.Process, pool) (mirror line 141/147) |
| extend | internal/app/service/job_enqueuer.go | new job-name const + EnqueueJob call (line 65) |
| extend | config / Helm | CRM_AUDIT_DATABASE_URL read-only DSN; reuse existing USMAN client config |
| create | …_consumer_test.go, extractor_test.go + mocks | end-to-end happy-path + resume tests |
Implementation steps
- Explore — Read
consumer/activity_log_migration_consumer.go:25(consumer shape),worker/worker_service.go:141,147(registerJobWithOptions),service/job_enqueuer.go:65(EnqueueJob). Readqontak.com/db/schema.rb:47-78for the exactauditscolumns and confirm there's no(auditable_type, created_at)index (cursor is on theidPK). - Write failing tests (red) — stub
CRMExtractorreturning two fixture pages; assert: enqueue→consume→transform→MigrateBatch→job-doc updated; restart resumes fromlast_audit_id;failure_rate>0.10→halted.make test. - Extractor — parameterized read-only cursor query (no string interpolation — SQLi guard); page size 500.
- Consumer loop — orchestrate the transformer (Task 3) +
MigrateBatch(Task 4); update job doc per batch; cumulative failure-rate halt. - Register + enqueue — wire the worker + job-name const + DSN config.
- Go green + gate —
make mocks && make test && make build && make lint.
Acceptance criteria
- Job registered with gocraft/work; trigger enqueue → consumer runs.
- Cursor advances on
audits.id; resumes fromlast_audit_idafter a restart. - Per-batch progress (
progress_pct, counts) written to the job doc. - Cumulative
failure_rate > 0.10→ jobhaltedwith cursor preserved. - Extraction query is parameterized against a read-only DSN.
Test strategy
Consumer test with a stubbed CRMExtractor (two pages) + mocked transformer/MigrateBatch: assert orchestration order, job-doc updates, the resume cursor, and the halt threshold. The live query plan (OQ-2) is validated operationally at Stage 0, not in unit tests.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 3 |
| QA | 1 |
| Total | 4 |
Assumptions: reuses the gocraft/work consumer + register/enqueue pattern; the service already speaks Postgres (chat/webhook DBs); CRM DSN provisioning (OQ-1) and query plan (OQ-2) are Stage-0 run-time gates, not build blockers.
Run to verify
make mocks && make test && make build && make lint
Depends on
- Task 3 (transformer), Task 4 (
MigrateBatch), Task 5 (job repo + trigger enqueue). - [External: OQ-1 CRM read-only DSN; OQ-2 extraction query plan — gate the run, not the build]
Task 7: [FE] Migration-status indicator + composable (CALM-S05)
While a contact's CRM history is still importing, the agent sees a non-blocking "importing…" banner with progress; it disappears once done and never breaks the page if the status API is unavailable.
Status: ⚠️ Partially blocked — buildable now against a mocked status response and the pixel3 MpBanner; the real endpoint is the IAG proxy from Task 5, and OQ-10 (Figma frame) is pending (build against the pixel3 interim, polish later).
Design reference: n/a — design pending (OQ-10) · DS version: @mekari/pixel3@1.0.10-dev.0 (verified package.json:24) · Frame: TBD · Design QA: TBD
What to build
A small status-indicator component (composed from MpBanner) mounted on DetailPage.vue, driven by a useCrmMigrationStatus composable that polls $customFetch('/v1/crm_migration/status') every 15 s and stops on completed/not_started. Four states: in_progress → banner with progress_pct; not_started/completed → hidden; API error → hidden (fail-silent).
Implementation Plan
| Action | File | What changes |
|---|---|---|
| create | features/customers/detail/components/CrmMigrationStatus/CrmMigrationStatus.vue | MpBanner (variant info) with progress_pct; props {companySsoId?, pollIntervalMs=15000} (tenant is server-derived, so companySsoId is optional/unused for the call) |
| create | features/customers/detail/components/CrmMigrationStatus/composables/useCrmMigrationStatus.ts | $customFetch('/v1/crm_migration/status') poll; reads snake_case keys as-is; stop on completed/not_started; error → hidden |
| extend | features/customers/detail/views/DetailPage.vue | mount <CrmMigrationStatus> in the activity section (lines 1-49) |
| create | features/customers/detail/components/CrmMigrationStatus/CrmMigrationStatus.spec.ts | vitest: 4 states |
Composable placement co-located with the component (matching the
LoyaltySection/composables/convention);common/composables/is the alternative if it's later shared.useCustomFetchis atcommon/composables/useCustomFetch.ts.
Implementation steps
- Explore — Open
DetailPage.vue(rendersCustomerActivityV2/CustomerActivityconditionally) for the mount point; read an existingMpBannerusage (e.g.DrawerCreateSegment.vue) for theMpBanner/MpBannerTitle/MpBannerDescription/MpBannerIconAPI; readcommon/composables/useCustomFetch.ts:170for the$customFetchfactory andCustomerActivityV2/CustomerActivityV2.spec.tsfor the test idiom. - Write failing tests (red) —
CrmMigrationStatus.spec.ts: shown onin_progress(withprogress_pct), hidden oncompleted/not_started, hidden on API error.pnpm test— confirm red. Mock$customFetch. - Composable — implement the 15 s poll with start/stop; stub the call with a mock response for now — the real IAG endpoint ships in Task 5 (no
company_sso_idsent; tenant is server-side). - Component — compose
MpBannerwith the four-state logic;role="status"(polite) for a11y. - Mount — add to
DetailPage.vue. - Go green + gate —
pnpm test && pnpm lint && pnpm build.
Acceptance criteria
- Banner shows on
in_progresswithprogress_pct; hidden oncompleted/not_started. - API error → indicator hidden, contact detail still renders (fail-silent, CALM-S05/ERR-1).
- Poll runs every 15 s while mounted and stops on
completed/not_started. - Composable sends no
company_sso_id(tenant derived server-side). - No change to
ActivityLog.vue/CustomerActivity.vue/ theActivityLogTS interface (no badge).
Test strategy
Vitest + @testing-library/vue with a mocked $customFetch: assert banner visibility across the four states and that a rejected fetch leaves the component hidden without throwing.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | 1 |
| Backend | — |
| QA | 0.5 |
| Total | 1.5 |
Assumptions: reuses pixel3
MpBanner(no new bundle cost) and the existing$customFetch; ships against the pixel3 interim while OQ-10's frame is pending; backend IAG proxy (Task 5) replaces the mock at integration.
Run to verify
pnpm test && pnpm lint && pnpm build
Depends on
- Task 5 (the
GET /v1/crm_migration/statusIAG proxy — mock until then). - [External: OQ-10 Figma frame — pixel polish, not a build blocker]
Task 8: [BE] SyncWatermarkResolver — pre-sync cutoff (CALM-S07)
Only the activity that predates a contact's sync is migrated; everything after is left to the live CRM→CDP feed, so no record is imported twice.
Status: ✅ Actionable
What to build
A small transform-stage resolver that, once ContactResolver has the CDP contact, uses its created_at as the per-contact sync watermark and skips any audit with created_at >= watermark (reason post_sync_live_feed). It is the seam between this one-time backfill and the existing live Kafka feed (RFC Decision 16, grounding note #11/#12).
Implementation Plan
| Action | File | What changes |
|---|---|---|
| create | internal/app/service/crm_migration/sync_watermark_resolver.go | given the resolved contact, return skip when audit.created_at >= contact.created_at; reuse the per-job contact cache (no extra read) |
| extend | internal/app/service/crm_migration/transformer.go | invoke the watermark check after ContactResolver, before transform; emit post_sync_live_feed skip metric |
| extend | internal/pkg/consts/const.go | add skip reason post_sync_live_feed |
| create | …/sync_watermark_resolver_test.go | below/at/above-watermark cases |
Implementation steps
- Explore — confirm
contact.created_atis populated on theContactreturned byFindOneByCrmDataID(Task 2); read whereContactResolverslots intotransformer.go(Task 3). - Write failing tests (red) —
created_at1s below watermark → migrate; equal/above → skippost_sync_live_feed; missing contact (no watermark) → defers tocontact_not_found(Task 3), not this resolver.make test. - Implement — pure comparison against the cached contact; no new I/O.
- Wire into transformer — order: scope → contact → watermark → changes → dup.
- Quality gate —
make build && make lint.
Acceptance criteria
-
audit.created_at < contact.created_at→ record proceeds;>=→ skip withpost_sync_live_feed. - Uses the already-fetched contact (no extra DB round-trip); value cached per job.
- Skip is counted in metrics and visible in the job's skipped tally.
- Documented edge: a pre-existing-cross-channel contact (OQ-15) may under-cut — safe for migration-created contacts.
Test strategy
Table-driven unit test over (audit.created_at, contact.created_at) pairs asserting migrate-vs-skip and the skip reason. No live dependency.
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 1 |
| QA | 0.5 |
| Total | 1.5 |
Assumptions:
contact.created_atis a faithful sync proxy (RFC Decision 16; the audit-based watermark was found unbuildable —qontak_customer_idis not audited); the live feed owns the post-watermark range.
Run to verify
make test && make build && make lint
Depends on
- Task 2 (
FindOneByCrmDataIDreturns the contact incl.created_at), Task 3 (transformer pipeline this slots into). - [External: OQ-14 seam gap/overlap proof — Stage-1 validation, not a build blocker; OQ-15 cross-channel edge]
Task 9: [BE] AssociationLogEnricher — in-place metadata patch (CALM-S08)
Association logs (deal/company/ticket/task link/unlink/resolved) always carry their referent metadata — and gaps on logs already in CDP (including ones the live feed wrote) get backfilled, without ever creating duplicates or mutating immutable fields.
Status: ⚠️ Partially blocked — fully buildable + unit-testable now against fixtures; the live-feed match-key (OQ-13) must be confirmed collision-safe before Stage 1, since live-feed association logs carry no external_id. Default: skip ambiguous matches (enrich_ambiguous) rather than patch the wrong log.
Design reference: n/a — BE-only.
What to build
An enricher over association logs only that fills missing referent metadata (crm_person_id, linked entity id + name) from CRM lookups and updates the existing activity_logs document in place — additive only (never deletes, never changes category/action/timestamp). It targets both newly-migrated rows and live-feed rows without external_id, located by the match-key (company_sso_id, customer_id, category/action, linked entity id, timestamp≈). This is the one sanctioned in-place update path under the RFC §3.D append-only rule (Decision 17 amends Decision 13).
Implementation Plan
| Action | File | What changes |
|---|---|---|
| create | internal/app/service/crm_migration/association_enricher.go | for assoc logs: compute missing metadata via CRM lookup; resolve target doc (own external_id, else live-feed match-key); additive $set of missing fields only; enrich_ambiguous skip when >1 candidate |
| extend | internal/app/repository/activity_log/base.go | add a scoped additive-update method (e.g. EnrichAssociationMetadata(ctx, filter, fields)) — $set only the missing metadata keys; the only mutation path on source_tag='crm_migration'/assoc logs |
| extend | internal/pkg/consts/const.go | add skip/outcome reason enrich_ambiguous |
| extend | internal/app/service/crm_migration/transformer.go (or consumer post-write step) | invoke the enricher for association categories after MigrateBatch |
| create | …/association_enricher_test.go + mocks | migrated + live-feed + ambiguous + deleted-referent + idempotency cases |
Implementation steps
- Explore — read how live-feed logs are written (
kafka_consumer/{deal,company,ticket,task}_consumer.go→IActivityLogService.CreateActivityLogFor*) to learn which metadata fields they populate vs leave empty, and the exact shape ofcategory/action/entity id on those docs (this defines the match-key). Readactivity_log/base.goto confirm there is no existing mutation method (append-only-by-absence) — you are adding the single sanctioned one. - Write failing tests (red) — migrated assoc log missing
crm_person_id→ filled; live-feed assoc log (noexternal_id) matched by key → patched in place (assert no new doc); two candidates →enrich_ambiguousskip (assert nothing patched); deleted-in-CRM referent → left[Deleted reference]; re-run → patches 0 (idempotent).make test. - Match resolution — own
external_idfirst; else the live-feed match-key with the OQ-13 collision guard (skip if ambiguous). - Additive update —
$setonly missing keys; never touchcategory/action/timestamp. - Wire in + green — invoke after
MigrateBatch;make mocks && make test. - Quality gate —
make build && make lint.
Acceptance criteria
- Association logs gain missing
crm_person_id+ linked entity id/name; non-association logs untouched. - Live-feed logs (no
external_id) are patched in place via the match-key — no duplicate doc created. - Ambiguous match (>1 candidate) →
enrich_ambiguous, no patch (never guesses). - Update is additive only —
category/action/timestampnever change; no deletes (append-only exception, Decision 17). - Idempotent: a second run patches 0; deleted referent stays
[Deleted reference].
Test strategy
Table-driven tests with fixture activity-log docs (migrated + live-feed shapes) and a mocked CRM lookup: assert in-place $set of only-missing fields, the no-new-doc invariant, the ambiguity skip, and idempotency. The real match-key collision-safety (OQ-13) is validated on a sample account at Stage 1 (RFC Gate G).
Effort estimate
| Discipline | Days |
|---|---|
| Frontend | — |
| Backend | 3 |
| QA | 1 |
| Total | 4 |
Assumptions: live-feed log shape is stable and exposes an entity id usable in the match-key (confirmed in step 1 / OQ-13); the additive-update method is the only mutation allowed on migrated/assoc logs; CRM lookups reuse the extractor's read-only DSN.
Run to verify
make mocks && make test && make build && make lint
Depends on
- Task 3 (category/action classification identifies association logs), Task 4 (
MigrateBatchwrites the rows this runs after). - [External: OQ-13 live-feed match-key collision-safety — gates Stage-1 sign-off (Gate G), not the build]
Ordering rationale
- Critical path is BE chunks 1 → 3 → 4 → 5 → 6. Task 1 (schema/indexes) is foundational — idempotency, contact lookup, and the one-active-job guard all depend on its indexes, so it ships first. Task 2 (finder) and Task 3 (transformer) are the data-correctness core; Task 3 is the single largest task and carries the most estimation risk (OQ-12).
- Task 4 before Task 5/6 because both the consumer and the rollback path reuse
MigrateBatchandsource_tag. Task 5 brings up the job store + control-plane endpoints the consumer (Task 6) enqueues into. - Task 6 is last on the BE critical path — it composes Tasks 3–5 into the running job. Its code is unblocked now; its execution run is what waits on the cross-team Stage-0 gates (OQ-1 DSN, OQ-2 query plan).
- Task 7 (FE) runs in parallel with the BE work — it's purely additive and ships against a mocked status response, then points at Task 5's IAG proxy at integration. Backend-first deploy order means the migration can backfill before the FE indicator lands (RFC §4.A).
- Task 8 (watermark) and Task 9 (enrichment) attach to the data path. Task 8 is a thin filter in the transform pipeline (after Task 3's
ContactResolver); it's low-risk and can land with Task 3. Task 9 runs afterMigrateBatch(Task 4) and is the highest-risk new task — it adds the only sanctioned in-place update and depends on the OQ-13 live-feed match-key, so schedule it after the core write path is proven and confirm OQ-13 before Stage 1. - Phase A (CALM-S09) is folded into Task 5's trigger guard (the 422 coverage gate already lived there) — no separate task; the remediation (syncing unsynced contacts) is the upstream
BackfillCustomerIdWorker, outside this breakdown. - Push externally on, in priority order: OQ-1/OQ-2 (CRM read-only DSN +
EXPLAIN ANALYZE— gate every run), OQ-7 (per-accountcrm_data.id≥99% coverage — Phase-A gate, remediated by the upstream backfill), OQ-13 (live-feed match-key — gates Task 9 / Stage-1 Gate G), then OQ-12 (join classes + id-sets — safe fallthrough), OQ-14/OQ-15 (seam validation / cross-channel edge), OQ-9/OQ-10 (edge confirmation / design polish).
Skipped stories
No stories were blocked or excluded — all nine PRD/RFC stories (CALM-S01…CALM-S09) and all nine execution chunks are buildable now. The open questions in this RFC are operational Stage-0 / Stage-1 gates (confirmations on other teams' production systems, or sample-account validation), not build blockers, so they're noted inline on Tasks 3, 6, 7, 8, and 9 rather than skipped:
| Open question | Affects | Why not blocking the build |
|---|---|---|
| OQ-1 / OQ-2 | Task 6 run | DSN + query plan gate running extraction; build against a stubbed CRMExtractor |
| OQ-7 | per-account run | crm_data.id ≥99% coverage gate (Phase A), remediated by the upstream BackfillCustomerIdWorker |
| OQ-9 | Task 3 actor edge | Deactivated-user default already specified (case 2) |
| OQ-10 | Task 7 polish | Pixel3 MpBanner interim; frame is pixel polish |
| OQ-12 | Task 3 precision | Unmapped status rows fall through to …/update (no record dropped) |
| OQ-13 | Task 9 (enrichment) | Live-feed match-key built against fixtures; ambiguous matches skipped (enrich_ambiguous), confirmed collision-safe at Stage-1 Gate G |
| OQ-14 | Task 8 seam | No-dup/no-gap proof is a Stage-1 sample-account check, not a unit-test gate |
| OQ-15 | Task 8 edge | Cross-channel created_at under-cut; safe for migration-created contacts |
PRD catch-up. CALM-S07/S08/S09 are net-new vs PRD v2.3 (RFC §7) — the PRD should be backfilled with these three stories + ACs so the PRD ⇄ RFC ⇄ test-spec chain stays consistent. The build is unblocked meanwhile.