Cache synchronization utilizing jOOQ and PostgreSQL capabilities

[ad_1]

Introduction

On this article, we’re going to see how we will obtain cache synchronization with the assistance of jOOQ and PostgreSQL capabilities.

By utilizing Change Information Seize, we will monitor how desk data change over time and synchronize the application-level cache entries that had been constructed from the desk data in query.

Area Mannequin

Let’s assume we’re constructing a question-and-answer web site just like Stack Overflow. The most important and crucial tables in our database are the query and reply tables, which look as follows:

Question and Answer tables

As a result of our software has a variety of customers, we need to retailer probably the most seen questions and solutions in an application-level cache like Redis.

The cache entry key’s the query identifier, and the worth goes to be a Query document that accommodates a Listing of Reply data, as illustrated by the next diagram:

Question and Answer records

Monitoring document modifications utilizing a PostgreSQL operate

To extract the Query and Reply data that have to be synchronized with the cache, we’re going to use the next get_updated_questions_and_answers PostgreSQL operate:


CREATE OR REPLACE FUNCTION get_updated_questions_and_answers()
RETURNS TABLE(
    question_id bigint, question_title varchar(250), 
    question_body textual content, question_score integer, 
    question_created_on timestamp, question_updated_on timestamp,
    answer_id bigint, answer_body textual content, 
    answer_accepted boolean, answer_score integer, 
    answer_created_on timestamp, answer_updated_on timestamp
)
LANGUAGE plpgsql
AS $$
DECLARE
previous_snapshot_timestamp timestamp;
    max_snapshot_timestamp timestamp;
    result_set_record document;
BEGIN
    previous_snapshot_timestamp = (
        SELECT
            updated_on
        FROM
            cache_snapshot
        WHERE
            area = 'QA'
    );
    IF previous_snapshot_timestamp is null THEN
        INSERT INTO cache_snapshot(
            area,
            updated_on
        )
        VALUES (
            'QA',
            to_timestamp(0)
        );

        previous_snapshot_timestamp = to_timestamp(0);
    END IF;

    max_snapshot_timestamp = to_timestamp(0);
    FOR result_set_record IN(
        SELECT
            q1.id as question_id, q1.title as question_title,
            q1.physique as question_body,q1.rating as question_score,
            q1.created_on as question_created_on, q1.updated_on as question_updated_on,
            a1.id as answer_id, a1.physique as answer_body,
            a1.accepted as answer_accepted, a1.rating as answer_score,
            a1.created_on as answer_created_on, a1.updated_on as answer_updated_on
        FROM
            query q1
        LEFT JOIN
            reply a1 on q1.id = a1.question_id
        WHERE
            q1.id IN (
                SELECT q2.id
                FROM query q2
                WHERE
                    q2.updated_on > previous_snapshot_timestamp
            ) OR
            q1.id IN (
                SELECT a2.question_id
                FROM reply a2
                WHERE
                    a2.updated_on > previous_snapshot_timestamp
            )
        ORDER BY
            question_created_on, answer_created_on
    ) loop
	
    IF result_set_record.question_updated_on > max_snapshot_timestamp THEN
       max_snapshot_timestamp = result_set_record.question_updated_on;
    END IF;
	
    IF result_set_record.answer_updated_on > max_snapshot_timestamp THEN
       max_snapshot_timestamp = result_set_record.answer_updated_on;
    END IF;

    question_id = result_set_record.question_id;
    question_title = result_set_record.question_title;
    question_body = result_set_record.question_body;
    question_score = result_set_record.question_score;
    question_created_on = result_set_record.question_created_on;
    question_updated_on = result_set_record.question_updated_on;
    answer_id = result_set_record.answer_id;
    answer_body = result_set_record.answer_body;
    answer_accepted = result_set_record.answer_accepted;
    answer_score = result_set_record.answer_score;
    answer_created_on = result_set_record.answer_created_on;
    answer_updated_on = result_set_record.answer_updated_on;
    RETURN subsequent;
END loop;

UPDATE
    cache_snapshot
SET updated_on = max_snapshot_timestamp
WHERE
    area = 'QA';
END
$$

The get_updated_questions_and_answers operate works as follows:

  • First, it checks the previous_snapshot_timestamp, which tracks what was the newest query or reply that we beforehand synchronized with the cache.
  • Second, we fetch the query together with all their reply data if there was any modification that occurred inside this query and reply hierarchy
  • Afterward, we iterate over the query and reply data and calculate the max_snapshot_timestamp, which is able to grow to be the subsequent previous_snapshot_timestamp the subsequent time we name the get_updated_questions_and_answers operate.

Calling the TABLE-value operate utilizing jOOQ

As I defined in this text, jOOQ supplies the easiest way to name database saved procedures and capabilities from Java.

By utilizing the code generator, jOOQ creates a GetUpdatedQuestionsAndAnswers utility that enables us to name the get_updated_questions_and_answers PostgreSQL operate.

First, we’ll import the static variables declared by the GetUpdatedQuestionsAndAnswers utility:


import static com.vladmihalcea.ebook.hpjp.jooq.pgsql.schema.crud.tables
    .GetUpdatedQuestionsAndAnswers.GET_UPDATED_QUESTIONS_AND_ANSWERS;

Afterward, we will name the get_updated_questions_and_answers PostgreSQL operate like this:


Consequence<GetUpdatedQuestionsAndAnswersRecord> data = sql
    .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.name())
    .fetch();

The GetUpdatedQuestionsAndAnswersRecord accommodates the type-safe TABLE consequence set that’s returned by the get_updated_questions_and_answers PostgreSQL operate.

From the GetUpdatedQuestionsAndAnswersRecord, we will create the Query and Reply hierarchy to be saved within the cache.

This may be encapsulated within the getUpdatedQuestionsAndAnswers methodology utilizing a customized Java Collector:


public Listing<Query> getUpdatedQuestionsAndAnswers() {
    return doInJOOQ(sql -> {
        return sql
            .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.name())
            .gather(
                Collectors.collectingAndThen(
                    Collectors.toMap(
                        GetUpdatedQuestionsAndAnswersRecord::getQuestionId,
                        document -> {
                            Query query = new Query(
                                document.getQuestionId(),
                                document.getQuestionTitle(),
                                document.getQuestionBody(),
                                document.getQuestionScore(),
                                document.getQuestionCreatedOn(),
                                document.getQuestionUpdatedOn(),
                                new ArrayList<>()
                            );

                            Lengthy answerId = document.getAnswerId();
                            if (answerId != null) {
                                query.solutions().add(
                                    new Reply(
                                        answerId,
                                        document.getAnswerBody(),
                                        document.getAnswerScore(),
                                        document.getAnswerAccepted(),
                                        document.getAnswerCreatedOn(),
                                        document.getAnswerUpdatedOn()
                                    )
                                );
                            }

                            return query;
                        },
                        (Query current, Query alternative) -> {
                            current.solutions().addAll(
                                alternative.solutions()
                            );
                            return current;
                        },
                        LinkedHashMap::new
                    ),
                    (Perform<Map<Lengthy, Query>, Listing<Query>>) map -> 
                        new ArrayList<>(map.values())
                )
            );
    });
}

Testing time

When inserting a mum or dad query row with two related reply youngster data:


LocalDateTime timestamp = LocalDateTime.now().minusSeconds(1);

sql
.insertInto(QUESTION)
.columns(
    QUESTION.ID,
    QUESTION.TITLE,
    QUESTION.BODY,
    QUESTION.SCORE,
    QUESTION.CREATED_ON,
    QUESTION.CREATED_ON
)
.values(
    1L,
    " name jOOQ saved procedures?",
    "I've a PostgreSQL saved process and I might prefer to name it from jOOQ.",
    1,
    timestamp,
    timestamp
)
.execute();

sql
.insertInto(ANSWER)
.columns(
    ANSWER.ID,
    ANSWER.QUESTION_ID,
    ANSWER.BODY,
    ANSWER.SCORE,
    ANSWER.ACCEPTED,
    ANSWER.CREATED_ON,
    ANSWER.CREATED_ON
)
.values(
    1L,
    1L,
    "Checkout the [jOOQ docs]" +
    "(https://www.jooq.org/doc/newest/guide/sql-execution/stored-procedures/).",
    10,
    true,
    timestamp,
    timestamp
)
.values(
    2L,
    1L,
    "Checkout [this article]" +
    "(https://vladmihalcea.com/jooq-facts-sql-functions-made-easy/).",
    5,
    false,
    timestamp,
    timestamp
)
.execute();

We will see that the getUpdatedQuestionsAndAnswers methodology returns one Query with two Reply entries that match precisely the Query hierarchy we’ve simply created:


Listing<Query> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.measurement());

Query query = questions.get(0);
assertEquals(1, query.id().intValue());

Listing<Reply> solutions = query.solutions();
assertEquals(2, solutions.measurement());
assertEquals(1, solutions.get(0).id().intValue());
assertEquals(2, solutions.get(1).id().intValue());

When inserting a brand new Reply into our hierarchy:


sql
.insertInto(ANSWER)
.columns(
    ANSWER.ID,
    ANSWER.QUESTION_ID,
    ANSWER.BODY
)
.values(
    3L,
    1L,
    "Checkout this " +
    "(https://www.youtube.com/watch?v=8jiJDflpw4Y)."
)
.execute();

We will see that now the Query document returned by the getUpdatedQuestionsAndAnswers methodology will include three Reply youngster parts:


Listing<Query> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.measurement());

Query query = questions.get(0);
assertEquals(1, query.id().intValue());

Listing<Reply> solutions = query.solutions();
assertEquals(3, solutions.measurement());
assertEquals(1, solutions.get(0).id().intValue());
assertEquals(2, solutions.get(1).id().intValue());
assertEquals(3, solutions.get(2).id().intValue());

When updating the reply desk row that we’ve simply created:


sql
.replace(ANSWER)
.set(
    ANSWER.BODY,
    "Checkout this [YouTube video from Toon Koppelaars]" +
    "(https://www.youtube.com/watch?v=8jiJDflpw4Y)."
)
.the place(ANSWER.ID.eq(3L))
.execute();

The getUpdatedQuestionsAndAnswers methodology will return the up to date snapshot of our Query and Reply hierarchy:


Listing<Query> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.measurement());
Query query = questions.get(0);
assertEquals(1, query.id().intValue());

Listing<Reply> solutions = query.solutions();
assertEquals(3, solutions.measurement());
assertEquals(1, solutions.get(0).id().intValue());
assertEquals(2, solutions.get(1).id().intValue());

Reply latestAnswer = solutions.get(2);
assertEquals(3, latestAnswer.id().intValue());
assertEquals(
    "Checkout this [YouTube video from Toon Koppelaars]" +
    "(https://www.youtube.com/watch?v=8jiJDflpw4Y).", 
    latestAnswer.physique()
);

If we determine to insert a brand new Query:


sql
.insertInto(QUESTION)
.columns(
    QUESTION.ID,
    QUESTION.TITLE,
    QUESTION.BODY
)
.values(
    2L,
    " use the jOOQ MULTISET operator?",
    "I need to understand how I can use the jOOQ MULTISET operator."
)
.execute();

The getUpdatedQuestionsAndAnswers methodology will seize this variation and return the newly created Query that we will retailer within the cache:


Listing<Query> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.measurement());

Query query = questions.get(0);
assertEquals(2, query.id().intValue());
assertTrue(query.solutions().isEmpty());

Cool, proper?


Should you loved this text, I wager you’re going to love my Ebook and Video Programs as nicely.







And there may be extra!


You may earn a major passive revenue stream from selling all these superb merchandise that I’ve been creating.


Should you’re all for supplementing your revenue, then be a part of my associates program.

Conclusion

Whereas caching knowledge is straightforward, synchronizing the cache with the database is the tough half.

By utilizing jOOQ to name the PostgreSQL TABLE-value capabilities that fetch the cacheable aggregates, we will simplify this job, because the consequence will seize the entries which were modified for the reason that final time we executed the cache synchronization.

This analysis was funded by Information Geekery GmbH and performed in accordance with the weblog ethics coverage.

Whereas the article was written independently and displays totally my opinions and conclusions, the quantity of labor concerned in making this text occur was compensated by Information Geekery.

Transactions and Concurrency Control eBook

[ad_2]

Leave a Comment

Your email address will not be published. Required fields are marked *