How to implement a database job queue using SKIP LOCKED

Imagine having a tool that can automatically detect if you are using JPA and Hibernate properly. Hypersistence Optimizer is that tool!

Introduction

In this article, we are going to see how we can implement a database job queue using SKIP LOCKED.

I decided to write this article while answering this Stack Overflow question asked by Rafael Winterhalter.

Since SKIP LOCKED is a lesser-known SQL feature, it’s a good opportunity to show you how to use it and why you should employ it, especially when implementing a job queue task.

Domain Model

Let’s assume we have the following Post entity which has a status Enum property looking as follows:

Post and PostStatus

The PostStatus Enum encapsulates the moderation status of a given Post entity. Therefore, when the Post is first created, the status is PENDING. The site moderators are going to review the pending Post entries and change the state to either APPROVED or SPAM.

The PostStatus class looks as follows:

public enum PostStatus {
    PENDING,
    APPROVED,
    SPAM
}

And the Post entity is also trivial to map as a JPA entity:

@Entity(name = "Post")
@Table(name = "post")
public class Post {

    @Id
    private Long id;

    private String title;

    private String body;

    @Enumerated
    private PostStatus status;

    //Getters and setters omitted for brevity
}

Job queue

So, the associated post table acts as a job queue since the rows need to be moderated prior to being displayed to the user. If we have multiple concurrent users trying to moderate the Post entities, we need a way to coordinate their efforts to avoid having two moderators review the same Post record.

Let’s consider that we have the following Post entries to moderate:

for (long i = 0; i < 10; i++) {
    Post post = new Post();
    post.setId(i);
    post.setTitle("High-Performance Java Persistence");
    post.setBody(String.format("Chapter %d summary", i));
    post.setStatus(PostStatus.PENDING);
    
    entityManager.persist(post);
}

The first naive implementation would be to retrieve the first N Post rows while also locking them:

public List<Post> getAndLockPosts(
            EntityManager entityManager,
            PostStatus status,
            int postCount) {
    return entityManager.createQuery(
        "select p " +
        "from Post p " +
        "where p.status = :status " +
        "order by p.id", Post.class)
    .setParameter("status", status)
    .setMaxResults(postCount)
    .setLockMode(LockModeType.PESSIMISTIC_WRITE)
    .setHint(
        "javax.persistence.lock.timeout",
        LockOptions.NO_WAIT
    )
    .getResultList();
}

Notice that we are using the PESSIMISTIC_WRITE JPA LockModeType to instruct Hibernate to apply an exclusive lock on the underlying selected Post records.

The javax.persistence.lock.timeout JPA query hint instructs Hibernate to issue a NO WAIT option when applying the exclusive lock. Without using NO WAIT, the lock acquisition will block until it either acquires the row-level lock or the lock waiting period times out.

Now, if we call the getAndLockPost method from two concurrent Java threads:

final int postCount = 2;

doInJPA(entityManager -> {
    assertEquals(
            postCount,
            getAndLockPosts(
                entityManager,
                PostStatus.PENDING,
                postCount
            ).size()
    );

    try {
        executeSync(() -> {
            doInJPA(_entityManager -> {
                assertEquals(
                    postCount,
                    getAndLockPosts(
                        _entityManager,
                        PostStatus.PENDING,
                        postCount
                    ).size()
                );
            });
        });
    } catch (Exception e) {
        assertEquals(
            1,
            Arrays.stream(ExceptionUtils.getThrowables(e))
            .map(Throwable::getClass)
            .filter(clazz -> clazz.equals(PessimisticLockException.class))
            .count()
        );
    }
});

We can see that a PessimisticLockException is indeed thrown:

[Alice]:
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status=0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p NOWAIT

[Bob]: 
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status=0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p NOWAIT

-- SQL Error: 0, SQLState: 55P03
-- ERROR: could not obtain lock on row in relation "post"

The reason the PessimisticLockException is thrown is that both concurrent transactions try to lock the same records since the second transaction has no way of knowing which records are already locked.

Using SKIP LOCKED

To fix this problem, we need to use the LockOptions.SKIP_LOCKED Hibernate query hint:

public List<Post> getAndLockPostsWithSkipLocked(
            EntityManager entityManager,
            PostStatus status,
            int postCount) {
    return entityManager
    .createQuery(
        "select p " +
        "from Post p " +
        "where p.status = :status " +
        "order by p.id", Post.class)
    .setParameter("status", status)
    .setMaxResults(postCount)
    .setLockMode(LockModeType.PESSIMISTIC_WRITE)
    .setHint(
        "javax.persistence.lock.timeout", 
        LockOptions.SKIP_LOCKED
    )
    .getResultList();
}

Now, when fetching the Post entries using two concurrent Java threads:

final int postCount = 2;

doInJPA(entityManager -> {
    
    List<Post> pendingPosts = getAndLockPostsWithSkipLocked(
        entityManager, 
        PostStatus.PENDING, 
        postCount
    );
    
    List<Long> ids = pendingPosts
    .stream()
    .map(Post::getId)
    .collect(toList());
        
    assertTrue(
        ids.size() == 2 && 
        ids.contains(0L) && 
        ids.contains(1L)
    );

    executeSync(() -> {
        doInJPA(_entityManager -> {
            List<Post> _pendingPosts = getAndLockPostsWithSkipLocked(
                _entityManager, 
                PostStatus.PENDING, 
                postCount
            );
            
            List<Long> _ids = _pendingPosts
            .stream()
            .map(Post::getId)
            .collect(toList());
            
            assertTrue(
                _ids.size() == 2 && 
                _ids.contains(2L) && 
                _ids.contains(3L)
            );
        });
    });
});

Everything will work just fine since the second transaction will skip the rows that were locked previously by the first transaction:

[Alice]:
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status = 0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

[Bob]:                                                                                                                                                                                                               
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status = 0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

Notice the SKIP LOCKED option appended by Hibernate to the FOR UPDATE clause. The SKIP LOCKED option will allow us to lock the rows that have not been locked previously. In our example, you can see that Alice has selected and locked the Post entities with the id values of 0 and 1 while Bob selects and locks the Post records with the id values of 3 and 4.

Without this option, implementing a job queue using a relational database would be a very complex task.

The SKIP LOCKED option is nowadays supported by most relational database systems. The following list indicates the first database version that introduced support for SKIP LOCKED.

  • Oracle 10g
  • PostgreSQL 9.5
  • SQL Server 2005
  • MySQL 8.0

If you enjoyed this article, I bet you are going to love my Book and Video Courses as well.

Conclusion

SKIP LOCKED is a very handy option when implementing concurrency control algorithms using a relational database. Now that SKIP LOCKED is widely supported, you should definitely use it if you need to implement a queue job inside the relational database system you are using.

FREE EBOOK
Newsletter logo
10 000 readers have found this blog worth following!

If you subscribe to my newsletter, you'll get:
  • A free sample of my Video Course about running Integration tests at warp-speed using Docker and tmpfs
  • 3 chapters from my book, High-Performance Java Persistence,
  • a 10% discount coupon for my book.

15 Comments on “How to implement a database job queue using SKIP LOCKED

  1. Will look into Debezium.

    Any thoughts on when to go via the abstraction-over-a-relation database route for queueing?
    One scenario where I think it might be justified is if you’re doing an update to database as well as to a queue and ACID is an absolute necessity. The queue is also formed over the same database else we’d require XA transactions (if the queue is an entirely new resource).

    • I have no idea what you are talking about. Anyway, if you want me to provide you with more in-depth suggestions about database systems and software architecture, you should definitely check out my consulting page.

  2. Hi Vlad,
    Thanks for educating about skip locked.
    An unrelated question, if implementing a job queue in a relational database, what is the best way of notifying readers that a new row has been created that matches your query?

    • Users need to poll. Pushing does not work unless the DB implements such a feature: Oracle AQ or PostgreSQL notify/listen.

      • Thanks for your reply Vlad.
        On DBs that don’t have a notify, I’ve seen suggestions of making the readers issue a “sleep query” and then the producer notifying using a “kill sleep query” (see https://www.engineyard.com/blog/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you). Do you think it is a good idea?

        Also is there a good litmus test to decide whether to use a message queue vs create an abstraction over a relational database and put mechanisms like SKIP LOCKED to use?

        Thanks again!

      • That does not sound like a good idea to me. Better use Debezium to parse the redo log and analyze the changes so that, when a change happens, you notify the subscribed listeners.

  3. Vlad,

    I think @Anatoly is advicing that SKIP LOCKED just locks the rows during the fetching instead of opening a cursor, what it’s different from SELECT...FOR UPDATE, at least in Oracle. So it’s important to keep in mind this behavior when using SKIP LOCKED with Oracle, otherwise you may have concurrency problems.

  4. Very good post, Vlad!

    SKIP LOCKED is one of the most powerful features I’ve seen in databases so far! Although many databases support it I only came to know it 2 years ago!

    I’ve been working day by day with Oracle 11G since 2016 and certainly SKIP LOCKED has been very useful to us because we often implement job/task queues to integrate systems.

      • SKIP LOCKED in Oracle is not thread safe. Faced with this issue recently and there are a lot of recommendations over the internet that are saying that you should use additional select in this case. The thing is that Oracle checks the entries for locks after filtering them with a condition from where statement and at that time second thread can already update them.
        I would suggest to fetch from cursor instead if you really have to use database as a queue. However databases are not designed to be used that way, it’s better to consider different tools I would say.

      • Sounds like a problem in Oracle that has to be fixed. You should have opened an issue. This feature works just fine in PostgreSQL and MySQL and the job queue requirement is exactly why you’d use this feature. Using a queuing system is not without issues either, so, if you have a very simple job queue that can be implemented using a table, why complicate the design?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.