Publicación en Kafka

Publicar en Kafka es sencillo. Es importante saber que el orden se garantiza dentro de una partición pero no entre particiones.

Mensaje

Un mensaje de Kafka está compuesto por:

  • clave: binario (aunque podemos enviar como otro tipo, todo en Kafka al final son arrays de bytes). Si no la especificamos será null.
  • valor: binario
  • tipo de compresión
  • cabeceras (opcionales)
  • partición y offset para ubicar el mensaje en la partición
  • timestamp

Figure 1: Serialización de claves y valores en Kafka (https://learning.oreilly.com/videos/apache-kafka-series/9781789342604/9781789342604-video2_2/)

Figure 1: Serialización de claves y valores en Kafka (https://learning.oreilly.com/videos/apache-kafka-series/9781789342604/9781789342604-video2_2/)

Clave

Si queremos garantizar el orden de todos los mensajes, hemos de añadir key. Kafka usa la misma partición para los mensajes que compartan la misma. De esta forma podemos garantizar el orden absoluto de una serie de mensajes Obviamente tenemos las contrapartidas de usar una sola partición: menor rendimiento, etc. La partición a partir de la clave se elige aplicando hashing1 a dicha clave, no podemos elegir de forma explícita a qué partición queremos que envíe el mensaje.

Si queremos producir con fs2-kafka en Scala, podemos usar un valor opcional2 para no tener que especificar siempre una clave, cuando en muchas ocasiones no nos hará falta.

type Res[F, T] = Resource[F, Producer[F, T]]
type Config[F, T] = ProducerSettings[F, String, T]

def kafka[F[_]: Async, A](settings: Config[F, T], topic: String, key: Option[String]=None): Res =
  KafkaProducer.resource(settings) map { p =>
    new {
      def send(a: A): F[Unit] = p.produceOne(topic, key getOrElse "key", a).flatten.void
      def send(a: A, properties: Map[String, String]): F[Unit] = send(a)
    }
  }

En el caso más común de no tener clave, None se serializa como null y los mensajes se envían entre particiones siguiendo round-robin.


  1. La fórmula para obtener el hash es targetPartition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)↩︎

  2. La clave no tiene porqué ser un String, puede ser también un número o datos binarios. ↩︎