How to implement a database job queue using SKIP LOCKED

Imagine having a tool that can automatically detect if you are using JPA and Hibernate properly. Hypersistence Optimizer is that tool!

Introduction

In this article, we are going to see how we can implement a database job queue using SKIP LOCKED.

I decided to write this article while answering this Stack Overflow question asked by Rafael Winterhalter.

Since SKIP LOCKED is a lesser-known SQL feature, it’s a good opportunity to show you how to use it and why you should employ it, especially when implementing a job queue task.

Domain Model

Let’s assume we have the following Post entity which has a status Enum property looking as follows:

Post and PostStatus

The PostStatus Enum encapsulates the moderation status of a given Post entity. Therefore, when the Post is first created, the status is PENDING. The site moderators are going to review the pending Post entries and change the state to either APPROVED or SPAM.

The PostStatus class looks as follows:

public enum PostStatus {
    PENDING,
    APPROVED,
    SPAM
}

And the Post entity is also trivial to map as a JPA entity:

@Entity(name = "Post")
@Table(name = "post")
public class Post {

    @Id
    private Long id;

    private String title;

    private String body;

    @Enumerated
    private PostStatus status;

    //Getters and setters omitted for brevity
}

Job queue

So, the associated post table acts as a job queue since the rows need to be moderated prior to being displayed to the user. If we have multiple concurrent users trying to moderate the Post entities, we need a way to coordinate their efforts to avoid having two moderators review the same Post record.

Let’s consider that we have the following Post entries to moderate:

for (long i = 0; i < 10; i++) {
    Post post = new Post();
    post.setId(i);
    post.setTitle("High-Performance Java Persistence");
    post.setBody(String.format("Chapter %d summary", i));
    post.setStatus(PostStatus.PENDING);
    
    entityManager.persist(post);
}

The first naive implementation would be to retrieve the first N Post rows while also locking them:

public List<Post> getAndLockPosts(
            EntityManager entityManager,
            PostStatus status,
            int postCount) {
    return entityManager.createQuery(
        "select p " +
        "from Post p " +
        "where p.status = :status " +
        "order by p.id", Post.class)
    .setParameter("status", status)
    .setMaxResults(postCount)
    .setLockMode(LockModeType.PESSIMISTIC_WRITE)
    .setHint(
        "javax.persistence.lock.timeout",
        LockOptions.NO_WAIT
    )
    .getResultList();
}

Notice that we are using the PESSIMISTIC_WRITE JPA LockModeType to instruct Hibernate to apply an exclusive lock on the underlying selected Post records.

The javax.persistence.lock.timeout JPA query hint instructs Hibernate to issue a NO WAIT option when applying the exclusive lock. Without using NO WAIT, the lock acquisition will block until it either acquires the row-level lock or the lock waiting period times out.

Now, if we call the getAndLockPost method from two concurrent Java threads:

final int postCount = 2;

doInJPA(entityManager -> {
    assertEquals(
            postCount,
            getAndLockPosts(
                entityManager,
                PostStatus.PENDING,
                postCount
            ).size()
    );

    try {
        executeSync(() -> {
            doInJPA(_entityManager -> {
                assertEquals(
                    postCount,
                    getAndLockPosts(
                        _entityManager,
                        PostStatus.PENDING,
                        postCount
                    ).size()
                );
            });
        });
    } catch (Exception e) {
        assertEquals(
            1,
            Arrays.stream(ExceptionUtils.getThrowables(e))
            .map(Throwable::getClass)
            .filter(clazz -> clazz.equals(PessimisticLockException.class))
            .count()
        );
    }
});

We can see that a PessimisticLockException is indeed thrown:

[Alice]:
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status=0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p NOWAIT

[Bob]: 
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status=0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p NOWAIT

-- SQL Error: 0, SQLState: 55P03
-- ERROR: could not obtain lock on row in relation "post"

The reason the PessimisticLockException is thrown is that both concurrent transactions try to lock the same records since the second transaction has no way of knowing which records are already locked.

Using SKIP LOCKED

To fix this problem, we need to use the LockOptions.SKIP_LOCKED Hibernate query hint:

public List<Post> getAndLockPostsWithSkipLocked(
            EntityManager entityManager,
            PostStatus status,
            int postCount) {
    return entityManager
    .createQuery(
        "select p " +
        "from Post p " +
        "where p.status = :status " +
        "order by p.id", Post.class)
    .setParameter("status", status)
    .setMaxResults(postCount)
    .setLockMode(LockModeType.PESSIMISTIC_WRITE)
    .setHint(
        "javax.persistence.lock.timeout", 
        LockOptions.SKIP_LOCKED
    )
    .getResultList();
}

Now, when fetching the Post entries using two concurrent Java threads:

final int postCount = 2;

doInJPA(entityManager -> {
    
    List<Post> pendingPosts = getAndLockPostsWithSkipLocked(
        entityManager, 
        PostStatus.PENDING, 
        postCount
    );
    
    List<Long> ids = pendingPosts
    .stream()
    .map(Post::getId)
    .collect(toList());
        
    assertTrue(
        ids.size() == 2 && 
        ids.contains(0L) && 
        ids.contains(1L)
    );

    executeSync(() -> {
        doInJPA(_entityManager -> {
            List<Post> _pendingPosts = getAndLockPostsWithSkipLocked(
                _entityManager, 
                PostStatus.PENDING, 
                postCount
            );
            
            List<Long> _ids = _pendingPosts
            .stream()
            .map(Post::getId)
            .collect(toList());
            
            assertTrue(
                _ids.size() == 2 && 
                _ids.contains(2L) && 
                _ids.contains(3L)
            );
        });
    });
});

Everything will work just fine since the second transaction will skip the rows that were locked previously by the first transaction:

[Alice]:
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status = 0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

[Bob]:                                                                                                                                                                                                               
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status = 0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

Notice the SKIP LOCKED option appended by Hibernate to the FOR UPDATE clause. The SKIP LOCKED option will allow us to lock the rows that have not been locked previously. In our example, you can see that Alice has selected and locked the Post entities with the id values of 0 and 1 while Bob selects and locks the Post records with the id values of 3 and 4.

Without this option, implementing a job queue using a relational database would be a very complex task.

The SKIP LOCKED option is nowadays supported by most relational database systems. The following list indicates the first database version that introduced support for SKIP LOCKED.

  • Oracle 10g
  • PostgreSQL 9.5
  • SQL Server 2005
  • MySQL 8.0

Online Workshops

If you enjoyed this article, I bet you are going to love my upcoming Online Workshops!

Conclusion

SKIP LOCKED is a very handy option when implementing concurrency control algorithms using a relational database. Now that SKIP LOCKED is widely supported, you should definitely use it if you need to implement a queue job inside the relational database system you are using.

Transactions and Concurrency Control eBook

3 Comments on “How to implement a database job queue using SKIP LOCKED

  1. Hi Vlad,
    my company is still stuck using mysql 5.6, hence, we couldn’t apply skip locked to our job/task queues. Any work around using mysql 5.6 since we don’t plan on migrating to mysql 8 at the moment?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Hypersistence Optimizer 2.2 has been released!