日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

用戶畫像往往是大型網站的重要模塊,基于用戶畫像不僅可以實現個性化推薦,還可以實現用戶分群、精準推送、精準營銷以及用戶行為預測、商業化轉化分析等,為商業決策提供數據支持。通常用戶畫像包括用戶屬性信息(性別、年齡、出生日期等)、用戶行為信息(瀏覽、收藏、點贊等)以及環境信息(時間、地理位置等)。

處理用戶行為數據

在數據準備階段,我們通過 Flume 已經可以將用戶行為數據收集到 Hive 的 user_action 表的 HDFS 路徑中,先來看一下這些數據長什么樣子,我們讀取當天的用戶行為數據,注意讀取之前要先關聯分區

_day = time.strftime("%Y-%m-%d", time.localtime())
_localions = '/user/hive/warehouse/profile.db/user_action/' + _dayif fs.exists(_localions):    # 如果有該文件直接關聯,捕獲關聯重復異常    try:
        self.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (_day, _localions))
    except Exception as e:
        pass    self.spark.sql("use profile")
    user_action = self.spark.sql("select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId from user_action where dt>=" + _day)

user_action 結果如下所示

新聞個性化推薦系統源碼之構建離線用戶畫像

 

useraction

可以發現,上面的一條記錄代表用戶對文章的一次行為,但通常我們需要查詢某個用戶對某篇文章的所有行為,所以,我們要將這里用戶對文章的多條行為數據合并為一條,其中包括用戶對文章的所有行為。我們需要新建一個 Hive 表 user_article_basic,這張表包括了用戶 ID、文章 ID、是否曝光、是否點擊、閱讀時間等等,隨后我們將處理好的用戶行為數據存儲到此表中

create table user_article_basic
(    user_id     BIGINT comment "userID",
    action_time STRING comment "user actions time",
    article_id  BIGINT comment "articleid",
    channel_id  INT comment "channel_id",
    shared      BOOLEAN comment "is shared",
    clicked     BOOLEAN comment "is clicked",
    collected   BOOLEAN comment "is collected",
    exposure    BOOLEAN comment "is exposured",
    read_time   STRING comment "reading time"
)    COMMENT "user_article_basic"
    CLUSTERED by (user_id) into 2 buckets
    STORED as textfile
    LOCATION '/user/hive/warehouse/profile.db/user_article_basic';

遍歷每一條原始用戶行為數據,判斷用戶對文章的行為,在 user_action_basic 中將該用戶與該文章對應的行為設置為 True

if user_action.collect():
    def _generate(row):        _list = []        if row.action == 'exposure':
            for article_id in eval(row.articleId):
                # ["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]
                _list.Append(                    [row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])            return _list
        else:
            class Temp(object):
                shared = False                clicked = False                collected = False                read_time = ""
            _tp = Temp()            if row.action == 'click':
                _tp.clicked = True            elif row.action == 'share':
                _tp.shared = True            elif row.action == 'collect':
                _tp.collected = True            elif row.action == 'read':
                _tp.clicked = True            _list.append(                [row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,                 True, row.readTime])            return _list
    user_action_basic = user_action.rdd.flatMap(_generate)    user_action_basic = user_action_basic.toDF(        ["user_id", "action_time", "article_id", "channel_id", "shared", "clicked", "collected", "exposure",
         "read_time"])

user_action_basic 結果如下所示,這里的一條記錄包括了某個用戶對某篇文章的所有行為

新聞個性化推薦系統源碼之構建離線用戶畫像

 

user_action_basic

由于 Hive 目前還不支持 pyspark 的原子性操作,所以 user_article_basic 表的用戶行為數據只能全量更新(實際場景中可以選擇其他語言或數據庫實現)。這里,我們需要將當天的用戶行為與 user_action_basic 的歷史用戶行為進行合并

old_data = uup.spark.sql("select * from user_article_basic")
new_data = old_data.unionAll(user_action_basic)

合并后又會產生一個新的問題,那就是用戶 ID 和文章 ID 可能重復,因為今天某個用戶對某篇文章的記錄可能在歷史數據中也存在,而 unionAll() 方法并沒有去重,這里我們可以按照用戶 ID 和文章 ID 進行分組,利用 max() 方法得到 action_time, channel_id, shared, clicked, collected, exposure, read_time 即可,去重后直接存儲到 user_article_basic 表中

new_data.registerTempTable("temptable")
self.spark.sql('''insert overwrite table user_article_basic select user_id, max(action_time) as action_time,
        article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked,
        max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptable
        group by user_id, article_id''')

表 user_article_basic 結果如下所示

新聞個性化推薦系統源碼之構建離線用戶畫像

 

user_article_basic

計算用戶畫像

我們選擇將用戶畫像存儲在 Hbase 中,因為 Hbase 支持原子性操作和快速讀取,并且 Hive 也可以通過創建外部表關聯到 Hbase,進行離線分析,如果要刪除 Hive 外部表的話,對 Hbase 也沒有影響。首先,在 Hbase 中創建用戶畫像表

create 'user_profile', 'basic','partial','env'

在 Hive 中創建 Hbase 外部表,注意字段類型設置為 map

create external table user_profile_hbase
(    user_id         STRING comment "userID",
    information     MAP<STRING, DOUBLE> comment "user basic information",
    article_partial MAP<STRING, DOUBLE> comment "article partial",
    env             MAP<STRING, INT> comment "user env"
)    COMMENT "user profile table"
    STORED BY 'org.Apache.hadoop.hive.hbase.HBaseStorageHandler'
        WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:")
    TBLPROPERTIES ("hbase.table.name" = "user_profile");

創建外部表之后,還需要導入一些依賴包

cp -r /root/bigdata/hbase/lib/hbase-*.jar /root/bigdata/spark/jars/
cp -r /root/bigdata/hive/lib/h*.jar /root/bigdata/spark/jars/

接下來,讀取處理好的用戶行為數據,由于日志中的 channel_id 有可能是來自于推薦頻道(0),而不是文章真實的頻道,所以這里要將 channel_id 列刪除

spark.sql("use profile")
user_article_basic = spark.sql("select * from user_article_basic").drop('channel_id')

通過文章 ID,將用戶行為數據與文章畫像數據進行連接,從而得到文章頻道 ID 和文章主題詞

spark.sql('use article')
article_topic = spark.sql("select article_id, channel_id, topics from article_profile")
user_article_topic = user_article_basic.join(article_topic, how='left', on=['article_id'])

user_article_topic 結果如下圖所示,其中 topics 列即為文章主題詞列表,如 ['補碼', '字符串', '李白', ...]

新聞個性化推薦系統源碼之構建離線用戶畫像

 

user_article_topic

接下來,我們需要計算每一個主題詞對于用戶的權重,所以需要將 topics 列中的每個主題詞都拆分為單獨的一條記錄。可以利用 Spark 的 explode() 方法,達到類似“爆炸”的效果

import pyspark.sql.functions as F
user_article_topic = user_topic.withColumn('topic', F.explode('topics')).drop('topics')

user_article_topic 如下圖所示

新聞個性化推薦系統源碼之構建離線用戶畫像

 

user_article_topic

我們通過用戶對哪些文章發生了行為以及該文章有哪些主題詞,計算出了用戶對哪些主題詞發生了行為。這樣,我們就可以根據用戶對主題詞的行為來計算主題詞對用戶的權重,并且將這些主題詞作為用戶的標簽。那么,用戶標簽權重的計算公式為:用戶標簽權重 =(用戶行為分值之和)x 時間衰減。其中,時間衰減公式為:時間衰減系數 = 1 / (log(t) + 1),其中 t 為發生行為的時間距離當前時間的大小

不同的用戶行為對應不同的權重,如下所示

用戶行為分值閱讀時間(<1000)1閱讀時間(>=1000)2收藏2分享3點擊5

計算用戶標簽及權重,并存儲到 Hbase 中 user_profile 表的 partial 列族中。注意,這里我們將頻道 ID 和標簽一起作為 partial 列族的鍵存儲,這樣我們就方便查詢不同頻道的標簽及權重了

def compute_user_label_weights(partitions):
    """ 計算用戶標簽權重
    """
    action_weight = {        "read_min": 1,
        "read_middle": 2,
        "collect": 2,
        "share": 3,
        "click": 5
    }    from datetime import datetime    import numpy as np        # 循環處理每個用戶對應的每個主題詞    for row in partitions:
        # 計算時間衰減系數        t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')
        alpha = 1 / (np.log(t.days + 1) + 1)
                if row.read_time  == '':
            read_t = 0
        else:
            read_t = int(row.read_time)                # 計算閱讀時間的行為分數        read_score = action_weight['read_middle'] if read_t > 1000 else action_weight['read_min']
                # 計算各種行為的權重和并乘以時間衰減系數        weights = alpha * (row.shared * action_weight['share'] + row.clicked * action_weight['click'] +
                          row.collected * action_weight['collect'] + read_score)
                # 更新到user_profilehbase表        with pool.connection() as conn:            table = conn.table('user_profile')
            table.put('user:{}'.format(row.user_id).encode(),
                      {'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(
                          weights).encode()})            conn.close()
user_topic.foreachPartition(compute_user_label_weights)

在 Hive 中查詢用戶標簽及權重

hive> select * from user_profile_hbase limit 1;
OKuser:1  {"birthday":0.0,"gender":null}  {"18:##":0.25704484358604845,"18:&#":0.25704484358604845,"18:+++":0.23934588700996243,"18:+++++":0.23934588700996243,"18:AAA":0.2747964402379244,"18:Animal":0.2747964402379244,"18:Author":0.2747964402379244,"18:BASE":0.23934588700996243,"18:BBQ":0.23934588700996243,"18:Blueprint":1.6487786414275463,"18:Code":0.23934588700996243,"18:DIR......

接下來,要將用戶屬性信息加入到用戶畫像中。讀取用戶基礎信息,存儲到用戶畫像表的 basic 列族即可

def update_user_info():
    """
    更新用戶畫像的屬性信息
    :return:
    """
    spark.sql("use toutiao")
    user_basic = spark.sql("select user_id, gender, birthday from user_profile")
    def udapte_user_basic(partition):        import happybase        #  用于讀取hbase緩存結果配置        pool = happybase.ConnectionPool(size=10, host='172.17.0.134', port=9090)
        for row in partition:
            from datetime import date
            age = 0
            if row.birthday != 'null':
                born = datetime.strptime(row.birthday, '%Y-%m-%d')
                today = date.today()
                age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))            with pool.connection() as conn:                table = conn.table('user_profile')
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:gender'.encode(): json.dumps(row.gender).encode()})
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:birthday'.encode(): json.dumps(age).encode()})
                conn.close()
    user_basic.foreachPartition(udapte_user_basic)

到這里,我們的用戶畫像就計算完成了。

Apscheduler 定時更新

定義更新用戶畫像方法,首先處理用戶行為日志,拆分文章主題詞,接著計算用戶標簽的權重,最后再將用戶屬性信息加入到用戶畫像中

def update_user_profile():
    """
    定時更新用戶畫像的邏輯
    :return:
    """
    up = UpdateUserProfile()    if up.update_user_action_basic():
        up.update_user_label()        up.update_user_info()

在 Apscheduler 中添加定時更新用戶畫像任務,設定每隔 2 個小時更新一次

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
# 創建scheduler,多進程執行executors = {    'default': ProcessPoolExecutor(3)
}scheduler = BlockingScheduler(executors=executors)# 添加一個定時運行文章畫像更新的任務, 每隔1個小時運行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一個定時運行用戶畫像更新的任務, 每隔2個小時運行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
scheduler.start()

另外說一下,在實際場景中,用戶畫像往往是非常復雜的,下面是電商場景的用戶畫像,可以了解一下。

新聞個性化推薦系統源碼之構建離線用戶畫像

 

用戶畫像

分享到:
標簽:畫像 用戶
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定