日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

SparkSQL在機器學習場景中應用

第四范式已經在很多行業落地了上萬個AI應用,比如在金融行業的反欺詐,媒體行業的新聞推薦,能源行業管道檢測,而SparkSQL在這些AI應用中快速實現特征變換發揮著重要的作用

半小時,將你的Spark SQL模型變為在線服務

 

SparkSQL在特征變換主要有一下幾類

  1. 多表場景,用于表之間拼接操作,比如交易信息表去拼接賬戶表
  2. 使用udf進行簡單的特征變換,比如對時間戳進行hour函數處理
  3. 使用時間窗口和udaf進行時序類特征處理,比如計算一個人最近1天的消費金額總和

SparkSQL到目前為止,解決很好的解決離線模型訓練特征變換問題,但是隨著AI應用的發展,大家對模型的期望不再只是得出離線調研效果,而是在真實的業務場景發揮出價值,而真實的業務場景是模型應用場景,它需要高性能,需要實時推理,這時候我們就會遇到以下問題

  1. 多表數據離線到在線怎么映射,即批量訓練過程中輸入很多表,到在線環境這些表該以什么形式存在,這點也會影響整個系統架構,做得好能夠提升效率,做得不好就會大大增加模型產生業務價值的成本
  2. SQL轉換成實時執行成本高,因為在線推理需要高性能,而數據科學家可能做出成千上萬個特征,每個特征都人肉轉換,會大大增加的工程成本
  3. 離線特征和在線特征保持一致困難,手動轉換就會導致一致性能,而且往往很難一致
  4. 離線效果很棒但是在線效果無法滿足業務需求

在具體的反欺詐場景,模型應用要求tp99 20ms去檢測一筆交易是否是欺詐,所以對模型應用性能要求非常高

第四范式特征工程數據庫是如何解決這些問題

半小時,將你的Spark SQL模型變為在線服務

 

通過特征工程數據庫讓SparkSQL的能力得到了補充

  1. 以數據庫的形式,解決了離線表到在線的映射問題,我們對前面給出的答案就是離線表是怎么分布的,在線也就怎么分布
  2. 通過同一套代碼去執行離線和在線特征轉換,讓在線模型效果得到了保證
  3. 數據科學家與業務開發團隊的合作以sql為傳遞介質,而不再是手工去轉換代碼,大大提升模型迭代效率
  4. 通過llvm加速的sql,相比scala實現的spark2.x和3.x在時序復雜特征場景能夠加速2~3倍,在線通過in-memory的存儲,能夠保證sql能夠在非常低延遲返回結果

快速將spark sql 模型變成實時服務demo

demo的模型訓練場景為預測一次打車行程到結束所需要的時間,這里我們將使用fedb ,pyspark,lightgbm等工具最終搭建一個http 模型推理服務,這也會是spark在機器學習場景的實踐

半小時,將你的Spark SQL模型變為在線服務

 

整個demo200多行代碼,制作時間不超過半個小時

  1. train_sql.py 特征計算與訓練, 80行代碼
  2. predict_server.py 模型推理http服務, 129行代碼

場景數據和特征介紹

整個訓練數據如下樣子

樣例數據

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration

id3097625,1,2016-01-22 16:01:00,2016-01-22 16:15:16,2,-73.97746276855469,40.7613525390625,-73.95573425292969,40.772396087646484,N,856

id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198

id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303

id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330

id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496

id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935

id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904

id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331

id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674 `

場景特征變換sql腳本

特征變換

select trip_duration, passenger_count,

sum `(pickup_latitude) over w as vendor_sum_pl,`

max `(pickup_latitude) over w as vendor_max_pl,`

min `(pickup_latitude) over w as vendor_min_pl,`

avg `(pickup_latitude) over w as vendor_avg_pl,`

sum `(pickup_latitude) over w2 as pc_sum_pl,`

max `(pickup_latitude) over w2 as pc_max_pl,`

min `(pickup_latitude) over w2 as pc_min_pl,`

avg `(pickup_latitude) over w2 as pc_avg_pl ,`

count `(vendor_id) over w2 as pc_cnt,`

count `(vendor_id) over w as vendor_cnt`

from {}

window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW),

w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW) `

我們選擇了vendor_id 和 passenger_count 兩個緯度做時序特征

train_df = spark.sql(train_sql)

# specify your configurations as a dict

params = {

'boosting_type' `: 'gbdt' ,

'objective' `: 'regression' ,

'metric' `: { 'l2' , 'l1' },

'num_leaves' `: 31 ,

'learning_rate' `: 0.05 ,

'feature_fraction' `: 0.9 ,

'bagging_fraction' `: 0.8 ,

'bagging_freq' `: 5 ,

'verbose' `: 0`

}

print `( 'Starting training...' )`

gbm = lgb.train(params,

lgb_train,

num_boost_round `= 20 ,`

valid_sets `= lgb_eval,

early_stopping_rounds `= 5 )`

gbm.save_model( `'model.txt' )執行模型訓練過程,最終產生model.txt

模型推理過程

導入數據代碼

import

def insert_row(line):

row = line.split( `',' )

row[ `2 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 2 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`

row[ `3 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 3 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`

insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);" `% tuple (row)

driver.executeInsert( `'db_test' , insert)

with open `( 'data/taxi_tour_table_train_simple.csv' , 'r' ) as fd:

idx = 0

for line in fd:

if idx = `= 0 :

idx = idx + 1

continue

insert_row(line.replace( `'n' , ''))

idx = idx + 1 `

注:train.csv為訓練數據csv格式版本

模型推理邏輯

predict.py

def` `post( self ):

row = json.loads( `self .request.body)

ok, req = fedb_driver.getRequestBuilder( `'db_test' , sql)

if not ok or not req:

self `.write( "fail to get req" )`

return

input_schema = req.GetSchema()

if not input_schema:

self `.write( "no schema found" )`

return

str_length = 0

for i in range `(input_schema.GetColumnCnt()):`

if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) = `= 'string' :

str_length = str_length + len `(row.get(input_schema.GetColumnName(i), ''))`

req.Init(str_length)

for i in range `(input_schema.GetColumnCnt()):`

tname = sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))

if tname = `= 'string' :

req.AppendString(row.get(input_schema.GetColumnName(i), ''))

elif tname = `= 'int32' :

req.AppendInt32( `int (row.get(input_schema.GetColumnName(i),` `0 )))`

elif tname = `= 'double' :

req.AppendDouble( `float (row.get(input_schema.GetColumnName(i),` `0 )))`

elif tname = `= 'timestamp' :

req.AppendTimestamp( `int (row.get(input_schema.GetColumnName(i),` `0 )))`

else `:`

req.AppendNULL()

if not req.Build():

self `.write( "fail to build request" )`

return

ok, rs = fedb_driver.executeQuery( `'db_test' , sql, req)

if not ok:

self `.write( "fail to execute sql" )`

return

rs. `Next ()

ins = build_feature(rs)

self `.write( "----------------ins---------------n" )`

self `.write( str (ins) + "n" )

duration = bst.predict(ins)

self `.write( "---------------predict trip_duration -------------n" )`

self `.write( "%s s" % str (duration[ 0 ]))``

最終執行效果

# 發送推理請求 ,會看到如下輸出

Python3 predict.py

----------------ins---------------

[[ 2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097

40.774097 40.774097 1. 1. ]]

---------------predict trip_duration -------------

859.3298781277192 s `

運行demo請到 https://github.com/4paradigm/SparkSQLWithFeDB

分享到:
標簽:Spark SQL
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定