La librería más usada en el ecosistema de Typelevel es fs2-kafka. Aquí dejo algunos apuntes extraídos de la documentación y de algún que otro artículo revisado que dejo enlazado.
Podemos trabajar tanto con fs2.Stream como con Resource. Existen tanto KafkaConsumer.stream(...)
como KafkaConsumer.resource(...)
. Ambos crean un consumidor de Java Kafka y empiezan en un hilo secundario. Los dos garantizan que se liberan/cierran los recursos abiertos, cerrando el consumidor y parando el trabajo en el background.
Podemos suscribirnos a un tema, a una lista de temas (como lista a varargs), o mediante una expresión regular que nos subscriba a todos los temas que estén contenido en dicha regex.
Es en la configuración del consumidor1 donde definimos casi todos los parámetros del mismo.
Qué tipos y por lo tanto cómo deserializar los mensajes se define aquí. La clase se define como
sealed abstract class ConsumerSettings[F[_], K, V] {
def keyDeserializer: Resource[F, KeyDeserializer[F, K]] // The `Deserializer` to use for deserializing record keys.
def valueDeserializer: Resource[F, ValueDeserializer[F, V]] // The `Deserializer` to use for deserializing record values.
...
}
y por lo tanto cómo definamos tanto la clave (K) cómo el mensaje (V) a la hora de crearla nos define el deserializador que se empleará.
Si hacemos val settings = ConsumerSettings[IO, String, String]
estamos definiendo tanto clave como mensajes como texto.
Clase fs2.kafka.ConsumerSettings. ↩︎