Configurazioni Talend e spark-submit: in cosa differiscono?

Configurazioni Talend e spark-submit: in cosa differiscono?

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.

Nel mio precedente blog, "Talend e Apache Spark: manuale tecnico di base", vi ho illustrato le analogie tra i job Talend Spark e spark-submit. In questo nuovo blog, continuerò ad analizzare le configurazioni Talend Spark con Apache spark-submit. Per prima cosa, vedremo di fare un parallelo tra le opzioni della scheda Apache Spark Configuration (Configurazione Apache Spark) di un job Talend Spark e gli argomenti che è possibile trasmettere a un comando spark-submit, soffermandoci sull'utilizzo di entrambi. Iniziamo subito.

Differenze tra i comandi

Quando si esegue un job Apache Spark (come uno degli esempi Apache Spark offerti per impostazione predefinita nel cluster Hadoop per verificare che Spark funzioni come previsto) nel proprio ambiente, si usano i seguenti comandi:

export HADOOP_CONF_DIR=XXX./bin/spark-submit  --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client  --executor-memory 5G --num-executors 10 /path/to/examples.jar 1000

I due comandi evidenziati sopra definiscono la directory in cui il job spark-submit leggerà i file di configurazione del cluster. Quindi, procediamo a inviare il nostro comando spark-submit per l'esecuzione di Spark all'interno di un cluster YARN in modalità client, utilizzando 10 executor con 5 G di memoria ciascuno per eseguire il job Spark di esempio.

Ora osserviamo come il job Spark di esempio viene eseguito in Talend. Quando eseguiamo un job Spark di esempio (come quello sopra) in Talend, tutte le informazioni di configurazione Spark vengono inserite nella seguente scheda, all'interno della scheda di esecuzione:

Figura 1

Sorgono spontanee alcune domande. Come si rapportano le informazioni che inseriamo in Talend a quanto viene inserito nel terminale per eseguire un job Spark? Come possiamo sapere quanti executor e quanta memoria sono necessari? E come facciamo a risolvere eventuali problemi? Risponderemo a tutte queste domande in questo blog.

Ma prima di procedere, vorrei presentare alcune opzioni spark-submit che verranno utilizzate nel blog. In base alla documentazione Apache Spark, queste sono alcune delle opzioni più comuni utilizzabili in uno script spark-submit:

--class: si tratta del punto di ingresso principale per l'applicazione Spark.

--master: questa opzione consente di specificare se lo Spark master è un'applicazione Spark autonoma o se si utilizza Spark su YARN.

--deploy-mode: come accennato nei miei blog precedenti, questa opzione si riferisce alle due diverse modalità YARN disponibili e indica come verrà implementato il driver Spark.

--conf: questa opzione consente di trasmettere ulteriori impostazioni di configurazione Spark che il job dovrà utilizzare, come ad esempio: "spark.executor.userClassPathFirst=true".

--application-jar: si riferisce al percorso in cui è stato inserito il codice Spark compilato che Apache Spark andrà ad eseguire.

--application-arguments: questa opzione consente di trasmettere eventuali argomenti specifici del codice Spark utilizzato.

Ora vediamo come le opzioni elencate sopra vengono utilizzate in un job Talend Spark. Come potrete notare, nella scheda "Spark Configuration", sotto la scheda di esecuzione, le varie opzioni che è possibile impostare sono logicamente ordinate nelle seguenti categorie:

  1. Cluster Version (Versione cluster)
  2. Configuration (Configurazione)
  3. Authentication (Autenticazione)
  4. Tuning (Regolazione)
  5. Spark History (Cronologia Spark)

Cluster Version (Versione cluster)

Iniziamo con una delle prime opzioni disponibili per il job Talend nella categoria "Cluster Version". Si tratta dell'opzione "Spark Mode" (Modalità Spark).

Questa opzione consente di specificare se lo Spark master è su YARN o se si utilizza un'applicazione Spark autonoma. Questa opzione corrisponde all'opzione "--deploy-mode" descritta in precedenza quando abbiamo affrontato le opzioni spark-submit, così come all'opzione "--master". Ad esempio, se si seleziona la modalità Spark "YARN Client" (Client YARN) in Talend, sarà come specificare "--master yarn --deploy-mode client" in spark-submit. Se nel menu a discesa viene selezionata la modalità "Standalone", Talend chiederà di inserire i dati dell'URL dello Spark master, esattamente come accadrebbe con spark-submit. Questo corrisponderà alla trasmissione del seguente argomento in spark-submit: "--master spark://127.0.0.1:7077".

Configuration (Configurazione)

In Talend, la categoria "Configuration" richiede le seguenti informazioni:

Nella prima serie di caselle di controllo, Talend chiede di inserire informazioni quali resource manager, indirizzo scheduler del resource manager, indirizzo jobhistory e directory di staging.

Quando si utilizza spark-submit tutte queste informazioni vengono inserite nel job Spark tramite HADOOP_CONF_DIR .Possiamo configurare questo valore come una variabile di ambiente prima di eseguire lo script spark-submit oppure come ambiente permanente in /etc/environment o /etc/profile. Tutte queste variabili di ambiente vengono anche definite in uno script shell dell'ambiente che viene ottenuto dai job Spark eseguiti tramite spark-submit. Il nome di tale file è spark-env.sh e si trova sempre nella directory "/etc/spark/conf" degli host Spark. Ecco un esempio di come può apparire questo file di configurazione nel cluster:

La casella di controllo successiva chiede se si desidera definire la directory principale Hadoop, che talvolta è richiesta per i job Spark. Nei job spark-submit, queste informazioni vengono configurate nello stesso modo, ma il nome della variabile di ambiente è HADOOP_HOME. Nei job Talend Spark, le caselle di controllo si comportano come il file "spark-env.sh" dello script spark-submit, che acquisisce tali valori al runtime del job Spark.

L'ultima opzione della categoria "Configuration" per la configurazione di job Talend Spark definisce il nome host o l'indirizzo IP del driver Spark. Questa opzione risulta molto utile se il sistema da cui viene eseguito il job Spark impiega indirizzi IP interni ed esterni o se vi sono problemi con la risoluzione del nome host che potrebbero impedire allo Spark master e agli executor di riconnettersi al driver Spark.

Per impostazione predefinita, se questa opzione non viene specificata, il sistema tenterà di utilizzare il nome host locale e di risolvere il relativo indirizzo IP. Come accennato nel blog precedente, Talend impiega la modalità client YARN – di conseguenza il driver Spark viene sempre eseguito sul sistema da cui viene avviato il job Spark. Per fare un parallelo con le opzioni disponibili in spark-submit, questa impostazione verrebbe definita utilizzando l'opzione "--conf", quindi fornendo la seguente coppia chiave/valore: "spark.driver.host=127.0.0.1". E qui si concludono le opzioni di configurazione disponibili nel sottomenu della scheda Spark Configuration.

Authentication (Autenticazione)

Nella categoria dedicata all'autenticazione possiamo selezionare il metodo di autenticazione utilizzato dal cluster Hadoop:

Se non viene effettuata alcuna selezione in questa categoria, il job darà per scontato l'utilizzo della modalità di autenticazione più semplice da parte del cluster e tenterà di connettersi al cluster Hadoop usando il nome utente specificato qui. In spark-submit, questa informazione verrebbe specificata nella configurazione Spark delle applicazioni che stiamo inviando.

Se procediamo e selezioniamo l'opzione "Use Kerberos authentication" (Usa autenticazione Kerberos), ci verrà chiesto di aggiungere le seguenti informazioni:

I primi due campi contengono i Service Principal Name (SPN) utilizzati dal Resource Manager e dal servizio Job History (Cronologia job). Se l'opzione per l'utilizzo di un file "keytab" non viene selezionata, quando viene eseguito, il job cercherà la cache dei ticket Kerberos sul sistema su cui viene eseguito e cercherà ticket Kerberos validi da utilizzare nella cache specifica dell'utente che ha avviato il job.

Se l'opzione "keytab" è selezionata, non sarà necessario specificare il file keytab da utilizzare insieme al Principal Name dell'utente per il quale il file viene emesso. Ecco perché, quando il job viene avviato, genera un ticket Kerberos basato sul file keytab per il Principal Name che verrà usato per il job. Nel caso di spark-submit, nell'applicazione dovrà essere inserita la configurazione Spark definita nel codice utilizzato da Kerberos per l'autenticazione. Prima di eseguire il comando spark-submit, dovrà essere eseguito il comando kinit Kerberos per generare un ticket, se non si utilizza un file keytab, oppure, se si utilizza un file keytab, è possibile eseguire il comando kinit con i contrassegni necessari per utilizzare un keytab per la generazione di ticket, o ancora, è possibile specificare l'accesso da keytab all'interno del codice dell'applicazione Spark.

Tuning (Regolazione)

Passiamo alla categoria "Tuning" che in Talend offre l'opzione "Set tuning properties" (Imposta proprietà di regolazione), deselezionata per impostazione predefinita. Se invece "Set tuning properties" è selezionata, vengono automaticamente offerte le seguenti opzioni:

Vediamo a cosa corrispondono tutte queste opzioni in spark-submit.

La prima opzione, "Set application master tuning properties" (Imposta proprietà di regolazione master applicazione), consente di configurare la quantità di memoria e il numero di core che YARN Application Master deve utilizzare.

Lo scopo dell'istanza di YARN Application Master è di negoziare le risorse dal Resource Manager, quindi di comunicare con i Node Manager per monitorare l'impiego delle risorse ed eseguire i container. Se questa opzione non viene selezionata, a YARN Application Master verranno assegnati 512 m e 1 core, per impostazione predefinita. Per selezionare questa stessa impostazione in spark-submit, dovremmo usare l'opzione "--conf", quindi trasmettere le seguenti due coppie chiave/valore: "spark.yarn.am.memory=512m,spark.yarn.am.cores=1".

Possiamo anche configurare una serie di impostazioni aggiuntive, incluso il numero di executor, la quantità di memoria per ciascun executor, il numero di core per executor e la quantità di memoria di overhead che può essere assegnata a ciascun executor tramite le successive opzioni.

I valori predefiniti sono 1 g di memoria per executor e 1 core per executor, mentre la memoria di overhead per executor sarà, per impostazione predefinita, pari al 10% della memoria utilizzata dall'executor, con un minimo pari a 384 m e a 2 executor richiesti. Per trasmettere questa stessa opzione in spark-submit possiamo procedere in due diversi modi. Uno consiste nell'utilizzare, come nell'esempio, il comando spark-submit di cui sopra "--executor-memory 5G --num-executors 10"; in alternativa, possiamo usare l'opzione "--conf" con le seguenti coppie chiave/valore: "spark.executor.instances=2, spark.executor.cores=1, spark.executor.memory=2, spark.yarn.executor.memoryOverhead=384m".

L'ultima opzione disponibile si riferisce all'allocazione delle risorse YARN:

È possibile selezionare tra Auto (Automatica), Fixed (Fissa) e Dynamic (Dinamica), ma qual è il loro significato? Spark consente di selezionare come devono essere assegnati gli executor.

Se si lascia l'impostazione Auto, l'opzione per definire il numero di executor scompare, in quanto verrà utilizzato il valore predefinito assegnato da YARN che, come abbiamo visto, corrisponde a 2 executor. Se si seleziona Fixed, è possibile specificare la quantità di executor che devono essere richiesti dal job. L'opzione Dynamic permette di usare il meccanismo fornito da Spark per regolare in modo dinamico l'assegnazione degli executor al job Spark, in base alle esigenze in fase di runtime. Ciò significa che l'applicazione, durante l'esecuzione, potrà richiedere executor aggiuntivi se necessario, per rilasciarli a YARN quando non servono più. Quando questa opzione viene selezionata, fornisce la seguente configurazione:

Ora è possibile selezionare quanti executor richiedere inizialmente a YARN, quindi specificare il numero minimo e massimo di executor per il job, in base al carico di lavoro necessario durante l'esecuzione del job da parte di Spark. Per configurare l'opzione dinamica in spark-submit è possibile usare l'opzione "--conf", quindi specificare le seguenti coppie chiave/valore: "spark.dynamicAllocation.enabled=true, spark.shuffle.service.enabled=true". In base alla documentazione Spark (https://spark.apache.org/docs/1.6.1/job-scheduling.html#dynamic-resource-allocation target="_blank") queste due proprietà sono richieste per l'uso di questa funzione.

Proseguendo nella categoria "Tuning" della scheda "Spark Configuration" di Talend, troviamo la casella di controllo "Set Web UI port" (Imposta porta interfaccia utente web). Se selezionata, consente di specificare una porta; l'impostazione predefinita è "4040". Se si attiva questa opzione, quando l'applicazione Spark è in esecuzione, il driver Spark avvia un'interfaccia utente web che può essere utilizzata per monitorare il job Spark in esecuzione. Se l'opzione non viene selezionata, verrà utilizzata la porta predefinita indicata sopra e, se non è disponibile, l'applicazione Spark tenterà con quella successiva fino a quando non ne trova una aperta. L'opzione viene in genere utilizzata se si sa già che la porta 4040 non è disponibile sul sistema da cui viene eseguito il job Spark e si desidera specificare la porta da utilizzare, anziché lasciare che l'applicazione Spark tenti di trovare una porta aperta. Per configurare questa stessa opzione in spark-submit è possibile usare l'opzione "--conf", quindi specificare la seguente coppia chiave/valore: "spark.ui.port=4041".

La successiva opzione disponibile è "Broadcast Factory" (Fabbrica di trasmissione) che, come possiamo vedere, offre a sua volta diverse opzioni:

A cosa serve "Broadcast Factory"? Nelle applicazioni Spark, questa opzione permette di tramettere variabili agli executor del cluster. Grazie a questa opzione, le variabili possono essere distribuite in modo rapido ed efficiente, evitando di affidare l'intera operazione a un singolo nodo. Come abbiamo accennato, è possibile scegliere tra tre diverse opzioni. Con l'opzione Auto vengono utilizzati i valori predefiniti. La seconda e terza opzione permettono di scegliere tra Torrent e HTTP come metodo di trasmissione. In spark-submit la stessa operazione viene eseguita dall'opzione "--conf" utilizzata con la coppia chiave/valore "spark.broadcast.factory=org.apache.spark.broadcast.TorrentBroadcastFactory", se non si desidera utilizzare l'impostazione predefinita, che generalmente corrisponde a Torrent.

L'ultima opzione della categoria "Tuning" permette di personalizzare il serializzatore Spark da usare:

La serializzazione è molto importante in Spark in quanto, come descritto nella documentazione (https://spark.apache.org/docs/latest/tuning.html#data-serialization), consente di serializzare i dati tra gli executor per migliorare le prestazioni in un ambiente distribuito. Dal momento che, per impostazione predefinita, questa opzione non è selezionata, Talend la imposterà su "Kryo", che è considerato tra i serializzatori più efficienti. Per utilizzare la stessa impostazione in spark-submit, è necessario specificare l'opzione "--conf" in associazione alla coppia chiave/valore "spark.serializer=org.apache.spark.serializer.KryoSerializer". Se questa opzione non viene specificata in spark-submit, verrà utilizzato il serializzatore java, mentre se viene utilizzato il server Spark SQL Thrift, verrà utilizzato il serializzatore Kryo per impostazione predefinita.

Spark History (Cronologia Spark)

Passiamo ora all'ultima categoria, "Spark History". Se abilitiamo la registrazione Spark, ci vengono proposte le seguenti opzioni:

Se si abilita la registrazione eventi, è possibile specificare una directory in HDFS dove i file della cronologia del job possono essere letti dal server della cronologia Spark, oltre a specificare l'indirizzo del server della cronologia. In spark-submit, per abilitare e configurare rispettivamente questa opzione è necessario inviare le seguenti coppie chiave/valore insieme all'opzione "--conf": "spark.eventLog.enabled=true,spark.eventLog.dir=hdfs://namenode_host:namenode_port/user/spark/applicationHistory,spark.yarn.historyServer.address=http://spark_history_server:history_port".

Configurazione aggiuntiva

Ora che abbiamo passato in rassegna le varie categorie della scheda "Spark Configuration", possiamo vedere che sono rimaste altre tre opzioni da utilizzare. La prima è "Spark "scratch" directory" (Directory dei file temporanei Spark). Questa opzione consente di specificare la directory dei file temporanei da utilizzare sul disco locale del sistema in cui viene avviato il job Spark, mentre l'applicazione è in esecuzione. In spark-submit dovremmo utilizzare l'opzione "--conf" e digitare la sintassi "spark.local.dir=/tmp". Se non si specifica nulla, verrà usata la directory /tmp predefinita.

L'opzione successiva consente di attivare i checkpoint Spark. In questo modo è possibile riprendere il job Spark da uno specifico punto nel tempo, in caso di errore. Se l'opzione viene attivata, offre la possibilità di specificare una directory nel filesystem locale o in HDFS dove salvare lo stato di avanzamento del job. Per abilitare la stessa opzione in spark-submit, è necessario, come indicato anche nella documentazione Spark, procedere all'interno del codice Spark. Per un esempio, potete consultare la documentazione Spark a questa pagina (https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#checkpointing).

L'ultima opzione corrisponde ad "Advanced Properties" (Proprietà avanzate). Questa opzione consente di aggiungere proprietà Spark da trasmettere alla nostra applicazione sotto forma di coppia chiave/valore. La stessa coppia chiave/valore viene utilizzata in spark-submit, in questo caso in combinazione all'opzione "--conf".

Osservando più attentamente il cluster Hadoop e uno dei nodi del gateway Spark, è possibile notare che molte delle selezioni predefinite citate sopra sono già specificate all'interno di un file denominato spark-defaults.conf che viene utilizzato quando si esegue spark-submit. Il file si trova nel percorso "/etc/spark/conf". Se provate ad aprire il file vedrete che contiene molte delle proprietà di cui abbiamo parlato. È comunque possibile ignorarle, come accennato prima, trasmettendole come opzioni in spark-submit. Ecco un esempio:

Conclusione

Talend offre tutte le opzioni necessarie per configurare l'applicazione Spark e, grazie alle caselle di controllo e ai menu a discesa, specificare le opzioni da utilizzare e le impostazioni predefinite da conservare è più semplice che mai. Vi invito a passare in rassegna le varie impostazioni che è possibile usare nei job Talend Spark e a provare quanto è facile configurarle e ottimizzarle per l'ambiente utilizzato.

Riferimenti

https://spark.apache.org/docs/latest/submitting-applications.html

https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#checkpointing

https://spark.apache.org/docs/latest/tuning.html#data-serialization

https://spark.apache.org/docs/1.6.1/job-scheduling.html#dynamic-resource-allocation

Partecipa alla discussione

0 Comments

Scrivi una risposta

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *