Bash es increiblemente permisivo, por lo que se ha convertido en mi lenguaje de prototipado por defecto. Combinado con SQLite y Jq, me faltaba una última pieza para completar los diseños: un sistema pubsub. Ahora he encontrado una manera sencilla de integrar NATS (corriendo en Docker) con Bash.
NATS es un sistema de mensajería pubsub escrito en Go. Aunque es relativamente sencillo usar las bibliotecas que ofrecen para desarrollar clientes e interaccionar con NATS, quería aprovechar la existencia de NATS CLI para integrarlo en mis scripts de manera sencilla.
El problema
Usando el cliente nats
, suscribirse a un topic es tan sencillo como:
nats sub <topic-name>
Sin embargo, al ejecutar el comando, el cliente nats
se queda escuchando, y no finaliza.
El comando muestra los mensajes que se reciben en el topic indicado, pero no he sabido capturarlos de forma que pudiera consumir los mensajes recibidos desde un script en Bash.
La solución
La solución ha venido de revisar la ayuda del comando y descubrir que se puede finalizar “la suscripción” tras recibir un número determinado de mensajes:
$ nats sub --help
usage: nats subscribe [<flags>] [<subject>]
Generic subscription client
Args:
[<subject>] Subject to subscribe to
Flags:
...
--count=COUNT Quit after receiving this many messages
Al finalizar el comando, el mensaje recibido se puede capturar en una variable, y el script continua, consumiendo el valor y haciendo con él lo que queramos…
Una simple prueba de concepto:
#!/usr/bin/env bash
while true ; do
sleep 0.1
msg=$(nats sub stuff --raw --count=1)
echo "message: $msg"
done
Añadimos --raw
para evitar el mensaje de NATS del número de mensaje recibido, etc..
message: 19:30:08 Subscribing on sample.topic
[#1] Received on "sample.topic"
hola
Una vez desbloqueada la recepción de los mensajes, la publicación no presenta ningún problema. En este ejemplo, enviamos un ID y un timestamp generados por NATS en bucle:
#!/usr/bin/env bash
while true ; do
sleep 0.5
nats pub stuff '{ "id": "{{ ID }}", "time": "{{ UnixNano }}" }'
done
El subscriber recibe el mensaje y lo imprime por pantalla usando echo
:
$ bash subscriber.sh
message: { "id": "vqUCNi4QzgspHAGj2WyfZl", "time": "1739102475245242000" }
message: { "id": "AC6pFZcnRHQ5Xzt9CV5RbP", "time": "1739102475787266000" }
message: { "id": "7s8IiQxxeife2AxTXOG1JR", "time": "1739102476327454000" }
...
Ejecutando pipelines
Imprimir un mensaje a stdout
no lleva prácticamente nada de tiempo… Pero si queremos hacer alguna cosa más “complicada”, que lleve más tiempo, el subscriber podría perder algunos mensajes… Para evitar esto, usamos la idea de las go routines, lanzando un proceso independiente para no bloquear la ejecución de subscriber.sh
.
En Bash, movemos el proceso “pesado” a segundo plano añadiendo un &
tras el comando.
Por ejemplo:
#!/bin/bash
while true ; do
sleep 0.1
msg=$(nats sub stuff --raw --count=1)
echo "processing '$msg'..."
./processor.sh "$msg" &
done
processor.sh
llevaría a cabo un proceso que requiere más tiempo (que en este caso simulamos con un sleep
), aunque en este caso sólo se trata de guardar los mensajes que recibimos en un fichero CSV:
#!/usr/bin/env bash
parser() {
local msg="$1"
id=$(echo $1 | jq -r '.id')
time=$(echo $1 | jq -r '.time')
if [[ "$id" == "" ]]; then
echo "error: malformed message '$msg'"
exit 1
fi
echo "$id,$time" >> registry.csv
sleep 2
}
if [[ ! -f ./registry.csv ]]; then
echo "ID, TimeStamp (Unix)" > ./registry.csv
fi
parser "$@"
Discriminando mensajes
Podemos tener múltiples suscriptores escuchando a un topic. La idea es que cada uno de los suscriptores sólo procese un determinado “tipo” de mensaje. Para ello, en primer lugar, deberíamos tener diferentes tipos de mensajes.
Imaginemos que nuestro sistema de mensajería pubsub recibe el resultado del lanzamiento de una moneda:
#!/bin/bash
flipper() {
(( RANDOM % 2 )) && echo "cara" || echo "cruz"
}
while true ; do
sleep 0.5
nats pub stuff '{ "result": "'$(flipper)'" }'
done
Como antes, generamos un subscriber, pero esta vez sólo actuará cuando el mensaje contenga cara
:
#!/bin/bash
while true ; do
sleep 0.1
msg=$(nats sub stuff --raw --count=1)
./flip_logger.sh "$msg" &
done
El subscriber en sí no contiene ninguna lógica; sólo pasa la información al script flip_logger.sh para que éste la procese sin bloquear la recepción de mensajes.
El análisis y procesado del contenido del mensaje se hace en flip_logger.sh
; en este caso, se muestra un mensaje en stdout
indicando que el lanzamiento ha resultado cara
:
#!/usr/bin/env bash
if [[ "$(echo $1 | jq -r '.result')" == "cara" ]]; then
echo "Ha salido 'Cara'"
fi
No es el script más útil del mundo, pero muestra cómo podemos tener múltiples scripts suscritos a un topic haciendo que cada uno de ellos reaccione únicamente cuando se detecte un mensaje -o una propiedad del mensaje- determinada.