RFC: Legacy Migration — CRM Contact Activity Logs → CDP (audits → activity_logs)
Document Conventions (do not remove)
This RFC follows the Qontak RFC Template format for governance — the metadata table, Confluence sections 1–6, and Comment logs are mandatory.
It is also agent-execution-ready: §1 PRD-to-Schema Derivation (BE half) + §1 Design References (FE half), §2 Repo Reading Guide (Detail 2.0) for both layers, mermaid diagrams, §2.G Cross-Layer Contract Verification, and §4 Agent Execution Plan + Verification & Rollback Recipe are complete.
The YAML frontmatter at the top is the machine-readable index. The Metadata table below is the human-readable governance record. Both agree on every shared field.
Grounding note (anti-hallucination). Every
path:linereference in this RFC was verified against the live worktreescontact-service,qontak.com, andqontak-customer-feon 2026-06-18 (see Detail 2.0 Source Verification). Where the PRD's assumed code differed from the repo, the repo wins and the deviation is flagged. Material corrections to the PRD made during grounding:
- The existing
ActivityLogMigration*framework incontact-serviceis the TF-2519 user-ID replacement migration (Redis-tracked, 7-day TTL) — not a CDP/CRM data import. We reuse its pattern (gocraft/work consumer + status handler), not its job.ActivityLog.Actoris atactivity_log/base.go:28(PRD said :29); the struct has noDescriptionfield (Description exists only on the request payload).Contact.CrmData.IDis atcontact/base.go:343(PRD saidContact.SourceIDat :68, which is a different field) and there is no existing query-by-crm_data.idrepository finder — one must be added.- The CRM
auditstable has 7 more columns than the PRD listed (user_type,processed_class,processed_id,queued_at,processed_at,processed_by,reindexed_at).- CRM
audits.actiononly ever storescreate/update/destroy(theauditedgem).associate/disassociateare derived display strings (define_how), not stored action values — the category mapper must derive link/unlink fromauditable_type+action, not read anassociateaction.- CRM
mapping_who(audit.rb:1824-1842) has four outcomes, not three (PRD missed the bare"<deleted user>"fallback).- The PRD-named CRM fields (
type/how/who/what/why/...) are notattr_accessors; the accessors aredefine_*-prefixed (audit.rb:19-22). The spirit of the PRD (computed at runtime byparse()) is correct.- CRM does not special-case "resolved" / "completed" / "won/lost" — every
updaterenders as a generic "changed" verb (mapping_action,audit.rb:1052-1062). The PRD's…→resolved/…→completedcategory rows are therefore CDP-defined predicates, not values liftable from CRM. The mapper derives them from the presence ofcrm_stage_id/crm_task_status_id/crm_ticket_status_idinaudited_changes(see §2.F CategoryMapper). Link/unlink is derivable (create→associated /destroy→unassociated on the Deal–Person and Deal–Company join audits,audit.rb:472-479, 526-533).- The web FE is served by the IAG group
/iag/v1/*withIAGMiddleware(the gateway strips/iag, so the browser calls/v1/*), and the tenantcompany_sso_idis injected byRequirePermissionMiddleware(require_permission_middleware.go:97, setsconsts.CompanySSOKey) — not the/openapi/v1MAG group (that is the third-party surface). The PRD/earlier draft's "MAG" attribution for the FE read is corrected to IAG.- USMAN email→SSO UUID resolution already exists in
contact-service:QontakLaunchpadClient.GetUsersByEmails(ctx, companySsoId, emails)(internal/app/api/qontak_launchpad.go:264, interface:26) → callsGET {root}/private/users?company_sso_id=&emails=→LaunchpadUsersResponsewhoseUserResponse[].SsoID(jsonsso_id) is the SSO UUID. OQ-9 is thus largely resolved (only the deactivated-userStatushandling needs confirming).Corrections added in the 2026-06-30 contact-state revision (cases: unsynced contacts, pre-sync logs, association reconciliation): 11. A live CRM→CDP activity feed already exists — the Kafka consumers
DealConsumer/CompanyConsumer/TicketConsumer/TaskConsumer(internal/app/kafka_consumer/{deal,company,ticket,task}_consumer.go) callIActivityLogService.CreateActivityLogFor{Deal,Company,Ticket,Task}on CRM events (payload.CrmEvent, which carriesqontak_customer_id). A CRM event can only resolve to a CDP contact onceqontak_customer_idis set (i.e. after the contact is synced), so the live feed owns post-sync activity. This migration must therefore backfill only the pre-sync tail and not re-import what the live feed already wrote. Critically, live-feed logs carry noexternal_id, so the(company_sso_id, external_id)partial-unique index (Decision 2) cannot dedup migrated rows against live-feed rows — only a per-contact timestamp cutoff prevents double-import (Decision 16). 12.qontak_customer_idis NOT reliably audited onCrm::Person.Crm::Personisaudited(person.rb:157, noexcept:), but the path that setsqontak_customer_idon an existing contact iscontact.update_column(:qontak_customer_id, …)(app/services/crm/centralized_contacts/send.rb:125, literal comment "update without validations and callbacks"), reached by the upstream backfill (Crm::BackfillCustomerIdSingleService#create_cdp_contact→CentralizedContacts::Send).update_columnbypasses theauditedgem callbacks and does not bumpupdated_at— so no audit row records the sync moment, and there is no CRM-side timestamp to derive it from. The per-contact sync watermark is therefore CDPcontact.created_at, the only signal that actually exists per-contact (Decision 16). 13. Contact sync/creation (Case 1) is owned upstream, not by this RFC. The existing CRMCrm::BackfillCustomerIdWorker(team_id, person_ids, create_if_not_found)(app/workers/crm/backfill_customer_id_worker.rb) →BackfillCustomerIdBatchService→BackfillCustomerIdSingleService#create_cdp_contactprovisions the CDP contact (via Contact360CentralizedContacts::Send) and writesqontak_customer_idback into CRM. This RFC depends on that worker (gate-only — Decision 18) and never writes contacts orqontak_customer_id, so Non-goal #3 (read-only on CRM) is preserved.
Metadata
| Field | Value | Notes |
|---|---|---|
| Status | RFC (IDEA) | Human label; YAML status: carries the remapped linter enum draft |
| DRI | Zhelia Alifa | RFC owner (frontmatter dri) |
| Team | cdp | Advisory squad slug carried from PRD / initiative README |
| Author(s) | Zhelia Alifa | Primary author |
| Reviewers | CDP Backend Lead, CDP Frontend Lead, CRM (qontak.com) Squad Lead, Platform / USMAN Team | Tech reviewers across affected squads (BE + Data + FE) |
| Approver(s) | CDP Tech Lead, InfoSec Approver | Tech leaders + infosec approver — assign real names before in-review (OQ-8) |
| Submitted Date | 2026-06-18 | ISO-8601 |
| Last Updated | 2026-06-30 | ISO-8601 |
| Target Release | 2026-Q3 | Quarter |
| Target Quarter | 2026-Q3 | Advisory, carried from PRD |
| Related | ../prds/prd-legacy-migration-crm-activity-logs.md | Source PRD v2.3 |
| Discussion | #cdp-migration-ops (Slack) | Alerts + discussion channel |
Type: full-stack Frontend sub-type: enhancement (one targeted component change — the migration-status indicator; no new screens, no badge) Backend sub-type: new-feature
Sections at a Glance
- Overview (PRD-to-Schema Derivation — BE half; Design References — FE half; traceability; per-story change map)
- Technical Design (Infrastructure Topology → Technical Decisions [ADR] → Repo Reading Guide [both layers] → end-to-end mermaid → DDL → APIs → cross-layer contract verification)
- High-Availability & Security
- Backwards Compatibility and Rollout Plan (cross-layer rollout matrix, Agent Execution Plan, Verification & Rollback Recipe)
- Concern, Questions, or Known Limitations
- Comment logs
- Ready for agent execution
1. Overview
CDP (the contact-service backend + qontak-customer-fe web app) has no
mechanism to import historical contact activity from the legacy CRM. When a
client migrates from CRM (qontak.com) to CDP, all contact activity — field
changes and association events — stays behind in the CRM audits Postgres table
(verified: db/schema.rb:47-78), leaving migrated agents a blank pre-migration
timeline. This blocks CDP General Release for the largest customer segment
(existing CRM clients) and forces indefinite dual-system access for compliance.
This RFC specifies a one-time, per-account batch migration that extracts CRM
audits rows, transforms each into the CDP ActivityLog shape (re-implementing
the deterministic mapping from qontak.com's app/models/audit.rb in Go), and
writes them idempotently into CDP MongoDB (activity_logs). It is driven by a
new gocraft/work consumer in the contact-service worker deployment — the
same orchestration pattern already in the repo (ActivityLogMigrationConsumer,
internal/app/consumer/activity_log_migration_consumer.go) — but with a
durable MongoDB job-state store instead of the existing Redis 7-day TTL
(activity_log_migration_service.go:30-31), because CRM migrations run 24 h+ per
account. One small FE change surfaces progress: a non-blocking
migration-status indicator. The migrated logs themselves render in the existing
CDP activity timeline with no FE change and no source badge (PRD v2.3, §4.8).
The work reuses documented infrastructure (gocraft/work worker +
IJobEnqueuer, the /private BasicAuth S2S router group, the golang-migrate
Mongo migration tooling, Datadog StatsD metrics, the existing MongoDB
activity_logs collection and repository) rather than building those from
scratch.
Contact state & the pre-sync boundary (2026-06-30 revision). The migration is structured two-phase per account:
- Phase A — contact-coverage gate (gate-only). A contact's activity can only land on a CDP timeline if the contact exists in CDP (
qontak_customer_idset /contacts.crm_data.idpopulated). Contacts not yet synced are created by the upstream CRMCrm::BackfillCustomerIdWorker(create_if_not_found: true)→ Contact360 — not by this RFC. Phase A only verifies coverage and gates the account/contact; contact-service stays read-only on CRM (Decision 18, CALM-S09).- Phase B — migrate the pre-sync tail + reconcile associations. For synced contacts, a live CRM→CDP Kafka feed already captures post-sync activity (grounding note #11). So this batch backfills only the historical tail before each contact's sync watermark (CDP
contact.created_at— grounding note #12; Decision 16, CALM-S07), and runs an association-log enrichment pass that fills missing metadata on association logs in place — including live-feed logs that carry noexternal_id(Decision 17, CALM-S08). The live feed owns everything at/after the watermark; this migration never re-imports it.
Alignment with PRD v2.3 (badge removed). PRD v2.3 dropped the "Migrated from CRM" source badge entirely: migrated logs render in the existing CDP Activity Log view, indistinguishable from native logs (PRD §4.8, CALM-S04). The only FE change is the migration-status indicator (CALM-S05). The backend
source_tagfield is retained — it is used for rollback, validation, and append-only enforcement, not for FE display — and is not added to the FEActivityLogTS interface.
Success Criteria
- Migration accuracy ≥ 99% per account before client cutover —
records_correctly_migrated / total_in_scope_source_records, spot-checked by random sample (PRD §11 North Star). - Zero duplicate records on any re-run — enforced by a partial unique index
on
(company_sso_id, external_id)(PRD §11; CALM-S02, CALM-S06-NEG). - Job halt rate < 2% across all accounts in controlled rollout (PRD §11).
contact_not_foundskip rate < 1% per account (gated onContact.crm_data.idcoverage ≥ 99% before the account runs — OQ-7).- Per-account job completes within P95 ≤ 24 h; CDP write throughput target ≥ 500 records/s via batches of ≤ 20 (PRD §5 Constraints).
- 100% of migrating CRM accounts reach
completedstatus before CDP GA (PRD §11). - Zero double-import across the sync seam — no migrated log duplicates a
live-feed log: every migrated record satisfies
audits.created_at < contact.created_at(the per-contact watermark, CALM-S07). Measured by sampling migrated rows against the contact'screated_at(target 100% below-watermark). - Association-log enrichment completeness ≥ 99% — of in-scope association logs
(link/unlink/resolved/completed), the share whose required metadata
(
crm_person_id, linked entity id + name) is present after the enrichment pass (CALM-S08); deleted-in-CRM referents are excluded (they keep[Deleted reference]).
Metric framing (REV-9).
accuracy_pctis a backend measure (rows migrated vs in-scope source rows). The FE caps the timeline at 5,000 visible entries (ActivityLog.vue:47-52), so on large accounts an agent may see fewer entries than were migrated — this is a UI display limit, not a migration gap, and must not be read as low accuracy.
Out of Scope
- Non-contact audits:
Crm::Deal,Ticket,Crm::Task,Crm::Companyrecords are migrated only as association events on a contact (link/unlink/resolved); standalone Deal/Ticket/Task/Company timelines are out of scope (PRD §4.1, §4). - Real-time / ongoing sync — this is a one-time historical batch per account (PRD §4.2). The existing live CRM→CDP Kafka feed (grounding note #11) already owns post-sync activity; this migration deliberately stops at each contact's sync watermark and does not touch the live path (Decision 16, CALM-S07).
- Creating or syncing the contact itself — provisioning the CDP contact and
writing
qontak_customer_idback into CRM is owned by the upstreamCrm::BackfillCustomerIdWorker(grounding note #13). This RFC only gates on coverage (Phase A) and never writes contacts or CRM (Decision 18, CALM-S09). - Any write to CRM — extraction is read-only on
audits(PRD §4.3). - Activity logs from other products (Inbox, Campaign, Chatbot) (PRD §4.4).
- New FE screens — only one component edit (the migration-status indicator); migrated logs reuse the existing timeline with no FE change (PRD §4.5).
- A source badge / "Migrated from CRM" label on migrated logs — removed in PRD v2.3; migrated entries render with the standard CDP structure, identical to native logs (PRD §4.8).
- Raising the FE 5,000-entry display cap (
page === 1000×perPage 5,ActivityLog.vue:47-52) — migrated logs share the existing window (PRD §4.7-4.8). - A code-enforced retention policy — none exists in CRM (
audit.rb/jobs grep clean); the available range is determined by queryingMIN/MAX(created_at) FROM auditsbefore each run (PRD §4.6, OQ-2).
Related Documents
| Title | Path / Link | What this RFC took from it |
|---|---|---|
| PRD v2.3 (source) | ../prds/prd-legacy-migration-crm-activity-logs.md | All requirements, stories, ACs, constraints, decisions |
| Initiative README | ../README.md | dri, team, target_quarter, scope |
| Confluence (epic context) | https://jurnal.atlassian.net/wiki/spaces/QON/pages/51202916409 | Migration program context (reviewed — no material code impact) |
| Jira epic | https://jurnal.atlassian.net/browse/TF-3182 | Epic anchor |
Assumptions
- CDP Contact migration has already completed for the account, and each
migrated contact carries the CRM person ID in
Contact.crm_data.id(contact/base.go:343); conversely the CRM stores the CDP contact ID inqontak_customer_id. Coverage is confirmed ≥ 99% per account before that account runs (OQ-7). Activity logs reference CDPcustomer_ids, so this is a hard ordering prerequisite (PRD §13). This prerequisite is satisfied by the upstreamCrm::BackfillCustomerIdWorker(create_if_not_found: true)(grounding note #13) — Phase A gates on coverage and (if below threshold) waits for that worker; it never provisions contacts itself (Decision 18). A residual contact that the backfill could not create is still skipped withcontact_not_found. - A live CRM→CDP activity feed (Kafka consumers, grounding note #11) captures
activity after a contact is synced. So the per-contact migration window is
bounded above by the contact's sync watermark = CDP
contact.created_at(grounding note #12; Decision 16) —qontak_customer_idis not audited, so there is no CRM-side timestamp for this. Migrateaudits.created_at < contact.created_at; the live feed owns the rest. contact-servicecan open a read-only Postgres connection to the CRMauditsdatabase (a service account / replica DSN). The service already speaks Postgres for its chat + webhook-log DBs (db/webhook_logs_migrations/, golang-migrate Postgres targetsMakefile:186-192), so the driver and patterns exist; only a new read-only datasource is added. The direct-DB vs CRM-extraction-API choice is OQ-1 (default: direct read-only query).- The CRM
audits.audited_changessemantics are stable for theauditedgem v5.8.0 (Gemfile.lock:153):update→{field: [old,new]};destroy→ full attribute snapshot{field: value}(audit.rb:482-483,503-505). - A USMAN lookup that resolves a CRM user email → SSO UUID is available to
contact-service(the user-management service behindLaunchpadUserService,rest_router.go:362). The exact endpoint is not yet verified — see OQ-9. - The migration is operated by the internal migration/engineering team via S2S tokens only — never by client admins or end users (PRD §8 permission model).
Dependencies
| Dependency | Owner | Deliverable | Status | Blocking? |
|---|---|---|---|---|
New S2S endpoints (/private/activity_logs/crm_migration*, /migrate) | CDP Backend | Trigger, batch-insert, status, rollback (this RFC §2.4) | needs-building | YES |
ActivityLog schema additions + partial unique index | CDP Backend | external_id, source_tag, metadata on the struct + Mongo migration (this RFC §2.3) | needs-building | YES |
CRM audits + users read access | CRM / Data Eng | Read-only DSN or replica with auditable_type/created_at/id filters; cursor pagination (OQ-1) | blocked-pending-confirm | YES |
| CDP Contact migration completed per account | CDP Squad | CRM person ID in Contact.crm_data.id / CDP ID in CRM qontak_customer_id, coverage ≥ 99% (OQ-7) | needs-confirm per account | YES (ordering) |
| Upstream contact backfill for unsynced contacts | CRM Squad | Crm::BackfillCustomerIdWorker(create_if_not_found: true) provisions the CDP contact + writes qontak_customer_id back; this RFC gates on it (Phase A, Decision 18) — does not call or reimplement it | exists — run before Phase B | YES (ordering, Phase A) |
| Live CRM→CDP activity feed (post-sync boundary) | CDP Squad | Kafka Deal/Company/Ticket/TaskConsumer → CreateActivityLogFor* owns activity after sync; migration must stop at the watermark to avoid double-import (live-feed logs have no external_id) | exists — boundary only | YES (correctness) |
| CRM user email → SSO UUID resolution | Platform / USMAN | Exists — QontakLaunchpadClient.GetUsersByEmails (qontak_launchpad.go:264) → sso_id. Only confirm deactivated-user Status/empty handling (OQ-9) | exists — confirm edge | no (downgraded) |
| FE migration-status indicator | CDP FE | Status indicator component on DetailPage.vue + useCrmMigrationStatus composable (no badge, no ActivityLog interface change — PRD v2.3) | needs-building | YES (Stage 1) |
Go YAML parser for audited_changes | CDP Eng | YAML lib in the Go migration service + field-name display map (OQ-6) | needs-building | YES |
PRD-to-Schema Derivation (backend half — required)
| PRD-described entity / attribute / rule | Persisted as | Exposed via | Enforced where | Source |
|---|---|---|---|---|
| A migrated audit must be traceable to its CRM origin and deduplicated | activity_logs.external_id (string = CRM audit.id) | POST /private/activity_logs/migrate request; read on FE record | Partial unique index (company_sso_id, external_id); skip-on-conflict in service | PRD §5, §7.2, D-2 |
| A record's origin must be identifiable for rollback + validation | activity_logs.source_tag = "crm_migration" (new const) | POST .../migrate; rollback filter; validation (no FE display — PRD v2.3) | New const ActivityLogSourceCrmMigration in consts/const.go; written by migrate service | PRD §5, §7.2, D-3 |
| Migrated records carry extensible legacy metadata | activity_logs.metadata (embedded doc, {legacy:true}) | POST .../migrate; FE detail | Set by transformer for every migrated record | PRD §5, §7.2 |
| Each CRM field change becomes a structured delta | activity_logs.changes[] = {field,from,to} (existing Change) | Read via existing GET /v1/activity_logs | ChangesExtractor parses audited_changes YAML (two branches) | PRD §7.3, §3 Behavior 3, D-4, D-10 |
| CRM auditable_type + action maps to a CDP category/action | activity_logs.category, .action | POST .../migrate; read on FE | CategoryMapper lookup (derives link/unlink from type+action) | PRD §7.3 |
| The actor (who) must survive even for deleted/system users | activity_logs.user_id (SSO UUID or null) + .actor (string) | POST .../migrate; FE name | ActorResolver (4 cases) → Actor field (base.go:28) | PRD §7.3, D-6 |
| Original event time, normalized to UTC | activity_logs.timestamp (existing time.Time) | read on FE timeline | Transformer UTC-normalizes audits.created_at | PRD §3 Behavior 3 (AC-6) |
| Per-account job is observable and idempotently resumable | crm_activity_log_migration_jobs (new durable Mongo collection): job_id, company_sso_id, status, progress_pct, records_migrated/skipped/failed, last_audit_id, started_at, updated_at, completed_at, error_message | GET .../crm_migration/status | CrmActivityLogMigrationService (durable store, replaces Redis) | PRD §5, §7 Behaviors 1/5, D-8 |
| Migration is gated and reversible | feature flag cdp_crm_activity_log_migration_enabled; rollback by source_tag | trigger 403 if OFF; POST .../rollback | flag check in trigger handler; DELETE … WHERE source_tag='crm_migration' | PRD §5, §7 Behavior 6 |
| Contact resolution by CRM person id | reads existing contacts.crm_data.id | internal (no new column) | new repo finder FindOneByCrmDataID | PRD §7.3, D-9 |
| Only pre-sync history is migrated (post-sync owned by live feed) | no new field — per-contact upper-bound predicate audits.created_at < contact.created_at | internal extraction filter | SyncWatermarkResolver (reads CDP contact.created_at); CRMExtractor/transform skip-above-watermark | Case 2a; D-16 |
| Association logs carry full referent metadata | activity_logs.metadata / changes (existing) — patched in place for existing logs incl. live-feed (no external_id) | read on FE timeline | AssociationLogEnricher (additive-only update; D13 append-only exception, D-17) | Case 2b; D-17 |
| Phase-A contact coverage is a precondition, not this RFC's job to fix | reads existing contacts.crm_data.id coverage | trigger 422 contacts_not_migrated | gate check in trigger; remediated by upstream BackfillCustomerIdWorker | Case 1; D-18 |
Every §2.3 DDL field and every §2.4 endpoint traces back to a row here.
Design References (frontend half — required)
| PRD-named surface | Figma / design link | Frame name | Design system version | Design QA contact | Notes |
|---|---|---|---|---|---|
Migration status indicator on Contact Detail (DetailPage.vue) | n/a — design pending (PRD §8 CALM-S05 "Figma: TBD") | — | @mekari/pixel3@1.0.10-dev.0 (verified package.json:24) | TBD (OQ-10) | Non-blocking banner; 4 states (loading/empty/error/success) |
The migration-status indicator is the only FE surface, and it is design-pending: it is tagged Should-Have in the PRD but cannot be pixel-finalized without a frame; this is recorded as a §7 readiness gap (OQ-10). The backend (the migration itself) is not blocked by design — it can ship and backfill while the FE chrome follows. The migrated logs need no design work at all — they reuse the existing CDP timeline rendering (no badge, PRD v2.3). Use design-system components only (pixel3 banner/alert), so the deviation risk is low.
Detail 1.A — PRD Traceability (cross-layer)
Forward (PRD AC → RFC):
| PRD composite AC id | FE section / component | BE section / endpoint |
|---|---|---|
| CALM-S01/AC-1..AC-3, ERR-1..ERR-4 | n/a | §2.4 POST /private/activity_logs/crm_migration (trigger); §2.F consumer |
| CALM-S01/AC-4 | n/a | §2.4 + §2.3 contacts.crm_data.id; new FindOneByCrmDataID |
| CALM-S02/AC-1..AC-3, ERR-1 | n/a | §2.3 partial unique index; §2.4 /migrate skip-on-conflict; §2.E |
| CALM-S03/AC-1..AC-8, ERR-1..ERR-3 | n/a | §2.F transformer (CategoryMapper/ChangesExtractor/ActorResolver); §2.2 |
| CALM-S04/AC-1..AC-2, ERR-1 | n/a — renders in the existing timeline, no FE change (no badge, PRD v2.3) | existing GET /v1/activity_logs (returns migrated rows; native rendering) |
| CALM-S05/AC-1..AC-3, ERR-1 | §2.A DetailPage.vue indicator; §2.C; §2.E (FE) | §2.4 GET .../crm_migration/status |
| CALM-S06-NEG/NEG-1, NEG-2 | n/a | §2.3 unique index; §2.4 trigger flag-gate (403) |
| CALM-S07/AC-1..AC-3 | n/a | §2.F SyncWatermarkResolver; §2.F CRMExtractor upper-bound cutoff; Decision 16 |
| CALM-S08/AC-1..AC-4, ERR-1 | n/a — patched rows render natively in the existing timeline | §2.F AssociationLogEnricher (in-place patch); Decision 17 |
| CALM-S09/AC-1..AC-2, ERR-1 | n/a | §2.4 trigger Phase-A gate (422 contacts_not_migrated); Decision 18 |
Reverse (RFC → PRD AC):
| New FE component / BE endpoint / dependency | PRD composite AC id it serves |
|---|---|
POST /private/activity_logs/crm_migration (trigger) | CALM-S01/AC-1, ERR-1, ERR-2, ERR-4; CALM-S06-NEG/NEG-2 |
POST /private/activity_logs/migrate (batch insert) | CALM-S01/AC-1; CALM-S02/AC-1, AC-2, ERR-1; CALM-S03/AC-7 |
GET .../crm_migration/status | CALM-S01/AC-2, AC-3; CALM-S05/AC-1, AC-2 |
POST .../crm_migration/rollback | PRD §7 Behavior 6 (rollback) |
Partial unique index (company_sso_id, external_id) | CALM-S02/AC-1..AC-3; CALM-S06-NEG/NEG-1 |
external_id/source_tag/metadata fields (BE-only) | CALM-S02/*; CALM-S03/AC-7 |
ActorResolver (4-case) | CALM-S03/AC-3, AC-4, AC-5 |
ChangesExtractor (two-branch) | CALM-S03/AC-2, AC-2b |
Existing GET /v1/activity_logs read (native rendering, no badge) | CALM-S04/AC-1, AC-2, ERR-1 |
| FE status indicator | CALM-S05/AC-1..AC-3, ERR-1 |
SyncWatermarkResolver + per-contact upper-bound cutoff (created_at < watermark) | CALM-S07/AC-1..AC-3 |
AssociationLogEnricher (in-place metadata patch, incl. live-feed logs) | CALM-S08/AC-1..AC-4, ERR-1 |
Phase-A coverage gate (422 contacts_not_migrated; upstream backfill remediates) | CALM-S09/AC-1, AC-2, ERR-1 |
UI / Consumer Surface Coverage
| PRD-named surface | Consumer | Required reads (BE) | Required writes (BE) | FE component | Status surface |
|---|---|---|---|---|---|
| Contact Detail → Activity timeline | web | GET /v1/activity_logs?customer_id=… (existing, CustomerStore.ts:466-508) | n/a (read-only view) | ActivityLog.vue / CustomerActivity.vue (existing, no change) | n/a — migrated rows render identically to native (no badge, PRD v2.3) |
| Contact Detail → migration status indicator | web | GET /private/activity_logs/crm_migration/status (new) | n/a | DetailPage.vue indicator | status + progress_pct from status endpoint |
| Migration ops (trigger/rollback/status) | internal (S2S) | GET .../crm_migration/status | POST .../crm_migration, POST .../migrate, POST .../crm_migration/rollback | n/a — no UI | job document status |
Role Coverage
| PRD role | Authorization mechanism | Endpoints permitted (BE) | UI surface visibility (FE) | Cross-tenant? | Audit trail |
|---|---|---|---|---|---|
| Migration / Implementation Engineer | S2S BasicAuth on /private group (rest_router.go:69-72) | trigger, /migrate, status, rollback | none | yes — operates per company_sso_id | observability events (PRD §10) + job document |
| CS / Sales Agent (post-migration) | existing CDP web session (MAG, rest_router.go:362) | GET /v1/activity_logs (existing); GET .../crm_migration/status (read-only) | sees the status indicator; migrated logs render as normal records | own tenant only | n/a (read-only) |
| Client admin / end user | — | none (explicitly denied — S2S only) | sees migrated logs as normal records (no badge) | n/a | n/a |
PRD Section Coverage
| PRD § | Title | Where covered |
|---|---|---|
| 1 | One-liner + Problem | §1 Overview |
| 2 | What Happens If We Don't Build This | §1 Overview (problem) |
| 3 | Target Users + Persona Context | Detail 1.A Role Coverage |
| 4 | Non-Goals | §1 Out of Scope |
| 5 | Constraints | §2 Technical Decisions; §2.3; §3 Performance |
| 5.1 | Data Lifecycle | §2.3 (retention) ; §3.D Compliance |
| 6 | New Features | §2.1 Architecture; §2.F consumer; §2.A UI |
| 7 | API & Webhook Behavior | §2.4 APIs; §2.2 Sequence; §2.F |
| 7.1 | CRM Source Schema | §2.0 Source Verification (CRM); §2.3 (source shape) |
| 7.2 | CDP Target Schema | §2.3 DDL |
| 7.3 | Field Transformation Logic | §2.F transformer; §2.2 |
| 8 | System Flow + Stories + ACs | §2.2 Sequence; Detail 1.A/1.C; §4.D Execution |
| 9 | Rollout | §4 Rollout |
| 9.1 | Migration Transition Window | §4 Rollout; §2.F.2 State Surface |
| 10 | Observability | §3 Monitoring & Alerting |
| 11 | Success Metrics | §1 Success Criteria |
| 12 | Launch Plan & Stage Gates | §4 Rollout Strategy |
| 13 | Dependencies | §1 Dependencies |
| 14 | Key Decisions + Alternatives | §2 Technical Decisions (ADR) + Detail 1.B |
| 15 | Open Questions | §5 Concerns / Open Questions |
| Appendix A | Grounded Code References | §2.0 Source Verification |
Detail 1.B — Decisions Closed (cross-layer)
| # | Decision | Chosen option | Alternatives rejected | Why rejected | Layer | §2 block |
|---|---|---|---|---|---|---|
| 1 | CDP write path | New dedicated S2S batch endpoint POST /private/activity_logs/migrate | Reuse POST /openapi/v1/customers/{id}/activity_logs | Single-record; mandatory activity_log_code (external_activity_log_request.go:85-90); category=='transaction' only (:122-127) | BE | Decision 1 |
| 2 | Idempotency key | external_id + partial unique index (company_sso_id, external_id) | CRM id as CDP _id; app-level dedup only | CDP generates ObjectID; app-only dedup races under retry | BE | Decision 2 |
| 3 | Origin tagging | New source_tag const crm_migration | Overload existing Source (external/qontak) | Overloading breaks existing Source semantics + rollback filter clarity | BE | Decision 3 |
| 4 | changes[] source | Parse audited_changes YAML | Parse the why/define_why string | define_why is free-text (audit.rb:1607-1661), unparseable | BE | Decision 4 |
| 5 | Transform engine | Re-implement Audit mapping in Go | Call a live Rails Audit#parse | Cross-language runtime dep; not batch-scalable; version-fragile | BE | Decision 5 |
| 6 | Actor resolution | 4 explicit cases; never drop a record | Skip records with unresolvable user | Deleted/system events are legitimate history; dropping lowers accuracy | BE | Decision 6 |
| 7 | CRM pagination | Cursor on audits.id (WHERE id > :cursor) | OFFSET pagination | OFFSET degrades on a 15M+ table; id PK index is O(log n) | BE/Data | Decision 7 |
| 8 | Job-state store | Durable MongoDB collection | Reuse Redis (7-day TTL, activity_log_migration_service.go:30-31) | 24 h+ pausing/resuming jobs would lose state at TTL | BE | Decision 8 |
| 9 | Contact resolution | contacts.crm_data.id via new FindOneByCrmDataID | Separate mapping table as primary | crm_data.id already carries crm_person_id; extra table is redundant | BE | Decision 9 |
| 10 | changes[] parsing | Two branches (update [old,new] / destroy snapshot) | Single uniform parser | Destroy stores a snapshot, not pairs (audit.rb:482-483) | BE | Decision 10 |
| 11 | Extraction interface | Direct read-only Postgres query on audits | New CRM REST extraction API | No such API exists; direct read is simpler (revisit if coupling hurts — OQ-1) | Data | Decision 11 |
| 12 | Endpoint namespace | underscore /private/activity_logs/… | hyphen /private/activity-logs/… (PRD) | Repo convention is underscore (rest_router.go:69-74); avoid a mixed namespace | BE | Decision 1 |
| 13 | Soft-delete policy for migrated logs | Append-only; cannot delete when source_tag='crm_migration' (except ops rollback) | Allow normal deletion | CRM audits are immutable; migrated history must match (OQ-5) | BE | Decision 2 (consequences) |
| 14 | FE deploy coupling | Backend ships first; FE status indicator follows independently | Couple FE+BE in one flag | source_tag is additive + BE-only; migrated logs render natively in any FE version | both | §4.A matrix |
| 15 | FE reads migration status | New IAG session-authed proxy GET /v1/crm_migration/status (tenant from session) | Call /private S2S route from browser; expose status under another group | Browser can't hold S2S BasicAuth; IAG + RequirePermissionMiddleware is the existing tenant-scoped pattern (rest_router.go:216) | both | §2.4, §2.G (resolves REV-1/OQ-11) |
| 16 | Pre-sync cutoff (Case 2a) | Per-contact upper bound = CDP contact.created_at; migrate audits.created_at < watermark | (a) created_at of the audit that set qontak_customer_id; (b) account-level cutoff date | (a) qontak_customer_id is set via update_column → not audited, no such row (grounding #12); (b) too coarse — gaps/overlaps for off-boundary syncs | BE | Decision 16 |
| 17 | Association reconciliation (Case 2b) | AssociationLogEnricher patches existing logs in place (additive metadata only), incl. live-feed logs without external_id | Enrich only newly-migrated rows; reconstruct missing link events from current join tables | enrich-only misses live-feed gaps the PM flagged; reconstructing events expands scope beyond "fill missing data" | BE | Decision 17 (D13 exception) |
| 18 | Unsynced contacts (Case 1) | Phase-A gate only; upstream BackfillCustomerIdWorker syncs; this RFC never writes CRM/contacts | Trigger the backfill from the migration; reimplement contact creation in contact-service | keeps Non-goal #3 (read-only CRM); avoids duplicating an existing pipeline + a new cross-service coupling | BE | Decision 18 |
Detail 1.C — Per-Story Change Map
| Story id | Title | Layer scope | FE changes | BE changes | Composite AC ids | Acceptance criteria (verifiable) | RFC anchors |
|---|---|---|---|---|---|---|---|
| CALM-S01 | Run batch migration for an account | BE-only | n/a — BE-only | POST /private/activity_logs/crm_migration (trigger, flag-gate, prereq check, enqueue); CrmActivityLogMigrationConsumer; GET .../status; FindOneBySourceID | CALM-S01/AC-1..AC-4, ERR-1..ERR-4 | go test ./internal/app/service/... green; trigger returns job_id + status=in_progress; flag OFF → 403 FLAG_DISABLED; contacts-not-migrated → abort contacts_not_migrated; re-trigger completed → 409 | §2.4 rows 1,3 · §2.F · §4.D chunks 2,5,6 · §1 PRD-to-Schema rows 8,10 |
| CALM-S02 | Idempotent re-run; no duplicates | BE-only | n/a | partial unique index (company_sso_id, external_id); /migrate skip-on-conflict; last_audit_id resume cursor | CALM-S02/AC-1..AC-3, ERR-1 | integration test: re-run inserts 0 new, N skipped, status=completed; concurrent trigger → 409 JOB_ALREADY_RUNNING; duplicate insert logs duplicate_external_id | §2.3 index · §2.4 row 2 · §2.E · §4.D chunks 1,4 |
| CALM-S03 | Transform CRM audit → CDP log | BE-only | n/a | SchemaTransformer = CategoryMapper + ChangesExtractor (2 branches) + ActorResolver (4 cases) + ContactResolver | CALM-S03/AC-1..AC-8, ERR-1..ERR-3 | unit tests: update→[from,to]; destroy→to:null; active→SSO UUID; deleted→[Deleted CRM User]; system→Qontak System; UTC timestamp; comment JSON crm_person_id first; unmapped→skip | §2.F transformer · §2.2 · §4.D chunk 3 · §1 PRD-to-Schema rows 4,5,6 |
| CALM-S04 | Migrated logs visible in the existing timeline (no badge) | BE-only (no FE change) | n/a — migrated logs render via the existing GET /v1/activity_logs read path with the existing timeline components; no FE edit (no badge, PRD v2.3) | none new (existing GET /v1/activity_logs already returns the doc; BE source_tag/metadata flow through but are not FE-displayed) | CALM-S04/AC-1, AC-2, ERR-1 | migrated logs appear in the existing timeline sorted by timestamp, identical to native rows (no badge); deleted-ref renders [Deleted reference] via existing rendering | §2.3 BE fields (chunk 1) · existing GET /v1/activity_logs |
| CALM-S05 | Migration status indicator | FE + BE | new status indicator on DetailPage.vue; polls status endpoint; 4 states | GET /private/activity_logs/crm_migration/status (+ IAG proxy) | CALM-S05/AC-1..AC-3, ERR-1 | pnpm test: indicator shown on in_progress, hidden on completed/not_started; API error → fail-silent (detail renders) | §2.A · §2.C · §2.4 row 3 · §4.D chunk 7 · §1 Design References row 1 |
| CALM-S06-NEG | No duplicate records on re-run (guard rail) | BE-only | n/a | unique index + flag gate | CALM-S06-NEG/NEG-1, NEG-2 | integration: known external_id re-encounter → skip + duplicate_external_id; flag OFF → 403, no job/records | §2.3 index · §2.4 row 1 · §4.D chunks 1,4 |
| CALM-S07 | Migrate only pre-sync activity (no double-import) | BE-only | n/a | SyncWatermarkResolver (reads CDP contact.created_at); CRMExtractor/transform skip records with audits.created_at >= watermark | CALM-S07/AC-1..AC-3 | unit: record below watermark → migrated; record at/after watermark → skipped post_sync_live_feed; no-watermark (contact missing) → contact_not_found | §2.F SyncWatermarkResolver · Decision 16 · §4.D chunk 8 · §1 PRD-to-Schema row 11 |
| CALM-S08 | Reconcile association logs; fill missing metadata | BE-only (no FE change) | n/a — patched rows render natively | AssociationLogEnricher — for association logs (link/unlink/resolved/completed) fill crm_person_id + linked entity id/name via CRM lookup; patch existing CDP logs in place (incl. live-feed logs, matched on §2.F match-key — OQ-13); additive-only (D13 exception) | CALM-S08/AC-1..AC-4, ERR-1 | unit: migrated assoc log missing crm_person_id → filled; existing live-feed assoc log missing entity name → patched (no new doc); deleted referent → left [Deleted reference]; idempotent re-run patches 0 | §2.F AssociationLogEnricher · Decision 17 · §4.D chunk 9 · §1 PRD-to-Schema row 12 |
| CALM-S09 | Gate on contact coverage (Phase A) | BE-only | n/a | trigger Phase-A gate: coverage < threshold → 422 contacts_not_migrated; remediated by upstream BackfillCustomerIdWorker (no contact/CRM write here) | CALM-S09/AC-1, AC-2, ERR-1 | integration: coverage below gate → 422, no job; after upstream backfill raises coverage → trigger succeeds; residual unresolved contact → skip contact_not_found (not a CRM write) | §2.4 trigger gate · Decision 18 · §4.D chunk 5 · §1 PRD-to-Schema row 13 |
2. Technical Design
Infrastructure Topology (start here)
contact-service runs on Kubernetes (ArgoCD + Helm) on Alibaba Cloud, split
into separate deployments per binary subcommand (Makefile:60-77): API
(server), Worker (worker, gocraft/work), Consumer (consumer,
Kafka), Worker-UI. Datastores: MongoDB (primary, activity_logs +
contacts), Redis (app cache + launchpad cache), PostgreSQL (chat DB +
webhook-logs DB), Kafka (Alikafka). This RFC adds one external read
dependency (CRM audits/users Postgres) and one durable Mongo collection.
Deployment topology
flowchart TB
eng([Migration Engineer / ops script]) -->|HTTPS S2S BasicAuth| lb[Ingress / LB]
lb -->|HTTP /private| api["contact-service-api pods ×N\n(stateless, HPA/KEDA)"]
api -->|enqueue gocraft/work| redisq[(Redis — job queue)]
redisq -->|consume| worker["contact-service-worker pods ×M\n(CrmActivityLogMigrationConsumer)"]
worker -->|read-only cursor query| crmpg[("CRM Postgres\naudits + users (read-only DSN)")]
worker -->|email → SSO UUID| usman(["USMAN / LaunchpadUserService\n(HTTPS, owner: Platform)"])
worker -->|"upsert (skip-on-conflict)"| mongo[("MongoDB primary\nactivity_logs")]
worker -->|read/write job state| jobs[("MongoDB\ncrm_activity_log_migration_jobs")]
api -->|read status| jobs
agent([CS/Sales Agent]) -->|HTTPS web| lb
lb -->|"/v1/activity_logs, /private/.../status"| api
api -->|read| mongo
worker -->|metrics| dd([Datadog StatsD])
Per-service responsibility
flowchart LR
subgraph cs["contact-service (CDP, owner: CDP Squad)"]
t1["POST /private/activity_logs/crm_migration\n(trigger job)"]
t2["POST /private/activity_logs/migrate\n(batch insert ≤20, idempotent)"]
t3["GET /private/activity_logs/crm_migration/status\n(progress)"]
t4["POST /private/activity_logs/crm_migration/rollback"]
c1["CrmActivityLogMigrationConsumer\n(extract→transform→write)"]
end
t1 -->|enqueue| c1
c1 -->|"SELECT … cursor on id"| crm(["CRM Postgres audits/users\n(owner: CRM Squad)"])
c1 -->|"email→SSO UUID"| usman(["USMAN (owner: Platform)"])
c1 -->|"shared service MigrateBatch()"| t2
t2 -->|upsert| mdb[("MongoDB activity_logs")]
c1 -->|job state| jobdb[("MongoDB crm_activity_log_migration_jobs")]
t3 --> jobdb
t4 -->|"delete where source_tag"| mdb
Both the trigger-driven consumer and the standalone
/migrateendpoint call the sameCrmActivityLogMigrationService.MigrateBatch()so the write/idempotency path has one implementation (testable in isolation; reusable for manual replay).
Technical Decisions (ADR-format)
Decision 1: New dedicated S2S migration endpoints (underscore namespace)
Context. The migration must batch-insert records carrying external_id /
source_tag / metadata and skip duplicates. The only external write today is
single-record and over-constrained.
Options considered
- A — Reuse
POST /openapi/v1/customers/{customer_id}/activity_logs.- Pros: no new route.
- Cons: single-record (
ExternalActivityLogRequest,external_activity_log_request.go:11-33);activity_log_codemandatory (:85-90);categorymust betransaction(:122-127); noexternal_id/source_tag/bulk fields. Unworkable.
- B — New
/privateS2S endpoints, batch ≤20, idempotent.- Pros: bypasses all constraints; S2S BasicAuth matches the existing
/privategroup (rest_router.go:69-72); batch matches the provenmaxBulkCreateLimit = 20(activity_log.go:26). - Cons: new surface to secure + document.
- Pros: bypasses all constraints; S2S BasicAuth matches the existing
Decision: B. Endpoints (underscore, per repo convention — Decision 12):
POST /private/activity_logs/crm_migration (trigger),
POST /private/activity_logs/migrate (batch insert),
GET /private/activity_logs/crm_migration/status,
POST /private/activity_logs/crm_migration/rollback.
Rationale. Reuse is impossible (three hard constraints). The /private
group is already S2S-only and is where the analogous TF-2519 migration status
lives (/private/activity_logs/migration/status, rest_router.go:74) — we sit
beside it under a distinct crm_migration sub-path to avoid colliding with that
unrelated job.
Consequences. New auth surface (mitigated: same BasicAuth middleware). The hyphen path in the PRD is overridden to underscore.
Reversibility. Routes are additive; deleting them and dropping the collection reverses with no impact on existing traffic.
Decision 2: external_id + partial unique index for idempotency
Context. 24 h+ jobs will be retried/resumed; duplicates must be impossible.
Options considered
- A — App-level dedup only (query-before-insert). Pros: no schema change. Cons: races under concurrent batches; no DB guarantee.
- B —
external_idstring + unique index(company_sso_id, external_id), partial (only whereexternal_idexists), skip-on-conflict. Pros: DB-enforced; safe under retry/concurrency. Cons: schema migration; index build on a large collection.
Decision: B. Partial unique index so existing native activity_logs
documents (no external_id) are unaffected.
Rationale. Only a DB constraint makes re-runs provably duplicate-free
(CALM-S02, CALM-S06-NEG). company_sso_id is the tenant scope already on the
struct (base.go).
Consequences. Migrated logs become append-only by source_tag (Decision 13).
Index build must run as a golang-migrate step before any insert.
Reversibility. Drop the index (migrate-down) and the three fields; existing
docs are untouched (fields are omitempty).
Decision 3: source_tag as a new constant, not overloading Source
Context. Source today is only external / qontak (consts/const.go:105-106).
Decision. Add ActivityLogSourceCrmMigration = "crm_migration" and a new
SourceTag field (do not repurpose Source).
Rationale. A dedicated tag enables rollback (DELETE WHERE source_tag=…) and
validation/append-only filtering without perturbing existing Source semantics.
(It is not used for FE display — PRD v2.3 removed the badge.) No serious
alternative beyond overloading Source, which was rejected for blast radius.
Consequences / Reversibility. Extra field; removable with the index migration.
Decision 4 & 10: Parse changes[] from audited_changes YAML, two branches
Context. CDP changes[] = {field,from,to} must come from a structured
source. CRM stores deltas in audited_changes (YAML) — but the shape differs by
action (verified audit.rb:482-483, 503-505).
Decision. Parse audited_changes YAML with two branches:
update→{field:[old,new]}⇒{field, from:old, to:new}.destroy→{field:value}(full snapshot) ⇒{field, from:value, to:null}.
Reject the why/define_why string (free-text, audit.rb:1607-1661).
Rationale. A uniform parser produces wrong results for destroys. Field names are CRM DB columns; a migration-local display map (OQ-6) humanizes them.
Consequences. Need a Go YAML lib + a column→label map. Reversibility. Pure code; revertable per chunk.
Decision 5: Re-implement the Audit mapping in Go
Context. Transformation logic lives in Rails (audit.rb mapping_type
:1495, mapping_who :1824, mapping_how_what_why :1844, parse :1998).
Decision. Port the deterministic mapping to Go inside the migration service. No alternative seriously considered beyond calling a live Rails process — rejected: cross-language runtime dependency, not batch-scalable, version-fragile.
Consequences. The Go port must track the Rails logic; pin to audited gem
v5.8.0 semantics and add a golden-file test set sampled from real audits.
Reversibility. Code-only.
Decision 6: Actor resolution — four explicit cases, never drop a record
Context. CRM mapping_who (audit.rb:1824-1842) yields four outcomes
(active Name (email); soft-deleted Name <deleted user>; Qontak system;
bare <deleted user> fallback). CDP stores the display name in Actor
(activity_log/base.go:28) — not a description field (which does not exist
on the struct).
Decision. audits.user_id is an integer FK — not an email/UUID — so the
active-user path is an explicit two hop (REV-6):
hop 1 (CRM read DSN): SELECT email, full_name, deleted_at FROM users WHERE id = :user_id
hop 2 (USMAN client): QontakLaunchpadClient.GetUsersByEmails(ctx, company_sso_id, [email])
→ LaunchpadUsersResponse.UserResponse[0].SsoID (qontak_launchpad.go:264)
Normalize to four cases:
- Active (
users.deleted_atnull and USMAN returns a non-emptySsoIDfor the email) →user_id = SsoID,Actor = users.full_name. - Soft-deleted (
users.deleted_atset —acts_as_paranoid,user.rb:18; or USMAN returns no match / inactiveStatus) →user_id = null,Actor = "[Deleted CRM User]". - System (
audits.user_idnull, oraudits.username ∈ {central, hub, Qontak system}peraudit.rb:1834) →user_id = null,Actor = "Qontak System". - Not found /
RecordNotFound→ treated as case 2 (Actor = "[Deleted CRM User]").
Rationale. Deleted/system events are legitimate; dropping them lowers
accuracy. The 4th outcome corrects the PRD's 3-case model; the explicit users
join corrects the compressed integer→email hop.
Consequences. Per-record CRM users read + USMAN call — mitigated by an
in-job user_id → {email, sso_uuid} cache (§3 caching) and the username
fallback. GetUsersByEmails is company-scoped (needs company_sso_id).
Open edge (OQ-9): confirm whether GetUsersByEmails returns deactivated
users (use LaunchpadUser.Status / empty result → case 2). Reversibility.
Code-only.
Decision 7: Cursor pagination on audits.id
Context. audits has no composite index on (auditable_type, created_at)
(verified db/schema.rb:68-77: created_at standalone, auditable_index is
(auditable_id, auditable_type)). OFFSET on 15M+ rows degrades per page.
Decision. Extract with WHERE id > :cursor AND auditable_type IN (scope) AND created_at >= :cutoff ORDER BY id ASC LIMIT 500, advancing the id cursor. Run
EXPLAIN ANALYZE before Stage 0; add a (auditable_type, created_at) or
(id) WHERE auditable_type IN … index on CRM if the plan is unacceptable (OQ-2).
Rationale. id is the PK (B-tree); cursor scan is O(log n) per page.
Consequences. The created_at/auditable_type predicate still filters
post-index-scan; a CRM-side index may be needed. Reversibility. Query-only.
Decision 8: Durable MongoDB job-state store (not Redis)
Context. The existing migration service stores state in Redis with a 7-day
TTL (activity_log_migration_service.go:30-31, key
activity_log_migration:user_id_update). CRM migrations run 24 h+ and pause/resume.
Decision. Persist job state in a new MongoDB collection
crm_activity_log_migration_jobs. Reuse the gocraft/work consumer + status
handler pattern (ActivityLogMigrationConsumer/Handler) but swap the
backend store.
Rationale. A 7-day TTL on a multi-day job that may sit between batches risks
silent state loss; Mongo gives durable, queryable progress + the resume cursor
(last_audit_id). Consequences. New collection + repository.
Reversibility. Drop the collection (migrate-down).
Decision 9: Contact resolution via contacts.crm_data.id (+ new finder)
Context. Contact.CrmData.ID (contact/base.go:343) holds the CRM
crm_person_id on the CDP side; conversely the CRM stores the CDP contact ID in
qontak_customer_id. No query-by-crm_data.id finder exists (grep clean —
crm_data.id is referenced only in base.go:534 as a field-path constant).
Decision. Add a repository method FindOneByCrmDataID(ctx, companySsoID, crmDataID) (*Contact, error) and resolve auditable_id.to_s → customer_id. Fall
back to a separate mapping table only if crm_data.id coverage < 100% (OQ-7).
Rationale. Reuses existing data; avoids a redundant table. Consequences.
Needs a supporting Mongo index on (company_sso_id, crm_data.id) for lookup
performance (added in the same migration).
Fallback design (REV-5). crm_data.id coverage is a hard Stage-0 / Phase-A
gate (Decision 18, CALM-S09): run
db.contacts.countDocuments({company_sso_id, "crm_data.id":{$exists:false}}) /
total per account. If coverage ≥ 99%, proceed with FindOneByCrmDataID.
If coverage < 99%, the account is blocked until either (a) the unsynced
contacts are synced by the upstream Crm::BackfillCustomerIdWorker(create_if_not_found: true) (grounding note #13) — which provisions the CDP contact and backfills
crm_data.id / qontak_customer_id; this RFC waits on it but never runs it — or
(b) a fallback map collection is populated:
// crm_contact_id_map (only when crm_data.id coverage < 99%)
{ "company_sso_id": "acc-123", "crm_person_id": "12345", "customer_id": "66b…" }
// unique index (company_sso_id, crm_person_id); populated from the contact-migration output
ContactResolver then tries FindOneByCrmDataID first, falls back to
crm_contact_id_map, else skips with contact_not_found. The migration never
runs an account below the 99% gate with no fallback populated. Reversibility.
Code + index (+ drop the optional map collection).
Decision 11: Direct read-only Postgres extraction (vs CRM REST API)
Context. No CRM REST extraction API with the needed contract exists
(verified: routes.rb:364 resources :audits, only:[:index] renders an HTML
dashboard; contacts/timeline.rb pulls from CDP, it does not expose audits).
Decision. Extract via a direct read-only Postgres query against audits
(+ users for actor email). Default; revisit if schema coupling becomes a
maintenance burden (OQ-1).
Rationale. Simpler and faster than commissioning a new CRM API; the service
already runs Postgres connections. Consequences. Couples the migrator to the
CRM audits schema (pinned by tests + gem version). Reversibility. Swap the
extractor for an API client behind the same CRMExtractor interface.
Decision 16: Pre-sync cutoff via CDP contact.created_at watermark (Case 2a)
Context. A live CRM→CDP Kafka feed already writes activity logs for events
after a contact is synced (grounding note #11). Re-importing that post-sync
range would create duplicates — and live-feed logs carry no external_id, so
the (company_sso_id, external_id) partial-unique index (Decision 2) cannot dedup
them. Only a per-contact timestamp cutoff can. We need the moment a contact
started being covered by the live feed.
Options considered
- A —
created_atof the CRM audit that setqontak_customer_id. Precise in theory, but does not exist:qontak_customer_idis written viacontact.update_column(send.rb:125), which bypasses theauditedgem and doesn't bumpupdated_at(grounding note #12). No audit row, no CRM timestamp. - B — CDP
contact.created_at. The CDP contact is created at sync time (by the upstream backfill's Contact360Send), right beforeqontak_customer_idis set — socreated_at≈ when live coverage began. Already on the contact doc; no CRM dependency. - C — one account-level cutoff date. Coarse; gaps/overlaps for contacts synced off the account boundary.
Decision: B. Per-contact upper bound = CDP contact.created_at. Migrate
audits.created_at < contact.created_at; the live feed owns at/after.
Rationale. It is the only per-contact signal that actually exists, and it needs no new field and no CRM read.
Consequences. SyncWatermarkResolver reads created_at from the same contact
already fetched by ContactResolver (no extra round-trip). Edge (OQ-15): a CDP
contact that pre-existed CRM linkage (e.g. created via chat, later linked) has a
created_at earlier than the true CRM-sync moment → its cutoff is too early,
leaving a small pre-sync gap. For migration-created contacts (crm_data.id set by
contact migration) this is the safe, normal case. Reversibility. Query-only.
Decision 17: Association-log enrichment patches existing logs in place (Case 2b)
Context. Association logs (link/unlink/resolved/completed for Deal–Person /
Deal–Company / Ticket / Task) must carry their referent metadata (crm_person_id,
linked-entity id + name). Some logs already in CDP — especially live-feed logs,
which have no external_id — are missing that metadata and need it backfilled
("double-check association logs, fill missing data").
Decision. Add an AssociationLogEnricher that fills the missing metadata and
updates the existing activity_logs document in place (additive only — it
never deletes, never changes category/action/timestamp). It targets both the
rows this migration writes and live-feed rows; the latter are located by a
match-key (company_sso_id, customer_id, category/action, linked entity id, timestamp≈) since they lack external_id (OQ-13). The patch is idempotent
(fill-when-missing).
Rationale. Enriching only newly-migrated rows would miss the live-feed gaps the requirement is about; reconstructing absent link events from current join tables would expand scope beyond "fill missing data."
Consequences — append-only exception (amends Decision 13). Decision 13 / §3.D
make migrated logs append-only. This enricher is the one sanctioned in-place
update path, narrowly scoped to additive metadata fields. The §3.D enforcement
invariant is amended: any future delete is still forbidden; an additive metadata
update by AssociationLogEnricher is the named exception. Reversibility. The
fields are additive; the source audits remain the source of truth, so a re-run
recomputes them.
Decision 18: Unsynced contacts — Phase-A gate only, sync owned upstream (Case 1)
Context. Contacts with no qontak_customer_id (not yet synced to CDP) have no
CDP timeline to attach activity to. They must be synced "first."
Decision. Phase A only verifies coverage and gates the account (trigger →
422 contacts_not_migrated when below threshold). Provisioning the contact + the
CRM qontak_customer_id write-back is done by the existing upstream
Crm::BackfillCustomerIdWorker(create_if_not_found: true) (grounding note #13),
run as a runbook step before Phase B. contact-service never creates contacts
or writes CRM.
Options considered. (a) Have the migration invoke the backfill worker and
poll — rejected: adds a live cross-service coupling and a CRM write path into a
read-only-on-CRM service. (b) Reimplement contact creation in contact-service —
rejected: duplicates an existing pipeline and forces a qontak_customer_id
write-back, breaking Non-goal #3.
Rationale. Keeps this service read-only on CRM (Non-goal #3) and reuses the
proven backfill pipeline. Consequences. A residual contact the backfill could
not create is still skipped with contact_not_found (within the < 1% budget).
Reversibility. Gate logic only.
Minimum-coverage check: Storage = Decisions 2,8,9 (Mongo). Sync/async = async gocraft/work consumer (this §). Caching = in-job email→UUID cache (§3). Third-party = direct Postgres (D11) + USMAN HTTP (D6/OQ-9). Consistency = eventual (batch); idempotency via unique index (D2) + per-contact watermark for the live-feed seam (D16). Multi-tenancy = every query/index scoped by
company_sso_id. Reuse/new = D1, D3, D8, D9; contact-sync reused upstream (D18).
Detail 2.0 — Repo Reading Guide
Repo Map (both layers)
flowchart LR
subgraph fe["qontak-customer-fe/"]
store["features/customers/store/CustomerStore.ts"]
actlog["detail/components/ActivityLog.vue"]
actv["detail/components/CustomerActivity.vue"]
detail["detail/views/DetailPage.vue"]
end
subgraph be["contact-service/internal/"]
router["server/rest_router.go"]
handler["app/handler/activity_log_migration_handler.go"]
consumer["app/consumer/activity_log_migration_consumer.go"]
svc["app/service/activity_log_migration_service.go"]
alrepo["app/repository/activity_log/base.go"]
ctrepo["app/repository/contact/base.go"]
consts["pkg/consts/const.go"]
mig["db/migrations/*.json"]
end
subgraph crm["qontak.com/"]
audit["app/models/audit.rb"]
schema["db/schema.rb (audits, users)"]
end
store --> actlog --> actv
detail --> actlog
router --> handler --> svc --> alrepo
svc --> ctrepo
consumer --> svc
svc -.reads.-> audit
svc -.reads.-> schema
Existing Code Anchors
| Layer | Path | Why the agent reads it | What pattern it teaches |
|---|---|---|---|
| BE | internal/server/rest_router.go:69-74, 279-282, 362-376 | route groups + auth + where to mount new routes | /private=BasicAuth, /api/v1=BasicAuth, /openapi/v1=MAG; existing migration status at :74 |
| BE | internal/app/repository/activity_log/base.go:17-34 | the struct to extend | field/bson/json tag conventions; Actor at :28; no Description |
| BE | internal/pkg/consts/const.go:100-106 | where to add the new source const | const block style; Source*/Category* consts |
| BE | internal/app/service/activity_log.go:26,128-146 | bulk insert + the cap | maxBulkCreateLimit=20; BulkCreateActivityLog check :129-131 |
| BE | internal/app/consumer/activity_log_migration_consumer.go:25 | consumer pattern to mirror | ProcessUpdateUserIDJob(job *work.Job) shape |
| BE | internal/app/service/activity_log_migration_service.go:22,25,30-31,47,187 | the pattern to reuse + the Redis store to replace | gocraft job name const; Redis 7-day TTL (replace with Mongo) |
| BE | internal/app/handler/activity_log_migration_handler.go | status handler shape | GetMigrationStatus response wiring |
| BE | internal/app/payload/activity_log_migration.go:18-25 | status payload shape to mirror | {job_id,status,total_updated,error_message,created_at,updated_at} |
| BE | internal/app/repository/contact/base.go:343 | CrmData.ID field | crm_data.id bson path; add FindOneByCrmDataID |
| BE | internal/app/service/job_enqueuer.go:65-67, internal/worker/worker_service.go:132,144-156 | enqueue + register a gocraft job | IJobEnqueuer.EnqueueJob; registerJobWithOptions; job.<name>.success/failed metrics |
| BE | internal/pkg/http/response.go:5-49, default_error.go | error envelope + constructors | BaseResponse{resp_code,resp_desc{id,en},meta}; ErrBadRequest* etc. |
| BE | db/migrations/008_contact_accounts_index.up.json, 016_*activity_log_category* | how Mongo indexes are created | golang-migrate JSON createIndexes; snake_case names |
| BE | internal/pkg/datadog/metric.go:69, internal/app/service/manual_sync.go:107 | metrics convention | Datadog StatsD; <feature>.<event> names |
| CRM | app/models/audit.rb:213-215, 482-505, 1824-1842, 1495/1824/1844/1998 | the mapping to port | YAML parse; update vs destroy; 4-case actor; parse() |
| CRM | db/schema.rb:47-78, 5082-5136 | source columns + indexes; users for actor | exact audits columns; no (auditable_type,created_at) index; users.email/full_name/deleted_at |
| FE | features/customers/store/CustomerStore.ts:203-214, 466-508, 722 | existing read store (read-only context — interface not extended) | ActivityLog (10 fields, no source); getActivityLogs; Pinia defineStore('c360-customer') |
| FE | features/customers/detail/components/ActivityLog.vue:24,47-52,85-97,125,190-210 | existing timeline transform + cap (read-only context — not edited; migrated logs render natively) | updateTimeline() maps raw→timeline[]; 5,000 cap |
| FE | features/customers/detail/components/CustomerActivity.vue:150,277-301 | duplicate transform (read-only context — not edited, no badge) | second updateTimeline() |
| FE | features/customers/detail/views/DetailPage.vue:1-49 | where the indicator mounts | renders V2/V1 activity + association panels |
Existing Contracts to Reuse, Extend, or Replace (BE)
| Contract | Status | Justification | Owner |
|---|---|---|---|
POST /openapi/v1/customers/{id}/activity_logs | not reused | over-constrained (Decision 1) | CDP |
POST /api/v1/activity_logs (BulkCreate, cap 20) | extended (pattern) | new MigrateBatch mirrors the ≤20 batch but adds idempotency + new fields | CDP |
GET /private/activity_logs/migration/status (TF-2519) | reused (pattern only) | distinct job; new crm_migration/status sits beside it | CDP |
ActivityLog struct + activity_logs collection | extended | add 3 fields + partial unique index | CDP |
consts.ActivityLogSource* | extended | add crm_migration | CDP |
Contact repository | extended | add FindOneByCrmDataID + supporting index | CDP |
POST /private/activity_logs/crm_migration (+ /migrate, /status, /rollback) | new-with-justification | no existing batch+idempotent+multi-category endpoint (Decision 1) | CDP |
GET /iag/v1/crm_migration/status (FE proxy) | new-with-justification | FE needs a session-authed, tenant-scoped status read; the /private route is S2S-only (REV-1) | CDP |
QontakLaunchpadClient.GetUsersByEmails (USMAN) | reused | email→SSO UUID already implemented (qontak_launchpad.go:264) | Platform |
CRM audits read | new (read-only DSN) | no extraction API exists (Decision 11) | CRM Squad |
Patterns to Follow
| Layer | Concern | Pattern in repo | Reference file | Deviation? |
|---|---|---|---|---|
| BE | HTTP handler shape | myHandler(h.X.Method) + permission/basic-auth middleware | rest_router.go:316,376 | none |
| BE | Repository / DB access | Mongo repo structs; omitempty bson tags | activity_log/base.go:17-34 | none |
| BE | Queue producer/consumer | IJobEnqueuer.EnqueueJob + registerJobWithOptions | job_enqueuer.go:65, worker_service.go:132 | none |
| BE | Job state | Redis (existing) → Mongo (new) | activity_log_migration_service.go:30 | yes — durable store (Decision 8) |
| BE | Error response | BaseResponse{resp_code,resp_desc{id,en},meta} | pkg/http/response.go:5-49 | none |
| BE | Logging / tracing | log/slog + Datadog dd-trace + OTel | config/logger.go, rest_router.go:65 | none |
| BE | Metrics | Datadog StatsD <feature>.<event> | datadog/metric.go:69, manual_sync.go:107 | none |
| BE | Mongo index/DDL | golang-migrate JSON createIndexes | db/migrations/016_* | none |
| FE | State management | Pinia setup store | CustomerStore.ts:722 | none |
| FE | Data fetching | $customFetch (ofetch wrapper) | useCustomFetch.ts:170, CustomerStore.ts:470 | none |
| Cross | Naming (snake_case API → FE) | FE consumes snake_case directly (no transform) | CustomerStore.ts:206-211 | none — read the status fields (progress_pct, etc.) as-is |
Reading Order for the Agent
internal/app/repository/activity_log/base.go:17-34— the struct you extend.internal/pkg/consts/const.go:100-106— add the source const here.db/migrations/016_*activity_log_category*— copy this to add fields/index.internal/app/service/activity_log.go:26,128-146— the batch + cap pattern.internal/app/service/activity_log_migration_service.go+consumer/activity_log_migration_consumer.go— the pattern to mirror (and the Redis store you replace).internal/worker/worker_service.go:132+service/job_enqueuer.go:65— register + enqueue the new job.internal/server/rest_router.go:69-74— mount the new/privateroutes.qontak.com/app/models/audit.rb:213-215,482-505,1824-1842— the mapping to port.qontak.com/db/schema.rb:47-78,5082-5136— exact source columns + users.- FE:
DetailPage.vue:1-49(where the status indicator mounts) + a newuseCrmMigrationStatuscomposable calling/v1/crm_migration/status. The activity timeline (ActivityLog.vue/CustomerActivity.vue) is not edited — migrated logs render natively (no badge, PRD v2.3).
Source Verification (anti-hallucination)
| Layer | Anchor / claim | Verified by | Evidence |
|---|---|---|---|
| BE | External endpoint over-constrained | read | rest_router.go:376 route; external_activity_log_request.go:85-90 code mandatory; :122-127 category=='transaction' |
| BE | Bulk cap = 20 | read | activity_log.go:26 maxBulkCreateLimit = 20; check :129-131 |
| BE | ActivityLog lacks external_id/source_tag/metadata; Actor at :28; no Description | read | activity_log/base.go:17-34; Actor string bson actor,omitempty at :28 |
| BE | Source consts only external/qontak; no crm_migration | read + grep | consts/const.go:105-106; grep crm_migration → 0 |
| BE | Existing migration is TF-2519 user-id (Redis 7-day TTL) | read | activity_log_migration_service.go:30-31 7*24*60*60; key :25 …:user_id_update; consumer ProcessUpdateUserIDJob :25 |
| BE | Status endpoint + payload | read | rest_router.go:74; payload/activity_log_migration.go:18-25 |
| BE | Contact.CrmData.ID at :343; no finder | read + grep | contact/base.go:343; crm_data.id path constant at :534; no FindByCrmDataID |
| BE | gocraft/work present + enqueue/register | read | go.mod:14 gocraft/work v0.5.1; job_enqueuer.go:65-67; worker_service.go:132,144-156 |
| BE | Mongo indexes via golang-migrate JSON | read + ls | db/migrations/008_*.up.json, 016_*activity_log_category*; Makefile:173 migrate-up |
| BE | Test/build/lint commands | read | Makefile:80 test (go test -race), :138 lint (staticcheck), :46 build, :156 mocks |
| BE | Error envelope | read | pkg/http/response.go:5-49; default_error.go:73 ErrBadRequest |
| BE | Metrics = Datadog StatsD | read | datadog/metric.go:69; example manual_sync.go:107 bulk_manual_sync.started |
| BE | Infra = k8s/ArgoCD/Helm on AliCloud; separate worker | read | deploy-alicloud/cd/production-worker.yaml; Makefile:60-77 subcommands |
| CRM | audits columns (+7 beyond PRD) + no (auditable_type,created_at) index | read | db/schema.rb:47-78 (incl. user_type, processed_*, queued_at, reindexed_at); indexes :68-77 |
| CRM | audited_changes YAML; update [old,new]; destroy snapshot | read | audit.rb:213-215 YAML.load; :503-505 (update); :482-483 (destroy); audited gem 5.8.0 Gemfile.lock:153 |
| CRM | crm_person_id from comment JSON first | read | audit.rb:487-501 |
| CRM | mapping_who 4 outcomes; Actor not description | read | audit.rb:1824-1842 (active/deleted/system/bare) |
| CRM | action only create/update/destroy; associate/disassociate derived | read + grep | audit.rb:474-476,528-530 (define_how); grep action == → only 3 |
| CRM | users for email/full_name/deleted_at | read | db/schema.rb:5082-5136 |
| CRM | no extraction API; timeline pulls from CDP | read | routes.rb:364; services/contacts/timeline.rb:34-41 HTTParty.get |
| CRM | no retention job; arbitrary-SQL batch job | read + grep | app/jobs/audit_batch_sql_job.rb; retention grep → 0 |
| FE | ActivityLog interface (10 fields, no source) at :203-214 | read | CustomerStore.ts:203-214 |
| FE | timeline[] transform updateTimeline() in two files (existing; not edited — migrated logs render natively, no badge) | read | ActivityLog.vue:190-210; CustomerActivity.vue:277-301 |
| FE | 5,000 cap page===1000×perPage 5 | read | ActivityLog.vue:47-52,80 |
| FE | fetch via $customFetch to /v1/activity_logs | read | CustomerStore.ts:466-508; useCustomFetch.ts:170 |
| FE | Pinia; commands pnpm test(vitest)/lint(eslint)/build; no typecheck/e2e script | read | package.json scripts; pnpm-lock.yaml |
| BE/Platform | USMAN email→SSO UUID resolution | read | VERIFIED — QontakLaunchpadClient.GetUsersByEmails(ctx, companySsoId, emails) qontak_launchpad.go:264 (iface :26) → GET /private/users?company_sso_id=&emails= → LaunchpadUsersResponse.UserResponse[].SsoID (json sso_id). Only deactivated-user Status handling is OQ-9 |
| BE | FE group is IAG /iag/v1 (not MAG); tenant via RequirePermissionMiddleware | read | rest_router.go:117-118 IAGMiddleware; :216 /iag/v1/activity_logs; require_permission_middleware.go:97 sets consts.CompanySSOKey; handler reads it activity_log_handler.go:68 |
| BE | No activity_log delete path (append-only by absence) | grep + read | activity_log/base.go has Search/Count/Insert/BulkInsert/UpdateUserIDByBatch only; IActivityLogService no delete |
| CRM | No resolved/completed/associate verb — all updates render "changed"; status keys are crm_stage_id/crm_task_status_id/crm_ticket_status_id | read | audit.rb:1052-1062 mapping_action; deal won via crm_stage_id audit.rb:67-91; link/unlink audit.rb:472-479,526-533 |
| CRM | users soft-delete = acts_as_paranoid (deleted_at) | read | user.rb:18 acts_as_paranoid; Gemfile:46; schema.rb:5136 deleted_at |
| Data | CRM read-only DSN / replica access | NOT VERIFIED | access path not confirmed → OQ-1 (operational Stage-0 gate) |
Design ↔ Code Mapping (FE half)
| Figma frame / component | Implementing file | Reuse vs new | Tokens used | Backing API | Deviation |
|---|---|---|---|---|---|
n/a — design pending (status indicator) | DetailPage.vue (new child component) | new (composed from pixel3 banner/alert) | pixel3 surface/space | GET /v1/crm_migration/status (IAG proxy) | pending frame (OQ-10) |
The migrated activity logs themselves are not in this table — they reuse the existing CDP timeline rendering with no design or code change (no badge, PRD v2.3).
Detail 2.1 — Architecture (mermaid)
End-to-end component diagram
flowchart TB
eng([Migration Engineer]) --> trig[/POST /private/activity_logs/crm_migration/]
trig --> tsvc[CrmActivityLogMigrationService.Trigger]
tsvc -->|flag + prereq check| jobrepo[(crm_activity_log_migration_jobs)]
tsvc -->|enqueue| q[[gocraft/work @ Redis]]
q --> cons[CrmActivityLogMigrationConsumer]
cons --> ext[CRMExtractor]
ext -->|cursor SELECT| crmdb[(CRM Postgres audits/users)]
cons --> xf[SchemaTransformer]
xf --> cmap[CategoryMapper]
xf --> chg[ChangesExtractor]
xf --> act[ActorResolver]
act -->|email→UUID| usman([USMAN])
xf --> cres[ContactResolver]
cres -->|FindOneByCrmDataID| ctcoll[(contacts)]
cres --> wm["SyncWatermarkResolver\n(skip >= contact.created_at)"]
cons --> mb[MigrateBatch ≤20]
mb -->|upsert skip-on-conflict| alcoll[(activity_logs)]
mb --> enr["AssociationLogEnricher\n(in-place metadata patch)"]
enr -->|"additive update (incl. live-feed rows)"| alcoll
mb --> jobrepo
livefeed["Kafka Deal/Company/Ticket/Task consumers\n(post-sync live feed — owns >= watermark)"] -->|CreateActivityLogFor*| alcoll
agent([CS Agent]) --> fe[ActivityLog.vue / DetailPage.vue]
fe -->|GET /v1/activity_logs| alcoll
fe -->|GET .../crm_migration/status| jobrepo
Data model (erDiagram)
erDiagram
ACTIVITY_LOGS ||--o{ CHANGE : has
ACTIVITY_LOGS {
objectid _id PK
objectid customer_id FK
string company_sso_id
string action
string category
string user_id
string actor
timestamp timestamp
string source
string external_id "NEW — CRM audit.id"
string source_tag "NEW — crm_migration"
object metadata "NEW — {legacy:true}"
}
CONTACTS {
objectid _id PK
string company_sso_id
string crm_data_id "= CRM crm_person_id (crm_data.id)"
}
CRM_ACTIVITY_LOG_MIGRATION_JOBS {
string job_id PK
string company_sso_id
string status
int progress_pct
int records_migrated
int records_skipped
int records_failed
int last_audit_id "resume cursor"
timestamp started_at
timestamp updated_at
timestamp completed_at
string error_message
}
CONTACTS ||--o{ ACTIVITY_LOGS : "customer_id"
State machine — migration job status
stateDiagram-v2
[*] --> not_started
not_started --> in_progress: trigger (flag ON, prereqs OK)
in_progress --> in_progress: batch processed (cursor advances)
in_progress --> halted: failure_rate > 10%
in_progress --> completed: all pages done, accuracy computed
halted --> in_progress: re-trigger (resume from last_audit_id)
completed --> rolled_back: ops rollback
rolled_back --> [*]
completed --> [*]
Two-phase framing (Decisions 16/17/18). Phase A is a pre-job gate: the trigger checks contact-coverage and returns
422 contacts_not_migratedwith no job row when below threshold (CALM-S09) — remediated by the upstreamBackfillCustomerIdWorker, never by this service. Only once the gate passes does a job enternot_started → in_progress. Phase B is thein_progresswork and now includes, per record: the pre-sync watermark filter (skip>= contact.created_at, CALM-S07) and, for association logs, the in-place enrichment sub-pass (CALM-S08). Neither adds a job state — they are per-record steps inside the batch loop (see Branch & skip flow).
Canonical
failure_ratedefinition (REV-10).failure_rate = records_failed / records_processed, where `records_processed = records_migrated
- records_skipped + records_failed
, computed **cumulatively across the job** (not per-batch) and re-evaluated **after each batch**.records_failedcounts records that failed to write after the retry (it does **not** countskippedrecords). The job transitions tohaltedwhen this cumulative ratio exceeds0.10`. Every "failure_rate > 10%" mention (§2.2, §3.A) refers to this formula.
Branch & skip flow (per-record skip policy)
flowchart TD
rec([audit record]) --> scope{auditable_type in scope?}
scope -- no --> s1[skip: unmapped_action_type]
scope -- yes --> contact{contact resolvable via crm_data.id / comment JSON?}
contact -- no --> s2[skip: contact_not_found]
contact -- yes --> wm{"created_at < contact.created_at (pre-sync watermark)?"}
wm -- "no (>= watermark)" --> s5[skip: post_sync_live_feed]
wm -- yes --> chg{audited_changes valid for action?}
chg -- "null on update" --> s3[skip: invalid_change_format]
chg -- ok --> dup{external_id already exists?}
dup -- yes --> s4[skip: duplicate_external_id]
dup -- no --> ins[transform + insert]
ins --> assoc{association log? metadata missing?}
assoc -- yes --> enr[AssociationLogEnricher: fill in place]
s1 --> done([next record])
s2 --> done
s3 --> done
s4 --> done
s5 --> done
enr --> done
assoc -- no --> done
Detail 2.2 — Sequence (end-to-end, incl. failure paths)
Happy path — trigger → extract → transform → batch insert
sequenceDiagram
actor Eng as Migration Engineer
participant LB as Ingress/LB
participant API as contact-service-api
participant JOB as Mongo jobs coll
participant Q as gocraft/work (Redis)
participant W as Worker (Consumer)
participant CRM as CRM Postgres (read-only)
participant USM as USMAN
participant CT as Mongo contacts
participant AL as Mongo activity_logs
Eng->>LB: POST /private/activity_logs/crm_migration {company_sso_id} (BasicAuth)
LB->>API: HTTP
API->>API: flag ON? contacts migrated? no active job?
API->>JOB: insert job {status:in_progress, last_audit_id:0}
API->>Q: enqueue crm_activity_log_migration
API-->>Eng: 202 {job_id, status:in_progress}
loop cursor batches (LIMIT 500)
W->>CRM: SELECT … WHERE id > last_audit_id AND auditable_type IN(…) AND created_at>=cutoff ORDER BY id
Note right of CRM: cursor on id PK. p? depends on CRM index (OQ-2)
CRM-->>W: up to 500 rows
loop per record
W->>CT: FindOneByCrmDataID(company_sso_id, auditable_id) → contact (+created_at)
W->>W: watermark — created_at >= contact.created_at? → skip post_sync_live_feed (CALM-S07)
W->>CRM: hop 1 — SELECT email, full_name, deleted_at FROM users WHERE id=user_id
W->>USM: hop 2 — GetUsersByEmails(company_sso_id, [email]) → SsoID (cached per job)
Note right of USM: timeout 3s. deleted_at set / no match → [Deleted CRM User]
W->>W: parse audited_changes YAML (update/destroy). map category/action
end
W->>AL: MigrateBatch upsert ≤20 (skip on (company_sso_id, external_id))
AL-->>W: inserted / skipped counts
opt association logs with missing metadata (CALM-S08)
W->>AL: AssociationLogEnricher — fill crm_person_id / entity name in place (incl. live-feed rows, OQ-13 match-key)
end
W->>JOB: update progress_pct, counts, last_audit_id
W->>W: failure_rate > 10%? → halt
end
W->>JOB: status:completed, accuracy_pct
Failure path — USMAN timeout during actor resolution
sequenceDiagram
participant W as Worker (ActorResolver)
participant USM as USMAN
W->>USM: GET user by email
Note right of USM: timeout after 3s
USM--xW: no response (after 1 retry)
W->>W: fall back to username. user_id=null, Actor="[Deleted CRM User]"
W->>W: record NOT skipped (Decision 6). increment actor_resolve_degraded metric
Failure path — batch insert 5xx
sequenceDiagram
participant W as Worker (MigrateBatch)
participant AL as Mongo activity_logs
W->>AL: upsert batch ≤20
AL--xW: write error (5xx/timeout)
W->>AL: retry once
AL--xW: still failing
W->>W: log batch_insert_failed. queue records for end-of-run retry
W->>W: if cumulative failure_rate > 10% → halt + PagerDuty
Detail 2.3 — Database Model (DDL)
MongoDB (no SQL). Schema lives in the Go struct + a golang-migrate JSON
migration mirroring db/migrations/016_*activity_log_category*.
Struct additions — internal/app/repository/activity_log/base.go:
// added to ActivityLog (after existing fields, ~line 32)
ExternalID string `json:"external_id,omitempty" bson:"external_id,omitempty"`
SourceTag string `json:"source_tag,omitempty" bson:"source_tag,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty" bson:"metadata,omitempty"`
Const — internal/pkg/consts/const.go (beside :105-106):
ActivityLogSourceCrmMigration = "crm_migration"
Migration db/migrations/0NN_crm_migration_indexes.up.json (createIndexes,
mirrors existing JSON migrations):
{
"createIndexes": "activity_logs",
"indexes": [{
"key": { "company_sso_id": 1, "external_id": 1 },
"name": "uniq_company_external_id",
"unique": true,
"partialFilterExpression": { "external_id": { "$exists": true } }
}]
}
{
"createIndexes": "contacts",
"indexes": [{
"key": { "company_sso_id": 1, "crm_data.id": 1 },
"name": "idx_company_crm_data_id"
}]
}
crm_activity_log_migration_jobs is created lazily on first insert (Mongo); a
migration adds its lookup index:
{
"createIndexes": "crm_activity_log_migration_jobs",
"indexes": [
{ "key": { "company_sso_id": 1 }, "name": "idx_company_sso_id" },
{ "key": { "company_sso_id": 1, "status": 1 }, "name": "idx_company_status",
"unique": true, "partialFilterExpression": { "status": { "$eq": "in_progress" } } }
]
}
The partial-unique on
(company_sso_id, status=in_progress)enforces one active job per account (CALM-S02/AC-3 → 409JOB_ALREADY_RUNNING).
- Cardinality / growth. Migrated
activity_logs≈ in-scopeauditsrows per account (target ≤ 1 year). RunSELECT COUNT(*) … FROM audits WHERE auditable_type IN(scope) AND created_at>=cutoffbefore each account (OQ-2).crm_activity_log_migration_jobs: one row per (account × run) — tiny. - Example migrated document.
{ "company_sso_id":"acc-123", "customer_id":"66b…", "category":"customer_details","action":"update", "actor":"Budi Santoso", "user_id":"sso-uuid-…","changes":[{"field":"Phone Number","from":"+628…","to":"628…"}],"timestamp":"2025-03-04T09:12:00Z", "external_id":"1341215908","source_tag":"crm_migration", "metadata":{"legacy":true} }
- PII classification.
actor(name),user_id(SSO UUID),changes[].from/to(may contain phone/email/name) → PII;external_id/source_tag/metadata.legacy→ non-PII. See §3.D. - Retention. Migrated
activity_logsfollow the existingactivity_logsretention (unchanged).crm_activity_log_migration_jobs: 1 year (PRD §5.1).
Per-status lifecycle — crm_activity_log_migration_jobs.status:
| Status | Visibility | Retention | Restore semantics | Transitions allowed |
|---|---|---|---|---|
not_started | implicit (no row) | — | — | → in_progress |
in_progress | status API + ops | 1 yr | resume from last_audit_id | → halted, completed |
halted | status API + alert | 1 yr | re-trigger resumes | → in_progress |
completed | status API | 1 yr | append-only; rollback only | → rolled_back |
rolled_back | status API + event | 1 yr | re-trigger from scratch | → in_progress |
- Partition / sharding key: none (collections already keyed by
company_sso_id; Mongo handles scale). NoSQL rationale: the target store is already MongoDB — no SQL alternative is in play for CDP writes.
Detail 2.4 — APIs
Outbound endpoints (callers → us)
| Endpoint | Method | AuthN/AuthZ | Request schema | Response schema | Status codes | Idempotency | Versioning | Reuse? |
|---|---|---|---|---|---|---|---|---|
/private/activity_logs/crm_migration | POST | BasicAuth (/private group, rest_router.go:69-72) | {company_sso_id, date_range_start?} | {job_id, status} | 202, 400, 403 FLAG_DISABLED, 409 ALREADY_COMPLETED/JOB_ALREADY_RUNNING, 422 contacts_not_migrated | one active job per account (partial-unique on status) | path /private (unversioned, matches existing) | new-with-justification |
/private/activity_logs/migrate | POST | BasicAuth | {records:[{company_sso_id, customer_id, external_id, source_tag, metadata, category, action, actor, user_id, changes[], timestamp}] ≤20} | {inserted, skipped, failed, results[]} | 200, 400 (>20 / bad shape), 401 | per-record (company_sso_id, external_id) skip-on-conflict | /private | new-with-justification |
/private/activity_logs/crm_migration/status | GET | BasicAuth | ?company_sso_id= | {status, progress_pct, records_migrated, records_skipped, records_failed, started_at, duration_seconds, accuracy_pct?} | 200, 200 {status:not_started} when none | n/a (read) | /private | new (mirrors TF-2519 status payload) |
/private/activity_logs/crm_migration/rollback | POST | BasicAuth | {company_sso_id} | {records_removed} | 200, 404 (no migrated records), 500 rollback_failed | idempotent (delete-by-tag) | /private | new-with-justification |
/iag/v1/crm_migration/status (FE-readable proxy — REV-1) | GET | IAG session + RequirePermissionMiddleware(CustomersCustomersViewKey); company_sso_id derived from session context (consts.CompanySSOKey), not a query param — reject cross-tenant | (none — tenant from session) | {status, progress_pct, records_migrated, records_skipped, records_failed, started_at, duration_seconds} (subset of the /private payload; accuracy_pct omitted) | 200, 200 {status:not_started} when none, 401 (no session), 403 (no permission) | n/a (read) | /iag/v1 (FE calls /v1/...; gateway strips /iag) | new-with-justification |
/v1/activity_logs (FE read) | GET | IAG session + RequirePermissionMiddleware (mirror ActivityLogHandler.Get, activity_log_handler.go:68; tenant via consts.CompanySSOKey) | ?customer_id=&page=&per_page=&after_id= | existing fields (the doc also carries BE source_tag/metadata as omitempty, ignored by the FE — no badge, PRD v2.3) | 200 | n/a | existing (/iag/v1, rest_router.go:216) | reused |
- Example — trigger request/response.
POST /private/activity_logs/crm_migration→{"company_sso_id":"acc-123"}→202 {"job_id":"crmmig-acc-123-…","status":"in_progress"}. - Example — flag OFF. →
403 {"resp_code":"FLAG_DISABLED","resp_desc":{"id":"…","en":"migration flag is off"}}. - Rate limits / payload.
/migratecapped at 20 records/call (mirrorsmaxBulkCreateLimit). Internal S2S — no per-user rate limit; the consumer self-throttles to the ≥500 rec/s target. - Pagination. CRM extraction = cursor on
audits.id; FE read = existingafter_idcursor (CustomerStore.ts:441-464).
Inbound webhooks (others → us)
n/a — this migration has no inbound webhook; CRM is read-only and there is no callback from CRM or USMAN.
Detail 2.A — UI Contract
Migrated logs — no FE contract change (PRD v2.3). Migrated activity logs are
returned by the existing GET /v1/activity_logs read and render in the
existing timeline (ActivityLog.vue / CustomerActivity.vue) exactly like
native logs. The FE ActivityLog TS interface (CustomerStore.ts:203-214) is
not extended (no source_tag/metadata on the FE type), and neither
updateTimeline() is edited — there is no badge. The deleted-associated-
reference [Deleted reference] placeholder (CALM-S04/ERR-1) is existing CDP
rendering behavior, not new work. The only FE surface in this RFC is the
status indicator below.
Status indicator — new child of DetailPage.vue:
interface MigrationStatusProps {
companySsoId: string
pollIntervalMs?: number // default 15000
}
- State owner: a small composable
useCrmMigrationStatuscalling$customFetch('/v1/crm_migration/status')(the IAG session-authed proxy, §2.4 — resolves REV-1/OQ-11; the browser never touches the/privateS2S route). Tenant is derived server-side from the session, so the FE sends nocompany_sso_id. - 4 states:
in_progress→ banner "Importing CRM activity history — some records may still be loading." withprogress_pct;not_started/completed→ hidden; API error → hidden (fail-silent, CALM-S05/ERR-1).
Detail 2.B — Data-Fetching Strategy (FE)
- Library: ofetch via
$customFetch(useCustomFetch.ts:170) — Bearer auth + retry + token refresh. - Cache key: activity logs keyed by
customer_id + page(existing store state,CustomerStore.ts); status keyed bycompany_sso_id. - TTL / refetch: activity logs on mount +
loadMore; status polled every 15 s while mounted, stop oncompleted/not_started. - SWR: no — direct fetch, store-held.
- Optimistic updates: n/a (read-only).
Detail 2.C — UI State Matrix
| Surface | Loading | Empty | Error | Partial | Success |
|---|---|---|---|---|---|
| Activity timeline (no badge — existing view) | existing skeleton | "No activity found" (ActivityLog.vue:37) | existing inline error | migrated + native interleaved, sorted by timestamp | full timeline; migrated rows render identically to native (no badge) |
| Migration status indicator | banner with progress_pct (in_progress) | hidden (not_started) | hidden, fail-silent (ERR-1) | banner persists while in_progress | hidden once completed |
Detail 2.D — Data Integrity Matrix
| Write path | Transaction scope | Partial failure behavior | Idempotency key + TTL | Consistency | Duplicate handling | Stale-read handling |
|---|---|---|---|---|---|---|
MigrateBatch upsert ≤20 | per-record upsert (no multi-doc txn) | retry once; then end-of-run retry queue; halt if >10% | (company_sso_id, external_id) (no TTL — permanent) | eventual | skip-on-conflict via partial unique index → duplicate_external_id | FE reads may lag during in_progress (progressive insert, PRD §9.1) |
| job-state update | single-doc update | retried by consumer loop | job_id | strong (single doc) | n/a | status API reads latest doc |
| rollback delete-by-tag | DELETE WHERE source_tag='crm_migration' AND company_sso_id= | retry; log rollback_failed; alert | idempotent (re-delete = 0) | eventual | n/a | n/a |
Detail 2.E — Concurrency Collision Map
| Resource | Writers | Collision | Resolution | On failure |
|---|---|---|---|---|
crm_activity_log_migration_jobs (per account) | two trigger calls | double job for one account | partial-unique (company_sso_id, status=in_progress) | 2nd trigger → 409 JOB_ALREADY_RUNNING (CALM-S02/AC-3) |
activity_logs (same external_id) | retried batches / re-run | duplicate insert | partial-unique (company_sso_id, external_id) | conflict → skip + duplicate_external_id (CALM-S06-NEG/NEG-1) |
last_audit_id cursor | single consumer per job | n/a (one writer) | one active job invariant | n/a |
Detail 2.F — Async Job / Event Consumer Spec
| Job/Consumer | Trigger | Input | Retry | DLQ + retention | Concurrency | Idempotency key | Per-msg timeout | Poison handling |
|---|---|---|---|---|---|---|---|---|
CrmActivityLogMigrationConsumer (gocraft/work, job crm_activity_log_migration) | enqueue by trigger handler | {job_id, company_sso_id, date_range_start?} | job-level: resume from last_audit_id; batch: retry once then end-of-run queue | gocraft retry/dead set (existing worker config); failed-record queue retained 30 d (PRD §5.1) | 1 per account (job invariant); worker pool default | job_id (job) + external_id (record) | USMAN call 3s; batch insert per existing Mongo timeout | record-level skip + *_record_failed event; job halts at >10% failure |
Sub-components (CALM-S03):
-
CRMExtractor— cursor query onaudits(Decision 7/11). The date predicate iscreated_at >= :cutoffwhere:cutoff = COALESCE(date_range_start, MIN(created_at) for the account)(REV-11): the trigger's optionaldate_range_startwins; otherwise the account's oldest in-scopeaudits.created_at(taken from the Stage-0 OQ-2MIN/MAX/COUNTprobe). -
CategoryMapper— CDP-definedcategory + actionfromauditable_type + action + audited_changeskeys. Grounding correction (REV-7 / #8): CRM has noassociate/resolved/completedverb — everyupdateis a generic "changed" (mapping_action,audit.rb:1052-1062). So link/unlink is derived from the join-model audit'saction(create→linked /destroy→unlinked, permapping_content_people_deal/mapping_content_companies_deal,audit.rb:472-533), and resolved/completed are predicates this mapper owns, keyed on the presence of a status/stage field inaudited_changes:Source audit Predicate (on audited_changes)CDP category CDP action Crm::Person,Crm::Phone+updateany field delta customer_detailsupdateDeal–Person join + create/destroyjoin row added / removed (carries crm_person_id+crm_deal_id)dealslinked/unlinkedCrm::Deal+updatecontains crm_stage_id→ won stage (perdeal_won_at,audit.rb:67-91)dealsresolvedCrm::Deal+updateno won-stage transition dealsupdateTicket+updatecontains crm_ticket_status_id→ a resolved status idticketsresolvedTicket+updateotherwise ticketsupdateCrm::Task+updatecontains crm_task_status_id→ a done status idtaskscompletedCrm::Task+updateotherwise tasksupdateDeal–Company join + create/destroyjoin row added / removed (carries crm_company_id+crm_deal_id)companylinked/unlinkedanything else — skip unmapped_action_typeOQ-12 (new): confirm (a) the exact join-model
auditable_typeclass names (Deal–Person, Deal–Company) to put in the scope list, and (b) the concrete id-sets for "won"crm_stage_id, "resolved"crm_ticket_status_id, and "done"crm_task_status_id. Until confirmed, thoseupdaterows fall through to the generic…/updateaction (safe default — no record dropped). -
ChangesExtractor— two branches (Decision 10) + column→label map (OQ-6). -
ActorResolver— four cases via the two-hop chain (Decision 6): CRMusersSELECT email, full_name, deleted_at WHERE id=:user_id→GetUsersByEmails(qontak_launchpad.go:264) →SsoID; per-jobuser_id → {email, sso_uuid}cache. -
ContactResolver—commentJSONcrm_person_idfirst (audit.rb:487-501), thenaudited_changesYAML, thenFindOneByCrmDataID. -
SyncWatermarkResolver(CALM-S07, Decision 16) — onceContactResolverreturns the CDP contact, read itscreated_atas the per-contact sync watermark and skip the record ifaudits.created_at >= watermark(reasonpost_sync_live_feed) — that range is owned by the live Kafka feed and must not be re-imported (live-feed logs have noexternal_id, so the unique index can't catch the dup). This is a post-resolution transform-stage filter, not part of the account-level SQL:cutoff(which is the lower bound). The contact is already fetched byContactResolver, so this adds no round-trip; the value is cached in the per-job contact cache. Edge: a pre-existing-cross-channel contact (OQ-15) may under-cut; safe for migration-created contacts. -
AssociationLogEnricher(CALM-S08, Decision 17) — runs over association logs only (deals/company/tickets/taskslink/unlink/resolved/completed). Fills missing referent metadata (crm_person_id, linked entity id + name) from CRM lookups and updates the existingactivity_logsdoc in place (additive only — never deletes, never changescategory/action/timestamp). Targets both newly-migrated rows and live-feed rows that lackexternal_id, located by the match-key(company_sso_id, customer_id, category/action, linked entity id, timestamp≈)(OQ-13 — exact key + collision-safety to confirm). Idempotent (fill-when-missing): a deleted-in-CRM referent is left as[Deleted reference](CALM-S08/ERR-1). This is the one sanctioned in-place update path under the §3.D append-only rule (Decision 17 amends Decision 13). -
MigrationLogger/ValidationRunner— per-record outcome + source-vs-inserted count →accuracy_pct. The completion-timeValidationRunnercount (countDocuments({source_tag, company_sso_id})vs sourceCOUNT) is the authoritativeaccuracy_pct; the in-progressrecords_migratedcounter is a best-effort progress gauge (REV-12) and may drift by a batch on crash/resume — reconciled at completion.
Detail 2.F.1 — Responsibility Boundary Matrix
| Step (order) | Owning squad / service | Inbound trigger | Outbound effect | Failure handler | PRD anchor |
|---|---|---|---|---|---|
1. Confirm crm_data.id coverage ≥99% | CDP / Data Eng | pre-Stage-1 check | go/no-go for account | block account | PRD §13, OQ-7 |
2. Provide read-only audits/users access | CRM Squad | DSN/replica request | enables extraction | block migration | PRD §13, OQ-1 |
| 3. Trigger + run job | CDP Backend | engineer S2S call | reads CRM, writes CDP | flag/prereq guards | PRD §7 Behaviors 1-4 |
| 4. Resolve email→SSO UUID | Platform / USMAN | per-record call | actor UUID or null | null → [Deleted CRM User] | PRD §5, OQ-9 |
| 5. Surface migration status | CDP FE | status API | status indicator (migrated logs render natively, no badge) | fail-silent | PRD §8 CALM-S05 |
Detail 2.F.2 — State Surface Contract
| Entity | State field / event | Default | Updated by | Read via | Stale window |
|---|---|---|---|---|---|
| migration job | status, progress_pct | not_started | consumer per batch | GET .../crm_migration/status | ≤ 15 s (FE poll) |
| migrated log | source_tag (BE-only; rollback/validation, not FE-displayed) | absent (native) | MigrateBatch | rollback/validation filter (returned by GET /v1/activity_logs but ignored by FE) | until next FE fetch |
Detail 2.G — Cross-Layer Contract Verification
| Endpoint | BE response schema | FE expected schema | Match? | Gaps |
|---|---|---|---|---|
GET /v1/activity_logs | doc carries source_tag, metadata (snake_case, omitempty) | does not read source_tag/metadata (no badge, PRD v2.3); existing fields only | yes | none — BE fields are additive and ignored by the FE |
GET /v1/crm_migration/status (IAG proxy) | {status, progress_pct, …} (snake_case, tenant from session) | useCrmMigrationStatus reads same keys; sends no company_sso_id | yes | resolved — the FE calls the IAG session-authed proxy (§2.4), not the /private S2S route. Tenant scoping is server-side via consts.CompanySSOKey (reject cross-tenant 403) |
GET /private/activity_logs/crm_migration/status (ops) | same payload + accuracy_pct | n/a — S2S ops only, not called by FE | yes (n/a) | ops/engineer surface only |
OQ-11 is resolved (REV-1): CALM-S05 consumes the new IAG proxy
GET /v1/crm_migration/status. No remaining cross-layer access mismatch.
Detail 2.H — End-to-End Data Flow
Engineer → POST /private/activity_logs/crm_migration → Trigger service (flag + prereq + job insert + enqueue) → gocraft/work → Consumer → [CRMExtractor cursor read CRM Postgres] → [SchemaTransformer: ContactResolver(FindOneByCrmDataID) + ActorResolver(USMAN, cached) + ChangesExtractor(YAML) + CategoryMapper] → MigrateBatch upsert(skip-on-conflict) activity_logs + update job doc → (later) Agent opens Contact Detail → GET /v1/activity_logs → migrated logs render in the existing timeline identically to native logs (no badge, PRD v2.3).
- Side effects: observability events (PRD §10);
accuracy_pctat completion. - Ownership: CDP owns trigger→write; CRM owns source read access; Platform owns USMAN.
Detail 2.I — Scope Boundaries
- BE create:
crm_activity_log_migration_{handler,service,consumer}.go,payload/crm_activity_log_migration.go,repository/.../crm_migration_jobrepo,db/migrations/0NN_crm_migration_indexes.{up,down}.json,CRMExtractor+ transformer pkg (incl.SyncWatermarkResolver,AssociationLogEnricher),FindOneByCrmDataID. - BE modify:
activity_log/base.go(3 fields + a scoped additive-update method for the enricher — the one sanctioned in-place path, Decision 17),consts/const.go(1 const + skip reasonspost_sync_live_feed/enrich_ambiguous),rest_router.go(4 routes),worker_service.go(register job),job_enqueuer.go(job name), config (CRM DSN, USMAN, flag). - BE NOT touched: existing
ExternalActivityLogHandler,BulkCreate, TF-2519ActivityLogMigration*(left intact — distinct job). - FE create: migration-status indicator component +
useCrmMigrationStatus. - FE modify:
DetailPage.vue(mount the status indicator) — that is the whole FE change. - FE NOT touched: the 5,000-entry cap; pagination; native log rendering; the
activity-timeline components (
ActivityLog.vue/CustomerActivity.vue) and the FEActivityLogTS interface (no badge, nosource_tag/metadata— PRD v2.3). - Shared modules impact:
ActivityLogstruct is read across the service — additions areomitempty, backward-compatible.
Detail 2.J — Asset Inventory (FE)
| Asset | Type | Source | Format | Path |
|---|---|---|---|---|
| status indicator icon (optional) | icon | pixel3 icon set | SVG | TBD with frame (OQ-10) |
3. High-Availability & Security
The migration runs on the existing stateless worker deployment (HPA/KEDA).
A worker restart resumes from last_audit_id (durable job doc — Decision 8). If
CRM Postgres or USMAN is unavailable, the consumer retries (3× extract / 1×
USMAN) then halts the job with state preserved; agents continue to see native
CDP logs (PRD §9.1). Existing native activity_logs and the FE read path are
unaffected throughout.
Performance Requirement
- Backend: per-account job P95 ≤ 24 h; CDP write throughput ≥ 500 rec/s via
≤20-record batches; extraction = cursor on
audits.id, page size 500. RunEXPLAIN ANALYZEon the extraction query before Stage 0 (OQ-2); add a CRM index if needed. USMAN load bounded by a per-job email→UUID cache. - Frontend: migrated logs add zero FE cost — they render in the existing timeline with no code change. Status poll = 1 request / 15 s while a contact is open; negligible. No bundle budget impact (the indicator reuses pixel3 banner/alert); browser support per existing app.
- Load test: replay one large synthetic account (Stage 1) and measure rec/s, USMAN call rate, Mongo write latency, and CRM query plan.
Monitoring & Alerting
Metrics via Datadog StatsD (datadog/metric.go:69), naming
crm_activity_log_migration.<event> (mirrors bulk_manual_sync.*,
manual_sync.go:107). Events (PRD §10): …_started, …_completed, …_failed,
…_record_skipped (sampled 1:10), …_record_failed, …_rolled_back. Worker
job metrics job.crm_activity_log_migration.{success,failed} are emitted by the
existing registerJobWithOptions wrapper (worker_service.go:144-156).
Alerts: failure rate >10% → PagerDuty CDP on-call; job duration >48 h → Slack
#cdp-migration-ops; skip rate >20% → Slack (mapping gap); 3 consecutive account
failures → PagerDuty + PM; accuracy_pct < 90% → PagerDuty + PM. Dashboard owner:
CDP Engineering Squad.
Logging
Structured log/slog (config/logger.go) with job_id, company_sso_id,
external_id, skip_reason/failure_reason. PII scrubbing: never log
changes[].from/to values, actor names, or emails — log only external_id and
counts. Trace context via dd-trace + OTel (consumer/helper.go:37).
Security Implications
- Threat model. Entry point is the S2S
/privategroup (BasicAuth,rest_router.go:69-72) — only the internal migration team holds credentials. Risks: (a) cross-tenant leakage ifcompany_sso_idis not enforced on every query; (b) SSRF/SQLi via extraction; (c) PII exposure in logs. - Mitigations. Every Mongo query + the rollback delete are scoped by
company_sso_id. CRM extraction uses parameterized queries on a read-only DSN (no writes possible). Inputs validated:company_sso_idformat, batch ≤20,external_idnumeric-string. No user-supplied URLs (no SSRF). - Secrets. CRM DSN, USMAN creds, BasicAuth via the existing config/secret mechanism (Helm/Vault) — never hard-coded, never logged.
- Static analysis.
staticcheck(Makefile:138) in CI;sonar-scannerconfig present (sonar-project.properties).
Role × Endpoint Authorization Matrix
| Role | Endpoint(s) | Methods | Tenant scope | UI visibility | Constraint | Audit trail |
|---|---|---|---|---|---|---|
| Migration Engineer | /private/activity_logs/crm_migration*, /migrate | POST/GET | per company_sso_id (provided) | none | S2S BasicAuth only | events + job doc |
| CS / Sales Agent | GET /v1/activity_logs; GET /v1/crm_migration/status (IAG proxy) | GET | own tenant only — company_sso_id from session (consts.CompanySSOKey); cross-tenant request → 403 | status indicator (migrated logs as normal records) | read-only | n/a |
| Client admin / end user | — | — | — | migrated logs as normal records (no badge) | no migration access | n/a |
Detail 3.A — Failure Mode & Retry Catalog
| External call | Timeout | Retries | Circuit breaker | DLQ + retention | Caller behavior on persistent failure |
|---|---|---|---|---|---|
| CRM Postgres extract | per-query (driver default) | 3× then halt | n/a | n/a (read) | halt job, preserve cursor, alert |
| USMAN email→UUID | 3 s | 1× | n/a | n/a | fall back to [Deleted CRM User], do not skip |
Mongo MigrateBatch | Mongo driver default | 1× then end-of-run queue | halt at >10% | failed-record queue 30 d | halt + PagerDuty |
| Mongo job-state update | driver default | consumer loop retry | n/a | n/a | log; next batch retries |
Detail 3.A.1 — Branch & Skip Catalog
| Branch trigger | Where checked | Downstream effect | Audit trail | User-visible? |
|---|---|---|---|---|
auditable_type not in scope | CategoryMapper | skip record | …_record_skipped {unmapped_action_type} | no |
contact unresolved (crm_data.id/comment/YAML all miss) | ContactResolver | skip record | …_record_skipped {contact_not_found} | no |
audits.created_at >= contact.created_at (post-sync; owned by live feed) | SyncWatermarkResolver | skip record (CALM-S07) | …_record_skipped {post_sync_live_feed} | no |
audited_changes null on update | ChangesExtractor | skip record | …_record_skipped {invalid_change_format} | no |
(company_sso_id, external_id) exists | MigrateBatch | skip insert | …_record_skipped {duplicate_external_id} | no |
| flag OFF | trigger handler | no job, no records | 403 FLAG_DISABLED | engineer only |
Detail 3.B — Error Response Catalog (BE)
Shape: BaseResponse{resp_code, resp_desc{id,en}, meta} (pkg/http/response.go:5-49).
| Endpoint | Error code | HTTP | When | User-facing? |
|---|---|---|---|---|
| trigger | FLAG_DISABLED | 403 | flag OFF | no (engineer) |
| trigger | contacts_not_migrated | 422 | prereq missing | no |
| trigger | ALREADY_COMPLETED | 409 | re-trigger completed | no |
| trigger | JOB_ALREADY_RUNNING | 409 | active job exists | no |
/migrate | BATCH_TOO_LARGE | 400 | >20 records | no |
| rollback | rollback_failed | 500 | delete error | no |
| status | — | 200 {status:not_started} | no job | no |
Detail 3.C — Error Message Catalog (FE)
| Error code | User-facing message | Surface | User-facing? |
|---|---|---|---|
| status API unavailable | (none — indicator hidden, fail-silent) | banner | no (CALM-S05/ERR-1) |
| deleted associated reference | [Deleted reference] | inline timeline | yes (CALM-S04/ERR-1) |
Detail 3.D — Compliance & Data Governance
Triggered — migrated logs contain PII (names, phone/email in changes,
actor identity) and constitute an audit trail.
| Field | Classification | Legal basis | Retention | Encryption | Access audit | Right-to-delete path |
|---|---|---|---|---|---|---|
actor, user_id | PII | UU PDP / GDPR (legitimate interest — audit continuity) | per activity_logs policy | at rest (Mongo) + TLS in transit | S2S access + events | contact deletion cascades per existing CDP policy |
changes[].from/to | PII (variable) | same | same | same | same | same |
external_id, source_tag, metadata | non-PII | — | same | same | — | — |
Append-only enforcement point (REV-8). Today the activity_log repository
exposes no delete method at all (activity_log/base.go has only
Search*/Count/Insert/BulkInsert/UpdateUserIDByBatch — verified, no
DeleteOne/DeleteMany/Remove), and the service interface IActivityLogService
has no delete. So append-only holds by absence today — the only authorized
deleter is the new POST .../crm_migration/rollback service path (scoped to
source_tag='crm_migration' AND company_sso_id). Invariant for the future:
if any activity-log delete endpoint/method is ever added, it MUST reject
deletion when source_tag = 'crm_migration' (except the rollback path). This is
the named enforcement point.
Append-only exception for enrichment (Decision 17, CALM-S08). Append-only bars
deletes and mutation of category/action/timestamp — it does not
bar the AssociationLogEnricher from an additive metadata update (filling a
missing crm_person_id / linked-entity id+name on an association log, including a
live-feed log without source_tag). That single in-place update path is sanctioned
and idempotent; everything else on migrated logs stays append-only.
Cross-border (UU PDP). No cross-border transfer: the CRM audits/users
Postgres and the CDP MongoDB both reside in the same Qontak Alibaba Cloud
region (verified infra in §2.0 / deploy-alicloud/). Data stays in-jurisdiction.
Migrated logs are append-only (
source_tagguard, Decision 13 / OQ-5) — deleted only by the ops rollback path or the contact's own deletion cascade.
Detail 3.E — Accessibility
- Status banner: announced via
role="status"(polite), keyboard-skippable, dismiss not required (auto-hides). Color contrast per pixel3 defaults. (Migrated logs use the existing timeline's accessibility — no badge, PRD v2.3.)
4. Backwards Compatibility and Rollout Plan
Compatibility
- BE: all changes additive — new
/privateroutes; newomitemptyfields; partial index (existing docs withoutexternal_iduntouched). No existing endpoint/response shape changes. No API version bump. - FE: the status indicator is additive; migrated logs render via the existing read path with no FE change. No saved-state/cache migration.
- Cross-layer: migrated logs render as normal records in any FE version
(no badge by design — PRD v2.3);
source_tag/metadataare backend-only fields the FE never reads.
Rollout Strategy
- Deploy order: Backend first (schema + index + endpoints + consumer), then the FE status indicator. The migration can run and backfill before the FE chrome ships; migrated logs appear in any FE version (no badge by design).
- Feature flag:
cdp_crm_activity_log_migration_enabled(default OFF), enabled per account by the migration team. Single flag gates the whole BE path; the FE status indicator can ship independently (purely additive). - Stages (PRD §12): Stage 0 (Eng, 1-2 sprints) — schema/index/endpoints/consumer + FE status indicator in staging; Stage 1 (QA, 1 wk) — 2 synthetic accounts (small+large), accuracy=100%, zero dup on re-run, 4 actor cases, status indicator; Stage 2 (Pilot, 2 wk) — 5-10 client accounts, accuracy ≥99%, zero pipeline halts; Stage 3 (Controlled, ongoing) — 20-50 accounts/wk, halt rate <2%.
- Stop conditions: failure rate >10% in any account;
accuracy_pct <90%; 3 consecutive account failures. - Rollback mechanism:
POST /private/activity_logs/crm_migration/rollback(DELETE WHERE source_tag='crm_migration' AND company_sso_id=); flag → OFF. - Blast radius: worst case = one account's migrated logs (removable by rollback); native logs and other accounts unaffected.
- PIC: CDP Eng (Stage 0/1), PM+CSM (Stage 2), PM+Ops (Stage 3).
Detail 4.A — Cross-Layer Rollout Compatibility Matrix
| Scenario | FE | BE | Works? | Mitigation |
|---|---|---|---|---|
| Pre-deploy | Old | Old | yes | baseline |
| Backend first | Old | New | yes | migrated logs appear as normal records (no badge by design); status indicator not yet present |
| Frontend first | New | Old | yes | status indicator hidden (status API 404/not_started); migrated logs render natively when the BE ships |
| Both deployed | New | New | yes | target state |
| Backend rollback | New | Old (rolled back) | yes | status API gone → indicator fail-silent; migrated logs removed by rollback |
| Frontend rollback | Old | New | yes | logs still migrated and render natively; just no status indicator |
Detail 4.B — Configuration Contract
| Layer | Env var / flag | Type | Default | Required | Provisioner | Secret? |
|---|---|---|---|---|---|---|
| BE | cdp_crm_activity_log_migration_enabled | bool (flag) | OFF | yes | flag system, per account | no |
| BE | CRM_AUDIT_DATABASE_URL | string (read-only DSN) | — | yes | Helm/Vault | yes |
| BE | USMAN/Launchpad client config | string | existing | reuses existing QontakLaunchpadClient config (no new var) | Helm/Vault | yes |
| BE | /private BasicAuth creds | string | existing | yes | Helm/Vault | yes |
| FE | status poll interval | number (ms) | 15000 | no | build/default | no |
Detail 4.C — Test Plan (commands sourced from repo)
| Layer | Command (source) | What it must prove |
|---|---|---|
| BE unit | make test → go test -race ./... (Makefile:80) | transformer (4 actor cases, 2 change branches, category map), idempotency skip, flag/prereq guards |
| BE mocks | make mocks (Makefile:156, mockery) | regenerate mocks for new interfaces (CRMExtractor, job repo) |
| BE migrate | make migrate-up / make migrate-down (Makefile:173/176) | indexes created + reversible |
| BE lint | make lint → staticcheck (Makefile:138) | no static-analysis regressions |
| BE build | make build (Makefile:46) | compiles |
| BE integration | n/a — no dedicated integration target (closest make diff-test, Makefile:87); add a Mongo-backed test for idempotent re-run | re-run inserts 0, skips N; concurrent trigger → 409 |
| FE unit | pnpm test → vitest (package.json) | status indicator 4 states (shown on in_progress; hidden on completed/not_started; API error → fail-silent) |
| FE lint | pnpm lint → eslint . (package.json) | lint clean |
| FE build | pnpm build (package.json) | builds |
| FE typecheck | n/a — no typecheck script exists (no vue-tsc); rely on eslint + IDE | — |
| FE e2e | n/a — no e2e suite configured (no Playwright/Cypress config) | — |
Detail 4.D — Agent Execution Plan
| Order | Layer | Chunk | Files | Commands | Acceptance criteria |
|---|---|---|---|---|---|
| 1 | BE | Schema + const + indexes | activity_log/base.go, consts/const.go, db/migrations/0NN_crm_migration_indexes.{up,down}.json | make migrate-up && make build | fields compile; uniq_company_external_id (partial) + idx_company_crm_data_id + job indexes exist; migrate-down clean |
| 2 | BE | Contact finder | repository/contact/base.go (FindOneByCrmDataID) | make mocks && make test | unit test resolves a crm_data.id to a contact; miss → not-found |
| 3 | BE | Transformer pkg | new …/crm_migration/transformer*.go (CategoryMapper, ChangesExtractor, ActorResolver, ContactResolver) | make test | golden-file tests: update→{from,to}; destroy→to:null; 4 actor cases; comment-JSON crm_person_id first; UTC timestamp; unmapped→skip |
| 4 | BE | MigrateBatch service + /migrate endpoint | service/crm_activity_log_migration_service.go, handler, payload, rest_router.go | make test && make build | batch ≤20 enforced; (company_sso_id, external_id) conflict → skip + duplicate_external_id; re-run inserts 0 |
| 5 | BE | Trigger + status (S2S) + IAG status proxy + rollback + durable job store + Phase-A coverage gate (CALM-S09) | handler/service/repo + rest_router.go (3 /private routes + 1 /iag/v1/crm_migration/status wrapped in RequirePermissionMiddleware(CustomersCustomersViewKey)) | make test | flag OFF→403; coverage < threshold→422 contacts_not_migrated (no job; remediated upstream); 2nd trigger→409; status returns counts; IAG proxy tenant-scoped (cross-tenant→403); rollback deletes by tag |
| 6 | BE | Consumer + worker registration + extractor (account-level lower-bound :cutoff) | consumer/crm_activity_log_migration_consumer.go, worker/worker_service.go, service/job_enqueuer.go, CRM DSN config | make build && make test | job registered; enqueue→consume→cursor extract→transform→write; resumes from last_audit_id |
| 7 | FE | Status indicator + composable | new component + useCrmMigrationStatus (calls /v1/crm_migration/status), DetailPage.vue | pnpm test && pnpm build | indicator on in_progress; hidden on completed/not_started; API error→hidden. Depends on chunk 5 (IAG proxy) + OQ-10 (design) |
| 8 | BE | SyncWatermarkResolver — per-contact pre-sync cutoff (CALM-S07) | new …/crm_migration/watermark*.go; wire into the transform step (after ContactResolver) | make test | record created_at < contact.created_at → migrated; >= watermark → skip post_sync_live_feed; value cached per-job; no extra contact round-trip |
| 9 | BE | AssociationLogEnricher — in-place metadata patch (CALM-S08) | new …/crm_migration/association_enricher*.go; an additive update path on the activity-log repo (scoped, append-only-exception) | make test | migrated assoc log missing crm_person_id → filled; live-feed assoc log (no external_id) matched via OQ-13 key → patched in place (no new doc); ambiguous match → enrich_ambiguous skip; deleted referent → [Deleted reference]; idempotent re-run patches 0 |
Detail 4.E — Verification & Rollback Recipe
- Pre-merge (in order):
- BE: 1)
make lint2)make test3)make build4)make migrate-up && make migrate-down - FE: 1)
pnpm lint2)pnpm test3)pnpm build
- BE: 1)
- Post-deploy signals: Datadog
crm_activity_log_migration.completedcount >0 withaccuracy_pct ≥99;job.crm_activity_log_migration.failed≈ 0; Mongodb.activity_logs.countDocuments({source_tag:"crm_migration", company_sso_id})matches source count; status API returnscompleted. - Rollback recipe:
- Set
cdp_crm_activity_log_migration_enabled = OFFfor the account. POST /private/activity_logs/crm_migration/rollback {company_sso_id}→ confirmrecords_removed.- Verify
countDocuments({source_tag:"crm_migration", company_sso_id}) == 0. - If schema must be reverted:
make migrate-down(drops indexes/fields-by-index). - Revert the FE PR if the status indicator must be pulled (independent of BE — see §4.A).
- Set
Detail 4.F — Resource & Cost Notes
- Compute: runs on existing worker pods; one extra long-running job per account at a time — bounded. No new pod types.
- DB: read load on CRM Postgres (cursor scans — gated by OQ-2 index check); Mongo write load ≤500 rec/s during a run; one small job collection.
- Storage growth: migrated
activity_logs≈ in-scopeauditsper account (one-time). Network: cross-service CRM Postgres reads + USMAN calls (cached).
Detail 4.G — Stage-0 Go/No-Go Checklist (closes the cross-team gates)
Stage 0 (per §4 Rollout) cannot exit until the gates below are green. Gates A/B/E are cross-team (they need the CRM/Platform squad to grant access or run a query/measurement on the CRM production DB — CDP cannot self-serve them); gates C/D/F are CDP-owned. Each gate has a copy-pasteable check and a pass threshold so there is no ambiguity at sign-off.
Scope list used by every CRM query below (from §2.F CategoryMapper):
('Crm::Person','Crm::Phone','Crm::Deal','Ticket','Crm::Task',<Deal–Person join class>,<Deal–Company join class>)
— the two join-class names are confirmed in Gate E (OQ-12); until then run the
queries with the five known types and add the join classes once named.
Gate A — CRM read-only access (REV-3 / OQ-1) · owner: CRM Squad + Platform · blocks: extraction
- Ask to send the CRM/Platform team: "Grant
contact-servicea read-only Postgres role (or a read replica DSN) on theqontak.comproduction DB, scoped toSELECTonauditsandusers. We will store it asCRM_AUDIT_DATABASE_URLin Helm/Vault. No writes, no DDL from our side." - Acceptance: CDP can run, from the worker network,
SELECT 1,SELECT count(*) FROM audits LIMIT 1, andSELECT email, full_name, deleted_at FROM users LIMIT 1— and cannot write (INSERT/UPDATE/DDLreturn permission denied). - Pass: read works, write denied. No-go: access not granted → Stage 0 blocked.
Gate B — Extraction query plan + volume + cutoff (REV-4 / OQ-2 / REV-11) · owner: CRM DBA + CDP · blocks: Stage 0 sizing
Run on the CRM DB (needs Gate A):
-- B1: volume + date range (sets job ETA and the :cutoff floor)
SELECT MIN(created_at) AS oldest, MAX(created_at) AS newest, COUNT(*) AS rows_in_scope
FROM audits
WHERE auditable_type IN (/* scope list */)
AND created_at >= NOW() - INTERVAL '1 year';
-- B2: extraction query plan (the cursor page the consumer runs)
EXPLAIN (ANALYZE, BUFFERS)
SELECT id, auditable_id, auditable_type, associated_id, associated_type,
user_id, username, action, audited_changes, comment, created_at
FROM audits
WHERE id > 0 -- cursor; 0 = first page
AND auditable_type IN (/* scope list */)
AND created_at >= :cutoff -- :cutoff = COALESCE(date_range_start, B1.oldest)
ORDER BY id ASC
LIMIT 500;
- Pass thresholds: B2 page latency p95 < 500 ms and not a full
Seq Scanover all ~15M rows per page.:cutoffis recorded =COALESCE(date_range_start, B1.oldest). - If it fails (Seq Scan / >500 ms): ask the CRM team to add an index via a Rails migration (their repo owns
db/schema.rb), non-locking:Re-run B2 after the index. No-go until B2 passes (a slow plan blows the P95 ≤ 24 h/account budget).CREATE INDEX CONCURRENTLY index_audits_on_auditable_type_created_at_idON audits (auditable_type, created_at, id); -- DBA may prefer switching the cursor to (created_at, id)
Gate C — Per-account Contact.crm_data.id coverage (REV-5 / OQ-7) · owner: CDP / Data · blocks: that account only
Run on CDP MongoDB per account before that account is enabled:
const ACC = "<company_sso_id>";
const total = db.contacts.countDocuments({ company_sso_id: ACC });
const missing = db.contacts.countDocuments({ company_sso_id: ACC,
$or: [ {"crm_data.id": {$exists:false}}, {"crm_data.id": null}, {"crm_data.id": ""} ] });
print("coverage =", total ? ((total - missing) / total) : 0); // gate >= 0.99
- Pass: coverage ≥ 0.99. If < 0.99: this is the Phase-A gate
(Decision 18, CALM-S09) — run the upstream
Crm::BackfillCustomerIdWorker(…, create_if_not_found: true)to sync the unsynced contacts (it provisions the CDP contact + backfillscrm_data.id/qontak_customer_id), or populate thecrm_contact_id_mapfallback (Decision 9), then re-check. contact-service does not run the backfill or write CRM — it only waits for coverage. No-go for that account until ≥ 0.99 (elsecontact_not_foundblows the < 1% skip budget).
Gate D — USMAN deactivated-user behavior (REV-2 / OQ-9) · owner: Platform · blocks: actor accuracy
- Confirm: how
QontakLaunchpadClient.GetUsersByEmails(qontak_launchpad.go:264) represents a deactivated user — emptyUserResponsevs a returned row with a non-activeStatus. - Pass: behavior documented; ActorResolver maps "no active match" → case 2 (
[Deleted CRM User]). (Endpoint itself already verified — this is an edge confirmation, not a blocker for building.)
Gate E — Mapper id-sets + join class names (OQ-12) · owner: CRM + CDP · blocks: resolved/completed/link precision (not the run)
- Ask the CRM team for: (a) the Rails class names of the Deal–Person and Deal–Company association/join audited models (the
auditable_typevalues), and (b) the id-sets for woncrm_stage_id, resolvedcrm_ticket_status_id, donecrm_task_status_id. - Pass: values recorded in the migration-service lookup. Until then: those
updaterows fall through to generic…/update(safe — no record dropped), so this gate may stay amber into Stage 1 without blocking a run.
Gate G — Sync-seam & enrichment validation (REV / OQ-14, OQ-13) · owner: CDP + Data · blocks: Stage 1 sign-off (not Stage 0 build)
On a sample account at Stage 1, validate the two contact-state cases end-to-end:
- Seam (CALM-S07 / OQ-14): confirm no migrated row has
created_at >= contact.created_at(no double-import) and no in-scope pre-sync audit was dropped (no gap). Check: count audits just below the watermark vs migrated rows, and migrated rows vs live-feed rows just above it. - Enrichment (CALM-S08 / OQ-13): confirm the live-feed match-key patches the
intended logs only — sample N enriched live-feed logs, verify the
crm_person_id/ entity name is correct and no unrelated log was touched; ambiguous matches were skipped (enrich_ambiguous), not guessed. - Pass: zero double-import, zero gap, enrichment precision 100% on the sample. Amber until Stage 1 (these need a real account to measure).
Gate F — CDP build prerequisites (CDP-owned) · blocks: Stage 0 exit
make migrate-upapplied in staging: indexesuniq_company_external_id(partial),idx_company_crm_data_id, and the jobs-collection indexes exist;make migrate-downreverses cleanly.- New S2S + IAG endpoints stable in staging; FE status indicator in staging.
- Pass: §4.D chunks 1–9 acceptance criteria green in staging.
Sign-off table (fill at the Stage-0 gate review)
| Gate | Cross-team? | Owner | Status (go/no-go) | Evidence (query result / link / commit) | Date |
|---|---|---|---|---|---|
| A — CRM read access | yes | CRM + Platform | ☐ | ||
| B — Query plan + volume + cutoff | yes | CRM DBA + CDP | ☐ | B1 counts; B2 EXPLAIN; index? | |
C — crm_data.id coverage (per account) | no | CDP / Data | ☐ | coverage ratio per account | |
| D — USMAN deactivated behavior | yes (confirm) | Platform | ☐ | ||
| E — Mapper id-sets + join classes | yes | CRM + CDP | ☐ (amber OK) | ||
| G — Sync-seam & enrichment validation | no | CDP + Data | ☐ (Stage 1) | no dup/gap; enrichment precision | |
| F — CDP build prereqs in staging | no | CDP Eng | ☐ | chunks 1–9 green |
Go decision: Stage 0 exits to Stage 1 when A, B, F = go and D = go (or documented). C is evaluated per account at enablement, not at Stage-0 exit. E may be amber (safe fallthrough) through Stage 1 but should be green before Stage 2 pilot for
resolved/completedfidelity.
5. Concern, Questions, or Known Limitations
| # | Type | Question | Default / mitigation | Owner | Deadline |
|---|---|---|---|---|---|
| OQ-1 | Decision (blocker) | Direct read-only Postgres query on audits vs a new CRM extraction API? | Default: direct read-only DSN; confirm access control with CRM/Platform | CDP + CRM | 2026-06-17 (PRD) |
| OQ-2 | Open (blocker) | Actual audits date range/volume + extraction query plan (no (auditable_type,created_at) index) | Run MIN/MAX/COUNT + EXPLAIN ANALYZE before Stage 0; add CRM index if needed | CDP + Data | 2026-06-10 (PRD) |
| OQ-3 | Risk | Partial-failure policy: rollback vs resume | Default: resume from last_audit_id (idempotent); full rollback if corruption / unresolved >48 h | PM + CDP | 2026-06-17 |
| OQ-4 | Open | auditable_type values outside the initial scope list | Default: skip + unmapped_action_type; expand scope in follow-up | PM | 2026-06-17 |
| OQ-5 | Decision | Are migrated logs immutable (no agent delete)? | Default: append-only when source_tag='crm_migration' (Decision 13); confirm with CDP platform | CDP | 2026-06-17 |
| OQ-6 | Open | Field-name display map ownership (column→label) | Default: migration-service-local; promote to shared later | CDP | 2026-06-17 |
| OQ-7 | Risk (blocker) | Contact.crm_data.id coverage per account | Run coverage count; require ≥99% before the account; backfill or fallback table otherwise | CDP + Data | 2026-06-10 (PRD) |
| OQ-8 | Process | Named reviewers + InfoSec approver | Assign before moving status to in-review | DRI | before in-review |
| OQ-9 | Dependency (minor — downgraded) | USMAN email→SSO UUID is resolved: GetUsersByEmails exists (qontak_launchpad.go:264). Only confirm how it represents a deactivated user (empty result vs a Status flag) so ActorResolver maps to [Deleted CRM User]. | Use empty/Status → case 2; confirm with Platform | Platform / USMAN | before Stage 1 |
| OQ-10 | Design | Figma frame for the status indicator (the only FE surface; migrated logs need no design — no badge, PRD v2.3) | Use a pixel3 banner/alert as interim; get a frame for pixel polish | PM + Design | before Stage 1 (FE) |
| OQ-11 | Decision — RESOLVED (REV-1) | How does the web FE read status? | Resolved: new IAG session-authed proxy GET /v1/crm_migration/status (tenant from session), §2.4 / §2.G / Decision 15. No longer a blocker. | CDP Backend + FE | done |
| OQ-12 | Open (data confirm) | Confirm (a) the join-model auditable_type class names for Deal–Person / Deal–Company associations, and (b) the id-sets for "won" crm_stage_id, "resolved" crm_ticket_status_id, "done" crm_task_status_id (CRM has no verb for these — §2.F CategoryMapper / REV-7) | Until confirmed, those update rows fall through to generic …/update (safe — no record dropped) | CDP Eng + CRM | before Stage 2 |
| OQ-13 | Open (blocker for CALM-S08) | Exact match-key to locate live-feed association logs for in-place enrichment — they carry no external_id. Candidate (company_sso_id, customer_id, category/action, linked entity id, timestamp≈); confirm it is collision-safe (no over-matching distinct events) and how to bound the timestamp≈ tolerance. | Default: match on entity id + category/action + exact customer_id; if ambiguous (>1 candidate) skip the patch and emit enrich_ambiguous rather than risk patching the wrong log | CDP Eng | before Stage 1 (S08) |
| OQ-14 | Open (correctness) | Prove the sync seam is gap-free and overlap-free: migrate where created_at < contact.created_at + live feed owns >= contact.created_at. Validate on a sample account that no audit falls in neither set and none in both. | Sample-check at Stage 1 (count audits at/just-below the watermark vs live-feed rows); accept if no dup and no gap | CDP + Data | Stage 1 |
| OQ-15 | Edge (CALM-S07) | A CDP contact that pre-existed CRM linkage (created via chat etc., linked later) has contact.created_at earlier than its true CRM-sync moment → its watermark under-cuts, leaving a small pre-sync gap | Safe for migration-created contacts (crm_data.id set by contact migration); flag the cross-channel population and decide whether to special-case (e.g. use linkage time if a durable one is added later) | CDP | before Stage 2 |
Known limitations. (1) FE 5,000-entry cap unchanged — large histories truncate
in the UI (PRD §4.7). (2) The Go port of Audit#parse must be kept in sync with
CRM audit.rb / audited gem upgrades — guarded by golden-file tests pinned to
gem v5.8.0. (3) Cross-service CRM Postgres read couples the migrator to the CRM
schema (Decision 11 trade-off). (4) The sync watermark is an approximation.
qontak_customer_id is set via update_column and is not audited (grounding
note #12), so there is no exact sync timestamp; CDP contact.created_at is the
proxy (Decision 16). It is exact for migration-created contacts and only drifts for
the pre-existing-cross-channel edge (OQ-15). (5) Live-feed association logs have no
external_id, so enrichment matches them heuristically (OQ-13) — an ambiguous
match is skipped, not guessed.
6. Comment logs
| Date | Comment(s) From | Action Item(s) |
|---|---|---|
| 2026-06-18 | rfc-starter (initial draft) | Grounded against contact-service, qontak.com, qontak-customer-fe worktrees; PRD corrections recorded in Grounding note; blockers OQ-1/2/7/9/11 raised |
| 2026-06-18 | R1-fix (post rfc-reviewer R1, score 6.5) | Resolved all 12 REV findings: REV-1 IAG status proxy (GET /v1/crm_migration/status); REV-2/6 USMAN GetUsersByEmails verified + actor two-hop; REV-7 CategoryMapper CDP-defined predicate (CRM has no resolved/completed verb); REV-8 append-only enforcement (no delete path); REV-10 failure_rate formula; REV-11 :cutoff def; REV-5 <99% fallback designed; REV-9/12 metric/counter notes; cross-border confirmed. OQ-9/OQ-11 downgraded/resolved; OQ-12 added. §7 → implementation-ready |
| 2026-06-18 | post rfc-reviewer R2 (score 8.0, PROCEED) | Added Detail 4.G — Stage-0 Go/No-Go Checklist with copy-pasteable CRM/CDP queries, pass thresholds, the cross-team asks (Gates A/B/E), and a sign-off table. Closes the operational path for the remaining external gates (OQ-1/2/7/9/12). Additive — no change to spec/decisions |
| 2026-06-30 | Sync to PRD v2.3 (badge removed) | Realigned the RFC to PRD v2.3, which dropped the "Migrated from CRM" source badge. Removed the badge as an FE deliverable everywhere: deleted the FE ActivityLog TS interface extension + TimelineInterface.isMigrated + the MpBadge render (Detail 2.A), the FE badge execution chunk (Detail 4.D — now 7 chunks, the FE status indicator is chunk 7), the SlaBadge/MpBadge repo anchors + Badge-UI pattern row + the badge Design↔Code/Design-References/Asset-Inventory rows, and reframed CALM-S04 to BE-only (migrated logs render in the existing CDP timeline, identical to native, via the existing GET /v1/activity_logs). Backend source_tag/metadata retained (rollback/validation/append-only — not FE-displayed). Updated traceability tables, role/UI coverage, rollout + compatibility matrices, performance, accessibility, OQ-10, and §7. Bumped source-PRD ref v2.1 → v2.3. No change to the backend schema, endpoints, transformer, or decisions. |
| 2026-06-30 | Contact-state revision (unsynced / pre-sync / association cases) | Extended the RFC to cover three contact-state cases. Grounded 3 new facts (notes #11–13): a live CRM→CDP Kafka activity feed already owns post-sync activity; qontak_customer_id is not audited (set via update_column, send.rb:125), so no CRM sync-timestamp exists; the upstream Crm::BackfillCustomerIdWorker(create_if_not_found: true) owns contact creation + the qontak_customer_id write-back. Added 3 stories CALM-S07 (pre-sync watermark cutoff), CALM-S08 (association-log enrichment, in-place patch incl. live-feed rows), CALM-S09 (Phase-A coverage gate). Added 3 decisions D16 (watermark = CDP contact.created_at; the audit-based watermark was found unbuildable), D17 (append-only exception for additive enrichment, amends D13), D18 (gate-only — this RFC never writes CRM/contacts; sync owned upstream); amended D9. Added components SyncWatermarkResolver, AssociationLogEnricher; 2 new execution chunks (8, 9 — now 9 chunks). Added OQ-13 (live-feed match-key), OQ-14 (seam gap/overlap proof), OQ-15 (cross-channel created_at edge) + Stage-0 Gate G. Updated overview, scope, assumptions, dependencies, traceability, PRD-to-Schema, success criteria, branch/skip flow, state-machine framing, sequence, skip catalog, append-only enforcement. These three stories are net-new vs PRD v2.3 — the PRD must catch up (see §7). |
7. Ready for agent execution
PRD alignment gap (2026-06-30 contact-state revision). Stories CALM-S07 / CALM-S08 / CALM-S09 are net-new in this RFC and are not yet in PRD v2.3. The RFC is the agent-execution spec, but the repo's source-of-truth doctrine puts requirements in the PRD first. Action: backfill these three stories (and their ACs) into the PRD before moving the RFC status past
in-review, so the PRD ⇄ RFC ⇄ test-spec chain stays consistent. The build is unblocked meanwhile (the RFC fully specifies them), but the PRD must catch up.
Ready for agent execution: yes (implementation-ready). After the R1-fix
revision and the contact-state revision, every specification gate passes — an
agent can build all 9 chunks (Detail 4.D) from this RFC without a clarification
meeting. The previously
score-capping cross-layer mismatch (REV-1/OQ-11) is resolved (IAG status proxy),
the USMAN path is verified (GetUsersByEmails), the actor two-hop, CategoryMapper
predicate, append-only enforcement, failure_rate formula, and :cutoff are all
pinned, and the <99% contact fallback is designed.
The remaining Open Questions are operational Stage-0 go/no-go gates that gate running the migration, not building it — they are confirmations on other teams' production systems, not spec holes:
- OQ-1 / OQ-2 — confirm the CRM read-only DSN and run
EXPLAIN ANALYZE+MIN/MAX/COUNTbefore the first extraction (add a CRM index if the plan is bad). - OQ-7 — confirm per-account
Contact.crm_data.idcoverage ≥ 99% (else backfill or populate the designedcrm_contact_id_mapfallback) before that account runs. - OQ-9 — confirm
GetUsersByEmailsdeactivated-user representation (safe default already specified). - OQ-10 — FE pixel frame for the status indicator (chunk 7 builds against the pixel3 interim; the frame is polish, not a blocker; migrated logs need no design).
- OQ-12 — confirm join-model class names + won/done/resolved id-sets (safe
fallthrough to
…/updateuntil confirmed).
Build order: backend chunks 1–6, 8, 9 and FE chunk 7 are all unblocked for implementation now. Chunk 6's execution run (and Stage 1 QA) waits on OQ-1/2/9; any account's run waits on its OQ-7 coverage check (Phase A); chunk 8 (watermark) depends on the contact being resolved (chunk 3 + Task 2); chunk 9 (enrichment) depends on OQ-13's match-key for live-feed rows. None of these block writing and unit-testing the code.
What is ready now (can be executed immediately):
- Infrastructure Topology + per-service responsibility — complete.
- Technical Decisions — 14 ADR blocks; minimum coverage addressed.
- Repo Reading Guide + Source Verification — every anchor verified to
path:lineagainst the live repos (the two unverifiable rows are marked blockers, not filled). - DDL (struct + index migrations) with per-status lifecycle; APIs (outbound +
n/ainbound) tagged reuse/extend/new; Data Integrity / Concurrency / Async specs; Failure/Branch/Error catalogs; Cross-Layer Rollout matrix; Config contract; Agent Execution Plan (9 chunks, repo-sourced commands); Verification & Rollback recipe.
All 9 chunks (Detail 4.D) are implementation-ready. Only their execution runs / Stage-1 QA depend on the operational gates above (OQ-1/2/7/9/12/13/14) — the code can be written and unit-tested now.
Optional next step: hand off to
rfc-reviewerfor a second-pass score.