Data Flow Patterns
Overview
Data Flow trong Light Stack
┌─────────────────────────────────────────────────────────────┐
│ DATA FLOW OVERVIEW │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ CLIENT │ │
│ │ │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Read │ │ Write │ │ Realtime │ │ │
│ │ │ (Query) │ │ (Mutate) │ │ (Subscribe)│ │ │
│ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │
│ └────────┼───────────────┼───────────────┼────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ SUPABASE POSTGREST │ │
│ │ │ │
│ │ Request ──▶ Auth ──▶ RLS ──▶ PostgreSQL │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Read Flow
Server-Side Fetching (Recommended)
// app/projects/page.tsx
import { createClient } from '@/lib/supabase/server';
export default async function ProjectsPage() {
const supabase = await createClient();
// Server-side fetch - faster, SEO-friendly
const { data: projects, error } = await supabase
.from('projects')
.select(`
id,
name,
description,
tasks:tasks(count),
owner:profiles!owner_id(name, avatar_url)
`)
.order('created_at', { ascending: false });
if (error) {
console.error('Error fetching projects:', error);
return <ErrorPage message="Failed to load projects" />;
}
return <ProjectList projects={projects} />;
}
Client-Side Fetching
// components/task-search.tsx
'use client';
import { useState, useEffect } from 'react';
import { createClient } from '@/lib/supabase/client';
import { useDebouncedValue } from '@/hooks/use-debounced';
export function TaskSearch() {
const [query, setQuery] = useState('');
const [results, setResults] = useState([]);
const [loading, setLoading] = useState(false);
const debouncedQuery = useDebouncedValue(query, 300);
const supabase = createClient();
useEffect(() => {
if (!debouncedQuery) {
setResults([]);
return;
}
const search = async () => {
setLoading(true);
const { data } = await supabase
.from('tasks')
.select('id, title, status')
.ilike('title', `%${debouncedQuery}%`)
.limit(10);
setResults(data || []);
setLoading(false);
};
search();
}, [debouncedQuery]);
return (
<div>
<input
type="text"
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="Search tasks..."
/>
{loading && <Spinner />}
<ul>
{results.map((task) => (
<li key={task.id}>{task.title}</li>
))}
</ul>
</div>
);
}
Write Flow
Simple Mutations
// Direct mutation with RLS protection
'use client';
import { createClient } from '@/lib/supabase/client';
import { useRouter } from 'next/navigation';
export function CreateProjectForm() {
const supabase = createClient();
const router = useRouter();
const handleSubmit = async (formData: FormData) => {
const { data, error } = await supabase
.from('projects')
.insert({
name: formData.get('name'),
description: formData.get('description'),
})
.select()
.single();
if (error) {
toast.error(error.message);
return;
}
toast.success('Project created!');
router.push(`/projects/${data.id}`);
};
return (
<form action={handleSubmit}>
<input name="name" placeholder="Project name" required />
<textarea name="description" placeholder="Description" />
<button type="submit">Create Project</button>
</form>
);
}
Server Action Pattern
// app/actions/projects.ts
'use server';
import { createClient } from '@/lib/supabase/server';
import { revalidatePath } from 'next/cache';
export async function createProject(formData: FormData) {
const supabase = await createClient();
const { data: { user } } = await supabase.auth.getUser();
if (!user) {
return { error: 'Unauthorized' };
}
const { data, error } = await supabase
.from('projects')
.insert({
name: formData.get('name') as string,
description: formData.get('description') as string,
})
.select()
.single();
if (error) {
return { error: error.message };
}
revalidatePath('/projects');
return { data };
}
// Usage in component
import { createProject } from '@/app/actions/projects';
export function ProjectForm() {
return (
<form action={createProject}>
<input name="name" required />
<textarea name="description" />
<button type="submit">Create</button>
</form>
);
}
Complex Write Flow via Worker
Multi-step Transaction
┌──────────────────────────────────────────────────────────┐
│ COMPLEX WRITE FLOW (WORKER) │
├──────────────────────────────────────────────────────────┤
│ │
│ Client │
│ │ │
│ │ POST /api/checkout │
│ ▼ │
│ Cloudflare Worker │
│ │ │
│ ├─── 1. Validate request │
│ │ │
│ ├─── 2. Check inventory (Supabase) │
│ │ │ │
│ │ └─── SELECT * FROM products WHERE id IN (...) │
│ │ │
│ ├─── 3. Create payment (Stripe) │
│ │ │ │
│ │ └─── stripe.paymentIntents.create() │
│ │ │
│ ├─── 4. Create order (Supabase RPC) │
│ │ │ │
│ │ └─── supabase.rpc('create_order', {...}) │
│ │ │
│ ├─── 5. Enqueue fulfillment (Queue) │
│ │ │ │
│ │ └─── queue.send({ type: 'fulfill_order' }) │
│ │ │
│ └─── 6. Return response │
│ │ │
│ ▼ │
│ Client receives order confirmation │
│ │
└──────────────────────────────────────────────────────────┘
Implementation
// workers/checkout/index.ts
import { Hono } from 'hono';
import { createClient } from '@supabase/supabase-js';
import Stripe from 'stripe';
interface Env {
SUPABASE_URL: string;
SUPABASE_SERVICE_KEY: string;
STRIPE_SECRET_KEY: string;
ORDER_QUEUE: Queue;
}
const app = new Hono<{ Bindings: Env }>();
app.post('/api/checkout', async (c) => {
const { items, paymentMethodId } = await c.req.json();
const supabase = createClient(
c.env.SUPABASE_URL,
c.env.SUPABASE_SERVICE_KEY
);
const stripe = new Stripe(c.env.STRIPE_SECRET_KEY);
try {
// 1. Check inventory
const { data: products, error: inventoryError } = await supabase
.from('products')
.select('id, name, price, stock')
.in('id', items.map((i: any) => i.productId));
if (inventoryError) throw new Error('Inventory check failed');
// Validate stock
for (const item of items) {
const product = products.find((p) => p.id === item.productId);
if (!product || product.stock < item.quantity) {
return c.json({ error: `${product?.name} out of stock` }, 400);
}
}
// 2. Calculate total
const total = items.reduce((sum: number, item: any) => {
const product = products.find((p) => p.id === item.productId);
return sum + (product!.price * item.quantity);
}, 0);
// 3. Create payment
const paymentIntent = await stripe.paymentIntents.create({
amount: Math.round(total * 100),
currency: 'usd',
payment_method: paymentMethodId,
confirm: true,
});
if (paymentIntent.status !== 'succeeded') {
return c.json({ error: 'Payment failed' }, 400);
}
// 4. Create order (atomic transaction)
const { data: order, error: orderError } = await supabase.rpc(
'create_order_with_items',
{
p_items: items,
p_stripe_payment_id: paymentIntent.id,
p_total: total,
}
);
if (orderError) throw orderError;
// 5. Enqueue fulfillment
await c.env.ORDER_QUEUE.send({
type: 'fulfill_order',
orderId: order.id,
items,
});
return c.json({
success: true,
orderId: order.id,
});
} catch (error) {
console.error('Checkout error:', error);
return c.json({ error: 'Checkout failed' }, 500);
}
});
export default app;
Realtime Data Flow
Subscribe to Changes
// components/project-tasks.tsx
'use client';
import { useEffect, useState } from 'react';
import { createClient } from '@/lib/supabase/client';
interface Task {
id: string;
title: string;
status: string;
}
export function ProjectTasks({
projectId,
initialTasks
}: {
projectId: string;
initialTasks: Task[];
}) {
const [tasks, setTasks] = useState<Task[]>(initialTasks);
const supabase = createClient();
useEffect(() => {
// Subscribe to task changes
const channel = supabase
.channel(`project-${projectId}`)
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'tasks',
filter: `project_id=eq.${projectId}`,
},
(payload) => {
if (payload.eventType === 'INSERT') {
setTasks((prev) => [...prev, payload.new as Task]);
} else if (payload.eventType === 'UPDATE') {
setTasks((prev) =>
prev.map((t) =>
t.id === payload.new.id ? (payload.new as Task) : t
)
);
} else if (payload.eventType === 'DELETE') {
setTasks((prev) => prev.filter((t) => t.id !== payload.old.id));
}
}
)
.subscribe();
return () => {
supabase.removeChannel(channel);
};
}, [projectId, supabase]);
return (
<ul>
{tasks.map((task) => (
<li key={task.id}>
{task.title} - {task.status}
</li>
))}
</ul>
);
}
Optimistic Updates
// hooks/use-optimistic-tasks.ts
import { useState, useTransition } from 'react';
import { createClient } from '@/lib/supabase/client';
export function useOptimisticTasks(initialTasks: Task[]) {
const [tasks, setTasks] = useState(initialTasks);
const [isPending, startTransition] = useTransition();
const supabase = createClient();
const updateTaskStatus = async (taskId: string, newStatus: string) => {
// Optimistic update
const previousTasks = tasks;
setTasks((prev) =>
prev.map((t) => (t.id === taskId ? { ...t, status: newStatus } : t))
);
// Start transition for UI feedback
startTransition(async () => {
const { error } = await supabase
.from('tasks')
.update({ status: newStatus })
.eq('id', taskId);
if (error) {
// Rollback on error
setTasks(previousTasks);
toast.error('Failed to update task');
}
});
};
return { tasks, updateTaskStatus, isPending };
}
Caching Flow
KV Cache for Hot Data
┌──────────────────────────────────────────────────────────┐
│ CACHING FLOW │
├──────────────────────────────────────────────────────────┤
│ │
│ Request │
│ │ │
│ ▼ │
│ Worker │
│ │ │
│ ├── Check KV Cache │
│ │ │ │
│ │ ├── HIT ──▶ Return cached data (fast!) │
│ │ │ │
│ │ └── MISS │
│ │ │ │
│ │ ▼ │
│ │ Query Supabase │
│ │ │ │
│ │ ▼ │
│ │ Store in KV (with TTL) │
│ │ │ │
│ │ ▼ │
│ └───────── Return data │
│ │
└──────────────────────────────────────────────────────────┘
Implementation
// workers/api/products.ts
async function getProducts(env: Env) {
const cacheKey = 'products:featured';
// Check cache
const cached = await env.KV.get(cacheKey, 'json');
if (cached) {
return cached;
}
// Cache miss - fetch from Supabase
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_ANON_KEY);
const { data } = await supabase
.from('products')
.select('id, name, price, image_url')
.eq('featured', true)
.order('created_at', { ascending: false })
.limit(10);
// Store in cache (5 minutes TTL)
await env.KV.put(cacheKey, JSON.stringify(data), {
expirationTtl: 300,
});
return data;
}
// Invalidate cache on product update
async function invalidateProductCache(env: Env) {
await env.KV.delete('products:featured');
}
Tổng kết
Data Flow Best Practices
READ:
✅ Server Components for initial data
✅ Client fetch for interactive features
✅ Use select() to limit columns
✅ Implement pagination for lists
WRITE:
✅ Server Actions for mutations
✅ Worker for complex transactions
✅ Optimistic updates for UX
✅ RPC for atomic operations
REALTIME:
✅ Subscribe on client only
✅ Cleanup subscriptions
✅ Handle all event types
✅ Combine with optimistic updates
CACHING:
✅ KV for hot data
✅ TTL based on data freshness
✅ Invalidate on mutations
✅ Cache keys with context
| Technique |
Benefit |
| Server Components |
Faster initial load |
| Select specific columns |
Less data transfer |
| Pagination |
Handle large datasets |
| KV caching |
Reduce DB load |
| Optimistic updates |
Instant UI feedback |
Q&A
- Data flow patterns nào đang dùng?
- Có implement caching chưa?
- Challenges với realtime sync?