pyspark-RDD基礎
spark
pyspark是spark的Python API,允許python調用spark編程模型
初始化spark
SparkContext
from pyspark import SparkContext
sc = SparkContext(master='local[2]')
核查SparkContext
./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py
配置
from pyspark import SparkConf,SparkContext
conf = (SparkConf().setMaster("local").setAppName("my APP").set("spark.executor.memory","1g"))
sc = SparkContext(conf=conf)
使用shell
pyspark shell已經為SparkContext創建了名為sc的變量
./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py
用—master參數設定Context連接到哪個Master服務器,通過傳遞逗號分隔列表至—py-files添加Python.zip、egg或.py文件到Runtime路徑
加載數據
并行集合
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p","r"])])
外部數據
使用textFile()函數從HDFS、本地文件或其它支持hadoop的文件系統里讀取文件,或使用wholeTextFiles()函數讀取目錄下所有文本文件
textFile = sc.textFile('a.txt')
textFile2 = sc.wholeTextFiles(/aa)
提取RDD信息
基礎信息
rdd.getNumPatitions() 列出分區數
rdd.count() 計算RDD的實例數量
rdd.countByKey() 按鍵計算RDD實例數量
defaultdict(<type 'int'>,('a':2,'b':1))
rdd.countByValue() 按值計算RDD實例數量
defaultdict(<type 'int'>,(('b',2):1,('a',2):1,('a',7):1))
rdd.collectAsMap() 以字典的形式返回鍵值
('a':2,'b':2)
rdd.sum() 匯總RDD元素
4959
sc.parallelize([]).isEmpty() 檢查RDD是否為空
匯總
rdd.max() RDD元素的最大值
rdd.min() RDD元素的最小值
rdd.mean() RDD元素的平均值
rdd.stdev() RDD元素的標準差
rdd.variance() RDD元素的方差
rdd.histogram(3) 分箱(bin)生成直方圖
rdd.stats() 綜合統計包括:計數、平均值、標準差、最大值和最小值
應用函數
rdd.map(lambda x:x+(x[1],x[0])).collect() 對每個RDD元素執行函數
rdd.flatMap(lambda x:x+(x[1],x[0])) 對每個RDD元素執行函數,并拉平結果
rdd.collect()
rdd.flatMapValues(lambda x:x).collect() 不改變鍵,對rdd的每個鍵值對執行flatMap函數
選擇數據
獲取
rdd.collect() 返回包含所以RDD元素的列表
rdd.take(4) 提取前4個RDD元素
rdd.first() 提取第一個RDD元素
rdd.top(2) 提取前兩個RDD元素
抽樣rdd.sample(False,0.15,81) 返回RDD的采樣子集
篩選rdd.filter(lambda x:'a' in x) 篩選RDD
rdd.distinct() 返回RDD里的唯一值
rdd.keys() 返回RDD鍵值對里的鍵
迭代
def g(x):print(x)
rdd.foreach(g)
改變數據形狀
規約
rdd.reduceByKey(lambda x,y:x+y) 合并每個鍵的值
rdd.reduce(lambda x,y:x+y) 合并RDD的值
分組rdd.groupBy(lambda x:x%2).mapValues(list) 返回RDD的分組值
rdd.groupByKey().mapValues(list) 按鍵分組RDD集合seqOp = (lambda x,y:(x[0]+y,x[1]+1))
combOP = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd.aggregate((0,0),seqOp,combOP) 匯總每個分區里的RDD元素,并輸出結果
rdd.aggregeteByKey((0,0),seqOp,combOP) 匯總每個RDD的鍵的值
rdd.fold(0,add) 匯總每個分區里的RDD元素,并輸出結果
rdd.foldByKey(0,add) 合并每個鍵的值
rdd,keyBy(lambda x:x+x) 通過執行函數,創建RDD元素的元組
數學運算
rdd.subtract(rdd2) 返回RDD2里沒有匹配鍵的rdd的兼職對
rdd2.subtractByKey(rdd) 返回rdd2里的每個(鍵、值)對,rdd中,沒有匹配的鍵
rdd.cartesian(rdd2) 返回rdd和rdd2的笛卡爾積
排序
rdd.sortBy(lambda x:x[1]) 按給定函數排序RDD
rdd.sortByKey() 按鍵排序RDD的鍵值對
重分區
rdd.repartition(4) 新建一個含4個分區的RDD
rdd.coalesce(1) 將RDD中的分區數縮減為1個
保存
rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",'org.Apache.hadoop.mapred.TextOutputFormat')
終止SparkContext
sc.stop()
執行程序
./bin/spark-submit examples/src/main/python/pi.py
Pyspark_sql
Pyspark與Spark SQL
Spark SQL是Apache Spark處理結構化數據的模塊
初始化SparkSession
SparkSession用于創建數據框,將數據框注冊為表,執行SQL查詢,緩存表及讀取Parquet文件
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my app").config("spark.some.config.option","some-value").getOrCreate()
創建數據框
從RDD創建
from pyspark.sql.types import *
推斷Schemasc = spark.sparkContextlines = sc.textFile("people.txt")
parts = lines.map(lambda l:l.split(","))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)指定Schemapeople = parts.map(lambda p:Row(name=p[0],age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema = StructType(fields)spark.createDataFrame(people,schema).show()
從spark數據源創建
json
df = spark.read.json("customer.json")
df.show()df2 = spark.read.load("people.json",format = "json")
Parquet文件df3 = spark.read.load("users.parquet")
文本文件df4 = spark.read.text("people.txt")
查閱數據信息
df.dtypes 返回df的列名與數據類型
df.show() 顯示df內容
df.head() 返回前n行數據
df.first() 返回第一行數據
df.take(2) 返回前兩行數據
df.schema 返回df的schema
df.describe().show() 匯總統計數據
df.columns 返回df列名
df.count() 返回df的行數
df.distinct().count() 返回df中不重復的行數
df.printSchema() 返回df的Schema
df.explain() 返回邏輯與實體方案
重復值
df = df.dropDuplicates()
查詢
from pyspark.sql import functions as F
Select
df.select("firstName").show() 顯示firstName列的所有條目
df.select("firstName","lastName".show())
df.select("firstName","age",
explode("phoneNumber") 顯示firstName、age的所有條目和類型
.alias("contactInfo"))
.select("ContactInfo.type","firstName","age")
df.select(df["firstName"],df["age"]+1).show() 顯示firstName和age列的所有記錄添加
df.select(df["age"]>24).show() 顯示所有小于24的記錄
When
df.select("firstName",F.when(df.age>30,1)) 顯示firstName,且大于30歲顯示1,小于30顯示0
.otherwise(0).show()
df[df.firstName.isin("Jane","Boris")].collect() 顯示符合特定條件的firstName列的記錄
Like
df.select("firstName",df.lastName, 顯示lastName列中包含Smith的firstName列的記錄
like("Smith")).show()
Startswith-Endwith
df.select("firstName",df.lastName. 顯示lastName列中以Sm開頭的firstName列的記錄
startswith("Sm")).show()
df.select(df.lastName.endswith("th")).show() 顯示以th結尾的lastName
Substring
df.select(df.firstName.substr(1,3).alias("name"))返回firstName的子字符串
Between
df.select(df.age.between(22,24)).show() 顯示介于22到24直接的age列的所有記錄
添加、修改、刪除列
添加列
df = df.withColumn('city',df.address.city)
.withColumn('postalCode',df.address.postalCode)
.withColumn('state',df.address.state)
.withColumn('streetAddress',df.address.streetAddress)
.withColumn('telePhoneNumber',explode(df.phoneNumber.number))
.withColumn('telePhoneType',explode(df.phoneNumber.type))
修改列
df = df.withColumnRenamed('telePhoneNumber','phoneNumber')
刪除列
df = df.drop("address","phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)
分組
df.groupBy("age").count().show() 按age列分組,統計每組人數
篩選
df.filter(df["age"]>24).show() 按age列篩選,保留年齡大于24歲的
排序
peopledf.sort(peopledf.age.desc()).collect()
df.sort("age",ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1]).collect()
替換缺失值
df.na.fill(50).show() 用一個值替換空值
df.na.drop().show() 去除df中為空值的行
df.na.replace(10,20).show() 用一個值去替換另一個值
重分區
df.repartition(10).rdd.getNumPartitions() 將df拆分為10個分區
df.coalesce(1).rdd.getNumPartitions() 將df合并為1個分區
運行SQL查詢
將數據框注冊為視圖
peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")
查詢視圖
df = spark.sql("select * from customer").show()
peopledf = spark.sql("select * from global_temp.people").show()
輸出
數據結構
rdd1 = df.rdd 將df轉為rdd
df.toJSON().first() 將df轉為rdd字符串df.toPandas() 將df的內容轉為Pandas的數據框
保存至文件
df.select("firstName","city").write.save("nameAndCity.parquet")
df.select("firstName","age").write.save("nameAndAges.json",format="json")
終止SparkSession
spark.stop()