Come sviluppare un job di elaborazione dati utilizzando Apache Beam

Come sviluppare un job di elaborazione dati utilizzando Apache Beam

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

Questo post è il primo di una serie dedicata ad Apache Beam.

Conoscete Apache Beam? Se non lo conoscete, tranquilli: nell'universo dell'elaborazione dati, Apache Beam, ovvero uno degli ultimi progetti sviluppati da Apache Software Foundation e rilasciato per la prima volta nel giugno 2016, è uno strumento ancora relativamente nuovo. In effetti, solo di recente ho iniziato a lavorare assiduamente con Apache Beam, imparando ad apprezzarne tutte le sue funzionalità.

Apache Beam è un modello di programmazione unificata che offre un modo semplice per implementare job di elaborazione dati in batch e in streaming ed eseguirli su un apposito motore utilizzando un set di vari IO. Sembra promettente, ma non avete ancora le idee chiare? Ecco perché ho deciso di pubblicare un blog con una serie di post dedicati ad Apache Beam. In questo post, e nei successivi, illustrerò degli esempi concreti presentando diversi casi d'uso di job di elaborazione dati con Apache Beam.

L'argomento di oggi è l'elaborazione in batch. Ipotizziamo, ad esempio, che lavorate per un rivenditore di auto e volete analizzare le vendite di automobili nel corso di un determinato periodo di tempo, ad esempio quante auto di ogni marca sono state vendute. Questo significa che il set di dati è limitato (quantità limitata di dati) e non sarà aggiornato (le vendite sono avvenute in passato). In tal caso è possibile utilizzare l'elaborazione in batch per analizzare i dati.

Come dati di input sono disponibili i registri di testo delle auto vendute nel seguente formato:

id,brand_name,model_name,sales_number

Ad esempio:
1,Toyota,Prius,3
2,Nissan,Sentra,2
3,Ford,Fusion,4

Prima di avviare l'implementazione della nostra prima applicazione Beam, dobbiamo comprendere alcune nozioni fondamentali che in seguito saranno utilizzate di continuo. Sono tre i concetti principali su cui si basa Beam: Pipeline, PCollection, e PTransform.

  • Pipeline incapsula il flusso di lavoro di tutte le attività di elaborazione dati, dall'inizio alla fine.
  • PCollection è un'astrazione di un set di dati distribuito, che Beam utilizza per trasferire i dati tra i processi PTransform.
  • PTransform è un processo che opera con i dati di input (input PCollection) e produce dati di output (output PCollection). Di solito il primo e l'ultimo PTransform rappresentano un modo per specificare i dati di input/output che possono essere limitati (elaborazione in batch) o illimitati (elaborazione in streaming).

Per semplificare le cose, possiamo considerare l'elemento Pipeline come un DAG (grafo aciclico diretto) che rappresenta l'intero flusso di lavoro, i processi PTransform come nodi (che trasformano i dati) e PCollection come archi del grafo. Maggiori informazioni sono disponibili nella Guida alla programmazione di Beam.

Torniamo al nostro esempio e proviamo a implementare la prima pipeline che elaborerà il set di dati fornito.

Creazione di una pipeline

Per prima cosa creiamo la nuova pipeline:

Pipeline pipeline = Pipeline.create();

Quindi creiamo un nuovo PTransform usando il metodo pipeline.apply(), che leggerà i dati dal file di testo e creerà un nuovo PCollection di stringhe. Per farlo utilizziamo uno degli elementi IO già implementati in Beam, ovvero TextIO, che consente di leggere i file di testo e di scrivere in essi procedendo linea per linea. TextIO offre numerose altre funzioni, come la scrittura con diversi file system, il supporto dei pattern dei file e lo streaming dei file. Per maggiori informazioni consultate la documentazione di Apache Beam.

apply(TextIO.read().from("/path/to/input/file"))

L'output di questo PTransform è una nuova istanza di PCollection in cui ogni voce della collection è una riga di testo del file di input.

Poiché il nostro obiettivo è ottenere il numero totale di vendite per marca, dobbiamo raggrupparle di conseguenza. Quindi procederemo a suddividere ogni riga e a creare una coppia chiave/valore, in cui la chiave è il nome di una marca e il valore è il numero di vendite. Vale la pena ricordare che l'output PCollection fornito da un precedente PTransform sarà l'input PCollection di questo.

In questa fase utilizziamo il PTransform interno di Beam, chiamato MapElements, per creare una nuova coppia di chiave/valori per ogni voce di input utilizzando l'implementazione dell'interfaccia SimpleFunction.

Raggruppiamo quindi il numero di vendite per marca usando un'altra trasformazione di Beam, GroupByKey. L'output risultante è una PCollection di chiave/valori, in cui la chiave è il nome della marca e valore è la collection iterabile di vendite di quella marca.

.apply(GroupByKey.<String, Integer>create())

 

A questo punto siamo pronti per eseguire la somma di tutte le cifre delle vendite di auto per marca utilizzando la nostra implementazione della trasformazione ParDo:

Per finalizzare la pipeline, applichiamo un'altra trasformazione IO, che ci permetterà di ottenere la PCollection delle stringhe e scrivere queste nel file di testo:

.apply(TextIO.write().to("/path/to/output/dir").withoutSharding());

In ultimo non ci rimane che eseguire la pipeline creata:

pipeline.run();

Sembra abbastanza semplice, no? È proprio questa la forza di Apache Beam: consente di creare complicate pipeline di elaborazione dati con una quantità minima di codice.

Chi ha confidenza con Hadoop potrebbe avere notato che questa pipeline ricorda qualcosa:

  • Legge e analizza i dati di testo linea per linea creando nuove coppie chiave/valori (Map)
  • Poi raggruppa per chiave queste coppie chiave/valori (GroupBy)
  • Infine esegue l'iterazione su tutti i valori di una stessa chiave applicando una funzione dell'utente (Reduce)

Sì, è vero: questa semplice pipeline può essere eseguita con un classico job MapReduce. Ma è innegabile che in Beam risulta più semplice e chiaro (rispetto a Java) e la situazione non si complica neanche se decidiamo di estendere le pipeline aggiungendo un'altra trasformazione.

Compilazione ed esecuzione di una pipeline

Come accennato in precedenza, una pipeline Beam può essere eseguita su diversi strumenti di esecuzione (motori di elaborazione):

  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow

A tal fine, è sufficiente aggiungere una dipendenza corrispondente alla configurazione del progetto Maven o Gradle. Il vantaggio è che non è necessario adattare o riscrivere il codice della pipeline per eseguirlo su ogni strumento di esecuzione. Ancor meglio: non è necessario ricompilare i jar se erano già state incluse in precedenza le dipendenze di tutti gli strumenti di esecuzione, perché sarà sufficiente scegliere lo strumento di esecuzione da utilizzare.

Direct Runner è uno strumento di esecuzione locale utilizzato di solito per testare la pipeline. Usando Java si deve specificare la dipendenza da Direct Runner nel proprio pom.xml.


<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.3.0</version>
   <scope>runtime</scope>
</dependency>


In seguito si esegue la compilazione del progetto:
# mvn clean package

E si esegue la pipeline su Direct Runner:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner"

Ad esempio, se il file di input contiene i seguenti dati:
# cat /tmp/beam/cars_sales_log
1,Toyota,Prius,3
2,Nissan,Sentra,2
1,Toyota,Yaris,4
3,Ford,Fusion,5
3,Ford,Kuga,3

Il risultato finale sarà il seguente:
# cat /tmp/beam/cars_sales_report
Toyota: 7
Nissan: 2
Ford: 8

L'elenco di tutti gli strumenti di esecuzione supportati, con le relative istruzioni di utilizzo, è disponibile in questa pagina.

Infine, tutto il codice di questo esempio è pubblicato in questo repository GitHub: https://github.com/aromanenko-dev/beam-tutorial.

Nella parte successiva di questa serie di post, illustrerò come eseguire l'elaborazione dei dati di streaming in Beam. Prenderò un altro esempio di attività di analisi dei dati utilizzando dati illimitati e vedremo quali funzionalità ci offre Beam in questo caso.

Partecipa alla discussione

0 Comments

Scrivi una risposta

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *