Apache Spark e Talend: prestazioni e ottimizzazione

Apache Spark e Talend: prestazioni e ottimizzazione

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.

Per prima cosa desidero ringraziare tutti i lettori dei miei due blog precedenti dedicati a Talend e Apache Spark.

Se non conoscete questa serie di blog e non avete letto i miei post precedenti, potete trovare questi articoli qui "Talend e Apache Spark: manuale tecnico di base" e qui "Configurazioni Talend e spark-submit: in cosa differiscono?" (seconda parte).

Nei primi due post di questa serie relativa ad Apache Spark vengono fornite informazioni generali sul funzionamento di Talend con Spark, sulle somiglianze tra i job Talend Spark e spark-submit e sulle opzioni di configurazione disponibili per i job Spark in Talend.

In questo blog ci occuperemo di prestazioni e ottimizzazione di Apache Spark. Si tratta di un argomento affrontato spesso da chiunque utilizzi Apache Spark, anche all'esterno di Talend. Quando si sviluppano e si eseguono i primi job Spark, quasi sempre emergono le seguenti domande:

  • Quanti executor è opportuno assegnare per il job Spark?
  • Quanta memoria occorre per ciascun executor?
  • Quanti core è opportuno utilizzare?
  • Perché alcuni job Spark richiedono ore per elaborare 10 GB di dati e come si risolve questo problema?

In questo blog passerò in rassegna ognuna di queste domande per fornire risposte e approfondimenti. Prima di procedere e addentraci nell'argomento, presenterò alcuni concetti basilari che saranno usati nell'intero blog:

Partizione: con partizione si intende una porzione di un set di dati distribuiti. Viene creata in base alla dimensione del blocco HDFS predefinita. Spark utilizza le partizioni per elaborare parallelamente i set di dati.

Attività: le attività rappresentano delle unità di lavoro eseguibili all'interno di un executor.

Core: il core è un'unità di elaborazione all'interno di una CPU che determina il numero di attività parallele in Spark che possono essere eseguite in un executor.

Executor: un processo avviato sui nodi del worker che esegue l'invio del job nella memoria o nel disco.

Application Master: ogni applicazione YARN crea un processo Application Master che ha il compito di richiedere le risorse al Resource Manager. Dopo l'assegnazione delle risorse, il processo collabora con i Node Manager per avviare i container necessari al loro interno.

Ottimizzazione di Spark

Per iniziare, esaminiamo come ottimizzare i job Apache Spark all'interno di Talend. Come accennato in precedenza, nel job Talend Spark si trova la scheda Spark Configuration (Configurazione Spark) nella quale è possibile impostare le proprietà di ottimizzazione. In Talend è sempre deselezionata per impostazione predefinita.

In questa sezione è possibile impostare la memoria e il numero di core che saranno utilizzati dall'Application Master e dagli executor e il numero di executor richiesti dal job. Quando si inseriscono i valori in questa sezione, la domanda principale che ci si pone è in che modo poter stabilire il numero di core o la memoria che occorrono all'Application Master o agli executor per garantire delle buone prestazioni. Cerchiamo una risposta a questa domanda.

Come scegliere il numero di core per un job Spark

A questo punto, prima di procedere, è necessario prendere in considerazione alcuni fattori, vale a dire:

  1. La dimensione dei set di dati
  2. L'intervallo di tempo entro il quale il job deve essere completato
  3. Le operazioni e le azioni eseguite dal job

Tenendo a mente questi fattori, è possibile iniziare a configurare il job per massimizzare le prestazioni. Per prima cosa occorre mettere a punto l'Application Master, per il quale è possibile lasciare i valori predefiniti. L'Application Master, infatti, si occupa soltanto dell'orchestrazione delle risorse e non esegue alcuna elaborazione, pertanto non necessita di valori di memoria elevati né di numerosi core.

Il passaggio successivo consiste nel configurare la memoria e i core per gli executor. In questa fase il dubbio principale riguarda il numero di executor e di core e la quantità di memoria da utilizzare. Per trovare una risposta supponiamo di avere un cluster Hadoop con sei nodi worker, ciascuno dei quali dispone di 32 core e di 120 GB di memoria. La prima cosa che probabilmente si è portati a pensare è che le prestazioni saranno tanto migliori quanto maggiore sarà il numero di attività simultanee che si possono avere per ciascun executor. Facendo delle ricerche su questo argomento, noteremo che le linee guida relative all'ottimizzazione delle prestazioni delle distribuzioni Hadoop, ad esempio quelle di Cloudera in questo link, indicano che più di cinque core per executor producono un I/O del file system HDFS non soddisfacente. Di conseguenza, il valore ottimale di core ai fini delle prestazioni è cinque.

Proviamo ora a vedere quanti executor occorre avviare. Lo possiamo stabilire facilmente in base al numero di core e di nodi. Come già detto, cinque è il numero ottimale di core da utilizzare per ogni executor. A questo punto, da ciascuno dei 32 core disponibili per ogni nodo dobbiamo rimuovere quelli che non possiamo utilizzare per i job, poiché servono al sistema operativo e ai daemon di Hadoop in esecuzione sul nodo. È lo strumento di gestione del cluster Hadoop a eseguire questa operazione, rendendo più semplice stabilire il numero di core per nodo che è possibile usare per i job Spark.

Dopo avere eseguito il calcolo, supponiamo che rimangano 30 core per nodo utilizzabili. Dato che abbiamo stabilito che cinque è il numero ottimale di core per executor, possiamo eseguire fino a sei executor per nodo. È semplice.

L'ultimo passaggio consiste nel calcolare la quantità di memoria utilizzabile. Tenendo conto delle specifiche dell'hardware sopra stabilite, vediamo che per ogni nodo sono disponibili 120 GB di memoria ma, come già accennato per i core, non è possibile utilizzare tutta la memoria per i job perché parte di essa serve al sistema operativo. Anche in questo caso è lo strumento di gestione del cluster Hadoop a stabilire per noi la quantità di memoria utilizzabile per i job. Se il sistema operativo e i daemon Hadoop richiedono 2 GB di memoria, ne rimangono 118 GB da utilizzare per i job Spark. Poiché abbiamo stabilito che è possibile utilizzare sei executor per nodo, facendo un rapido calcolo otteniamo circa 20 GB di memoria per executor. Questo dato, tuttavia, non è attendibile al 100%, poiché va calcolato anche il sovraccarico di memoria di ogni executor. Come affermato nel mio blog precedente, il valore predefinito del sovraccarico è di 384 MB. Pertanto, sottraendo questo valore ai 20 GB, la quantità massima di memoria assegnabile a ogni executor risulterà di circa 19 GB.

Allocazione dinamica o fissa delle risorse del cluster

I numeri citati in precedenza sono validi sia in caso di allocazione dinamica che fissa delle risorse del cluster di un job Spark. La differenza tra le due è l'allocazione dinamica. Con l'allocazione dinamica è possibile specificare il numero iniziale di executor utilizzati, il numero minimo di executor utilizzabili dal job in caso di carico di lavoro scarso e il numero massimo da usare quando è necessaria maggiore potenza di elaborazione. Benché sarebbe comodo poter utilizzare tutte le risorse del cluster per il job, è necessario condividere la potenza di elaborazione con altri job in esecuzione sul cluster. Di conseguenza, in base ai requisiti stabiliti in precedenza esaminando i fattori da considerare per regolare il job Talend Spark, è possibile determinare la percentuale dei valori massimi utilizzabili.

Dopo avere configurato il job, possiamo dunque procedere con la sua effettiva esecuzione. Supponiamo però di aver notato che il completamento del job Spark richiede molto tempo, nonostante sia stato configurato con le impostazioni massime definite in precedenza. In tal caso occorre tornare al job e controllare che siano utilizzate anche altre impostazioni, per assicurarsi di ottenere le massime prestazioni.

Prestazioni di Spark

Per prima cosa supponiamo di unire due tabelle nel job Spark. Uno dei fattori considerati in precedenza, prima di avviare l'ottimizzazione dei job Spark, è stata la dimensione dei set di dati. Esaminando la dimensione delle tabelle si evince che una è di 50 GB e l'altra è di 100 MB: a questo punto occorre valutare se all'interno dei componenti Talend vengano sfruttati i join replicati.

Join replicato

Il join replicato (altrimenti chiamato "join Map-Side") è ampiamente utilizzata per unire tabelle di grandi dimensioni con tabelle di piccole dimensioni e trasmettere i dati dalla tabella più piccola agli executor. In questo caso, dato che il set di dati più piccolo può essere contenuto nella memoria, è possibile utilizzare un join replicato per trasmetterlo a ogni executor e ottimizzare le prestazioni del job Spark.

Poiché i dati della tabella devono essere combinati al livello dell'executor con i dati secondari, trasmettendo il set di dati più piccolo a tutti gli executor si eviterà di inviare nella rete i dati della tabella più grande. In Spark si verificano numerosi problemi di prestazioni per via della riproduzione casuale di grandi quantità di dati nella rete, ma è possibile evitarli facilmente attivando l'opzione "Use replicated join" (Usa join replicato) nel componente tMap del job Talend, come illustrato qui di seguito. In questo modo i dati della tabella di riferimento saranno trasmessi a tutti gli executor.

 

Il passaggio successivo consiste nel verificare se il job contenga delle operazioni che eseguono dispendiosi ricalcoli.

Cache di Spark

Per spiegare nel dettaglio i ricalcoli, analizziamo un semplice esempio di caricamento di un file contenente dati relativi agli acquisti dei clienti. Da questi dati intendiamo acquisire alcune metriche:

  • il numero totale di clienti
  • il numero di prodotti acquistati

In questo caso, se non utilizziamo la cache di Spark, ogni operazione caricherà i dati, andando così a incidere sulle prestazioni dato che saranno eseguiti ricalcoli dispendiosi. Sapendo che più avanti nel job occorrerà utilizzare questo set di dati, è preferibile salvarlo nella cache di Spark e utilizzarlo in seguito, evitando così di doverlo caricare di nuovo.

Nei job Talend Spark è possibile farlo con i componenti tCacheIn e tCacheOut, disponibili nella tavolozza Apache Spark di Talend; questi consentono di utilizzare il meccanismo di salvataggio nella cache di Spark offrendo diverse opzioni.

Inoltre, è possibile specificare se si desidera salvare i dati solo nella cache del disco, avendo in questo caso anche la possibilità di serializzare i dati salvati nella cache per la memoria, il disco o entrambi. Infine, è disponibile l'opzione per replicare i dati salvati nella cache su altri due nodi. L'opzione più utilizzata, perché la più veloce, è la memoria senza serializzazione; ma se gli RDD salvati nella cache non possono essere contenuti nella memoria e non desideriamo espanderli nel disco, allora selezioniamo la serializzazione, che ridurrà lo spazio consumato dal set di dati, pur determinando costi aggiuntivi in termini di sovraccarico che influirà sulle prestazioni. Pertanto è utile valutare le opzioni e selezionare quella più adatta alle proprie esigenze.

 

Se, nonostante tutto ciò, i problemi di prestazioni persistono, occorrerà analizzare l'interfaccia Web della cronologia di Spark per vedere cosa sta accadendo. Come affermato nel mio blog precedente, nella sezione "Spark History" (Cronologia di Spark) della Configurazione di Spark in Talend è possibile attivare l'opzione di registrazione degli eventi in Spark, che consente di risolvere i problemi relativi ai job Spark conservando i registri al termine dei job e rendendoli disponibili tramite l'interfaccia Web della cronologia di Spark. È consigliabile attivare la registrazione degli eventi in Spark per i job Spark, poiché consente di individuare e risolvere con facilità gli eventuali problemi di prestazioni.

 

Avendo attivato la registrazione degli eventi in Spark, è possibile consultare l'interfaccia Web della cronologia di Spark dove, in relazione al numero di applicazione del job, sono disponibili le seguenti schede:

Nell'interfaccia utente Spark qui sopra illustrata, andiamo nella scheda "Stages" (Fasi), individuiamo quella che influisce sulla prestazioni del job e ne esaminiamo i dettagli per verificare se è presente un comportamento simile al seguente:

 

Osserviamo che, anche avendo allocato dieci executor, uno solo di essi sta elaborando la maggior parte dei dati, mentre gli altri sono inattivi. Perché accade ciò? Per rispondere a questa domanda occorre individuare il passaggio del job in cui si verifica il problema. Ad esempio, notiamo che ciò accade in quella parte del job Spark in cui i dati vengono letti da un file compresso. Poiché per impostazione predefinita i file di archivio non sono partizionati in fase di lettura, verrà creato un RDD con una partizione singola per ogni file di archivio letto, che determinerà il comportamento rilevato. Se tale file compresso è in un formato archivio divisibile, come BZIP2, e può essere partizionato in fase di lettura, nelle impostazioni avanzate di tFileInputDelimited possiamo attivare la proprietà "Set Minimum Partitions" (Imposta partizioni minime) e quindi, come punto di partenza, impostare come valore minimo tante partizioni quanti sono gli executor.

Ma in caso di un file di archivio come GZIP che non può essere partizionato in fase di lettura, è possibile partizionarlo esplicitamente utilizzando il componente tPartition. Come illustrato qui di seguito, si tratta di un componente che permette il partizionamento del file in modo da poter distribuire uniformemente il carico tra i vari executor.

Il partizionamento in fase di lettura può essere eseguito anche durante la lettura di un database utilizzando i componenti tJDBC, con le seguenti proprietà:

Il partizionamento può essere applicato solo in determinate situazioni, come possiamo vedere qui sopra. Se dovessimo accertare che il set di dati è asimmetrico sulle chiavi usate per il join, sarà necessario impiegare altri metodi. In che modo è possibile identificare l'asimmetria dei dati? Iniziamo esaminando il set di dati per partizione: i dati sono raggruppati tra le varie chiavi utilizzate per il join. Ecco un esempio di come apparirebbe un set di dati asimmetrico per partizione:

In questo caso, se non è possibile ripartizionare con una chiave diversa, occorrerà cercare altri modi per migliorare il job Talend Spark. Una tecnica comunemente usata è chiamata "salting" e prevede l'aggiunta di una chiave falsa alla chiave effettiva per uniformare la distribuzione dei dati per partizione. Questa operazione può essere effettuata tramite il componente tMap del job Spark, come nell'esempio seguente:

In questo esempio abbiamo aggiunto a livello del tMap la chiava falsa come "numeric random" e l'abbiamo unita insieme alla chiave effettiva al set di dati di riferimento, anch'esso già aggiunto alla chiava falsa. Il fatto che l'unione venga eseguita in base alla chiave effettiva e a quella falsa generata per la distribuzione, permetterà di evitare partizioni asimmetriche che possono influire sulle prestazioni durante il join dei set di dati in Spark.

Conclusione

Esistono numerose tecniche per migliorare le prestazioni e ottimizzare i job Talend Spark. Spero che i suggerimenti illustrati in questo blog siano stati utili. Ora provate a creare altri job Spark in Talend.

Riferimenti:

https://spark.apache.org/docs/latest/tuning.html

https://www.cloudera.com/documentation/enterprise/latest/topics/admin_spark_tuning1.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-tuning.html

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_spark-component-guide/content/ch_tuning-spark.html

Partecipa alla discussione

0 Comments

Scrivi una risposta

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *