日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

入職AI算法崗?你需要會的大數據知識

 

pyspark-RDD基礎

spark

pyspark是spark的Python API,允許python調用spark編程模型

初始化spark

SparkContext

from pyspark import SparkContext
sc = SparkContext(master='local[2]')

核查SparkContext

./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py

配置

from pyspark import SparkConf,SparkContext
conf = (SparkConf().setMaster("local").setAppName("my APP").set("spark.executor.memory","1g"))
sc = SparkContext(conf=conf)

使用shell

pyspark shell已經為SparkContext創建了名為sc的變量

./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py

用—master參數設定Context連接到哪個Master服務器,通過傳遞逗號分隔列表至—py-files添加Python.zip、egg或.py文件到Runtime路徑

加載數據

并行集合

rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p","r"])])

外部數據

使用textFile()函數從HDFS、本地文件或其它支持hadoop的文件系統里讀取文件,或使用wholeTextFiles()函數讀取目錄下所有文本文件

textFile = sc.textFile('a.txt')
textFile2 = sc.wholeTextFiles(/aa)

提取RDD信息

基礎信息

rdd.getNumPatitions()								列出分區數
rdd.count()													計算RDD的實例數量
rdd.countByKey()										按鍵計算RDD實例數量
defaultdict(<type 'int'>,('a':2,'b':1))
rdd.countByValue()							按值計算RDD實例數量
defaultdict(<type 'int'>,(('b',2):1,('a',2):1,('a',7):1))
rdd.collectAsMap()							以字典的形式返回鍵值
('a':2,'b':2)
rdd.sum()									匯總RDD元素
4959
sc.parallelize([]).isEmpty()				檢查RDD是否為空

匯總

rdd.max()					RDD元素的最大值
rdd.min()					RDD元素的最小值
rdd.mean()					RDD元素的平均值
rdd.stdev()					RDD元素的標準差
rdd.variance()				RDD元素的方差
rdd.histogram(3)			分箱(bin)生成直方圖
rdd.stats()					綜合統計包括:計數、平均值、標準差、最大值和最小值

應用函數

rdd.map(lambda x:x+(x[1],x[0])).collect()		對每個RDD元素執行函數
rdd.flatMap(lambda x:x+(x[1],x[0]))				對每個RDD元素執行函數,并拉平結果
rdd.collect()
rdd.flatMapValues(lambda x:x).collect()			不改變鍵,對rdd的每個鍵值對執行flatMap函數

選擇數據

獲取
rdd.collect()							返回包含所以RDD元素的列表
rdd.take(4)								提取前4個RDD元素
rdd.first()								提取第一個RDD元素
rdd.top(2)								提取前兩個RDD元素
抽樣rdd.sample(False,0.15,81)				返回RDD的采樣子集
篩選rdd.filter(lambda x:'a' in x)			篩選RDD
rdd.distinct()							返回RDD里的唯一值
rdd.keys()								返回RDD鍵值對里的鍵

迭代

def g(x):print(x)     
rdd.foreach(g)

改變數據形狀

規約
rdd.reduceByKey(lambda x,y:x+y)				合并每個鍵的值
rdd.reduce(lambda x,y:x+y)					合并RDD的值
分組rdd.groupBy(lambda x:x%2).mapValues(list)	返回RDD的分組值
rdd.groupByKey().mapValues(list)			按鍵分組RDD集合seqOp = (lambda x,y:(x[0]+y,x[1]+1))
combOP = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd.aggregate((0,0),seqOp,combOP)		 	匯總每個分區里的RDD元素,并輸出結果
rdd.aggregeteByKey((0,0),seqOp,combOP)		匯總每個RDD的鍵的值
rdd.fold(0,add)								匯總每個分區里的RDD元素,并輸出結果
rdd.foldByKey(0,add)						合并每個鍵的值
rdd,keyBy(lambda x:x+x)						通過執行函數,創建RDD元素的元組

數學運算

rdd.subtract(rdd2)							返回RDD2里沒有匹配鍵的rdd的兼職對
rdd2.subtractByKey(rdd)						返回rdd2里的每個(鍵、值)對,rdd中,沒有匹配的鍵
rdd.cartesian(rdd2)							返回rdd和rdd2的笛卡爾積

排序

rdd.sortBy(lambda x:x[1])					按給定函數排序RDD
rdd.sortByKey()								按鍵排序RDD的鍵值對

重分區

rdd.repartition(4)							新建一個含4個分區的RDD
rdd.coalesce(1)								將RDD中的分區數縮減為1個

保存

rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",'org.Apache.hadoop.mapred.TextOutputFormat')

終止SparkContext

sc.stop()

執行程序

./bin/spark-submit examples/src/main/python/pi.py

Pyspark_sql

Pyspark與Spark SQL

Spark SQL是Apache Spark處理結構化數據的模塊

初始化SparkSession

SparkSession用于創建數據框,將數據框注冊為表,執行SQL查詢,緩存表及讀取Parquet文件

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my app").config("spark.some.config.option","some-value").getOrCreate()

創建數據框

從RDD創建

from pyspark.sql.types import *
推斷Schemasc = spark.sparkContextlines = sc.textFile("people.txt")
parts = lines.map(lambda l:l.split(","))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)指定Schemapeople = parts.map(lambda p:Row(name=p[0],age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema = StructType(fields)spark.createDataFrame(people,schema).show()

從spark數據源創建

json
df = spark.read.json("customer.json")
df.show()df2 = spark.read.load("people.json",format = "json")
Parquet文件df3 = spark.read.load("users.parquet")
文本文件df4 = spark.read.text("people.txt")

查閱數據信息

df.dtypes						返回df的列名與數據類型
df.show()						顯示df內容
df.head()						返回前n行數據
df.first()						返回第一行數據
df.take(2)						返回前兩行數據
df.schema						返回df的schema
df.describe().show()			匯總統計數據
df.columns						返回df列名
df.count()						返回df的行數
df.distinct().count()			返回df中不重復的行數
df.printSchema()				返回df的Schema
df.explain()					返回邏輯與實體方案

重復值

df = df.dropDuplicates()

查詢

from pyspark.sql import functions as F
Select
df.select("firstName").show()					顯示firstName列的所有條目
df.select("firstName","lastName".show())					
df.select("firstName","age",
          explode("phoneNumber")				顯示firstName、age的所有條目和類型
          .alias("contactInfo"))
			.select("ContactInfo.type","firstName","age")
df.select(df["firstName"],df["age"]+1).show()	顯示firstName和age列的所有記錄添加
df.select(df["age"]>24).show()					顯示所有小于24的記錄
When
df.select("firstName",F.when(df.age>30,1))		顯示firstName,且大于30歲顯示1,小于30顯示0
				.otherwise(0).show()
df[df.firstName.isin("Jane","Boris")].collect()	顯示符合特定條件的firstName列的記錄
Like	
df.select("firstName",df.lastName,				顯示lastName列中包含Smith的firstName列的記錄
          like("Smith")).show()
Startswith-Endwith
df.select("firstName",df.lastName.				顯示lastName列中以Sm開頭的firstName列的記錄	
          startswith("Sm")).show()
df.select(df.lastName.endswith("th")).show()	顯示以th結尾的lastName
Substring
df.select(df.firstName.substr(1,3).alias("name"))返回firstName的子字符串
Between
df.select(df.age.between(22,24)).show()			顯示介于22到24直接的age列的所有記錄

添加、修改、刪除列

添加列

df = df.withColumn('city',df.address.city) 
       .withColumn('postalCode',df.address.postalCode) 
    .withColumn('state',df.address.state) 
    .withColumn('streetAddress',df.address.streetAddress) 
    .withColumn('telePhoneNumber',explode(df.phoneNumber.number)) 
    .withColumn('telePhoneType',explode(df.phoneNumber.type)) 

修改列

df = df.withColumnRenamed('telePhoneNumber','phoneNumber')

刪除列

df = df.drop("address","phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)

分組

df.groupBy("age").count().show()		按age列分組,統計每組人數

篩選

df.filter(df["age"]>24).show()			按age列篩選,保留年齡大于24歲的

排序

peopledf.sort(peopledf.age.desc()).collect()
df.sort("age",ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1]).collect()

替換缺失值

df.na.fill(50).show()				用一個值替換空值
df.na.drop().show()					去除df中為空值的行
df.na.replace(10,20).show()			用一個值去替換另一個值

重分區

df.repartition(10).rdd.getNumPartitions()	將df拆分為10個分區
df.coalesce(1).rdd.getNumPartitions()		將df合并為1個分區

運行SQL查詢

將數據框注冊為視圖

peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")

查詢視圖

df = spark.sql("select * from customer").show()
peopledf = spark.sql("select * from global_temp.people").show()

輸出

數據結構

rdd1 = df.rdd		將df轉為rdd
df.toJSON().first()	將df轉為rdd字符串df.toPandas()		將df的內容轉為Pandas的數據框

保存至文件

df.select("firstName","city").write.save("nameAndCity.parquet")
df.select("firstName","age").write.save("nameAndAges.json",format="json")

終止SparkSession

spark.stop()

分享到:
標簽:數據
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定