Clojure Core Async

En Clojure, core.async facilita la programación asíncrona mediante canales. Los canales son intermediarios que se usan para conectar dos procesos1, uno haciendo las veces de emisor y otro de receptor. Pero pueden usarse para conectar múltiples procesos: sirven para relaciones many-to-many.

Los canales, como casi todo en Clojure, intentan seguir esquemas sencillos y prácticos. Siguen un modelo first-in first-out, lo que viene siendo una cola. De esta forma el orden en que un productor introduce los mensajes en un canal es el mismo orden en que va despachando dichos mensajes a medida que son reclamados por un consumidor en su otro extremo. Si son varios productores los que introducen los mensajes en el mismo canal, dichos mensajes serán igualmente despachados en el mismo orden en que fueron inyectados.

Metiendo y sacando datos en el canal

Para introducir mensajes se usa el operador >!!. La primera ! indica que se producen side-effects, la segunda que la instrucción bloquea el hilo que la ejecuta. De igual forma, <!! sirve para obtener valores de un canal.

Como bloquean el hilo donde se ejecutan, si creamos un canal e inyectamos un valor en el mismo hilo en que nos encontramos, lo bloquearemos:

(let [c (async/chan)]
  (async/>!! c "Hello World!")
  (println "Message in " c ": " (async/<!! c)))

Este código nunca imprimirá porque el hilo se queda bloqueado en la segunda línea: como estamos enviando un mensaje pero nadie escucha al otro lado, se queda esperando hasta que alguien escuche.

Lo mismo ocurre si escuchamos en un hilo y después intentamos producir un mensaje desde él:

(let [c (a/chan)]
  (println "Message: " (a/<!! c))
  (a/>!! "Hello World"))

No recibiremos ningún mensaje porque la segunda línea bloquea el hilo esperando que le llegue algún mensaje.

close!

Podemos cerrar un canal. Cuando lo hacemos, envía todos los mensajes que tiene pendientes y, por último, envía nil a todos los receptores que estaban pendientes de recibir nuevos mensajes.

Asincronía

put! take!

Estas dos funciones permiten trabajar de forma asíncrona. A put! podemos pasarle un callback opcional que se ejecutará tras recibir del receptor del mensaje una respuesta donde nos indique si ha recibido el mensaje. En take! el callback es obligatorio y será la función que se llamará al recibir un mensaje. Al usarlas, no se bloquea el hilo y podemos seguir trabajando. Por lo tanto, esto:

(let [c (async/chan)]
  (async/take! c println)
  (async/put! c "Hello World"))

y esto

(let [c (async/chan)]
  (async/put! c "Hello World")
  (async/take! c println))

serán equivalentes e imprimirán por consola el mismo valor.

thread

Thread es una macro que permite la ejecución del cuerpo en otro hilo, pasándole de forma inmediata el control al hilo principal desde el que se llama. Si mandamos un mensaje desde dicho nuevo hilo, este se bloqueará. Pero no el principal, el cual podrá consumir el valor enviado. Esto hará a su vez que se desbloquee el hilo productor que creamos con la macro.

(let [c (async/chan)]
  (async/thread (async/>!! c "Hello World!"))
  (println "Message in " c ": " (async/<!! c)))

Podemos de igual forma escuchar en un nuevo hilo y producir desde el principal:

(let [c (async/chan)]
  (async/thread (println "Message: " (async/<!! c)))
  (async/>!! c "Hello World"))

Otro hecho interesante es que esta macro crea un canal como respuesta a su llamada. Este canal recibirá el valor obtenido por la ejecución del bloque de código que se le pasa. Después de recibir dicho valor el canal se cierra, por lo que solo puede emplearse para una comunicación de un único mensaje:

(let [c (async/thread "Hello")]
  (println (async/<!! c)))

El bloque superior imprimirá el mensaje que se le pasa al canal en el cuerpo de la macro, en este caso Hello.

future

También podemos usar future en vez de core.async/thread para descargar el hilo en que nos encontremos de las tareas de emisión y/o recepción de mensajes. Como un futuro devuelve una promesa en vez de un canal, no puede usarse de forma tan natural con otras construcciones de core.async, pero si no necesitamos usar el canal que devuelve thread, podemos reemplezarlo sin problemas por future. Así, el primer y segundo ejemplos de thread donde usábamos canales creados de forma explícita pueden reescribirse usando future, no así el tercero. Los dos siguientes snippets son ambos válidos:

(let [c (async/chan)]
  (future (async/>!! c "Hello World!"))
  (println "Message in " c ": " (async/<!! c)))

(let [c (async/chan)]
  (future (println "Message: " (async/<!! c)))
  (async/>!! c "Hello World"))

Buffers

Podemos crear canales con buffers.

Buffer de tamaño fijo

Si se especifica el tamaño del buffer, y si el buffer tiene datos, bloqueará. Si no tiene, no bloquerará (TODO: compararar con primer ejemplo, como en caso de tener buffer > 1 no se bloquea).

Si el canal envía un mensaje pero nadie extrae su valor, el buffer no liberará el espacio y se bloqueará al llenarse. Si no queremos que se bloquee y en cambios se necesita seguir una política en que al llenarse puedan recibirse nuevos mensajes, podemos usar las funciones sliding-buffer y dropping-buffer para seleccionar qué tipo de política de descarte queremos seguir.

sliding-buffer

Cuando está lleno descarta los nuevos valores.

dropping-buffer

Cuando está lleno descarta los valores más antiguos.

IOC Threads

Cuando se habla de IOC2 threads nos referimos a hilos ligeros del estilo corrutinas en go o python. core.async puede trabajar con este tipo de hilos3.

Para crearlos se usa la macro go. Esta hace que se ejecute el código que se la pasa de forma secuencial, aunque internamente lo deconstruye y lo ejecuta mediante callbacks4.

(let [c (async/chan)]
  (async/go (async/>! c "Hello World!"))
  (async/go (println "Message in " c ": " (async/<! c))))

(let [c (async/chan)]
  (async/go
    (println "Vamos a recibir un mensaje")
    (println "Message: " (async/<! c))
    (println "Hemos recibido un mensaje"))
  (async/go
    (println "Vamos a ENVIAR un mensaje")
    (async/>! c "Hello World")))

Podemos ver que al estar dentro del cuerpo de la macro go pasamos a usar >! y <!, al no ser funciones que bloqueen el hilo donde se llaman. Toda llamada a estar versiones no bloqueantes ha de realizarse dentro del cuerpo de la macro go.

Es importante recalcar que estas versiones no bloquean el hilo de ejecución donde se realizan. Son usadas para decirle a la macro donde ha de separar el código en un callback que se ejecute cuando el canal reciba/envíe un mensaje. Se les llama respectivamente put parking y take parking, al servir como metáfora el hecho de que se quedan aparcadas a un lado, sin bloquear la ejecución del código, hasta que el canal tiene un valor que las hace reactivarse. Las similitudes con la forma de trabajar de un callback en un runtime con un único hilo que responde a eventos son evidentes.

Enlaces

Notas al Pie


  1. Un proceso aquí no tiene relación con un proceso de la JVM, es simplemente un bloque de código que se encarga de realizar alguna tarea. ↩︎

  2. Inversion Of Control↩︎

  3. Cuando creamos un hilo de este tipo no estamos creando uno de la JVM como sí ocurre cuando usamos thread o future↩︎

  4. Algo similar ocurre en JavaScript con async-await. Esta construcción permite ejecutar código de forma secuencial, el cual se traduce en realidad en una serie de callbacks anidados. ↩︎