Creado para tiempo real: mensajería de big data con Apache Kafka, parte 2

En la primera mitad de esta introducción de JavaWorld a Apache Kafka, desarrolló un par de aplicaciones de productor / consumidor a pequeña escala utilizando Kafka. A partir de estos ejercicios, debe familiarizarse con los conceptos básicos del sistema de mensajería Apache Kafka. En esta segunda mitad, aprenderá a usar particiones para distribuir la carga y escalar su aplicación horizontalmente, manejando hasta millones de mensajes por día. También aprenderá cómo Kafka usa las compensaciones de mensajes para rastrear y administrar el procesamiento de mensajes complejos, y cómo proteger su sistema de mensajería Apache Kafka contra fallas en caso de que un consumidor falle. Desarrollaremos la aplicación de ejemplo de la Parte 1 para casos de uso de publicación-suscripción y de punto a punto.

Particiones en Apache Kafka

Los temas de Kafka se pueden subdividir en particiones. Por ejemplo, mientras crea un tema llamado Demo, puede configurarlo para que tenga tres particiones. El servidor crearía tres archivos de registro, uno para cada una de las particiones de demostración. Cuando un productor publicaba un mensaje sobre el tema, asignaba un ID de partición para ese mensaje. Luego, el servidor agregaría el mensaje al archivo de registro solo para esa partición.

Si luego inició dos consumidores, el servidor podría asignar las particiones 1 y 2 al primer consumidor y la partición 3 al segundo consumidor. Cada consumidor leería solo de sus particiones asignadas. Puede ver el tema de demostración configurado para tres particiones en la Figura 1.

Para ampliar el escenario, imagine un clúster de Kafka con dos agentes, alojados en dos máquinas. Cuando particionó el tema de demostración, lo configuraría para tener dos particiones y dos réplicas. Para este tipo de configuración, el servidor de Kafka asignaría las dos particiones a los dos intermediarios de su clúster. Cada corredor sería el líder de una de las particiones.

Cuando un productor publica un mensaje, se dirige al líder de la partición. El líder tomaría el mensaje y lo agregaría al archivo de registro en la máquina local. El segundo corredor replicaría pasivamente ese registro de confirmación en su propia máquina. Si el líder de la partición caía, el segundo corredor se convertiría en el nuevo líder y comenzaría a atender las solicitudes de los clientes. De la misma manera, cuando un consumidor envía una solicitud a una partición, esa solicitud irá primero al líder de la partición, que devolverá los mensajes solicitados.

Beneficios de la partición

Considere los beneficios de particionar un sistema de mensajería basado en Kafka:

  1. Escalabilidad : en un sistema con una sola partición, los mensajes publicados en un tema se almacenan en un archivo de registro, que existe en una sola máquina. La cantidad de mensajes para un tema debe caber en un solo archivo de registro de confirmación, y el tamaño de los mensajes almacenados nunca puede ser mayor que el espacio en disco de esa máquina. Particionar un tema le permite escalar su sistema almacenando mensajes en diferentes máquinas en un clúster. Si desea almacenar 30 gigabytes (GB) de mensajes para el tema de demostración, por ejemplo, puede crear un clúster de Kafka de tres máquinas, cada una con 10 GB de espacio en disco. Luego, configuraría el tema para que tenga tres particiones.
  2. Equilibrio de carga del servidor : tener varias particiones le permite distribuir las solicitudes de mensajes entre los agentes. Por ejemplo, si tiene un tema que procesa 1 millón de mensajes por segundo, puede dividirlo en 100 particiones y agregar 100 agentes a su clúster. Cada corredor sería el líder de una sola partición, responsable de responder a solo 10,000 solicitudes de clientes por segundo.
  3. Equilibrio de carga del consumidor : similar al equilibrio de carga del servidor, alojar a varios consumidores en diferentes máquinas le permite distribuir la carga del consumidor. Supongamos que desea consumir 1 millón de mensajes por segundo de un tema con 100 particiones. Podría crear 100 consumidores y ejecutarlos en paralelo. El servidor Kafka asignaría una partición a cada uno de los consumidores, y cada consumidor procesaría 10.000 mensajes en paralelo. Dado que Kafka asigna cada partición a un solo consumidor, dentro de la partición cada mensaje se consumiría en orden.

Dos formas de particionar

El productor es responsable de decidir a qué partición irá un mensaje. El productor tiene dos opciones para controlar esta asignación:

  • Particionador personalizado : puede crear una clase que implemente la org.apache.kafka.clients.producer.Partitionerinterfaz. Esta costumbre Partitionerimplementará la lógica empresarial para decidir dónde se envían los mensajes.
  • DefaultPartitioner : si no crea una clase de particionador personalizada, org.apache.kafka.clients.producer.internals.DefaultPartitionerse utilizará la clase de forma predeterminada . El particionador predeterminado es lo suficientemente bueno para la mayoría de los casos y ofrece tres opciones:
    1. Manual : cuando cree un ProducerRecord, utilice el constructor sobrecargado new ProducerRecord(topicName, partitionId,messageKey,message)para especificar un ID de partición.
    2. Hash (sensible a la localidad) : cuando crea un ProducerRecord, especifique a messageKey, llamando new ProducerRecord(topicName,messageKey,message). DefaultPartitionerutilizará el hash de la clave para garantizar que todos los mensajes de la misma clave vayan al mismo productor. Este es el enfoque más fácil y común.
    3. Pulverización (equilibrio de carga aleatorio) : si no desea controlar a qué partición van los mensajes, simplemente llame new ProducerRecord(topicName, message)para crear su ProducerRecord. En este caso, el particionador enviará mensajes a todas las particiones por turnos, asegurando una carga equilibrada del servidor.

Partición de una aplicación Apache Kafka

Para el ejemplo simple de productor / consumidor de la Parte 1, usamos a DefaultPartitioner. Ahora intentaremos crear un particionador personalizado. Para este ejemplo, supongamos que tenemos un sitio de venta minorista que los consumidores pueden utilizar para pedir productos en cualquier parte del mundo. Según el uso, sabemos que la mayoría de los consumidores se encuentran en los Estados Unidos o en la India. Queremos dividir nuestra aplicación para enviar pedidos desde los EE. UU. O la India a sus propios consumidores respectivos, mientras que los pedidos de cualquier otro lugar irán a un tercer consumidor.

Para empezar, crearemos un CountryPartitionerque implemente la org.apache.kafka.clients.producer.Partitionerinterfaz. Debemos implementar los siguientes métodos:

  1. Kafka llamará a configure () cuando inicialicemos la Partitionerclase, con una Mapde las propiedades de configuración. Este método inicializa funciones específicas de la lógica empresarial de la aplicación, como conectarse a una base de datos. En este caso queremos un particionador bastante genérico que tome countryNamecomo propiedad. Luego podemos usar configProperties.put("partitions.0","USA")para mapear el flujo de mensajes a particiones. En el futuro, podemos usar este formato para cambiar qué países obtienen su propia partición.
  2. La ProducerAPI llama a la partición () una vez por cada mensaje. En este caso, lo usaremos para leer el mensaje y analizar el nombre del país del mensaje. Si el nombre del país está en countryToPartitionMap, volverá partitionIdalmacenado en Map. De lo contrario, hará un hash del valor del país y lo usará para calcular a qué partición debe ir.
  3. Llamamos a close () para cerrar el particionador. El uso de este método garantiza que los recursos adquiridos durante la inicialización se limpien durante el apagado.

Tenga en cuenta que cuando Kafka llama configure(), el productor de Kafka pasará todas las propiedades que hemos configurado para el productor a la Partitionerclase. Es esencial que leamos solo las propiedades que comienzan con partitions., las analicemos para obtener partitionIdy almacenemos el ID countryToPartitionMap.

A continuación se muestra nuestra implementación personalizada de la Partitionerinterfaz.

Listado 1. CountryPartitioner

 public class CountryPartitioner implements Partitioner { private static Map countryToPartitionMap; public void configure(Map configs) { System.out.println("Inside CountryPartitioner.configure " + configs); countryToPartitionMap = new HashMap(); for(Map.Entry entry: configs.entrySet()){ if(entry.getKey().startsWith("partitions.")){ String keyName = entry.getKey(); String value = (String)entry.getValue(); System.out.println( keyName.substring(11)); int paritionId = Integer.parseInt(keyName.substring(11)); countryToPartitionMap.put(value,paritionId); } } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.availablePartitionsForTopic(topic); String valueStr = (String)value; String countryName = ((String) value).split(":")[0]; if(countryToPartitionMap.containsKey(countryName)){ //If the country is mapped to particular partition return it return countryToPartitionMap.get(countryName); }else { //If no country is mapped to particular partition distribute between remaining partitions int noOfPartitions = cluster.topics().size(); return value.hashCode()%noOfPartitions + countryToPartitionMap.size() ; } } public void close() {} } 

La Producerclase en el Listado 2 (abajo) es muy similar a nuestro productor simple de la Parte 1, con dos cambios marcados en negrita:

  1. Establecemos una propiedad de configuración con una clave igual al valor de ProducerConfig.PARTITIONER_CLASS_CONFIG, que coincide con el nombre completo de nuestra CountryPartitionerclase. También establecemos countryNameen partitionId, mapeando así las propiedades a las que queremos pasar CountryPartitioner.
  2. We pass an instance of a class implementing the org.apache.kafka.clients.producer.Callback interface as a second argument to the producer.send() method. The Kafka client will call its onCompletion() method once a message is successfully published, attaching a RecordMetadata object. We'll be able to use this object to find out which partition a message was sent to, as well as the offset assigned to the published message.

Listing 2. A partitioned producer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");  configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName()); configProperties.put("partition.1","USA"); configProperties.put("partition.2","India");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, null, line); producer.send(rec, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset()); ; } }); line = in.nextLine(); } in.close(); producer.close(); } } 

Assigning partitions to consumers

The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.

If your business logic demands more control, then you'll need to manually assign partitions. In this case you would use KafkaConsumer.assign() to pass a list of partitions that each consumer was interested in to the Kakfa server.

Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.

Digamos que está creando un tema nuevo con tres particiones. Cuando inicie el primer consumidor para el nuevo tema, Kafka asignará las tres particiones al mismo consumidor. Si luego inicia un segundo consumidor, Kafka reasignará todas las particiones, asignando una partición al primer consumidor y las dos particiones restantes al segundo consumidor. Si agrega un tercer consumidor, Kafka reasignará las particiones nuevamente, de modo que a cada consumidor se le asigne una sola partición. Finalmente, si inicia el cuarto y quinto consumidor, entonces tres de los consumidores tendrán una partición asignada, pero los demás no recibirán ningún mensaje. Si una de las tres particiones iniciales deja de funcionar, Kafka utilizará la misma lógica de partición para reasignar la partición de ese consumidor a uno de los consumidores adicionales.