Kafka en Scala

La librería más usada en el ecosistema de Typelevel es fs2-kafka. Aquí dejo algunos apuntes extraídos de la documentación y de algún que otro artículo revisado que dejo enlazado.

Consumidor

Podemos trabajar tanto con fs2.Stream como con Resource. Existen tanto KafkaConsumer.stream(...) como KafkaConsumer.resource(...). Ambos crean un consumidor de Java Kafka y empiezan en un hilo secundario. Los dos garantizan que se liberan/cierran los recursos abiertos, cerrando el consumidor y parando el trabajo en el background.

Podemos suscribirnos a un tema, a una lista de temas (como lista a varargs), o mediante una expresión regular que nos subscriba a todos los temas que estén contenido en dicha regex.

Configuración

Es en la configuración del consumidor1 donde definimos casi todos los parámetros del mismo.

Deserialización

Qué tipos y por lo tanto cómo deserializar los mensajes se define aquí. La clase se define como

sealed abstract class ConsumerSettings[F[_], K, V] {
  def keyDeserializer: Resource[F, KeyDeserializer[F, K]] // The `Deserializer` to use for deserializing record keys.
  def valueDeserializer: Resource[F, ValueDeserializer[F, V]] // The `Deserializer` to use for deserializing record values.
  ...
}

y por lo tanto cómo definamos tanto la clave (K) cómo el mensaje (V) a la hora de crearla nos define el deserializador que se empleará.

Si hacemos val settings = ConsumerSettings[IO, String, String] estamos definiendo tanto clave como mensajes como texto.


  1. Clase fs2.kafka.ConsumerSettings↩︎