Come sviluppare un job di elaborazione dati utilizzando Apache Beam – Pipeline di streaming

Come sviluppare un job di elaborazione dati utilizzando Apache Beam – Pipeline di streaming

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

Nell'ultimo post del blog abbiamo spiegato come sviluppare dei job di elaborazione dati con Apache Beam. Questa volta ci occuperemo di uno degli aspetti più difficili del moderno universo dei big data, ovvero l'elaborazione dei dati in streaming.

La differenza principale tra batch e streaming sta nel tipo di sorgente dei dati di input. Se il set di dati da elaborare è limitato (anche se di ingenti dimensioni) e non viene aggiornato durante l'elaborazione, allora è opportuno utilizzare l'elaborazione in batch. In questo caso la sorgente di input può essere un file, una tabella di database, un oggetto di uno storage di oggetti e così via. Ci tengo a sottolineare ancora una volta che, con l'elaborazione in batch, si presume che i dati non siano modificabili durante tutto il tempo dell'elaborazione e che il numero dei record di input sia costante. Ma perché prestare attenzione a questo dettaglio? Perché anche con i file è possibile avere un flusso di dati illimitati quando i file vengono costantemente aggiunti o modificati. In questo caso, dobbiamo applicare un approccio in streaming per lavorare con i dati. Se sappiamo che i dati sono limitati e immutabili, allora sviluppiamo una pipeline di elaborazione in batch.

Le cose si complicano quando il set di dati è illimitato (costantemente in arrivo) e/o mutabile. Tra questo tipo di sorgenti rientrano, ad esempio, i sistemi di messaggistica (come Apache Kafka), i nuovi file in una directory (registri del server Web) o alcuni altri sistemi di raccolta dati in tempo reale (come i sensori IoT). Tutte queste sorgenti hanno in comune il fatto che occorre aspettare i dati nuovi. Certamente è possibile suddividere i dati in batch (per ora o per dimensione dei dati) ed elaborare ogni porzione in batch, ma sarebbe abbastanza difficile applicare le stesse funzioni a tutti i set di dati utilizzati e creare l'intera pipeline. Fortunatamente esistono diversi motori di streaming che consentono di eseguire facilmente questo tipo di elaborazione dati: Apache Spark, Apache Flink, Apache Apex, Google DataFlow. Sono tutti supportati da Apache Beam ed è possibile eseguire la stessa pipeline su motori diversi senza modificare in alcun modo il codice. In aggiunta, è possibile utilizzare la stessa pipeline sia in modalità batch che in streaming con minime modifiche: è sufficiente impostare in modo opportuno la sorgente di input e voilà, il gioco è fatto. È davvero semplice. Una soluzione che sognavo un po' di tempo fa, quando dovevo riscrivere i miei job in batch per trasformarli in job in streaming.

Ci siamo dilungati abbastanza sulla teoria, ora è tempo di fare un esempio pratico e scrivere il nostro primo codice di streaming. Leggeremo dei dati da Kafka (sorgente illimitata), eseguiremo una semplice elaborazione dati e scriveremo i risultati di nuovo in Kafka.

Supponiamo di avere un flusso illimitato di coordinate geografiche (X e Y) di alcuni oggetti su una mappa (nel nostro esempio supponiamo che gli oggetti siano le automobili); il flusso arriva in tempo reale e noi vogliamo selezionare solo gli oggetti posizionati all'interno di una determinata area. In altre parole, dobbiamo utilizzare i dati di testo provenienti dall'argomento Kafka, analizzarli, filtrarli in base ai limiti specificati e riscriverli in un altro argomento Kafka. Vediamo come farlo con l'aiuto di Apache Beam.

Ogni messaggio Kafka contiene dati di testo nel seguente formato:
id,x,y

dove:
id - l'ID univoco dell'oggetto,
x e y - le coordinate sulla mappa (integer).

È necessario fare attenzione al formato e ignorare i record con formato non valido.

Creazione di una pipeline

Seguendo una procedura molto simile a quella illustrata nel precedente post, in cui abbiamo eseguito un'elaborazione in batch, creiamo una pipeline:

Pipeline pipeline = Pipeline.create(options);

Possiamo elaborare l'oggetto Options per trasferire nella pipeline le opzioni della riga di comando. Per maggiori dettagli è possibile consultare l'esempio completo su Github.

A questo punto dobbiamo leggere i dati dall'argomento di input Kafka. Come affermato in precedenza, Apache Beam fornisce già un certo numero di connettori IO, tra cui KafkaIO. Quindi creiamo un nuovo PTransform illimitato, che utilizza i messaggi in arrivo da uno specifico argomento Kafka e li propaga alla fase successiva:

pipeline.apply(
    KafkaIO.<Long, String>read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))

Per impostazione predefinita, KafkaIO incapsula tutti i messaggi utilizzati nell'oggetto KafkaRecord. Tuttavia, la trasformazione successiva recupera solo un payload (valori stringa) dal nuovo oggetto DoFn creato:

.apply(
    ParDo.of(
        new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord<Long, String> record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)

Dopo questa fase, procediamo a filtrare i record (vedete l'attività iniziale citata in precedenza), non senza avere prima analizzato il valore di stringa in base al formato definito. In questo modo potrà essere incapsulato in un oggetto funzionale che sarà utilizzato dalla trasformazione interna Filter di Beam.

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)

Quindi prepariamo messaggi filtrati per riscriverli in Kafka creando una nuova coppia di chiave/valori usando una classe KV interna a Beam, utilizzabile su diversi connettori IO, tra i quali anche KafkaIO.

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn<String, KV<String, String>>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)

La trasformazione finale serve per scrivere i messaggi in Kafka; a tale scopo, è sufficiente utilizzare KafkaIO.write() , ovvero l'implementazione del sink. Così come per la lettura, occorre configurare questa trasformazione con alcune opzioni necessarie, ad esempio i server bootstrap di Kafka, il nome dell'argomento di output e i serializzatori di chiave/valore.

.apply(
    "WriteToKafka",
    KafkaIO.<String, String>write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);

Infine, eseguiamo la pipeline come di consueto:

pipeline.run();

Questa volta può apparire leggermente più complicato rispetto a quanto descritto nel blog precedente ma, come si può facilmente notare, non abbiamo fatto nulla di specifico per rendere la nostra pipeline compatibile per lo streaming. A questo pensa l'implementazione del modello di dati di Apache Beam, che semplifica notevolmente il passaggio dall'elaborazione in batch a quella in streaming per gli utenti Beam.

Compilazione ed esecuzione di una pipeline

Aggiungiamo le dipendenze necessarie per rendere possibile l'uso di Beam KafkaIO:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.1.0</version>
</dependency>

A questo punto compiliamo un jar ed eseguiamolo con DirectRunner per testarne il funzionamento:

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

Se necessario possiamo aggiungere altri argomenti utilizzati nella pipeline servendoci dell'opzione “exec.args”. Prima di eseguire la pipeline Beam, assicuriamoci anche che i server Kafka siano disponibili e che siano stati specificati correttamente. Infine, il comando Maven lancerà una pipeline e continuerà ad eseguirla finché non verrà terminata manualmente (facoltativamente è possibile specificare il tempo di esecuzione massimo). Questo significa che i dati saranno elaborati di continuo, in modalità streaming.

Come di consueto, tutto il codice di questo esempio è pubblicato sul repository GitHub.

Buono streaming!

Partecipa alla discussione

0 Comments

Scrivi una risposta

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *