Cómo crear aplicaciones de transmisión con estado con Apache Flink

Fabian Hueske es un comprometido y miembro de PMC del proyecto Apache Flink y cofundador de Data Artisans.

Apache Flink es un marco para implementar aplicaciones de procesamiento de flujo con estado y ejecutarlas a escala en un clúster de computación. En un artículo anterior, examinamos qué es el procesamiento de flujo con estado, qué casos de uso aborda y por qué debería implementar y ejecutar sus aplicaciones de flujo con Apache Flink.

En este artículo, presentaré ejemplos de dos casos de uso comunes de procesamiento de flujo con estado y discutiré cómo se pueden implementar con Flink. El primer caso de uso son las aplicaciones impulsadas por eventos, es decir, aplicaciones que ingieren flujos continuos de eventos y aplican cierta lógica empresarial a estos eventos. El segundo es el caso de uso de análisis de transmisión, donde presentaré dos consultas analíticas implementadas con la API SQL de Flink, que agregan datos de transmisión en tiempo real. En Data Artisans proporcionamos el código fuente de todos nuestros ejemplos en un repositorio público de GitHub.

Antes de profundizar en los detalles de los ejemplos, presentaré el flujo de eventos que ingieren las aplicaciones de ejemplo y explicaré cómo puede ejecutar el código que proporcionamos.

Una serie de eventos de viajes en taxi

Nuestras aplicaciones de ejemplo se basan en un conjunto de datos públicos sobre viajes en taxi que ocurrieron en la ciudad de Nueva York en 2013. Los organizadores del Gran Desafío DEBS (ACM International Conference on Distributed Event-Based Systems) 2015 reorganizaron el conjunto de datos original y lo convirtieron un solo archivo CSV del que estamos leyendo los siguientes nueve campos.

  • Medallion: un ID de suma MD5 del taxi
  • Hack_license: un ID de suma MD5 de la licencia de taxi
  • Pickup_datetime: la hora en que se recogió a los pasajeros
  • Dropoff_datetime: la hora a la que dejaron a los pasajeros
  • Pickup_longitude: la longitud del lugar de recogida
  • Pickup_latitude: la latitud del lugar de recogida
  • Dropoff_longitude: la longitud de la ubicación de entrega
  • Dropoff_latitude: la latitud de la ubicación de entrega
  • Total_amount: total pagado en dólares

El archivo CSV almacena los registros en orden ascendente según su atributo de tiempo de entrega. Por lo tanto, el archivo puede tratarse como un registro ordenado de eventos que se publicaron cuando finalizó un viaje. Para ejecutar los ejemplos que proporcionamos en GitHub, debe descargar el conjunto de datos del desafío DEBS desde Google Drive.

Todas las aplicaciones de ejemplo leen secuencialmente el archivo CSV y lo ingieren como una secuencia de eventos de viajes en taxi. A partir de ahí, las aplicaciones procesan los eventos como cualquier otro flujo, es decir, como un flujo que se ingiere desde un sistema de publicación-suscripción basado en registros, como Apache Kafka o Kinesis. De hecho, leer un archivo (o cualquier otro tipo de datos persistentes) y tratarlo como un flujo es una piedra angular del enfoque de Flink para unificar el procesamiento por lotes y el flujo.

Ejecutando los ejemplos de Flink

Como se mencionó anteriormente, publicamos el código fuente de nuestras aplicaciones de ejemplo en un repositorio de GitHub. Le recomendamos que bifurque y clone el repositorio. Los ejemplos se pueden ejecutar fácilmente desde el IDE de su elección; no es necesario instalar y configurar un clúster de Flink para ejecutarlos. Primero, importe el código fuente de los ejemplos como un proyecto de Maven. Luego, ejecute la clase principal de una aplicación y proporcione la ubicación de almacenamiento del archivo de datos (consulte más arriba el enlace para descargar los datos) como parámetro del programa.

Una vez que haya lanzado una aplicación, iniciará una instancia de Flink incrustada local dentro del proceso JVM de la aplicación y enviará la aplicación para ejecutarla. Verá un montón de declaraciones de registro mientras se inicia Flink y se programan las tareas del trabajo. Una vez que la aplicación se está ejecutando, su salida se escribirá en la salida estándar.

Construyendo una aplicación impulsada por eventos en Flink

Ahora, analicemos nuestro primer caso de uso, que es una aplicación impulsada por eventos. Las aplicaciones controladas por eventos ingieren secuencias de eventos, realizan cálculos a medida que se reciben los eventos y pueden emitir nuevos eventos o desencadenar acciones externas. Se pueden componer múltiples aplicaciones controladas por eventos conectándolas a través de sistemas de registro de eventos, de manera similar a cómo se pueden componer sistemas grandes a partir de microservicios. Las aplicaciones controladas por eventos, los registros de eventos y las instantáneas del estado de la aplicación (conocidos como puntos de guardado en Flink) comprenden un patrón de diseño muy poderoso porque puede restablecer su estado y reproducir su entrada para recuperarse de una falla, corregir un error o migrar una aplicación a un clúster diferente.

En este artículo examinaremos una aplicación impulsada por eventos que respalda un servicio, que monitorea las horas de trabajo de los taxistas. En 2016, la Comisión de Taxis y Limusinas de la Ciudad de Nueva York decidió restringir las horas de trabajo de los taxistas a turnos de 12 horas y exigir un descanso de al menos ocho horas antes de que pueda comenzar el siguiente turno. Un turno comienza con el comienzo del primer viaje. A partir de ese momento, un conductor puede iniciar nuevos viajes en un plazo de 12 horas. Nuestra aplicación rastrea los viajes de los conductores, marca la hora de finalización de su ventana de 12 horas (es decir, la hora en la que pueden comenzar el último viaje) y señala los viajes que violaron el reglamento. Puede encontrar el código fuente completo de este ejemplo en nuestro repositorio de GitHub.

Nuestra aplicación se implementa con la API DataStream de Flink y un KeyedProcessFunction. La API DataStream es una API funcional y se basa en el concepto de flujos de datos escritos. A DataStreames la representación lógica de una secuencia de eventos de tipo T. Un flujo se procesa aplicándole una función que produce otro flujo de datos, posiblemente de un tipo diferente. Flink procesa transmisiones en paralelo distribuyendo eventos a particiones de transmisión y aplicando diferentes instancias de funciones a cada partición.

El siguiente fragmento de código muestra el flujo de alto nivel de nuestra aplicación de monitoreo.

// ingesta flujo de viajes en taxi.

Paseos de DataStream = TaxiRides.getRides (env, inputPath);

Flujo de datos notificaciones = paseos

   // flujo de partición por la identificación de la licencia de conducir

   .keyBy (r -> r.licenseId)

   // monitorear eventos de viajes y generar notificaciones

   .proceso (nuevo MonitorWorkTime ());

// imprimir notificaciones

notificaciones.print ();

La aplicación comienza a ingerir una serie de eventos de viajes en taxi. En nuestro ejemplo, los eventos se leen de un archivo de texto, se analizan y se almacenan en TaxiRideobjetos POJO. Una aplicación del mundo real generalmente ingiere los eventos de una cola de mensajes o registro de eventos, como Apache Kafka o Pravega. El siguiente paso es marcar los TaxiRideeventos por el licenseIddel conductor. La keyByoperación divide la secuencia en el campo declarado, de modo que todos los eventos con la misma clave son procesados ​​por la misma instancia paralela de la siguiente función. En nuestro caso, particionamos en el licenseIdcampo porque queremos monitorear el tiempo de trabajo de cada conductor individual.

A continuación, aplicamos la MonitorWorkTimefunción a los TaxiRideeventos particionados . La función rastrea los viajes por conductor y monitorea sus turnos y tiempos de descanso. Emite eventos de tipo Tuple2, donde cada tupla representa una notificación que consta de la identificación de la licencia del conductor y un mensaje. Finalmente, nuestra aplicación emite los mensajes imprimiéndolos en la salida estándar. Una aplicación del mundo real escribiría las notificaciones en un mensaje externo o sistema de almacenamiento, como Apache Kafka, HDFS o un sistema de base de datos, o activaría una llamada externa para enviarlas inmediatamente.

Ahora que hemos analizado el flujo general de la aplicación, echemos un vistazo a la MonitorWorkTimefunción, que contiene la mayor parte de la lógica empresarial real de la aplicación. La MonitorWorkTimefunción es un estado KeyedProcessFunctionque ingiere TaxiRideeventos y emite Tuple2registros. La KeyedProcessFunctioninterfaz presenta dos métodos para procesar datos: processElement()y onTimer(). El processElement()método se llama para cada evento que llega. El onTimer()método se llama cuando se activa un temporizador registrado previamente. El siguiente fragmento muestra el esqueleto de la MonitorWorkTimefunción y todo lo que se declara fuera de los métodos de procesamiento.

clase estática pública MonitorWorkTime

    extiende KeyedProcessFunction {

  // constantes de tiempo en milisegundos

  privado estático final largo ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 horas

  privado estático final largo REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 horas

  privado estático final largo CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 horas

 formateador de DateTimeFormatter transitorio privado;

  // identificador de estado para almacenar la hora de inicio de un turno

  ValueState shiftStart;

  @Anular

  public void open (Configuración conf) {

    // registra el identificador de estado

    shiftStart = getRuntimeContext (). getState (

      nuevo ValueStateDescriptor ("shiftStart", Types.LONG));

    // inicializar el formateador de tiempo

    this.formatter = DateTimeFormat.forPattern (“aaaa-MM-dd HH: mm: ss”);

  }

  // processElement () y onTimer () se analizan en detalle a continuación.

}

La función declara algunas constantes para intervalos de tiempo en milisegundos, un formateador de tiempo y un controlador de estado para el estado con clave que es administrado por Flink. El estado administrado se controla periódicamente y se restaura automáticamente en caso de falla. El estado con clave se organiza por clave, lo que significa que una función mantendrá un valor por identificador y clave. En nuestro caso, la MonitorWorkTimefunción mantiene un Longvalor para cada tecla, es decir, para cada una licenseId. El shiftStartestado almacena la hora de inicio del turno de un conductor. El identificador de estado se inicializa en el open()método, que se llama una vez antes de que se procese el primer evento.

Ahora, echemos un vistazo al processElement()método.

@Anular

elemento de proceso de vacío público (

    Paseo en taxi,

    Contexto ctx,

    Coleccionista out) arroja Exception {

  // busca la hora de inicio del último turno

  Long startTs = shiftStart.value ();

  si (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // este es el primer viaje de un nuevo turno.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    long endTs = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ride.licenseId,

      "Se le permite aceptar nuevos pasajeros hasta" + formatter.print (endTs)));

    // registra el temporizador para limpiar el estado en 24 horas

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } else if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // este viaje comenzó después de que terminó el tiempo de trabajo permitido.

    // ¡Es una violación de las regulaciones!

    out.collect (Tuple2.of (ride.licenseId,

      “Este viaje violó las regulaciones de tiempo de trabajo”));

  }

}

El processElement()método se llama para cada TaxiRideevento. Primero, el método obtiene la hora de inicio del turno del conductor desde el identificador de estado. Si el estado no contiene una hora de inicio ( startTs == null) o si el último turno comenzó más de 20 horas ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) antes que el viaje actual, el viaje actual es el primer viaje de un nuevo turno. En cualquier caso, la función inicia un nuevo turno actualizando la hora de inicio del turno a la hora de inicio del viaje actual, emite un mensaje al conductor con la hora de finalización del nuevo turno y registra un temporizador para limpiar el estado en 24 horas.

Si el viaje actual no es el primer viaje de un nuevo turno, la función verifica si viola la regulación del tiempo de trabajo, es decir, si comenzó más de 12 horas después del inicio del turno actual del conductor. Si ese es el caso, la función emite un mensaje para informar al conductor sobre la infracción.

El processElement()método de la MonitorWorkTimefunción registra un temporizador para limpiar el estado 24 horas después del inicio de un turno. Es importante eliminar el estado que ya no se necesita para evitar el crecimiento del tamaño del estado debido al estado con fugas. Un temporizador se activa cuando la hora de la aplicación pasa la marca de tiempo del temporizador. En ese momento, onTimer()se llama al método. De manera similar al estado, los temporizadores se mantienen por clave y la función se coloca en el contexto de la clave asociada antes de onTimer()llamar al método. Por lo tanto, todo el acceso de estado se dirige a la clave que estaba activa cuando se registró el temporizador.

Echemos un vistazo al onTimer()método de MonitorWorkTime.

@Anular

public void onTimer (

    temporizadores largos,

    OnTimerContext ctx,

    Coleccionista out) arroja Exception {

  // eliminar el estado de cambio si no se ha iniciado un nuevo turno.

  Long startTs = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

El processElement()método registra temporizadores durante 24 horas después de que comenzó un turno para limpiar el estado que ya no es necesario. Limpiar el estado es la única lógica que onTimer()implementa el método. Cuando se activa un temporizador, verificamos si el conductor inició un nuevo turno mientras tanto, es decir, si cambió la hora de inicio del turno. Si ese no es el caso, borramos el estado de cambio para el conductor.