Cache synchronization using jOOQ and PostgreSQL functions
Imagine having a tool that can automatically detect JPA and Hibernate performance issues. Wouldn’t that be just awesome?
Well, Hypersistence Optimizer is that tool! And it works with Spring Boot, Spring Framework, Jakarta EE, Java EE, Quarkus, or Play Framework.
So, enjoy spending your time on the things you love rather than fixing performance issues in your production system on a Saturday night!
Introduction
In this article, we are going to see how we can achieve cache synchronization with the help of jOOQ and PostgreSQL functions.
By using Change Data Capture, we can track how table records change over time and synchronize the application-level cache entries that were built from the table records in question.
Domain Model
Let’s assume we are building a question-and-answer website similar to Stack Overflow. The largest and the most important tables in our database are the question
and answer
tables, which look as follows:
Because our application has a lot of users, we want to store the most viewed questions and answers in an application-level cache like Redis.
The cache entry key is the question identifier, and the value is going to be a Question
record that contains a List
of Answer
records, as illustrated by the following diagram:
Tracking record changes using a PostgreSQL function
To extract the Question
and Answer
records that need to be synchronized with the cache, we are going to use the following get_updated_questions_and_answers
PostgreSQL function:
CREATE OR REPLACE FUNCTION get_updated_questions_and_answers() RETURNS TABLE( question_id bigint, question_title varchar(250), question_body text, question_score integer, question_created_on timestamp, question_updated_on timestamp, answer_id bigint, answer_body text, answer_accepted boolean, answer_score integer, answer_created_on timestamp, answer_updated_on timestamp ) LANGUAGE plpgsql AS $$ DECLARE previous_snapshot_timestamp timestamp; max_snapshot_timestamp timestamp; result_set_record record; BEGIN previous_snapshot_timestamp = ( SELECT updated_on FROM cache_snapshot WHERE region = 'QA' FOR NO KEY UPDATE ); IF previous_snapshot_timestamp is null THEN INSERT INTO cache_snapshot( region, updated_on ) VALUES ( 'QA', to_timestamp(0) ); previous_snapshot_timestamp = to_timestamp(0); END IF; max_snapshot_timestamp = to_timestamp(0); FOR result_set_record IN( SELECT q1.id as question_id, q1.title as question_title, q1.body as question_body,q1.score as question_score, q1.created_on as question_created_on, q1.updated_on as question_updated_on, a1.id as answer_id, a1.body as answer_body, a1.accepted as answer_accepted, a1.score as answer_score, a1.created_on as answer_created_on, a1.updated_on as answer_updated_on FROM question q1 LEFT JOIN answer a1 on q1.id = a1.question_id WHERE q1.id IN ( SELECT q2.id FROM question q2 WHERE q2.updated_on > previous_snapshot_timestamp ) OR q1.id IN ( SELECT a2.question_id FROM answer a2 WHERE a2.updated_on > previous_snapshot_timestamp ) ORDER BY question_created_on, answer_created_on ) loop IF result_set_record.question_updated_on > max_snapshot_timestamp THEN max_snapshot_timestamp = result_set_record.question_updated_on; END IF; IF result_set_record.answer_updated_on > max_snapshot_timestamp THEN max_snapshot_timestamp = result_set_record.answer_updated_on; END IF; question_id = result_set_record.question_id; question_title = result_set_record.question_title; question_body = result_set_record.question_body; question_score = result_set_record.question_score; question_created_on = result_set_record.question_created_on; question_updated_on = result_set_record.question_updated_on; answer_id = result_set_record.answer_id; answer_body = result_set_record.answer_body; answer_accepted = result_set_record.answer_accepted; answer_score = result_set_record.answer_score; answer_created_on = result_set_record.answer_created_on; answer_updated_on = result_set_record.answer_updated_on; RETURN next; END loop; UPDATE cache_snapshot SET updated_on = max_snapshot_timestamp WHERE region = 'QA'; END $$
The get_updated_questions_and_answers
function works as follows:
- First, it checks the
previous_snapshot_timestamp
, which tracks what was the most recentquestion
oranswer
that we previously synchronized with the cache. - Second, we fetch the
question
along with all theiranswer
records if there was any modification that happened inside this question and answer hierarchy - Afterward, we iterate over the
question
andanswer
records and calculate themax_snapshot_timestamp
, which will become the nextprevious_snapshot_timestamp
the next time we call theget_updated_questions_and_answers
function.
If the
get_updated_questions_and_answers
function is called from a@Transactional
context that executes the cache update, then in case of a cache update failure the transaction will be rolled back, and thecache_snapshot
table is reverted to its previous consistent state.
Calling the TABLE-value function using jOOQ
As I explained in this article, jOOQ provides the best way to call database stored procedures and functions from Java.
By using the code generator, jOOQ creates a GetUpdatedQuestionsAndAnswers
utility that allows us to call the get_updated_questions_and_answers
PostgreSQL function.
First, we will import the static variables declared by the GetUpdatedQuestionsAndAnswers
utility:
import static com.vladmihalcea.book.hpjp.jooq.pgsql.schema.crud.tables .GetUpdatedQuestionsAndAnswers.GET_UPDATED_QUESTIONS_AND_ANSWERS;
Afterward, we can call the get_updated_questions_and_answers
PostgreSQL function like this:
Result<GetUpdatedQuestionsAndAnswersRecord> records = sql .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.call()) .fetch();
The GetUpdatedQuestionsAndAnswersRecord
contains the type-safe TABLE
result set that is returned by the get_updated_questions_and_answers
PostgreSQL function.
From the GetUpdatedQuestionsAndAnswersRecord
, we can create the Question
and Answer
hierarchy to be stored in the cache.
This can be encapsulated in the getUpdatedQuestionsAndAnswers
method using a custom Java Collector
:
public List<Question> getUpdatedQuestionsAndAnswers() { return doInJOOQ(sql -> { return sql .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.call()) .collect( Collectors.collectingAndThen( Collectors.toMap( GetUpdatedQuestionsAndAnswersRecord::getQuestionId, record -> { Question question = new Question( record.getQuestionId(), record.getQuestionTitle(), record.getQuestionBody(), record.getQuestionScore(), record.getQuestionCreatedOn(), record.getQuestionUpdatedOn(), new ArrayList<>() ); Long answerId = record.getAnswerId(); if (answerId != null) { question.answers().add( new Answer( answerId, record.getAnswerBody(), record.getAnswerScore(), record.getAnswerAccepted(), record.getAnswerCreatedOn(), record.getAnswerUpdatedOn() ) ); } return question; }, (Question existing, Question replacement) -> { existing.answers().addAll( replacement.answers() ); return existing; }, LinkedHashMap::new ), (Function<Map<Long, Question>, List<Question>>) map -> new ArrayList<>(map.values()) ) ); }); }
Testing time
When inserting a parent question
row with two associated answer
child records:
LocalDateTime timestamp = LocalDateTime.now().minusSeconds(1); sql .insertInto(QUESTION) .columns( QUESTION.ID, QUESTION.TITLE, QUESTION.BODY, QUESTION.SCORE, QUESTION.CREATED_ON, QUESTION.CREATED_ON ) .values( 1L, "How to call jOOQ stored procedures?", "I have a PostgreSQL stored procedure and I'd like to call it from jOOQ.", 1, timestamp, timestamp ) .execute(); sql .insertInto(ANSWER) .columns( ANSWER.ID, ANSWER.QUESTION_ID, ANSWER.BODY, ANSWER.SCORE, ANSWER.ACCEPTED, ANSWER.CREATED_ON, ANSWER.CREATED_ON ) .values( 1L, 1L, "Checkout the [jOOQ docs]" + "(https://www.jooq.org/doc/latest/manual/sql-execution/stored-procedures/).", 10, true, timestamp, timestamp ) .values( 2L, 1L, "Checkout [this article]" + "(https://vladmihalcea.com/jooq-facts-sql-functions-made-easy/).", 5, false, timestamp, timestamp ) .execute();
We can see that the getUpdatedQuestionsAndAnswers
method returns one Question
with two Answer
entries that match exactly the Question
hierarchy we have just created:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(1, question.id().intValue()); List<Answer> answers = question.answers(); assertEquals(2, answers.size()); assertEquals(1, answers.get(0).id().intValue()); assertEquals(2, answers.get(1).id().intValue());
When inserting a new Answer
into our hierarchy:
sql .insertInto(ANSWER) .columns( ANSWER.ID, ANSWER.QUESTION_ID, ANSWER.BODY ) .values( 3L, 1L, "Checkout this " + "(https://www.youtube.com/watch?v=8jiJDflpw4Y)." ) .execute();
We can see that now the Question
record returned by the getUpdatedQuestionsAndAnswers
method will contain three Answer
child elements:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(1, question.id().intValue()); List<Answer> answers = question.answers(); assertEquals(3, answers.size()); assertEquals(1, answers.get(0).id().intValue()); assertEquals(2, answers.get(1).id().intValue()); assertEquals(3, answers.get(2).id().intValue());
When updating the answer
table row that we have just created:
sql .update(ANSWER) .set( ANSWER.BODY, "Checkout this [YouTube video from Toon Koppelaars]" + "(https://www.youtube.com/watch?v=8jiJDflpw4Y)." ) .where(ANSWER.ID.eq(3L)) .execute();
The getUpdatedQuestionsAndAnswers
method will return the updated snapshot of our Question
and Answer
hierarchy:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(1, question.id().intValue()); List<Answer> answers = question.answers(); assertEquals(3, answers.size()); assertEquals(1, answers.get(0).id().intValue()); assertEquals(2, answers.get(1).id().intValue()); Answer latestAnswer = answers.get(2); assertEquals(3, latestAnswer.id().intValue()); assertEquals( "Checkout this [YouTube video from Toon Koppelaars]" + "(https://www.youtube.com/watch?v=8jiJDflpw4Y).", latestAnswer.body() );
If we decide to insert a new Question
:
sql .insertInto(QUESTION) .columns( QUESTION.ID, QUESTION.TITLE, QUESTION.BODY ) .values( 2L, "How to use the jOOQ MULTISET operator?", "I want to know how I can use the jOOQ MULTISET operator." ) .execute();
The getUpdatedQuestionsAndAnswers
method will capture this change and return the newly created Question
that we can store in the cache:
List<Question> questions = getUpdatedQuestionsAndAnswers(); assertEquals(1, questions.size()); Question question = questions.get(0); assertEquals(2, question.id().intValue()); assertTrue(question.answers().isEmpty());
Cool, right?
I'm running an online workshop on the 11th of October about High-Performance SQL.If you enjoyed this article, I bet you are going to love my Book and Video Courses as well.
Conclusion
While caching data is easy, synchronizing the cache with the database is the difficult part.
By using jOOQ to call the PostgreSQL TABLE-value functions that fetch the cacheable aggregates, we can simplify this task, as the result will capture the entries that have been changed since the last time we executed the cache synchronization.
This research was funded by Data Geekery GmbH and conducted in accordance with the blog ethics policy.
While the article was written independently and reflects entirely my opinions and conclusions, the amount of work involved in making this article happen was compensated by Data Geekery.

Ok, makes perfect sense now. Thanks!
You’re welcome.
Very interesting! Single question that is implementation specific – why did you subtract one second from the ‘timestamp’?
I’m glad you liked it.
The reason why the INSERT statement uses a TIMESTAMP that’s one second early is that we want to emulate the case when something has been created before we actually call the database function.