En Clojure, core.async facilita la programación asíncrona mediante canales. Los canales son intermediarios que se usan para conectar dos procesos1, uno haciendo las veces de emisor y otro de receptor. Pero pueden usarse para conectar múltiples procesos: sirven para relaciones many-to-many.
Los canales, como casi todo en Clojure, intentan seguir esquemas sencillos y prácticos. Siguen un modelo first-in first-out, lo que viene siendo una cola. De esta forma el orden en que un productor introduce los mensajes en un canal es el mismo orden en que va despachando dichos mensajes a medida que son reclamados por un consumidor en su otro extremo. Si son varios productores los que introducen los mensajes en el mismo canal, dichos mensajes serán igualmente despachados en el mismo orden en que fueron inyectados.
Para introducir mensajes se usa el operador >!!
. La primera !
indica que se producen side-effects, la segunda que la instrucción bloquea el hilo que la ejecuta. De igual forma, <!!
sirve para obtener valores de un canal.
Como bloquean el hilo donde se ejecutan, si creamos un canal e inyectamos un valor en el mismo hilo en que nos encontramos, lo bloquearemos:
(let [c (async/chan)]
(async/>!! c "Hello World!")
(println "Message in " c ": " (async/<!! c)))
Este código nunca imprimirá porque el hilo se queda bloqueado en la segunda línea: como estamos enviando un mensaje pero nadie escucha al otro lado, se queda esperando hasta que alguien escuche.
Lo mismo ocurre si escuchamos en un hilo y después intentamos producir un mensaje desde él:
(let [c (a/chan)]
(println "Message: " (a/<!! c))
(a/>!! "Hello World"))
No recibiremos ningún mensaje porque la segunda línea bloquea el hilo esperando que le llegue algún mensaje.
Podemos cerrar un canal. Cuando lo hacemos, envía todos los mensajes que
tiene pendientes y, por último, envía nil
a todos los receptores que
estaban pendientes de recibir nuevos mensajes.
Estas dos funciones permiten trabajar de forma asíncrona. A put!
podemos pasarle un callback opcional que se ejecutará tras recibir del
receptor del mensaje una respuesta donde nos indique si ha recibido el
mensaje. En take!
el callback es obligatorio y será la función que
se llamará al recibir un mensaje. Al usarlas, no se bloquea el hilo y
podemos seguir trabajando. Por lo tanto, esto:
(let [c (async/chan)]
(async/take! c println)
(async/put! c "Hello World"))
y esto
(let [c (async/chan)]
(async/put! c "Hello World")
(async/take! c println))
serán equivalentes e imprimirán por consola el mismo valor.
Thread es una macro que permite la ejecución del cuerpo en otro hilo, pasándole de forma inmediata el control al hilo principal desde el que se llama. Si mandamos un mensaje desde dicho nuevo hilo, este se bloqueará. Pero no el principal, el cual podrá consumir el valor enviado. Esto hará a su vez que se desbloquee el hilo productor que creamos con la macro.
(let [c (async/chan)]
(async/thread (async/>!! c "Hello World!"))
(println "Message in " c ": " (async/<!! c)))
Podemos de igual forma escuchar en un nuevo hilo y producir desde el principal:
(let [c (async/chan)]
(async/thread (println "Message: " (async/<!! c)))
(async/>!! c "Hello World"))
Otro hecho interesante es que esta macro crea un canal como respuesta a su llamada. Este canal recibirá el valor obtenido por la ejecución del bloque de código que se le pasa. Después de recibir dicho valor el canal se cierra, por lo que solo puede emplearse para una comunicación de un único mensaje:
(let [c (async/thread "Hello")]
(println (async/<!! c)))
El bloque superior imprimirá el mensaje que se le pasa al canal en el cuerpo de la macro, en este caso Hello.
También podemos usar future
en vez de core.async/thread
para
descargar el hilo en que nos encontremos de las tareas de emisión y/o
recepción de mensajes. Como un futuro devuelve una promesa en vez de un
canal, no puede usarse de forma tan natural con otras construcciones de
core.async
, pero si no necesitamos usar el canal que devuelve
thread
, podemos reemplezarlo sin problemas por future
. Así, el
primer y segundo ejemplos de thread
donde usábamos canales creados de
forma explícita pueden reescribirse usando future
, no así el tercero.
Los dos siguientes snippets son ambos válidos:
(let [c (async/chan)]
(future (async/>!! c "Hello World!"))
(println "Message in " c ": " (async/<!! c)))
(let [c (async/chan)]
(future (println "Message: " (async/<!! c)))
(async/>!! c "Hello World"))
Podemos crear canales con buffers.
Si se especifica el tamaño del buffer, y si el buffer tiene datos, bloqueará. Si no tiene, no bloquerará (TODO: compararar con primer ejemplo, como en caso de tener buffer > 1 no se bloquea).
Si el canal envía un mensaje pero nadie extrae su valor, el buffer no
liberará el espacio y se bloqueará al llenarse. Si no queremos que se
bloquee y en cambios se necesita seguir una política en que al llenarse
puedan recibirse nuevos mensajes, podemos usar las funciones
sliding-buffer
y dropping-buffer
para seleccionar qué tipo de
política de descarte queremos seguir.
Cuando está lleno descarta los nuevos valores.
Cuando está lleno descarta los valores más antiguos.
Cuando se habla de IOC2 threads nos referimos a hilos ligeros del
estilo corrutinas en go o python. core.async
puede trabajar con
este tipo de hilos3.
Para crearlos se usa la macro go
. Esta hace que se ejecute el código
que se la pasa de forma secuencial, aunque internamente lo deconstruye y
lo ejecuta mediante callbacks4.
(let [c (async/chan)]
(async/go (async/>! c "Hello World!"))
(async/go (println "Message in " c ": " (async/<! c))))
(let [c (async/chan)]
(async/go
(println "Vamos a recibir un mensaje")
(println "Message: " (async/<! c))
(println "Hemos recibido un mensaje"))
(async/go
(println "Vamos a ENVIAR un mensaje")
(async/>! c "Hello World")))
Podemos ver que al estar dentro del cuerpo de la macro go
pasamos a
usar >!
y <!
, al no ser funciones que bloqueen el hilo donde se
llaman. Toda llamada a estar versiones no bloqueantes ha de realizarse
dentro del cuerpo de la macro go
.
Es importante recalcar que estas versiones no bloquean el hilo de ejecución donde se realizan. Son usadas para decirle a la macro donde ha de separar el código en un callback que se ejecute cuando el canal reciba/envíe un mensaje. Se les llama respectivamente put parking y take parking, al servir como metáfora el hecho de que se quedan aparcadas a un lado, sin bloquear la ejecución del código, hasta que el canal tiene un valor que las hace reactivarse. Las similitudes con la forma de trabajar de un callback en un runtime con un único hilo que responde a eventos son evidentes.
Un proceso aquí no tiene relación con un proceso de la JVM, es simplemente un bloque de código que se encarga de realizar alguna tarea. ↩︎
Inversion Of Control. ↩︎
Cuando creamos un hilo de este tipo no estamos creando uno de la
JVM como sí ocurre cuando usamos thread
o future
. ↩︎
Algo similar ocurre en JavaScript con async-await. Esta construcción permite ejecutar código de forma secuencial, el cual se traduce en realidad en una serie de callbacks anidados. ↩︎