Postgres Inbuilt Message Queue

This is the message queue build inside the postgres. The data can be filled from the postgres table when written into the message queue.

Usecase Scenario: Sending Welcome Email after user creation
- Main application writes to the user's table
- E-mail_Sender will send the email to the user.
- Flow without pqmq: main_process writes into user's table. email_sender opens & reads the Users's table, check some entry(is_email_sent=false). if is_email_sent=false. Send the email
- Flow with pgmq: main_process writes into user's table. entry is created in pgmq. email_sender recieved message on pgmq & sends the email. Earlier the work of opening the table, read from it is not needed to be done by email_sender now

Creating & Reading queue entry

Method-1: main transaction insert into both the users table and a queue table/extension
Method-2: use triggers / stored procedures that enqueue when a new user row appears.

Main process:
Inserts into user's table and enqueues the message atomically.

package main
import (
	"context"
	"encoding/json"
	"log"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/craigpastro/pgmq-go/pgmq"
)

type User struct {
	ID    int64
	Name  string
	Email string
}

type WelcomeEmailJob struct {
	UserID int64  `json:"user_id"`
	Email  string `json:"email"`
	Name   string `json:"name"`
}

func main() {
	ctx := context.Background()

	pool, err := pgxpool.New(ctx, "postgres://postgres:password@localhost:5432/postgres")
	if err != nil {
		log.Fatal(err)
	}
	defer pool.Close()

	queueName := "welcome_email_queue"

	// Pseudo: ensure extension and queue exist during bootstrap, not every request.
	// _ = pgmq.CreatePGMQExtension(ctx, pool)
	// _ = pgmq.CreateQueue(ctx, pool, queueName)

	user := User{
		Name:  "Aman",
		Email: "aman@example.com",
	}

	err = createUserAndEnqueue(ctx, pool, queueName, user)
	if err != nil {
		log.Fatal(err)
	}

	log.Println("user created and welcome email queued")
}

func createUserAndEnqueue(ctx context.Context, pool *pgxpool.Pool, queueName string, user User) error {
	tx, err := pool.Begin(ctx)
	if err != nil {
		return err
	}
	defer func() {
		_ = tx.Rollback(ctx)
	}()

	// 1) Insert into users table.
	// Assumes users table has id generated by DB.
	err = tx.QueryRow(ctx, `
		INSERT INTO users (name, email, is_email_sent, created_at)
		VALUES ($1, $2, false, now())
		RETURNING id
	`, user.Name, user.Email).Scan(&user.ID)
	if err != nil {
		return err
	}

	// 2) Enqueue welcome email job in PGMQ.
	job := WelcomeEmailJob{
		UserID: user.ID,
		Email:  user.Email,
		Name:   user.Name,
	}

	jobBytes, err := json.Marshal(job)
	if err != nil {
		return err
	}

	_, err = pgmq.Send(ctx, tx, queueName, jobBytes)
	if err != nil {
		return err
	}

	return tx.Commit(ctx)
}
                
Email sender worker: This reads from PGMQ, sends the email, and archives the message on success.

CleanerWorkFlow:
  users insert
  pgmq enqueue
  worker reads message
  worker sends email
  worker updates users.is_email_sent = true
  worker archives message

package main
import (
	"context"
	"encoding/json"
	"log"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/craigpastro/pgmq-go/pgmq"
)
type WelcomeEmailJob struct {
	UserID int64  `json:"user_id"`
	Email  string `json:"email"`
	Name   string `json:"name"`
}

func emailSenderWorker(ctx context.Context, pool *pgxpool.Pool, queueName string) error {
	for {
		tx, err := pool.Begin(ctx)
		if err != nil {
			return err
		}

		msg, err := pgmq.Read(ctx, tx, queueName, 30)
		if err != nil {
			_ = tx.Rollback(ctx)
			time.Sleep(2 * time.Second)
			continue
		}
		if msg == nil {
			_ = tx.Rollback(ctx)
			time.Sleep(2 * time.Second)
			continue
		}

		var job WelcomeEmailJob
		if err := json.Unmarshal(msg.Message, &job); err != nil {
			_ = tx.Rollback(ctx)
			continue
		}

		err = sendWelcomeEmail(job)
		if err != nil {
			_ = tx.Rollback(ctx)
			// message becomes visible again after visibility timeout
			continue
		}

		// Archive or delete after successful processing.
		_, err = pgmq.Archive(ctx, tx, queueName, msg.MsgID)
		if err != nil {
			_ = tx.Rollback(ctx)
			continue
		}

		if err := tx.Commit(ctx); err != nil {
			return err
		}
	}
}

func sendWelcomeEmail(job WelcomeEmailJob) error {
	// Pseudo: integrate SMTP / SES / SendGrid here.
	// Example:
	// return mailer.Send(job.Email, "Welcome!", "Hello "+job.Name)
	log.Printf("sending welcome email to %s for user_id=%d\n", job.Email, job.UserID)
	return nil
}