Database Subscriptions
Postgres Changes
Basic Subscription
// Listen to all changes on a table
const channel = supabase
.channel('tasks-changes')
.on(
'postgres_changes',
{
event: '*', // INSERT, UPDATE, DELETE, or *
schema: 'public',
table: 'tasks',
},
(payload) => {
console.log('Change received:', payload);
}
)
.subscribe();
Payload Structure
// INSERT payload
{
schema: 'public',
table: 'tasks',
commit_timestamp: '2024-01-15T10:00:00Z',
eventType: 'INSERT',
new: { id: '123', title: 'New task', status: 'todo' },
old: {},
errors: null
}
// UPDATE payload
{
eventType: 'UPDATE',
new: { id: '123', title: 'Updated task', status: 'done' },
old: { id: '123', title: 'New task', status: 'todo' },
}
// DELETE payload
{
eventType: 'DELETE',
new: {},
old: { id: '123', title: 'Deleted task' },
}
Event-specific Listeners
Separate Handlers
const channel = supabase.channel('tasks-crud');
// Handle INSERT
channel.on(
'postgres_changes',
{ event: 'INSERT', schema: 'public', table: 'tasks' },
(payload) => {
console.log('New task:', payload.new);
// Add to local state
setTasks((prev) => [...prev, payload.new]);
}
);
// Handle UPDATE
channel.on(
'postgres_changes',
{ event: 'UPDATE', schema: 'public', table: 'tasks' },
(payload) => {
console.log('Updated task:', payload.new);
// Update local state
setTasks((prev) =>
prev.map((t) => (t.id === payload.new.id ? payload.new : t))
);
}
);
// Handle DELETE
channel.on(
'postgres_changes',
{ event: 'DELETE', schema: 'public', table: 'tasks' },
(payload) => {
console.log('Deleted task:', payload.old);
// Remove from local state
setTasks((prev) => prev.filter((t) => t.id !== payload.old.id));
}
);
channel.subscribe();
Filtering Changes
Filter by Column Value
// Only receive changes for specific project
const channel = supabase
.channel('project-tasks')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'tasks',
filter: 'project_id=eq.project-123',
},
handler
)
.subscribe();
Dynamic Filters
// Filter by current user
const channel = supabase
.channel('my-notifications')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'notifications',
filter: `user_id=eq.${userId}`,
},
(payload) => {
showNotification(payload.new);
}
)
.subscribe();
Filter Operators
// Available filter operators (PostgREST syntax)
filter: 'column=eq.value' // Equal
filter: 'column=neq.value' // Not equal
filter: 'column=gt.value' // Greater than
filter: 'column=gte.value' // Greater than or equal
filter: 'column=lt.value' // Less than
filter: 'column=lte.value' // Less than or equal
filter: 'column=in.(a,b,c)' // In list
// Examples
filter: 'status=eq.active'
filter: 'priority=gte.3'
filter: 'type=in.(task,bug,feature)'
React Hook Pattern
useRealtimeSubscription Hook
import { useEffect, useState } from 'react';
import { createClient } from '@/lib/supabase/client';
import { RealtimePostgresChangesPayload } from '@supabase/supabase-js';
type Task = {
id: string;
title: string;
status: string;
project_id: string;
};
export function useRealtimeTasks(projectId: string) {
const [tasks, setTasks] = useState<Task[]>([]);
const [loading, setLoading] = useState(true);
const supabase = createClient();
useEffect(() => {
// Initial fetch
async function fetchTasks() {
const { data } = await supabase
.from('tasks')
.select('*')
.eq('project_id', projectId)
.order('created_at', { ascending: false });
setTasks(data || []);
setLoading(false);
}
fetchTasks();
// Subscribe to changes
const channel = supabase
.channel(`tasks:${projectId}`)
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'tasks',
filter: `project_id=eq.${projectId}`,
},
(payload: RealtimePostgresChangesPayload<Task>) => {
switch (payload.eventType) {
case 'INSERT':
setTasks((prev) => [payload.new as Task, ...prev]);
break;
case 'UPDATE':
setTasks((prev) =>
prev.map((t) =>
t.id === (payload.new as Task).id
? (payload.new as Task)
: t
)
);
break;
case 'DELETE':
setTasks((prev) =>
prev.filter((t) => t.id !== (payload.old as Task).id)
);
break;
}
}
)
.subscribe();
// Cleanup
return () => {
supabase.removeChannel(channel);
};
}, [projectId]);
return { tasks, loading };
}
Using the Hook
function TaskList({ projectId }: { projectId: string }) {
const { tasks, loading } = useRealtimeTasks(projectId);
if (loading) return <div>Loading...</div>;
return (
<ul>
{tasks.map((task) => (
<li key={task.id}>
{task.title} - {task.status}
</li>
))}
</ul>
);
}
Multiple Table Subscriptions
const channel = supabase.channel('project-updates');
// Tasks changes
channel.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'tasks',
filter: `project_id=eq.${projectId}`,
},
handleTaskChange
);
// Comments on tasks
channel.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'comments',
},
handleNewComment
);
// Project settings
channel.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'projects',
filter: `id=eq.${projectId}`,
},
handleProjectUpdate
);
channel.subscribe();
Enable Realtime on Tables
Via Dashboard
Supabase Dashboard
└── Database
└── Replication
└── Select tables to enable realtime
Via SQL
-- Add table to realtime publication
ALTER PUBLICATION supabase_realtime ADD TABLE tasks;
ALTER PUBLICATION supabase_realtime ADD TABLE comments;
-- Remove table from realtime
ALTER PUBLICATION supabase_realtime DROP TABLE tasks;
-- Check which tables have realtime enabled
SELECT * FROM pg_publication_tables
WHERE pubname = 'supabase_realtime';
Full Replica vs Partial
-- Full replica: All columns in payload
ALTER TABLE tasks REPLICA IDENTITY FULL;
-- Default: Only primary key in old record
-- (good for DELETE events if you need full row data)
Best Practices
// ✅ GOOD: Specific filter reduces overhead
.on('postgres_changes', {
event: 'INSERT',
table: 'messages',
filter: `room_id=eq.${roomId}`,
}, handler)
// ❌ BAD: Listen to all changes on large table
.on('postgres_changes', {
event: '*',
table: 'all_events', // Millions of rows
}, handler)
// ✅ GOOD: Cleanup on unmount
useEffect(() => {
const channel = supabase.channel('...');
return () => supabase.removeChannel(channel);
}, []);
// ✅ GOOD: Debounce rapid updates
const debouncedHandler = useMemo(
() => debounce(handleChange, 100),
[]
);
Error Handling
Handle Connection Errors
const channel = supabase
.channel('my-channel')
.on('postgres_changes', config, handler)
.subscribe((status, err) => {
if (status === 'SUBSCRIBED') {
console.log('Connected');
}
if (status === 'CHANNEL_ERROR') {
console.error('Connection error:', err);
// Retry logic
setTimeout(() => {
channel.subscribe();
}, 5000);
}
if (status === 'TIMED_OUT') {
console.log('Connection timed out, retrying...');
}
});
Tổng kết
Subscription Pattern
// 1. Create channel
const channel = supabase.channel('unique-name');
// 2. Add listeners
channel.on('postgres_changes', { event, schema, table, filter }, handler);
// 3. Subscribe
channel.subscribe();
// 4. Cleanup
supabase.removeChannel(channel);
Key Points
| Aspect |
Recommendation |
| Channel name |
Unique per subscription |
| Filter |
Always use when possible |
| Cleanup |
Always remove on unmount |
| Error handling |
Implement retry logic |
| RLS |
Realtime respects RLS |
Q&A
- Tables nào cần realtime?
- Cần filter theo criteria nào?
- Có nhiều users concurrent không?