Skip to content

利用淘寶公開數據練習開發推薦模型使用fastapi,lightgbm,duckdb實作推薦api

Published: at 下午06:16

利用淘寶公開數據練習開發推薦模型使用fastapi,lightgbm,duckdb實作推薦api

最近看到 有淘寶(taobao) 有公開數據一億筆資料

想做練習看看 如何做到推薦模型

git repo

資料結構觀察

筆數 100,150,808

1,2268318,2520377,pv,1511544070
1,2333346,2520771,pv,1511561733
1,2576651,149192,pv,1511572885
1,3830808,4181361,pv,1511593493
1,4365585,2520377,pv,1511596146
1,4606018,2735466,pv,1511616481
1,230380,411153,pv,1511644942
1,3827899,2920476,pv,1511713473
1,3745169,2891509,pv,1511725471
1,1531036,2920476,pv,1511733732
1,2266567,4145813,pv,1511741471
1,2951368,1080785,pv,1511750828
列名称	说明
用户ID	整数类型,序列化后的用户ID
商品ID	整数类型,序列化后的商品ID
商品类目ID	整数类型,序列化后的商品所属类目ID
行为类型	字符串,枚举类型,包括('pv', 'buy', 'cart', 'fav')
时间戳	行为发生的时间戳

利用 duckdb 分析 產出 訓練資料(做特徵)

  1. 預設已經會使用dudkdb,使用範例

  2. 加入csv title

大檔案 3GB多 於是使用 vi 將 csv header 加上 (雖然還是跑得很慢

user_id,product_id,catalog_id,action,ts
  1. 載入資料到duckdb
CREATE TABLE logs AS
SELECT *
FROM read_csv_auto('UserBehavior.csv.zip');
  1. 建立 Label(Ranking 的答案) 買過些甚麼東西

買 = 1 其他 = 0

CREATE TABLE label_table AS
SELECT
    user_id,
    product_id,
    MAX(CASE WHEN action='buy' THEN 1 ELSE 0 END) AS label
FROM logs_clean
GROUP BY user_id, product_id;
  1. User Feature 使用者購買率
CREATE TABLE user_features AS
SELECT
    user_id,
    COUNT(*) AS user_actions,
    SUM(action='buy') AS user_buy_cnt,
    SUM(action='buy') * 1.0 / COUNT(*) AS user_buy_rate
FROM logs_clean
GROUP BY user_id;
  1. Product Feature 商品熱門度
CREATE TABLE product_features AS
SELECT
    product_id,
    COUNT(*) AS product_popularity,
    SUM(action='buy') * 1.0 / COUNT(*) AS product_buy_rate
FROM logs_clean
GROUP BY product_id;
  1. User × Product Feature 使用者看過幾次商品
CREATE TABLE interaction_features AS
SELECT
    user_id,
    product_id,
    COUNT(*) AS view_cnt,
    MAX(ts) AS last_view_ts
FROM logs_clean
GROUP BY user_id, product_id;
  1. 合併
CREATE TABLE training_table AS
SELECT
    l.user_id,
    l.product_id,
    u.user_buy_rate,
    p.product_popularity,
    i.view_cnt,
    l.label
FROM label_table l
JOIN user_features u USING(user_id)
JOIN product_features p USING(product_id)
JOIN interaction_features i USING(user_id, product_id);
  1. 看一下
select * from training_table limit 40;

使用者id , 商品 id , 使用者購買率,商品受歡迎程度,看過幾次 ,有沒有買過

user_idproduct_iduser_buy_rateproduct_popularityview_cntlabel
75205533619950.0068027210884353742210
75206035398130.0510
75208039751410.00843881856540084330610
75208020806900.008438818565400843133310
75210243424670.016393442622950824110
75210333192440.01515151515151515233420
75215030346960.012658227848101266453810
75215129989470.0330
75221728779020.02110
75223148536260.0108108108108108118510
75226940244090.0403410
75230723660140.03076923076923077205710
75233912695790.023809523809523808336210
75235919569050.0188679245283018863110
75236434639790.026410
75236826758590.067120
7523719537700.03846153846153846478110
7523733484460.03846153846153846413710
  1. 匯出
COPY (select * from training_table)
TO 'training.parquet'
(FORMAT PARQUET);

訓練模型

main.py

import duckdb
import pandas as pd
from lightgbm import LGBMClassifier
import joblib

print("正在從 Parquet 讀取資料...")
df = duckdb.read_parquet("training.parquet").df()

# ===============================
# Feature / Label
# ===============================
X = df.drop(columns=["label"])
y = df["label"]

print(f"開始訓練模型... 總資料量: {len(df)} 筆")

model = LGBMClassifier(n_estimators=100)
model.fit(X, y)

# ===============================
# ✅ 儲存模型
# ===============================
joblib.dump(model, "lgbm_model.pkl")

# 產出需要的欄位順序
joblib.dump(X.columns.tolist(), "features.pkl")

print("✅ model.pkl + features.pkl 已儲存")
duckdb==1.4.4
joblib==1.5.3
lightgbm==4.6.0
numpy==2.4.2
pandas==3.0.1
python-dateutil==2.9.0.post0
scikit-learn==1.8.0
scipy==1.17.1
six==1.17.0
threadpoolctl==3.6.0
tzdata==2025.3
uv pip install -r requirements.txt

如何測試模型

test.py

import joblib
import pandas as pd
import duckdb

# 1. 載入練好的大腦 (模型)
model_path = "./lgbm_model.pkl"
model = joblib.load(model_path)
print(f"✅ 已載入模型: {model_path}")

# 2. 讀取需要預測的新資料 (這裡假設是 new_data.parquet)
# 注意:新資料的欄位名稱和順序,必須跟訓練時的 X 一模一樣
print("🚀 讀取待預測資料...")
new_df = duckdb.read_parquet("./training.parquet").df()

# 如果有 label 欄位要先去掉,只留特徵
X_new = new_df.drop(columns=["label"]) if "label" in new_df.columns else new_df

# 3. 執行預測
# predict() 會直接給 0 或 1
predictions = model.predict(X_new)

# predict_proba() 會給機率 (例如:0.98 表示非常有可能是 1)
probabilities = model.predict_proba(X_new)[:, 1]

# 4. 將結果合併回原始資料並儲存
new_df['prediction'] = predictions
new_df['score'] = probabilities

print("📊 預測完成!前 5 筆結果:")
print(new_df[['prediction', 'score']].head())

# 5. 匯出結果
new_df.to_csv("predictions_result.csv", index=False)


print("💾 預測結果已存至 predictions_result.csv")

threshold = 0.01  # 只要機率大於 1%,我們就視為潛在客戶 就可以給那個人 看這個品
new_df['potential_buyer'] = (new_df['score'] > threshold).astype(int)

top_potential = new_df.sort_values('score', ascending=False).head(100)

new_df.to_csv("top_potential.csv", index=False)


print("💾 預測結果已存至 top_potential.csv")
uv run test.py

看資料

head -n 10 top_potential.csv 
user_id,product_id,user_buy_rate,product_popularity,view_cnt,label,prediction,score,potential_buyer
752055,3361995,0.006802721088435374,22,1,0,0,0.001953612870347798,0
752060,3539813,0.0,5,1,0,0,1.183486612355564e-06,0
752080,3975141,0.008438818565400843,306,1,0,0,0.0023560931749064197,0
752080,2080690,0.008438818565400843,1333,1,0,0,0.0023220314090510253,0
752102,4342467,0.01639344262295082,41,1,0,0,0.005052367426943275,0
752103,3319244,0.015151515151515152,334,2,0,0,0.03660798523751377,1
752150,3034696,0.012658227848101266,4538,1,0,0,0.0018015359387412201,0
752151,2998947,0.0,3,3,0,0,1.160701650510975e-06,0
752217,2877902,0.0,21,1,0,0,1.183486612355564e-06,0

使用faspi 掛載起模型 執行推薦api

 uv pip install "fastapi[standard]"

檔案名稱 myapp.py 不要叫 fastapi.py 會壞掉喔

# import joblib

# model = joblib.load("./lgbm_model.pkl")

# print("模型已成功載入!")
# print(type(model))

from fastapi import FastAPI, HTTPException
import joblib
import duckdb
import pandas as pd
from lightgbm import LGBMClassifier

# ===============================
# 1️⃣ 啟動時載入模型(只載一次)
# ===============================
print("🚀 Loading model...")

MODEL_PATH = "./lgbm_model.pkl"
FEATURE_PATH = "./features.pkl"
DATA_PATH = "./training.parquet"

model = joblib.load(MODEL_PATH)
feature_columns = joblib.load(FEATURE_PATH)
df = duckdb.read_parquet(DATA_PATH).df()

print("✅ Model Ready")

# ===============================
# 2️⃣ 建立 FastAPI
# ===============================
app = FastAPI(
    title="LightGBM Recommendation API",
    version="1.0"
)

# ===============================
# 3️⃣ Health Check
# ===============================
@app.get("/health")
def health():
    return {"status": "ok"}

# ===============================
# 4️⃣ 取得使用者特徵
# ===============================
def load_feature_by_vid(vid: str):
    user_df = df[df["user_id"] == int(vid)].copy()
    return user_df


# ===============================
# 5️⃣ 排序推薦
# ===============================
def rank_items(feature_df):

    if feature_df.empty:
        return feature_df

    # 只拿模型需要的欄位
    X = feature_df[feature_columns]

    # LightGBM 預測機率
    feature_df["score"] = model.predict_proba(X)[:, 1]

    ranked = feature_df.sort_values(
        "score",
        ascending=False
    )

    return ranked


# ===============================
# 6️⃣ 推薦 API
# ===============================
@app.get("/recommend/{vid}")
def recommend(vid: str, top_k: int = 20):

    feature_df = load_feature_by_vid(vid)

    if feature_df.empty:
        raise HTTPException(
            status_code=404,
            detail="User not found"
        )

    ranked = rank_items(feature_df)

    result = ranked.head(top_k)[
        ["user_id", "product_id", "score"]
    ].to_dict("records")

    return {
        "vid": vid,
        "top_k": top_k,
        "recommendations": result
    }
uvicorn myapp:app  --reload  --host 0.0.0.0 --port 9999
curl -X 'GET''http://localhost:9999/recommend/752055?top_k=2' -H 'accept: application/json'

結果

{
  "vid": "752055",
  "top_k": 2,
  "recommendations": [
    {
      "user_id": 752055,
      "product_id": 4422413,
      "score": 0.19915600225401725
    },
    {
      "user_id": 752055,
      "product_id": 4158852,
      "score": 0.0972484069020572
    }
  ]

小結論

但請求多次 都還是會是一樣的結果

在真正的 推薦系統 如果這樣 讓客戶一直看到 一樣的東西 應該會被哭哭

所以大概就是 多取個幾個正相關的幾品 然後 打亂重排 再給客戶看吧

這樣在local的請求耗時 大概六十毫秒 效率該是不錯的

但是 是不是真的的有效 可能要再驗證

優化

因為 之前只是找出最相關的品 推送給客戶

但如 小結說的 推薦都看一樣的一定會被哭哭

所以我們再增加一路 是 同品類最可能有興趣 然後在過濾分數後 在打亂排序給客戶看

於第一步標籤 Label(Ranking 的答案) 要增加 catalog_id ( 再走一輪 重新刷出 比對資料)

CREATE TABLE label_table AS
SELECT
    user_id,
    product_id,
    catalog_id
    MAX(CASE WHEN action='buy' THEN 1 ELSE 0 END) AS label
FROM logs_clean
GROUP BY user_id, product_id,catalog_id;

traning_table_ve2 parquet

CREATE TABLE training_table_v2 AS
SELECT
    l.user_id,
    l.product_id,
    l.catalog_id as catalog_id, 
    u.user_buy_rate,
    p.product_popularity,
    i.view_cnt,
    l.label
FROM label_table l
JOIN user_features u USING(user_id)
JOIN product_features p USING(product_id)
JOIN interaction_features i USING(user_id, product_id)
GROUP BY ALL

輸出

COPY (SELECT * FROM training_table_v2) TO 'training_v2.parquet' (FORMAT PARQUET);

程式改寫

# import joblib

# model = joblib.load("./lgbm_model.pkl")

# print("模型已成功載入!")
# print(type(model))

from fastapi import FastAPI, HTTPException
import joblib
import duckdb
import pandas as pd
from lightgbm import LGBMClassifier

# ===============================
# 1️⃣ 啟動時載入模型(只載一次)
# ===============================
print("🚀 Loading model...")

MODEL_PATH = "./lgbm_model.pkl"
FEATURE_PATH = "./features.pkl"
DATA_PATH = "./training_v2.parquet"
CATEGORY_PATH= "./category_top.parquet"
model = joblib.load(MODEL_PATH)
feature_columns = joblib.load(FEATURE_PATH)
df = duckdb.read_parquet(DATA_PATH).df()
df_cat_top = duckdb.read_parquet(CATEGORY_PATH).df() # 類目熱門表
print("✅ Model Ready")

# ===============================
# 2️⃣ 建立 FastAPI
# ===============================
app = FastAPI(
    title="LightGBM Recommendation API",
    version="1.0"
)

# ===============================
# 3️⃣ Health Check
# ===============================
@app.get("/health")
def health():
    return {"status": "ok"}

# ===============================
# 4️⃣ 取得使用者特徵
# ===============================
def load_feature_by_vid(vid: str):



    user_df = df[df["user_id"] == int(vid)].copy()

    return user_df


# ===============================
# 5️⃣ 排序推薦
# ===============================
def rank_items(feature_df):

    if feature_df.empty:
        return feature_df

    # 只拿模型需要的欄位
    X = feature_df[feature_columns]

    # LightGBM 預測機率
    feature_df["score"] = model.predict_proba(X)[:, 1]

    ranked = feature_df.sort_values(
        "score",
        ascending=False
    )

    return ranked
# ===============================
# 4️⃣ 取得召回清單 (Retrieval)
# ===============================
def get_candidates(user_id: int):
    # 路徑 A: 歷史行為召回 (原本的做法)
    hist_df = df[df["user_id"] == user_id].copy()
    
    # 路徑 B: 類目熱門召回 (新做法)
    # 先找出這個人看過哪些類目
    user_cats = hist_df["catalog_id"].unique()
    print(f"使用者 {user_id} 看過的類目: {user_cats}")
    
    # 從類目熱門表中撈出這些類目的 Top 商品 (假設我們有 df_cat_top)
    cat_recall = df_cat_top[df_cat_top["catalog_id"].isin(user_cats)].copy()
    print(f"從類目熱門表中找到 {len(cat_recall)} 個候選商品")
    
    # 為了讓 LightGBM 能跑,我們要幫「新招回」的商品補上特徵
    # 實務上這裡會去查 product_features 表,這裡先簡化用熱門度填充
    new_candidates = []
    for _, row in cat_recall.iterrows():
        pid = row["product_id"]
        # 如果這個商品使用者沒看過,就補一張「虛擬」特徵票給它
        if pid not in hist_df["product_id"].values:
            new_candidates.append({
                "user_id": user_id,
                "product_id": pid,
                "user_buy_rate": hist_df["user_buy_rate"].mean() if not hist_df.empty else 0,
                "product_popularity": row["pop_score"],
                "view_cnt": 0,  # 沒看過,所以是 0
            })
    
    new_df = pd.DataFrame(new_candidates)
    print(f"為 {len(new_df)} 個新候選商品補上特徵")
    
    # 合併:舊的(看過的) + 新的(召回的)
    full_df = pd.concat([hist_df, new_df], ignore_index=True).drop_duplicates(subset=['product_id'])
    return full_df

# ===============================
# 6️⃣ 推薦 API (更新後)
# ===============================
@app.get("/recommend/{vid}")
def recommend(vid: str, top_k: int = 20):
    user_id = int(vid)
    
    # --- 1. 執行多路召回 ---
    # 現在 feature_df 裡面包含了「看過的」跟「沒看過但同類別很火的」
    feature_df = get_candidates(user_id)

    print(f"使用者 {user_id} 的候選清單長這樣: feature_df.shape = {feature_df.shape} , first 5 rows = \n{feature_df.head()}")
    if feature_df.empty:
        # 如果真的是全新用戶,就給全站最熱門的商品(冷啟動處理)
        feature_df = get_global_hot_items(user_id)


    # --- 2. 執行排序 (原本的 LightGBM 邏輯) ---
    ranked = rank_items(feature_df)

    # --- 3. 過濾 score > 0.05的商品後 全部打亂排序 
    ranked = ranked[ranked["score"] > 0.05].sample(frac=1).reset_index(drop=True)
    
    print(f"使用者 {user_id} 的推薦結果: len = {len(ranked)}")
    result = ranked.head(top_k)[
        ["user_id", "product_id", "score"]
    ].to_dict("records")

    return {
        "vid": user_id,
        "top_k": top_k,
        "recommendations": result
    }

result

就會拿到 不同結果 且更逼真

{
  "vid": 752231,
  "top_k": 5,
  "recommendations": [
    {
      "user_id": 752231,
      "product_id": 4488036,
      "score": 0.07542575804187057
    },
    {
      "user_id": 752231,
      "product_id": 2438528,
      "score": 0.3351618043839475
    },
    {
      "user_id": 752231,
      "product_id": 4051195,
      "score": 0.0744589905811745
    },
    {
      "user_id": 752231,
      "product_id": 4200647,
      "score": 0.13268716248683052
    },
    {
      "user_id": 752231,
      "product_id": 998534,
      "score": 0.07501561949841754
    }
  ]
}

ref