建立 PySpark DataFrame
在大資料分析中,PySpark 是一個將流行的程式語言 Python 與開源大資料框架 Apache Spark 相結合的棧。PySpark 提供了一個優秀的大資料分析介面,而這個棧中的一個重要組成部分是 Spark 的 DataFrame API。在這裡,我們將為想要建立 PySpark DataFrame 的人提供技術指南,其中包括有用的提示和實際示例。
pyspark 的主要優勢是什麼?哪些行業主要使用它?
Pyspark 是 Apache Spark 的 Python API,Apache Spark 是一個分散式計算框架,它提供快速、可擴充套件和容錯的資料處理。Pyspark 的一些主要優勢包括:
可擴充套件性 - Pyspark 可以處理大型資料集,並且可以輕鬆地進行擴充套件或縮減以滿足不斷變化的資料處理需求。
速度 - Pyspark 旨在進行快速資料處理,可以快速有效地處理大型資料集。
容錯性 - Pyspark 旨在具有容錯性,這意味著它可以在不丟失資料或影響效能的情況下從硬體或軟體故障中恢復。
靈活性 - Pyspark 可用於各種資料處理任務,包括批處理、流處理、機器學習和圖處理。
與其他技術的整合 - Pyspark 可以與各種其他技術整合,包括 Hadoop、SQL 和 NoSQL 資料庫。
使用 Pyspark 的行業包括:
金融服務 - Pyspark 用於金融服務中的風險分析、欺詐檢測和其他資料處理任務。
醫療保健 - Pyspark 用於醫療保健中的醫學影像分析、疾病診斷和其他資料處理任務。
零售 - Pyspark 用於零售中的客戶細分、銷售預測和其他資料處理任務。
電信 - Pyspark 用於電信中的網路分析、呼叫資料分析和其他資料處理任務。
總的來說,Pyspark 為可擴充套件和快速的資料處理提供了一個強大的平臺,可用於各種行業和應用。
第一部分:建立 SparkSession
在 PySpark 中建立 DataFrame 之前,必須首先建立一個 SparkSession 來與 Spark 互動。SparkSession 用於建立 DataFrame、將 DataFrame 註冊為表以及執行 SQL 查詢。
語法
from pyspark.sql import SparkSession
# create a SparkSession
spark = SparkSession.builder \
.appName('my_app_name') \
.config('spark.some.config.option', 'some-value') \
.getOrCreate()
`appName` 指定 Spark 應用程式的名稱。
`config` 用於設定配置屬性,例如資料儲存選項。
`getOrCreate` 將建立一個新的 SparkSession,或者如果已存在則獲取現有 SparkSession。
第二部分:從 CSV 檔案建立 DataFrame
建立 PySpark DataFrame 的最常見方法之一是從 CSV 檔案載入資料。為此,您應該
語法
# load data from csv file
df = spark.read.csv('path/to/myfile.csv', header=True)
`header=True` 告訴 Spark CSV 檔案的第一行包含標題。
第三部分:從 SQL 查詢建立 DataFrame
從 SQL 查詢的結果建立 DataFrame 也是 PySpark 中的常見做法。為此 -
# create a Spark DataFrame from a SQL query
df = spark.sql('SELECT * FROM my_table')
`spark.sql` 從 SQL 查詢建立 DataFrame。
第四部分:從 RDD 建立 DataFrame
PySpark 還允許您從 RDD 建立 DataFrame。這是一個示例 -
# create a RDD rdd = spark.sparkContext.parallelize([(1, "John"), (2, "Sarah"), (3, "Lucas")]) # create a DataFrame df = spark.createDataFrame(rdd, ["Id", "Name"])
`parallelize` 從 Python 列表建立 RDD。
`createDataFrame` 從 RDD 建立 DataFrame。
第五部分:操作 DataFrame
建立 PySpark DataFrame 後,您通常需要對其進行操作。以下是一些常見操作:
選擇列
# select two columns
df.select('column1', 'column2')
過濾資料
# filter rows with a condition df.filter(df.column1 > 100)
分組資料
# group by column1 and calculate the mean of column2
df.groupby('column1').mean('column2')
連線 DataFrame
# 連線兩個 DataFrame df1.join(df2, df1.id == df2.id)最終程式,程式碼
# Creating a session
from pyspark.sql import SparkSession
# create a SparkSession
spark = SparkSession.builder \
.appName('my_app_name') \
.config('spark.some.config.option', 'some-value') \
.getOrCreate()
# Dataframe from CSV
# load data from csv file
df = spark.read.csv('path/to/myfile.csv', header=True)
# data frame from SQL query
# create a Spark DataFrame from a SQL query
df = spark.sql('SELECT * FROM my_table')
#Dataframe from RDD
# create a RDD
rdd = spark.sparkContext.parallelize([(1, "John"), (2, "Sarah"), (3, "Lucas")])
# create a DataFrame
df = spark.createDataFrame(rdd, ["Id", "Name"])
輸出
The output will be in the form a of a dataframe which can be accessed from different sources using different methods.
結論
在 PySpark 中建立 DataFrame 是大資料分析中的一項基本技能。透過使用 SparkSession,您可以使用 CSV 檔案、SQL 查詢或 RDD 建立 DataFrame。建立 DataFrame 後,您可以透過多種方式對其進行操作,例如選擇列、過濾資料、分組資料和連線 DataFrame。使用這些方法,您可以為您的資料分析需求建立定製管道。
資料結構
網路
關係資料庫管理系統 (RDBMS)
作業系統
Java
iOS
HTML
CSS
Android
Python
C 語言程式設計
C++
C#
MongoDB
MySQL
Javascript
PHP