Main process:
Inserts into user's table and enqueues the message atomically.
package main
import (
"context"
"encoding/json"
"log"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/craigpastro/pgmq-go/pgmq"
)
type User struct {
ID int64
Name string
Email string
}
type WelcomeEmailJob struct {
UserID int64 `json:"user_id"`
Email string `json:"email"`
Name string `json:"name"`
}
func main() {
ctx := context.Background()
pool, err := pgxpool.New(ctx, "postgres://postgres:password@localhost:5432/postgres")
if err != nil {
log.Fatal(err)
}
defer pool.Close()
queueName := "welcome_email_queue"
// Pseudo: ensure extension and queue exist during bootstrap, not every request.
// _ = pgmq.CreatePGMQExtension(ctx, pool)
// _ = pgmq.CreateQueue(ctx, pool, queueName)
user := User{
Name: "Aman",
Email: "aman@example.com",
}
err = createUserAndEnqueue(ctx, pool, queueName, user)
if err != nil {
log.Fatal(err)
}
log.Println("user created and welcome email queued")
}
func createUserAndEnqueue(ctx context.Context, pool *pgxpool.Pool, queueName string, user User) error {
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback(ctx)
}()
// 1) Insert into users table.
// Assumes users table has id generated by DB.
err = tx.QueryRow(ctx, `
INSERT INTO users (name, email, is_email_sent, created_at)
VALUES ($1, $2, false, now())
RETURNING id
`, user.Name, user.Email).Scan(&user.ID)
if err != nil {
return err
}
// 2) Enqueue welcome email job in PGMQ.
job := WelcomeEmailJob{
UserID: user.ID,
Email: user.Email,
Name: user.Name,
}
jobBytes, err := json.Marshal(job)
if err != nil {
return err
}
_, err = pgmq.Send(ctx, tx, queueName, jobBytes)
if err != nil {
return err
}
return tx.Commit(ctx)
}
|
Email sender worker:
This reads from PGMQ, sends the email, and archives the message on
success.
CleanerWorkFlow:
users insert
pgmq enqueue
worker reads message
worker sends email
worker updates users.is_email_sent = true
worker archives message
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/craigpastro/pgmq-go/pgmq"
)
type WelcomeEmailJob struct {
UserID int64 `json:"user_id"`
Email string `json:"email"`
Name string `json:"name"`
}
func emailSenderWorker(ctx context.Context, pool *pgxpool.Pool, queueName string) error {
for {
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
msg, err := pgmq.Read(ctx, tx, queueName, 30)
if err != nil {
_ = tx.Rollback(ctx)
time.Sleep(2 * time.Second)
continue
}
if msg == nil {
_ = tx.Rollback(ctx)
time.Sleep(2 * time.Second)
continue
}
var job WelcomeEmailJob
if err := json.Unmarshal(msg.Message, &job); err != nil {
_ = tx.Rollback(ctx)
continue
}
err = sendWelcomeEmail(job)
if err != nil {
_ = tx.Rollback(ctx)
// message becomes visible again after visibility timeout
continue
}
// Archive or delete after successful processing.
_, err = pgmq.Archive(ctx, tx, queueName, msg.MsgID)
if err != nil {
_ = tx.Rollback(ctx)
continue
}
if err := tx.Commit(ctx); err != nil {
return err
}
}
}
func sendWelcomeEmail(job WelcomeEmailJob) error {
// Pseudo: integrate SMTP / SES / SendGrid here.
// Example:
// return mailer.Send(job.Email, "Welcome!", "Hello "+job.Name)
log.Printf("sending welcome email to %s for user_id=%d\n", job.Email, job.UserID)
return nil
}
|