Lock processing logic by customer

Imagine having a tool that can automatically detect if you are using JPA and Hibernate properly. Hypersistence Optimizer is that tool!

Introduction

In the current application we are developing there was one use case where we wanted to synchronize message processing by message provider (customer generating those messaging). The flow looks something like this:

Imagine

So messages may come randomly since there are more customer jobs running in parallel, but we want to ensure that messages belonging to the same customer are processed one after the other (analog to the Serializable Database isolation level) while allowing messages coming from different customers to be processed in parallel.

Synchronizing access

So, this is how the customer locking mechanism looks like:

/**
 * CustomerLockedExecution - Lock execution based for a given customer
 */
public class CustomerLockedExecution<K> {

    private Map<K, ReentrantLock> lockMap = new HashMap<K, ReentrantLock>();

    private Lock getLock(K customerId) {
        ReentrantLock lock = lockMap.get(customerId);
        if (lock == null) {
            synchronized (this) {
                lock = lockMap.get(customerId);

                if (lock == null) {
                    lock = new ReentrantLock();
                    lockMap.put(customerId, lock);
                }
            }
        }
        return lock;
    }

    /**
     * Lock on the customer and execute the specific logic
     *
     * @param customerId customer id
     * @param callable   custom logic callback
     */
    public <T> void lockExecution(K customerId, Callable<T> callable) {
        Lock lock = getLock(customerId);
        try {
            lock.lockInterruptibly();
            callable.call();
        } catch (Exception e) {
            throw new CallableException(e, callable);
        } finally {
            lock.unlock();
        }
    }
}

Testing time

The unit test will start 10 threads, each one having the same customerId value, so all of them are concurring for executing their logic, which consists of adding 3 consecutive numbers (starting from an initial index) to a common buffer.

private CustomerLockedExecution<Long> execution = new CustomerLockedExecution<>();

private CopyOnWriteArrayList<Long> buffer = new CopyOnWriteArrayList<>();

private static final int appendTries = 3;

private final int threadCount = 10;

private ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

@Test
public void testAwaitExecutionForSameIntegratedSource() 
        throws InterruptedException {
    final CountDownLatch startLatch = new CountDownLatch(threadCount + 1);
    final CountDownLatch endLatch = new CountDownLatch(threadCount + 1);

    for (long i = 0; i < threadCount; i++) {
        final long index = i * threadCount;

        LOG.info("Scheduling thread index {}", index);

        executorService.submit(() -> {
            try {
                startLatch.countDown();
                startLatch.await();
                execution.lockExecution(
                    0L,
                    () -> {
                        LOG.info("Running thread index {}", index);
                        for (int j = 0; j < appendTries; j++) {
                            long number = index + j;
                            LOG.info("Adding {}", number);
                            buffer.add(number);
                        }

                        return null;
                    }
                );
                endLatch.countDown();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    startLatch.countDown();

    LOG.info("Waiting for threads to be done");

    endLatch.countDown();
    endLatch.await();

    LOG.info("Threads are done processing");

    for (int i = 0; i < threadCount; i += appendTries) {
        long reference = buffer.get(i);
        for (int j = 0; j < appendTries; j++) {
            assertEquals(reference + j, (long) buffer.get(i + j));
        }
    }
}

When executing the test case above, we get the following output:

Scheduling thread index 0
Scheduling thread index 10
Scheduling thread index 20
Scheduling thread index 30
Scheduling thread index 40
Scheduling thread index 50
Scheduling thread index 60
Scheduling thread index 70
Scheduling thread index 80
Scheduling thread index 90

Waiting for threads to be done

Running thread index 0
Adding 0
Adding 1
Adding 2

Running thread index 80
Adding 80
Adding 81
Adding 82

Running thread index 30
Adding 30
Adding 31
Adding 32

Running thread index 40
Adding 40
Adding 41
Adding 42

Running thread index 60
Adding 60
Adding 61
Adding 62

Running thread index 50
Adding 50
Adding 51
Adding 52

Running thread index 10
Adding 10
Adding 11
Adding 12

Running thread index 90
Adding 90
Adding 91
Adding 92

Running thread index 20
Adding 20
Adding 21
Adding 22

Running thread index 70
Adding 70
Adding 71
Adding 72

Threads are done processing

As you can see each thread is running randomly even if all are scheduled to run simultaneously, and there is no number adding overlapping between those, so every thread adds its three numbers without interleaving the adding with some other thread.

Preventing deadlocks

You should be aware of deadlocks, since we are holding a lock while executing a specific logic calling some non-private method, and that particular called logic might acquire some other lock too.

Fortunately, this is not our case since our message pipeline goes from one end to the other so there is only one way of entering this processing logic.

Anyway, when multiple locks are acquired (e.g. A, B and C), it’s mandatory to always acquire those locks in the same order:

  • A -> B -> C and A -> B

Combinations like:

  • A -> B and B -> A
  • A -> B -> C and C -> B -> A

are forbidden since they may end up in a deadlock.

If you enjoyed this article, I bet you are going to love my Book and Video Courses as well.

Seize the deal! 25% discount. Seize the deal! 25% discount.

Conclusion

Also, I always try to avoid calling external API while holding a lock, since those may end up being slow (a long processing web service call) which may affect our processing scalability since the lock would be kept for a long time.

But external API calls may also acquire locks we are not aware of, increasing the chance of deadlock, if by any chance we are locking on the same objects as the external API.

Transactions and Concurrency Control eBook

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.