Phase 6: Scheduled Jobs¶
Mục tiêu¶
Implement scheduled jobs với Supabase pg_cron và Cloudflare Cron Triggers.
Thời gian ước tính: 2 giờ
Overview: Chọn Cron nào?¶
| Use Case | Supabase pg_cron | Cloudflare Cron |
|---|---|---|
| Database cleanup | ✅ Recommended | |
| Data aggregation | ✅ Recommended | |
| Email digest | ✅ Recommended | |
| External API calls | ✅ Recommended | |
| Due date reminders | ✅ Recommended |
Part A: Supabase pg_cron (Database Jobs)¶
Step 1: Enable pg_cron¶
-- Enable in Supabase Dashboard → Database → Extensions
-- Or via SQL
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Grant usage
GRANT USAGE ON SCHEMA cron TO postgres;
Step 2: Database Cleanup Jobs¶
-- supabase/migrations/20240101000020_cron_jobs.sql
-- Job 1: Archive completed tasks after 30 days
SELECT cron.schedule(
'archive-old-tasks',
'0 2 * * *', -- Daily at 2 AM UTC
$$
UPDATE tasks
SET
status = 'archived',
updated_at = NOW()
WHERE status = 'done'
AND updated_at < NOW() - INTERVAL '30 days';
$$
);
-- Job 2: Clean up orphan attachments
SELECT cron.schedule(
'cleanup-orphan-attachments',
'0 3 * * 0', -- Weekly on Sunday at 3 AM
$$
DELETE FROM task_attachments
WHERE task_id NOT IN (SELECT id FROM tasks);
$$
);
-- Job 3: Update project statistics
SELECT cron.schedule(
'update-project-stats',
'*/15 * * * *', -- Every 15 minutes
$$
INSERT INTO project_stats (project_id, total_tasks, completed_tasks, updated_at)
SELECT
p.id,
COUNT(t.id),
COUNT(t.id) FILTER (WHERE t.status = 'done'),
NOW()
FROM projects p
LEFT JOIN tasks t ON t.project_id = p.id
GROUP BY p.id
ON CONFLICT (project_id)
DO UPDATE SET
total_tasks = EXCLUDED.total_tasks,
completed_tasks = EXCLUDED.completed_tasks,
updated_at = EXCLUDED.updated_at;
$$
);
-- Job 4: Clean old activity logs
SELECT cron.schedule(
'cleanup-activity-logs',
'0 4 * * *', -- Daily at 4 AM
$$
DELETE FROM activity_logs
WHERE created_at < NOW() - INTERVAL '90 days';
$$
);
-- Job 5: Vacuum tables (maintenance)
SELECT cron.schedule(
'vacuum-tables',
'0 5 * * 0', -- Weekly on Sunday at 5 AM
$$
VACUUM ANALYZE tasks;
VACUUM ANALYZE task_comments;
VACUUM ANALYZE activity_logs;
$$
);
Step 3: Create Stats Table¶
-- Project stats table for caching
CREATE TABLE IF NOT EXISTS project_stats (
project_id UUID PRIMARY KEY REFERENCES projects(id) ON DELETE CASCADE,
total_tasks INTEGER DEFAULT 0,
completed_tasks INTEGER DEFAULT 0,
overdue_tasks INTEGER DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Step 4: Monitor Cron Jobs¶
-- View all scheduled jobs
SELECT * FROM cron.job;
-- View recent job runs
SELECT
jobid,
runid,
job_pid,
status,
start_time,
end_time,
return_message
FROM cron.job_run_details
ORDER BY start_time DESC
LIMIT 20;
-- Unschedule a job
SELECT cron.unschedule('job-name');
-- Unschedule by job id
SELECT cron.unschedule(jobid)
FROM cron.job
WHERE jobname = 'archive-old-tasks';
Part B: Cloudflare Cron Triggers (Edge Jobs)¶
Step 1: Configure Cron Triggers¶
# wrangler.toml
name = "taskflow-cron"
main = "src/cron.ts"
[triggers]
crons = [
"0 9 * * *", # Daily digest at 9 AM UTC
"0 8 * * *", # Due date reminders at 8 AM
"0 17 * * 5" # Weekly summary on Friday 5 PM
]
[vars]
APP_ENV = "production"
Step 2: Cron Handler¶
// workers/cron/src/index.ts
import { createClient } from '@supabase/supabase-js';
interface Env {
SUPABASE_URL: string;
SUPABASE_SERVICE_KEY: string;
SENDGRID_API_KEY: string;
NOTIFICATION_QUEUE: Queue;
}
export default {
async scheduled(
controller: ScheduledController,
env: Env,
ctx: ExecutionContext
): Promise<void> {
const cron = controller.cron;
console.log(`Running cron: ${cron}`);
switch (cron) {
case '0 9 * * *':
await sendDailyDigest(env);
break;
case '0 8 * * *':
await sendDueDateReminders(env);
break;
case '0 17 * * 5':
await sendWeeklySummary(env);
break;
default:
console.log(`Unknown cron: ${cron}`);
}
},
};
async function sendDailyDigest(env: Env) {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);
// Get all users with pending tasks
const { data: users } = await supabase
.from('profiles')
.select('id, email, full_name');
if (!users) return;
for (const user of users) {
// Get user's tasks
const { data: tasks } = await supabase
.from('tasks')
.select('id, title, status, priority, due_date, project:projects(name)')
.eq('assignee_id', user.id)
.neq('status', 'done')
.order('due_date', { ascending: true })
.limit(10);
if (!tasks || tasks.length === 0) continue;
// Enqueue email (use queue for reliability)
await env.NOTIFICATION_QUEUE.send({
type: 'daily_digest',
userId: user.id,
email: user.email,
name: user.full_name,
tasks: tasks.map(t => ({
id: t.id,
title: t.title,
status: t.status,
priority: t.priority,
dueDate: t.due_date,
projectName: t.project?.name,
})),
});
}
console.log(`Daily digest: Queued emails for ${users.length} users`);
}
async function sendDueDateReminders(env: Env) {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
const tomorrowStr = tomorrow.toISOString().split('T')[0];
// Get tasks due tomorrow
const { data: tasks } = await supabase
.from('tasks')
.select(`
id,
title,
due_date,
assignee:profiles!assignee_id(id, email, full_name),
project:projects(name)
`)
.eq('due_date', tomorrowStr)
.neq('status', 'done');
if (!tasks) return;
for (const task of tasks) {
if (!task.assignee) continue;
await env.NOTIFICATION_QUEUE.send({
type: 'due_date_reminder',
userId: task.assignee.id,
email: task.assignee.email,
name: task.assignee.full_name,
task: {
id: task.id,
title: task.title,
dueDate: task.due_date,
projectName: task.project?.name,
},
});
}
console.log(`Due date reminders: Queued for ${tasks.length} tasks`);
}
async function sendWeeklySummary(env: Env) {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);
// Get week stats
const weekAgo = new Date();
weekAgo.setDate(weekAgo.getDate() - 7);
const { data: organizations } = await supabase
.from('organizations')
.select('id, name');
if (!organizations) return;
for (const org of organizations) {
// Get org members
const { data: members } = await supabase
.from('organization_members')
.select('user_id, user:profiles(email, full_name)')
.eq('organization_id', org.id)
.eq('role', 'owner');
// Get stats
const [
{ count: tasksCreated },
{ count: tasksCompleted },
] = await Promise.all([
supabase
.from('tasks')
.select('*, project:projects!inner(organization_id)', { count: 'exact', head: true })
.eq('project.organization_id', org.id)
.gte('created_at', weekAgo.toISOString()),
supabase
.from('tasks')
.select('*, project:projects!inner(organization_id)', { count: 'exact', head: true })
.eq('project.organization_id', org.id)
.eq('status', 'done')
.gte('updated_at', weekAgo.toISOString()),
]);
// Send to owners
for (const member of members || []) {
await env.NOTIFICATION_QUEUE.send({
type: 'weekly_summary',
userId: member.user_id,
email: member.user?.email,
name: member.user?.full_name,
orgName: org.name,
stats: {
tasksCreated: tasksCreated || 0,
tasksCompleted: tasksCompleted || 0,
},
});
}
}
console.log('Weekly summary: Queued for all organization owners');
}
Step 3: Email Templates (Queue Consumer)¶
// workers/notification-consumer/src/email-templates.ts
export function getDailyDigestHtml(data: {
name: string;
tasks: Array<{
id: string;
title: string;
status: string;
priority: string;
dueDate: string | null;
projectName: string;
}>;
}): string {
const taskList = data.tasks
.map(
(task) => `
<tr>
<td style="padding: 8px; border-bottom: 1px solid #eee;">
<a href="https://taskflow.app/tasks/${task.id}">${task.title}</a>
</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">${task.projectName}</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">${task.priority}</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">${task.dueDate || '-'}</td>
</tr>
`
)
.join('');
return `
<h2>Good morning, ${data.name}!</h2>
<p>Here's your daily task summary:</p>
<table style="width: 100%; border-collapse: collapse;">
<thead>
<tr style="background: #f5f5f5;">
<th style="padding: 8px; text-align: left;">Task</th>
<th style="padding: 8px; text-align: left;">Project</th>
<th style="padding: 8px; text-align: left;">Priority</th>
<th style="padding: 8px; text-align: left;">Due Date</th>
</tr>
</thead>
<tbody>
${taskList}
</tbody>
</table>
<p style="margin-top: 20px;">
<a href="https://taskflow.app/dashboard" style="background: #2563eb; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px;">
View Dashboard
</a>
</p>
`;
}
export function getDueDateReminderHtml(data: {
name: string;
task: {
id: string;
title: string;
dueDate: string;
projectName: string;
};
}): string {
return `
<h2>Task Due Tomorrow</h2>
<p>Hi ${data.name},</p>
<p>This is a reminder that the following task is due tomorrow:</p>
<div style="background: #f5f5f5; padding: 16px; border-radius: 8px; margin: 16px 0;">
<h3 style="margin: 0 0 8px 0;">${data.task.title}</h3>
<p style="margin: 0; color: #666;">Project: ${data.task.projectName}</p>
<p style="margin: 8px 0 0 0; color: #dc2626;">Due: ${data.task.dueDate}</p>
</div>
<a href="https://taskflow.app/tasks/${data.task.id}" style="background: #2563eb; color: white; padding: 10px 20px; text-decoration: none; border-radius: 5px; display: inline-block;">
View Task
</a>
`;
}
Step 4: Deploy Cron Worker¶
Monitoring¶
pg_cron Monitoring Dashboard¶
-- Create a view for monitoring
CREATE OR REPLACE VIEW cron_job_status AS
SELECT
j.jobname,
j.schedule,
j.command,
d.status,
d.start_time,
d.end_time,
d.end_time - d.start_time AS duration,
d.return_message
FROM cron.job j
LEFT JOIN LATERAL (
SELECT *
FROM cron.job_run_details
WHERE jobid = j.jobid
ORDER BY start_time DESC
LIMIT 1
) d ON true
ORDER BY j.jobname;
Cloudflare Cron Monitoring¶
- Dashboard → Workers → Your Worker → Metrics
- View invocation count, errors, duration
- Set up alerts for failures
Verification Checklist¶
- [ ] pg_cron extension enabled
- [ ] Database cleanup jobs scheduled
- [ ] Stats aggregation working
- [ ] Cloudflare Cron triggers configured
- [ ] Daily digest emails queued
- [ ] Due date reminders working
- [ ] Monitoring in place
Common Issues¶
pg_cron: Job not running¶
-- Check if job is enabled
SELECT * FROM cron.job WHERE jobname = 'your-job';
-- Check run history
SELECT * FROM cron.job_run_details WHERE jobid = X ORDER BY start_time DESC;
Cloudflare Cron: Not triggering¶
- Check wrangler.toml cron syntax
- Verify worker is deployed
- Check Workers Analytics for errors
Next Phase¶
Chuyển sang Phase 7: CI/CD & Deployment để setup deployment pipeline.