Bỏ qua

Database Subscriptions

Postgres Changes

Basic Subscription

// Listen to all changes on a table
const channel = supabase
  .channel('tasks-changes')
  .on(
    'postgres_changes',
    {
      event: '*',       // INSERT, UPDATE, DELETE, or *
      schema: 'public',
      table: 'tasks',
    },
    (payload) => {
      console.log('Change received:', payload);
    }
  )
  .subscribe();

Payload Structure

// INSERT payload
{
  schema: 'public',
  table: 'tasks',
  commit_timestamp: '2024-01-15T10:00:00Z',
  eventType: 'INSERT',
  new: { id: '123', title: 'New task', status: 'todo' },
  old: {},
  errors: null
}

// UPDATE payload
{
  eventType: 'UPDATE',
  new: { id: '123', title: 'Updated task', status: 'done' },
  old: { id: '123', title: 'New task', status: 'todo' },
}

// DELETE payload
{
  eventType: 'DELETE',
  new: {},
  old: { id: '123', title: 'Deleted task' },
}

Event-specific Listeners

Separate Handlers

const channel = supabase.channel('tasks-crud');

// Handle INSERT
channel.on(
  'postgres_changes',
  { event: 'INSERT', schema: 'public', table: 'tasks' },
  (payload) => {
    console.log('New task:', payload.new);
    // Add to local state
    setTasks((prev) => [...prev, payload.new]);
  }
);

// Handle UPDATE
channel.on(
  'postgres_changes',
  { event: 'UPDATE', schema: 'public', table: 'tasks' },
  (payload) => {
    console.log('Updated task:', payload.new);
    // Update local state
    setTasks((prev) =>
      prev.map((t) => (t.id === payload.new.id ? payload.new : t))
    );
  }
);

// Handle DELETE
channel.on(
  'postgres_changes',
  { event: 'DELETE', schema: 'public', table: 'tasks' },
  (payload) => {
    console.log('Deleted task:', payload.old);
    // Remove from local state
    setTasks((prev) => prev.filter((t) => t.id !== payload.old.id));
  }
);

channel.subscribe();

Filtering Changes

Filter by Column Value

// Only receive changes for specific project
const channel = supabase
  .channel('project-tasks')
  .on(
    'postgres_changes',
    {
      event: '*',
      schema: 'public',
      table: 'tasks',
      filter: 'project_id=eq.project-123',
    },
    handler
  )
  .subscribe();

Dynamic Filters

// Filter by current user
const channel = supabase
  .channel('my-notifications')
  .on(
    'postgres_changes',
    {
      event: 'INSERT',
      schema: 'public',
      table: 'notifications',
      filter: `user_id=eq.${userId}`,
    },
    (payload) => {
      showNotification(payload.new);
    }
  )
  .subscribe();

Filter Operators

// Available filter operators (PostgREST syntax)
filter: 'column=eq.value'     // Equal
filter: 'column=neq.value'    // Not equal
filter: 'column=gt.value'     // Greater than
filter: 'column=gte.value'    // Greater than or equal
filter: 'column=lt.value'     // Less than
filter: 'column=lte.value'    // Less than or equal
filter: 'column=in.(a,b,c)'   // In list

// Examples
filter: 'status=eq.active'
filter: 'priority=gte.3'
filter: 'type=in.(task,bug,feature)'

React Hook Pattern

useRealtimeSubscription Hook

import { useEffect, useState } from 'react';
import { createClient } from '@/lib/supabase/client';
import { RealtimePostgresChangesPayload } from '@supabase/supabase-js';

type Task = {
  id: string;
  title: string;
  status: string;
  project_id: string;
};

export function useRealtimeTasks(projectId: string) {
  const [tasks, setTasks] = useState<Task[]>([]);
  const [loading, setLoading] = useState(true);

  const supabase = createClient();

  useEffect(() => {
    // Initial fetch
    async function fetchTasks() {
      const { data } = await supabase
        .from('tasks')
        .select('*')
        .eq('project_id', projectId)
        .order('created_at', { ascending: false });

      setTasks(data || []);
      setLoading(false);
    }

    fetchTasks();

    // Subscribe to changes
    const channel = supabase
      .channel(`tasks:${projectId}`)
      .on(
        'postgres_changes',
        {
          event: '*',
          schema: 'public',
          table: 'tasks',
          filter: `project_id=eq.${projectId}`,
        },
        (payload: RealtimePostgresChangesPayload<Task>) => {
          switch (payload.eventType) {
            case 'INSERT':
              setTasks((prev) => [payload.new as Task, ...prev]);
              break;
            case 'UPDATE':
              setTasks((prev) =>
                prev.map((t) =>
                  t.id === (payload.new as Task).id
                    ? (payload.new as Task)
                    : t
                )
              );
              break;
            case 'DELETE':
              setTasks((prev) =>
                prev.filter((t) => t.id !== (payload.old as Task).id)
              );
              break;
          }
        }
      )
      .subscribe();

    // Cleanup
    return () => {
      supabase.removeChannel(channel);
    };
  }, [projectId]);

  return { tasks, loading };
}

Using the Hook

function TaskList({ projectId }: { projectId: string }) {
  const { tasks, loading } = useRealtimeTasks(projectId);

  if (loading) return <div>Loading...</div>;

  return (
    <ul>
      {tasks.map((task) => (
        <li key={task.id}>
          {task.title} - {task.status}
        </li>
      ))}
    </ul>
  );
}

Multiple Table Subscriptions

const channel = supabase.channel('project-updates');

// Tasks changes
channel.on(
  'postgres_changes',
  {
    event: '*',
    schema: 'public',
    table: 'tasks',
    filter: `project_id=eq.${projectId}`,
  },
  handleTaskChange
);

// Comments on tasks
channel.on(
  'postgres_changes',
  {
    event: 'INSERT',
    schema: 'public',
    table: 'comments',
  },
  handleNewComment
);

// Project settings
channel.on(
  'postgres_changes',
  {
    event: 'UPDATE',
    schema: 'public',
    table: 'projects',
    filter: `id=eq.${projectId}`,
  },
  handleProjectUpdate
);

channel.subscribe();

Enable Realtime on Tables

Via Dashboard

Supabase Dashboard
└── Database
    └── Replication
        └── Select tables to enable realtime

Via SQL

-- Add table to realtime publication
ALTER PUBLICATION supabase_realtime ADD TABLE tasks;
ALTER PUBLICATION supabase_realtime ADD TABLE comments;

-- Remove table from realtime
ALTER PUBLICATION supabase_realtime DROP TABLE tasks;

-- Check which tables have realtime enabled
SELECT * FROM pg_publication_tables
WHERE pubname = 'supabase_realtime';

Full Replica vs Partial

-- Full replica: All columns in payload
ALTER TABLE tasks REPLICA IDENTITY FULL;

-- Default: Only primary key in old record
-- (good for DELETE events if you need full row data)

Performance Considerations

Best Practices

// ✅ GOOD: Specific filter reduces overhead
.on('postgres_changes', {
  event: 'INSERT',
  table: 'messages',
  filter: `room_id=eq.${roomId}`,
}, handler)

// ❌ BAD: Listen to all changes on large table
.on('postgres_changes', {
  event: '*',
  table: 'all_events', // Millions of rows
}, handler)

// ✅ GOOD: Cleanup on unmount
useEffect(() => {
  const channel = supabase.channel('...');
  return () => supabase.removeChannel(channel);
}, []);

// ✅ GOOD: Debounce rapid updates
const debouncedHandler = useMemo(
  () => debounce(handleChange, 100),
  []
);

Error Handling

Handle Connection Errors

const channel = supabase
  .channel('my-channel')
  .on('postgres_changes', config, handler)
  .subscribe((status, err) => {
    if (status === 'SUBSCRIBED') {
      console.log('Connected');
    }
    if (status === 'CHANNEL_ERROR') {
      console.error('Connection error:', err);
      // Retry logic
      setTimeout(() => {
        channel.subscribe();
      }, 5000);
    }
    if (status === 'TIMED_OUT') {
      console.log('Connection timed out, retrying...');
    }
  });

Tổng kết

Subscription Pattern

// 1. Create channel
const channel = supabase.channel('unique-name');

// 2. Add listeners
channel.on('postgres_changes', { event, schema, table, filter }, handler);

// 3. Subscribe
channel.subscribe();

// 4. Cleanup
supabase.removeChannel(channel);

Key Points

Aspect Recommendation
Channel name Unique per subscription
Filter Always use when possible
Cleanup Always remove on unmount
Error handling Implement retry logic
RLS Realtime respects RLS

Q&A

  1. Tables nào cần realtime?
  2. Cần filter theo criteria nào?
  3. Có nhiều users concurrent không?