logo

PySparkSQL

Apache Spark è il software di maggior successo della Apache Software Foundation ed è progettato per l'elaborazione veloce. Diversi settori utilizzano Apache Spark per trovare le proprie soluzioni. PySpark SQL è un modulo di Spark che integra l'elaborazione relazionale con l'API di programmazione funzionale di Spark. Possiamo estrarre i dati utilizzando un linguaggio di query SQL. Possiamo usare le query come il linguaggio SQL.

Se hai una conoscenza di base di RDBMS, PySpark SQL sarà facile da usare e potrai estendere i limiti dell'elaborazione dati relazionale tradizionale. Spark supporta anche Hive Query Language, ma esistono limitazioni del database Hive. Spark SQL è stato sviluppato per rimuovere gli inconvenienti del database Hive. Diamo un'occhiata ai seguenti svantaggi di Hive:

Svantaggi di Hive

  • Non può riprendere l'elaborazione, il che significa che se l'esecuzione fallisce nel bel mezzo di un flusso di lavoro, non è possibile riprendere da dove si era bloccata.
  • Non possiamo eliminare i database crittografati in cascata quando il cestino è abilitato. Porta all'errore di esecuzione. Per eliminare questo tipo di database, gli utenti devono utilizzare l'opzione Elimina.
  • Le query ad hoc vengono eseguite utilizzando MapReduce, che viene lanciato da Hive ma quando analizziamo il database di medie dimensioni, ritarda le prestazioni.
  • Hive non supporta l'operazione di aggiornamento o eliminazione.
  • È limitato al supporto delle sottoquery.

Questi inconvenienti sono le ragioni per sviluppare Apache SQL.

PySpark SQL Breve introduzione

PySpark supporta l'elaborazione relazionale integrata con la programmazione funzionale di Spark. Fornisce supporto per le varie origini dati per consentire di intrecciare query SQL con trasformazioni di codice, risultando così uno strumento molto potente.

PySpark SQL stabilisce la connessione tra RDD e tabella relazionale. Fornisce un'integrazione molto più stretta tra l'elaborazione relazionale e procedurale tramite l'API Dataframe dichiarativa, che è integrata con il codice Spark.

Utilizzando SQL, può essere facilmente accessibile a più utenti e migliorare l'ottimizzazione per quelli attuali. Supporta inoltre l'ampia gamma di origini dati e algoritmi nei Big Data.

Funzionalità di PySpark SQL

Le funzionalità di PySpark SQL sono riportate di seguito:

I fratelli di Kylie Jenner

1) Accesso ai dati di coerenza

Fornisce un accesso coerente ai dati significa che SQL supporta un modo condiviso per accedere a una varietà di origini dati come Alveare, Avro, Parquet, JSON e JDBC. Svolge un ruolo significativo nell'accogliere tutti gli utenti esistenti in Spark SQL.

2) Incorporazione in Spark

Le query SQL PySpark sono integrate con i programmi Spark. Possiamo utilizzare le query all'interno dei programmi Spark.

Uno dei vantaggi principali è che gli sviluppatori non devono gestire manualmente gli errori di stato o mantenere l'applicazione sincronizzata con i processi batch.

3) Connettività standard

Fornisce una connessione tramite JDBC o ODBC e questi due sono gli standard di settore per la connettività per gli strumenti di business intelligence.

4) Funzioni definite dall'utente

PySpark SQL dispone di una funzione definita dall'utente (UDF) combinata con il linguaggio. UDF viene utilizzato per definire una nuova funzione basata su colonne che estende il vocabolario del DSL di Spark SQL per trasformare DataFrame.

5) Compatibilità con l'alveare

PySpark SQL esegue query Hive non modificate sui dati correnti. Consente la piena compatibilità con i dati Hive attuali.

Modulo SQL PySpark

Alcune classi importanti di Spark SQL e DataFrames sono le seguenti:

    pyspark.sql.SparkSession:Rappresenta il punto di ingresso principale per DataFrame e funzionalità SQL.pyspark.sql.DataFrame:Rappresenta una raccolta distribuita di dati raggruppati in colonne denominate.pyspark.sql.Colonna:Rappresenta un'espressione di colonna in a DataFrame. pyspark.sql.Riga:Rappresenta una riga di dati in a DataFrame. pyspark.sql.GroupedData:Metodi di aggregazione, restituiti da DataFrame.groupBy(). pyspark.sql.DataFrameNaFunctions:Rappresenta metodi per gestire i dati mancanti (valori nulli).pyspark.sql.DataFrameStatFunctions:Rappresenta metodi per la funzionalità statistica.pysark.sql.funzioni:Rappresenta un elenco di funzioni integrate disponibili per DataFrame. pyspark.sql.types:Rappresenta un elenco di tipi di dati disponibili.pyspark.sql.Window:Viene utilizzato per lavorare con le funzioni di Window.

Considera il seguente esempio di PySpark SQL.

stringa in carattere
 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Produzione:

 +-----+ |hello| +-----+ |spark| +-----+ 

Codice Spiegazione:

Nel codice sopra, abbiamo importato il file findpark modulo e chiamato findpark.init() costruttore; quindi, abbiamo importato il modulo SparkSession per creare una sessione spark.

da pyspark.sql importa SparkSession

È possibile utilizzare una sessione Spark per creare il set di dati e l'API DataFrame. Una SparkSession può essere utilizzata anche per creare DataFrame, registrare DataFrame come tabella, eseguire SQL su tabelle, memorizzare nella cache la tabella e leggere il file parquet.

costruttore di classi

È un costruttore di Spark Session.

getOrCreate()

Viene utilizzato per ottenere un esistente SparkSession, oppure se non ne esiste uno esistente, creane uno nuovo in base alle opzioni impostate nel builder.

Pochi altri metodi

Alcuni metodi di PySpark SQL sono i seguenti:

1. nomeapp(nome)

Viene utilizzato per impostare il nome dell'applicazione, che verrà visualizzato nell'interfaccia utente web di Spark. Il parametro nome accetta il nome del parametro.

2. config(chiave=Nessuno, valore = Nessuno, conf = Nessuno)

Viene utilizzato per impostare un'opzione di configurazione. Le opzioni impostate utilizzando questo metodo vengono propagate automaticamente a entrambi SparkConf E SparkSession la configurazione di.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

parametri:

    chiave-Una stringa del nome chiave di una proprietà di configurazione.valore-Rappresenta il valore di una proprietà di configurazione.conf-Un'istanza di SparkConf.

3. maestro(maestro)

Imposta l'URL spark master a cui connettersi, ad esempio 'local' per l'esecuzione locale, 'local[4]' per l'esecuzione locale con 4 core.

parametri:

    maestro:un URL per Spark Master.

4. SparkSession.catalogo

È un'interfaccia che l'utente può creare, eliminare, modificare o interrogare il database, le tabelle, le funzioni sottostanti, ecc.

5. SparkSession.conf

tutorial su javafx

È l'interfaccia di configurazione del runtime per Spark. Questa è l'interfaccia attraverso la quale l'utente può ottenere e impostare tutte le configurazioni Spark e Hadoop rilevanti per Spark SQL.

classe pyspark.sql.DataFrame

È una raccolta distribuita di dati raggruppati in colonne denominate. Un DataFrame è simile alla tabella relazionale in Spark SQL e può essere creato utilizzando varie funzioni in SQLContext.

 student = sqlContext.read.csv('...') 

Dopo la creazione del dataframe, possiamo manipolarlo utilizzando i diversi linguaggi specifici del dominio (DSL) che sono funzioni predefinite di DataFrame. Considera il seguente esempio.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Consideriamo il seguente esempio:

Esecuzione di query utilizzando Spark SQL

Nel codice seguente, innanzitutto creiamo un DataFrame ed eseguiamo le query SQL per recuperare i dati. Considera il seguente codice:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Produzione:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

Utilizzando la funzione groupBy()

La funzione groupBy() raccoglie i dati di categorie simili.

 songdf.groupBy('Genre').count().show() 

Produzione:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

distribuzione(numpartizioni, *cols)

IL distribuzione() restituisce un nuovo DataFrame che è un'espressione di partizionamento. Questa funzione accetta due parametri numpartizioni E *col. IL numpartizioni Il parametro specifica il numero di colonne di destinazione.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Produzione:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows