Persistent messaging

Persistence is used on both server side and client side to prevent messages from being lost.

NATS Jetstream

NATS possesses a persistence module called JetStream that can be enabled on the NATS server. On the client side, it uses stream (group of queue) to send messages.

import argparse
import asyncio

import nats

async def run():
    parser = argparse.ArgumentParser()

    parser.add_argument("subject", default="hello", nargs="?")
    parser.add_argument("-d", "--data", default="hello world")
    parser.add_argument("-s", "--servers", default="nats://localhost:4222")
    parser.add_argument("--creds", default="")
    args, unknown = parser.parse_known_args()

    data = args.data
    if len(unknown) > 0:
        data = unknown[0]

    async def error_cb(e):
        print("Error:", e)

    async def reconnected_cb():
        print("Got reconnected to NATS...")

    options = {"error_cb": error_cb, "reconnected_cb": reconnected_cb}

    if len(args.creds) > 0:
        options["user_credentials"] = args.creds

    try:
        if len(args.servers) > 0:
            options["servers"] = args.servers

        nc = await nats.connect(**options)
        js = nc.jetstream()
    except Exception as e:
        print(e)

A client can still subscribe to different subject.

    async def message_handler(msg):
        res = EchoResponse.FromString(msg.data)
        print(f"Got response: '{res.message}'")

    await js.add_stream(
        name=current_user, subjects=[args.subject, f"{args.subject}.reply"]
    )
    await js.subscribe(f"{args.subject}.reply", cb=message_handler)

Because of the properties of Jetstream messaging, the Request/Reply messaging pattern is replaced by a Publish/Reply. The client is subscribed to a dedicated reply queue user_name.*.reply which will receive the Server replies.

    req = EchoRequest()
    req.message = data
    payload = req.SerializeToString()

    await jetstream.publish(subject, payload)
    print(f"Published on [{subject}] : '{payload.decode()}'")

Message buffer

In case a client is disconnected or crash before the messages are sent, a Buffer is used to save the messages. This Buffer uses a sqlite3 DB to save the messages content in chronological order on the user's machine.

It can : - save messages - get the oldest saved message - get_all messages - delete a specific message - remove the oldest saved message

In our stream-echo example that features the use of a buffer, it is used to retrieve unsent messages as well as save messages.

async def send_message(jetstream, data: str, subject: str):
    buffer = Buffer(subject=subject)
    req = EchoRequest()
    req.message = data
    payload = req.SerializeToString()
    buffer.save(payload)

    message = buffer.get()
    while message is not None:
        await jetstream.publish(subject, message)
        print(f"Published on [{subject}] : '{payload.decode()}'")
        message = buffer.get()