Bỏ qua

Data Flow Patterns

Overview

Data Flow trong Light Stack

┌─────────────────────────────────────────────────────────────┐
│                     DATA FLOW OVERVIEW                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                    CLIENT                            │    │
│  │                                                      │    │
│  │  ┌────────────┐  ┌────────────┐  ┌────────────┐    │    │
│  │  │   Read     │  │   Write    │  │  Realtime  │    │    │
│  │  │  (Query)   │  │  (Mutate)  │  │  (Subscribe)│   │    │
│  │  └─────┬──────┘  └─────┬──────┘  └─────┬──────┘    │    │
│  └────────┼───────────────┼───────────────┼────────────┘    │
│           │               │               │                  │
│           ▼               ▼               ▼                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              SUPABASE POSTGREST                      │    │
│  │                                                      │    │
│  │  Request ──▶ Auth ──▶ RLS ──▶ PostgreSQL            │    │
│  │                                                      │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Read Flow

// app/projects/page.tsx
import { createClient } from '@/lib/supabase/server';

export default async function ProjectsPage() {
  const supabase = await createClient();

  // Server-side fetch - faster, SEO-friendly
  const { data: projects, error } = await supabase
    .from('projects')
    .select(`
      id,
      name,
      description,
      tasks:tasks(count),
      owner:profiles!owner_id(name, avatar_url)
    `)
    .order('created_at', { ascending: false });

  if (error) {
    console.error('Error fetching projects:', error);
    return <ErrorPage message="Failed to load projects" />;
  }

  return <ProjectList projects={projects} />;
}

Client-Side Fetching

// components/task-search.tsx
'use client';

import { useState, useEffect } from 'react';
import { createClient } from '@/lib/supabase/client';
import { useDebouncedValue } from '@/hooks/use-debounced';

export function TaskSearch() {
  const [query, setQuery] = useState('');
  const [results, setResults] = useState([]);
  const [loading, setLoading] = useState(false);

  const debouncedQuery = useDebouncedValue(query, 300);
  const supabase = createClient();

  useEffect(() => {
    if (!debouncedQuery) {
      setResults([]);
      return;
    }

    const search = async () => {
      setLoading(true);
      const { data } = await supabase
        .from('tasks')
        .select('id, title, status')
        .ilike('title', `%${debouncedQuery}%`)
        .limit(10);

      setResults(data || []);
      setLoading(false);
    };

    search();
  }, [debouncedQuery]);

  return (
    <div>
      <input
        type="text"
        value={query}
        onChange={(e) => setQuery(e.target.value)}
        placeholder="Search tasks..."
      />
      {loading && <Spinner />}
      <ul>
        {results.map((task) => (
          <li key={task.id}>{task.title}</li>
        ))}
      </ul>
    </div>
  );
}

Write Flow

Simple Mutations

// Direct mutation with RLS protection
'use client';

import { createClient } from '@/lib/supabase/client';
import { useRouter } from 'next/navigation';

export function CreateProjectForm() {
  const supabase = createClient();
  const router = useRouter();

  const handleSubmit = async (formData: FormData) => {
    const { data, error } = await supabase
      .from('projects')
      .insert({
        name: formData.get('name'),
        description: formData.get('description'),
      })
      .select()
      .single();

    if (error) {
      toast.error(error.message);
      return;
    }

    toast.success('Project created!');
    router.push(`/projects/${data.id}`);
  };

  return (
    <form action={handleSubmit}>
      <input name="name" placeholder="Project name" required />
      <textarea name="description" placeholder="Description" />
      <button type="submit">Create Project</button>
    </form>
  );
}

Server Action Pattern

// app/actions/projects.ts
'use server';

import { createClient } from '@/lib/supabase/server';
import { revalidatePath } from 'next/cache';

export async function createProject(formData: FormData) {
  const supabase = await createClient();

  const { data: { user } } = await supabase.auth.getUser();
  if (!user) {
    return { error: 'Unauthorized' };
  }

  const { data, error } = await supabase
    .from('projects')
    .insert({
      name: formData.get('name') as string,
      description: formData.get('description') as string,
    })
    .select()
    .single();

  if (error) {
    return { error: error.message };
  }

  revalidatePath('/projects');
  return { data };
}

// Usage in component
import { createProject } from '@/app/actions/projects';

export function ProjectForm() {
  return (
    <form action={createProject}>
      <input name="name" required />
      <textarea name="description" />
      <button type="submit">Create</button>
    </form>
  );
}

Complex Write Flow via Worker

Multi-step Transaction

┌──────────────────────────────────────────────────────────┐
│              COMPLEX WRITE FLOW (WORKER)                  │
├──────────────────────────────────────────────────────────┤
│                                                           │
│  Client                                                   │
│    │                                                      │
│    │  POST /api/checkout                                 │
│    ▼                                                      │
│  Cloudflare Worker                                        │
│    │                                                      │
│    ├─── 1. Validate request                              │
│    │                                                      │
│    ├─── 2. Check inventory (Supabase)                    │
│    │        │                                             │
│    │        └─── SELECT * FROM products WHERE id IN (...) │
│    │                                                      │
│    ├─── 3. Create payment (Stripe)                       │
│    │        │                                             │
│    │        └─── stripe.paymentIntents.create()          │
│    │                                                      │
│    ├─── 4. Create order (Supabase RPC)                   │
│    │        │                                             │
│    │        └─── supabase.rpc('create_order', {...})     │
│    │                                                      │
│    ├─── 5. Enqueue fulfillment (Queue)                   │
│    │        │                                             │
│    │        └─── queue.send({ type: 'fulfill_order' })   │
│    │                                                      │
│    └─── 6. Return response                               │
│             │                                             │
│             ▼                                             │
│  Client receives order confirmation                       │
│                                                           │
└──────────────────────────────────────────────────────────┘

Implementation

// workers/checkout/index.ts
import { Hono } from 'hono';
import { createClient } from '@supabase/supabase-js';
import Stripe from 'stripe';

interface Env {
  SUPABASE_URL: string;
  SUPABASE_SERVICE_KEY: string;
  STRIPE_SECRET_KEY: string;
  ORDER_QUEUE: Queue;
}

const app = new Hono<{ Bindings: Env }>();

app.post('/api/checkout', async (c) => {
  const { items, paymentMethodId } = await c.req.json();

  const supabase = createClient(
    c.env.SUPABASE_URL,
    c.env.SUPABASE_SERVICE_KEY
  );
  const stripe = new Stripe(c.env.STRIPE_SECRET_KEY);

  try {
    // 1. Check inventory
    const { data: products, error: inventoryError } = await supabase
      .from('products')
      .select('id, name, price, stock')
      .in('id', items.map((i: any) => i.productId));

    if (inventoryError) throw new Error('Inventory check failed');

    // Validate stock
    for (const item of items) {
      const product = products.find((p) => p.id === item.productId);
      if (!product || product.stock < item.quantity) {
        return c.json({ error: `${product?.name} out of stock` }, 400);
      }
    }

    // 2. Calculate total
    const total = items.reduce((sum: number, item: any) => {
      const product = products.find((p) => p.id === item.productId);
      return sum + (product!.price * item.quantity);
    }, 0);

    // 3. Create payment
    const paymentIntent = await stripe.paymentIntents.create({
      amount: Math.round(total * 100),
      currency: 'usd',
      payment_method: paymentMethodId,
      confirm: true,
    });

    if (paymentIntent.status !== 'succeeded') {
      return c.json({ error: 'Payment failed' }, 400);
    }

    // 4. Create order (atomic transaction)
    const { data: order, error: orderError } = await supabase.rpc(
      'create_order_with_items',
      {
        p_items: items,
        p_stripe_payment_id: paymentIntent.id,
        p_total: total,
      }
    );

    if (orderError) throw orderError;

    // 5. Enqueue fulfillment
    await c.env.ORDER_QUEUE.send({
      type: 'fulfill_order',
      orderId: order.id,
      items,
    });

    return c.json({
      success: true,
      orderId: order.id,
    });
  } catch (error) {
    console.error('Checkout error:', error);
    return c.json({ error: 'Checkout failed' }, 500);
  }
});

export default app;

Realtime Data Flow

Subscribe to Changes

// components/project-tasks.tsx
'use client';

import { useEffect, useState } from 'react';
import { createClient } from '@/lib/supabase/client';

interface Task {
  id: string;
  title: string;
  status: string;
}

export function ProjectTasks({
  projectId,
  initialTasks
}: {
  projectId: string;
  initialTasks: Task[];
}) {
  const [tasks, setTasks] = useState<Task[]>(initialTasks);
  const supabase = createClient();

  useEffect(() => {
    // Subscribe to task changes
    const channel = supabase
      .channel(`project-${projectId}`)
      .on(
        'postgres_changes',
        {
          event: '*',
          schema: 'public',
          table: 'tasks',
          filter: `project_id=eq.${projectId}`,
        },
        (payload) => {
          if (payload.eventType === 'INSERT') {
            setTasks((prev) => [...prev, payload.new as Task]);
          } else if (payload.eventType === 'UPDATE') {
            setTasks((prev) =>
              prev.map((t) =>
                t.id === payload.new.id ? (payload.new as Task) : t
              )
            );
          } else if (payload.eventType === 'DELETE') {
            setTasks((prev) => prev.filter((t) => t.id !== payload.old.id));
          }
        }
      )
      .subscribe();

    return () => {
      supabase.removeChannel(channel);
    };
  }, [projectId, supabase]);

  return (
    <ul>
      {tasks.map((task) => (
        <li key={task.id}>
          {task.title} - {task.status}
        </li>
      ))}
    </ul>
  );
}

Optimistic Updates

// hooks/use-optimistic-tasks.ts
import { useState, useTransition } from 'react';
import { createClient } from '@/lib/supabase/client';

export function useOptimisticTasks(initialTasks: Task[]) {
  const [tasks, setTasks] = useState(initialTasks);
  const [isPending, startTransition] = useTransition();
  const supabase = createClient();

  const updateTaskStatus = async (taskId: string, newStatus: string) => {
    // Optimistic update
    const previousTasks = tasks;
    setTasks((prev) =>
      prev.map((t) => (t.id === taskId ? { ...t, status: newStatus } : t))
    );

    // Start transition for UI feedback
    startTransition(async () => {
      const { error } = await supabase
        .from('tasks')
        .update({ status: newStatus })
        .eq('id', taskId);

      if (error) {
        // Rollback on error
        setTasks(previousTasks);
        toast.error('Failed to update task');
      }
    });
  };

  return { tasks, updateTaskStatus, isPending };
}

Caching Flow

KV Cache for Hot Data

┌──────────────────────────────────────────────────────────┐
│                   CACHING FLOW                            │
├──────────────────────────────────────────────────────────┤
│                                                           │
│  Request                                                  │
│    │                                                      │
│    ▼                                                      │
│  Worker                                                   │
│    │                                                      │
│    ├── Check KV Cache                                    │
│    │     │                                                │
│    │     ├── HIT ──▶ Return cached data (fast!)          │
│    │     │                                                │
│    │     └── MISS                                        │
│    │           │                                          │
│    │           ▼                                          │
│    │     Query Supabase                                  │
│    │           │                                          │
│    │           ▼                                          │
│    │     Store in KV (with TTL)                          │
│    │           │                                          │
│    │           ▼                                          │
│    └───────── Return data                                │
│                                                           │
└──────────────────────────────────────────────────────────┘

Implementation

// workers/api/products.ts
async function getProducts(env: Env) {
  const cacheKey = 'products:featured';

  // Check cache
  const cached = await env.KV.get(cacheKey, 'json');
  if (cached) {
    return cached;
  }

  // Cache miss - fetch from Supabase
  const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_ANON_KEY);

  const { data } = await supabase
    .from('products')
    .select('id, name, price, image_url')
    .eq('featured', true)
    .order('created_at', { ascending: false })
    .limit(10);

  // Store in cache (5 minutes TTL)
  await env.KV.put(cacheKey, JSON.stringify(data), {
    expirationTtl: 300,
  });

  return data;
}

// Invalidate cache on product update
async function invalidateProductCache(env: Env) {
  await env.KV.delete('products:featured');
}

Tổng kết

Data Flow Best Practices

READ:
✅ Server Components for initial data
✅ Client fetch for interactive features
✅ Use select() to limit columns
✅ Implement pagination for lists

WRITE:
✅ Server Actions for mutations
✅ Worker for complex transactions
✅ Optimistic updates for UX
✅ RPC for atomic operations

REALTIME:
✅ Subscribe on client only
✅ Cleanup subscriptions
✅ Handle all event types
✅ Combine with optimistic updates

CACHING:
✅ KV for hot data
✅ TTL based on data freshness
✅ Invalidate on mutations
✅ Cache keys with context

Performance Tips

Technique Benefit
Server Components Faster initial load
Select specific columns Less data transfer
Pagination Handle large datasets
KV caching Reduce DB load
Optimistic updates Instant UI feedback

Q&A

  1. Data flow patterns nào đang dùng?
  2. Có implement caching chưa?
  3. Challenges với realtime sync?