Skip to content

Commit d8df394

Browse files
committed
fix: apply enrichment to sequin_stream messages
sequin_stream was bypassing the SinkPipeline (which applies enrichment for push sinks like NATS). This commit adds enrichment processing in PullController.receive() to match the behavior of push sinks. Changes: - Add :enrichment and :postgres_database to cached_sink_consumer_for_sequin_stream preloads - Add enrich_messages/2 helper that groups messages by table_oid for efficient batched enrichment - Add test for enrichment in sequin_stream pull API
1 parent dcc62ec commit d8df394

File tree

3 files changed

+89
-2
lines changed

3 files changed

+89
-2
lines changed

lib/sequin/consumers/consumers.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,11 @@ defmodule Sequin.Consumers do
414414
Cache.get_or_store(
415415
cache_key,
416416
fn ->
417-
find_sink_consumer(account_id, id_or_name: id_or_name, type: :sequin_stream, preload: [:transform])
417+
find_sink_consumer(account_id,
418+
id_or_name: id_or_name,
419+
type: :sequin_stream,
420+
preload: [:transform, :enrichment, :postgres_database]
421+
)
418422
end,
419423
ttl
420424
)

lib/sequin_web/controllers/pull_controller.ex

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule SequinWeb.PullController do
22
use SequinWeb, :controller
33

44
alias Sequin.Consumers
5+
alias Sequin.Consumers.SinkConsumer
56
alias Sequin.Error
67
alias Sequin.Runtime.SlotMessageStore
78
alias Sequin.String, as: SequinString
@@ -21,7 +22,8 @@ defmodule SequinWeb.PullController do
2122
:ok <- SlotMessageStore.nack_stale_produced_messages(consumer),
2223
{:ok, messages} <- SlotMessageStore.produce(consumer, batch_size, :consistent_pid) do
2324
Logger.metadata(batch_size: batch_size)
24-
render(conn, "receive.json", consumer: consumer, messages: messages)
25+
enriched_messages = enrich_messages(consumer, messages)
26+
render(conn, "receive.json", consumer: consumer, messages: enriched_messages)
2527
end
2628
end
2729

@@ -169,4 +171,22 @@ defmodule SequinWeb.PullController do
169171
defp env do
170172
Application.get_env(:sequin, :env)
171173
end
174+
175+
defp enrich_messages(%SinkConsumer{enrichment: nil}, messages), do: messages
176+
177+
defp enrich_messages(%SinkConsumer{} = consumer, messages) do
178+
# Group by table_oid to batch enrichment queries efficiently
179+
enriched_by_cursor =
180+
messages
181+
|> Enum.group_by(& &1.table_oid)
182+
|> Enum.flat_map(fn {_table_oid, table_messages} ->
183+
Consumers.enrich_messages!(consumer.postgres_database, consumer.enrichment, table_messages)
184+
end)
185+
|> Map.new(&{{&1.commit_lsn, &1.commit_idx}, &1})
186+
187+
# Return in original order
188+
Enum.map(messages, fn msg ->
189+
Map.fetch!(enriched_by_cursor, {msg.commit_lsn, msg.commit_idx})
190+
end)
191+
end
172192
end

test/sequin_web/controllers/pull_controller_test.exs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ defmodule SequinWeb.PullControllerTest do
44
alias Sequin.Consumers
55
alias Sequin.Databases.ConnectionCache
66
alias Sequin.Factory.AccountsFactory
7+
alias Sequin.Factory.CharacterFactory
78
alias Sequin.Factory.ConsumersFactory
89
alias Sequin.Factory.DatabasesFactory
10+
alias Sequin.Factory.FunctionsFactory
911
alias Sequin.Factory.ReplicationFactory
1012
alias Sequin.Runtime.SlotMessageStore
1113
alias Sequin.Runtime.SlotMessageStoreSupervisor
14+
alias Sequin.TestSupport.Models.Character
1215

1316
setup :authenticated_conn
1417

@@ -118,6 +121,66 @@ defmodule SequinWeb.PullControllerTest do
118121
assert %{"data" => messages} = json_response(conn, 200)
119122
assert length(messages) == 1
120123
end
124+
125+
test "applies enrichment function to messages", %{conn: conn, account: account} do
126+
# Create a character in the test database
127+
character = CharacterFactory.insert_character!()
128+
129+
# Create database and replication slot
130+
db = DatabasesFactory.insert_configured_postgres_database!(account_id: account.id, tables: :character_tables)
131+
ConnectionCache.cache_connection(db, Sequin.Repo)
132+
rep_slot = ReplicationFactory.insert_postgres_replication!(postgres_database_id: db.id, account_id: account.id)
133+
134+
# Create an enrichment function that looks up character data
135+
enrichment_function =
136+
FunctionsFactory.insert_enrichment_function!(
137+
account_id: account.id,
138+
function_attrs: [
139+
code: """
140+
SELECT id, 'enriched_' || name as enriched_name
141+
FROM "Characters"
142+
WHERE id = ANY($1::int[])
143+
"""
144+
]
145+
)
146+
147+
# Create consumer with enrichment
148+
consumer =
149+
ConsumersFactory.insert_sink_consumer!(
150+
message_kind: :record,
151+
account_id: account.id,
152+
backfill_completed_at: DateTime.add(DateTime.utc_now(), -24, :hour),
153+
replication_slot_id: rep_slot.id,
154+
sink: %{type: :sequin_stream},
155+
enrichment_id: enrichment_function.id,
156+
source: %{
157+
include_table_oids: [Character.table_oid()]
158+
}
159+
)
160+
161+
start_supervised!({SlotMessageStoreSupervisor, consumer_id: consumer.id, test_pid: self()})
162+
163+
# Create a message for the character
164+
record =
165+
ConsumersFactory.deliverable_consumer_record(
166+
consumer_id: consumer.id,
167+
table_oid: Character.table_oid(),
168+
record_pks: [character.id],
169+
data: ConsumersFactory.consumer_record_data(record: %{"id" => character.id, "name" => character.name})
170+
)
171+
172+
SlotMessageStore.put_messages(consumer, [record])
173+
174+
# Fetch the message and verify enrichment
175+
conn = get(conn, ~p"/api/sequin_streams/#{consumer.id}/receive")
176+
assert %{"data" => [message]} = json_response(conn, 200)
177+
178+
# Verify enrichment data is present
179+
assert message["data"]["metadata"]["enrichment"] == %{
180+
"id" => character.id,
181+
"enriched_name" => "enriched_#{character.name}"
182+
}
183+
end
121184
end
122185

123186
describe "receive, wait_for behavior" do

0 commit comments

Comments
 (0)