利用淘寶公開數據練習開發推薦模型使用fastapi,lightgbm,duckdb實作推薦api
最近看到 有淘寶(taobao) 有公開數據一億筆資料
想做練習看看 如何做到推薦模型
git repo
資料結構觀察
-
資料樣式
筆數 100,150,808
1,2268318,2520377,pv,1511544070
1,2333346,2520771,pv,1511561733
1,2576651,149192,pv,1511572885
1,3830808,4181361,pv,1511593493
1,4365585,2520377,pv,1511596146
1,4606018,2735466,pv,1511616481
1,230380,411153,pv,1511644942
1,3827899,2920476,pv,1511713473
1,3745169,2891509,pv,1511725471
1,1531036,2920476,pv,1511733732
1,2266567,4145813,pv,1511741471
1,2951368,1080785,pv,1511750828
- 來自 taobao 網站的說明
列名称 说明
用户ID 整数类型,序列化后的用户ID
商品ID 整数类型,序列化后的商品ID
商品类目ID 整数类型,序列化后的商品所属类目ID
行为类型 字符串,枚举类型,包括('pv', 'buy', 'cart', 'fav')
时间戳 行为发生的时间戳
利用 duckdb 分析 產出 訓練資料(做特徵)
-
加入csv title
大檔案 3GB多 於是使用 vi 將 csv header 加上 (雖然還是跑得很慢
user_id,product_id,catalog_id,action,ts
- 載入資料到duckdb
CREATE TABLE logs AS
SELECT *
FROM read_csv_auto('UserBehavior.csv.zip');
- 建立 Label(Ranking 的答案) 買過些甚麼東西
買 = 1 其他 = 0
CREATE TABLE label_table AS
SELECT
user_id,
product_id,
MAX(CASE WHEN action='buy' THEN 1 ELSE 0 END) AS label
FROM logs_clean
GROUP BY user_id, product_id;
- User Feature 使用者購買率
CREATE TABLE user_features AS
SELECT
user_id,
COUNT(*) AS user_actions,
SUM(action='buy') AS user_buy_cnt,
SUM(action='buy') * 1.0 / COUNT(*) AS user_buy_rate
FROM logs_clean
GROUP BY user_id;
- Product Feature 商品熱門度
CREATE TABLE product_features AS
SELECT
product_id,
COUNT(*) AS product_popularity,
SUM(action='buy') * 1.0 / COUNT(*) AS product_buy_rate
FROM logs_clean
GROUP BY product_id;
- User × Product Feature 使用者看過幾次商品
CREATE TABLE interaction_features AS
SELECT
user_id,
product_id,
COUNT(*) AS view_cnt,
MAX(ts) AS last_view_ts
FROM logs_clean
GROUP BY user_id, product_id;
- 合併
CREATE TABLE training_table AS
SELECT
l.user_id,
l.product_id,
u.user_buy_rate,
p.product_popularity,
i.view_cnt,
l.label
FROM label_table l
JOIN user_features u USING(user_id)
JOIN product_features p USING(product_id)
JOIN interaction_features i USING(user_id, product_id);
- 看一下
select * from training_table limit 40;
- preview
使用者id , 商品 id , 使用者購買率,商品受歡迎程度,看過幾次 ,有沒有買過
| user_id | product_id | user_buy_rate | product_popularity | view_cnt | label |
|---|---|---|---|---|---|
| 752055 | 3361995 | 0.006802721088435374 | 22 | 1 | 0 |
| 752060 | 3539813 | 0.0 | 5 | 1 | 0 |
| 752080 | 3975141 | 0.008438818565400843 | 306 | 1 | 0 |
| 752080 | 2080690 | 0.008438818565400843 | 1333 | 1 | 0 |
| 752102 | 4342467 | 0.01639344262295082 | 41 | 1 | 0 |
| 752103 | 3319244 | 0.015151515151515152 | 334 | 2 | 0 |
| 752150 | 3034696 | 0.012658227848101266 | 4538 | 1 | 0 |
| 752151 | 2998947 | 0.0 | 3 | 3 | 0 |
| 752217 | 2877902 | 0.0 | 21 | 1 | 0 |
| 752231 | 4853626 | 0.010810810810810811 | 85 | 1 | 0 |
| 752269 | 4024409 | 0.0 | 4034 | 1 | 0 |
| 752307 | 2366014 | 0.03076923076923077 | 2057 | 1 | 0 |
| 752339 | 1269579 | 0.023809523809523808 | 3362 | 1 | 0 |
| 752359 | 1956905 | 0.018867924528301886 | 31 | 1 | 0 |
| 752364 | 3463979 | 0.0 | 264 | 1 | 0 |
| 752368 | 2675859 | 0.0 | 671 | 2 | 0 |
| 75237 | 1953770 | 0.038461538461538464 | 781 | 1 | 0 |
| 75237 | 3348446 | 0.038461538461538464 | 137 | 1 | 0 |
- 匯出
COPY (select * from training_table)
TO 'training.parquet'
(FORMAT PARQUET);
訓練模型
- 使用 uv
main.py
import duckdb
import pandas as pd
from lightgbm import LGBMClassifier
import joblib
print("正在從 Parquet 讀取資料...")
df = duckdb.read_parquet("training.parquet").df()
# ===============================
# Feature / Label
# ===============================
X = df.drop(columns=["label"])
y = df["label"]
print(f"開始訓練模型... 總資料量: {len(df)} 筆")
model = LGBMClassifier(n_estimators=100)
model.fit(X, y)
# ===============================
# ✅ 儲存模型
# ===============================
joblib.dump(model, "lgbm_model.pkl")
# 產出需要的欄位順序
joblib.dump(X.columns.tolist(), "features.pkl")
print("✅ model.pkl + features.pkl 已儲存")
duckdb==1.4.4
joblib==1.5.3
lightgbm==4.6.0
numpy==2.4.2
pandas==3.0.1
python-dateutil==2.9.0.post0
scikit-learn==1.8.0
scipy==1.17.1
six==1.17.0
threadpoolctl==3.6.0
tzdata==2025.3
uv pip install -r requirements.txt
如何測試模型
test.py
import joblib
import pandas as pd
import duckdb
# 1. 載入練好的大腦 (模型)
model_path = "./lgbm_model.pkl"
model = joblib.load(model_path)
print(f"✅ 已載入模型: {model_path}")
# 2. 讀取需要預測的新資料 (這裡假設是 new_data.parquet)
# 注意:新資料的欄位名稱和順序,必須跟訓練時的 X 一模一樣
print("🚀 讀取待預測資料...")
new_df = duckdb.read_parquet("./training.parquet").df()
# 如果有 label 欄位要先去掉,只留特徵
X_new = new_df.drop(columns=["label"]) if "label" in new_df.columns else new_df
# 3. 執行預測
# predict() 會直接給 0 或 1
predictions = model.predict(X_new)
# predict_proba() 會給機率 (例如:0.98 表示非常有可能是 1)
probabilities = model.predict_proba(X_new)[:, 1]
# 4. 將結果合併回原始資料並儲存
new_df['prediction'] = predictions
new_df['score'] = probabilities
print("📊 預測完成!前 5 筆結果:")
print(new_df[['prediction', 'score']].head())
# 5. 匯出結果
new_df.to_csv("predictions_result.csv", index=False)
print("💾 預測結果已存至 predictions_result.csv")
threshold = 0.01 # 只要機率大於 1%,我們就視為潛在客戶 就可以給那個人 看這個品
new_df['potential_buyer'] = (new_df['score'] > threshold).astype(int)
top_potential = new_df.sort_values('score', ascending=False).head(100)
new_df.to_csv("top_potential.csv", index=False)
print("💾 預測結果已存至 top_potential.csv")
uv run test.py
看資料
head -n 10 top_potential.csv
user_id,product_id,user_buy_rate,product_popularity,view_cnt,label,prediction,score,potential_buyer
752055,3361995,0.006802721088435374,22,1,0,0,0.001953612870347798,0
752060,3539813,0.0,5,1,0,0,1.183486612355564e-06,0
752080,3975141,0.008438818565400843,306,1,0,0,0.0023560931749064197,0
752080,2080690,0.008438818565400843,1333,1,0,0,0.0023220314090510253,0
752102,4342467,0.01639344262295082,41,1,0,0,0.005052367426943275,0
752103,3319244,0.015151515151515152,334,2,0,0,0.03660798523751377,1
752150,3034696,0.012658227848101266,4538,1,0,0,0.0018015359387412201,0
752151,2998947,0.0,3,3,0,0,1.160701650510975e-06,0
752217,2877902,0.0,21,1,0,0,1.183486612355564e-06,0
使用faspi 掛載起模型 執行推薦api
- install fastapi
uv pip install "fastapi[standard]"
檔案名稱 myapp.py 不要叫 fastapi.py 會壞掉喔
# import joblib
# model = joblib.load("./lgbm_model.pkl")
# print("模型已成功載入!")
# print(type(model))
from fastapi import FastAPI, HTTPException
import joblib
import duckdb
import pandas as pd
from lightgbm import LGBMClassifier
# ===============================
# 1️⃣ 啟動時載入模型(只載一次)
# ===============================
print("🚀 Loading model...")
MODEL_PATH = "./lgbm_model.pkl"
FEATURE_PATH = "./features.pkl"
DATA_PATH = "./training.parquet"
model = joblib.load(MODEL_PATH)
feature_columns = joblib.load(FEATURE_PATH)
df = duckdb.read_parquet(DATA_PATH).df()
print("✅ Model Ready")
# ===============================
# 2️⃣ 建立 FastAPI
# ===============================
app = FastAPI(
title="LightGBM Recommendation API",
version="1.0"
)
# ===============================
# 3️⃣ Health Check
# ===============================
@app.get("/health")
def health():
return {"status": "ok"}
# ===============================
# 4️⃣ 取得使用者特徵
# ===============================
def load_feature_by_vid(vid: str):
user_df = df[df["user_id"] == int(vid)].copy()
return user_df
# ===============================
# 5️⃣ 排序推薦
# ===============================
def rank_items(feature_df):
if feature_df.empty:
return feature_df
# 只拿模型需要的欄位
X = feature_df[feature_columns]
# LightGBM 預測機率
feature_df["score"] = model.predict_proba(X)[:, 1]
ranked = feature_df.sort_values(
"score",
ascending=False
)
return ranked
# ===============================
# 6️⃣ 推薦 API
# ===============================
@app.get("/recommend/{vid}")
def recommend(vid: str, top_k: int = 20):
feature_df = load_feature_by_vid(vid)
if feature_df.empty:
raise HTTPException(
status_code=404,
detail="User not found"
)
ranked = rank_items(feature_df)
result = ranked.head(top_k)[
["user_id", "product_id", "score"]
].to_dict("records")
return {
"vid": vid,
"top_k": top_k,
"recommendations": result
}
- 執行
uvicorn myapp:app --reload --host 0.0.0.0 --port 9999
- test
curl -X 'GET''http://localhost:9999/recommend/752055?top_k=2' -H 'accept: application/json'
結果
{
"vid": "752055",
"top_k": 2,
"recommendations": [
{
"user_id": 752055,
"product_id": 4422413,
"score": 0.19915600225401725
},
{
"user_id": 752055,
"product_id": 4158852,
"score": 0.0972484069020572
}
]
小結論
但請求多次 都還是會是一樣的結果
在真正的 推薦系統 如果這樣 讓客戶一直看到 一樣的東西 應該會被哭哭
所以大概就是 多取個幾個正相關的幾品 然後 打亂重排 再給客戶看吧
這樣在local的請求耗時 大概六十毫秒 效率該是不錯的
但是 是不是真的的有效 可能要再驗證
優化
因為 之前只是找出最相關的品 推送給客戶
但如 小結說的 推薦都看一樣的一定會被哭哭
所以我們再增加一路 是 同品類最可能有興趣 然後在過濾分數後 在打亂排序給客戶看
於第一步標籤 Label(Ranking 的答案) 要增加 catalog_id ( 再走一輪 重新刷出 比對資料)
CREATE TABLE label_table AS
SELECT
user_id,
product_id,
catalog_id
MAX(CASE WHEN action='buy' THEN 1 ELSE 0 END) AS label
FROM logs_clean
GROUP BY user_id, product_id,catalog_id;
traning_table_ve2 parquet
CREATE TABLE training_table_v2 AS
SELECT
l.user_id,
l.product_id,
l.catalog_id as catalog_id,
u.user_buy_rate,
p.product_popularity,
i.view_cnt,
l.label
FROM label_table l
JOIN user_features u USING(user_id)
JOIN product_features p USING(product_id)
JOIN interaction_features i USING(user_id, product_id)
GROUP BY ALL
輸出
COPY (SELECT * FROM training_table_v2) TO 'training_v2.parquet' (FORMAT PARQUET);
程式改寫
# import joblib
# model = joblib.load("./lgbm_model.pkl")
# print("模型已成功載入!")
# print(type(model))
from fastapi import FastAPI, HTTPException
import joblib
import duckdb
import pandas as pd
from lightgbm import LGBMClassifier
# ===============================
# 1️⃣ 啟動時載入模型(只載一次)
# ===============================
print("🚀 Loading model...")
MODEL_PATH = "./lgbm_model.pkl"
FEATURE_PATH = "./features.pkl"
DATA_PATH = "./training_v2.parquet"
CATEGORY_PATH= "./category_top.parquet"
model = joblib.load(MODEL_PATH)
feature_columns = joblib.load(FEATURE_PATH)
df = duckdb.read_parquet(DATA_PATH).df()
df_cat_top = duckdb.read_parquet(CATEGORY_PATH).df() # 類目熱門表
print("✅ Model Ready")
# ===============================
# 2️⃣ 建立 FastAPI
# ===============================
app = FastAPI(
title="LightGBM Recommendation API",
version="1.0"
)
# ===============================
# 3️⃣ Health Check
# ===============================
@app.get("/health")
def health():
return {"status": "ok"}
# ===============================
# 4️⃣ 取得使用者特徵
# ===============================
def load_feature_by_vid(vid: str):
user_df = df[df["user_id"] == int(vid)].copy()
return user_df
# ===============================
# 5️⃣ 排序推薦
# ===============================
def rank_items(feature_df):
if feature_df.empty:
return feature_df
# 只拿模型需要的欄位
X = feature_df[feature_columns]
# LightGBM 預測機率
feature_df["score"] = model.predict_proba(X)[:, 1]
ranked = feature_df.sort_values(
"score",
ascending=False
)
return ranked
# ===============================
# 4️⃣ 取得召回清單 (Retrieval)
# ===============================
def get_candidates(user_id: int):
# 路徑 A: 歷史行為召回 (原本的做法)
hist_df = df[df["user_id"] == user_id].copy()
# 路徑 B: 類目熱門召回 (新做法)
# 先找出這個人看過哪些類目
user_cats = hist_df["catalog_id"].unique()
print(f"使用者 {user_id} 看過的類目: {user_cats}")
# 從類目熱門表中撈出這些類目的 Top 商品 (假設我們有 df_cat_top)
cat_recall = df_cat_top[df_cat_top["catalog_id"].isin(user_cats)].copy()
print(f"從類目熱門表中找到 {len(cat_recall)} 個候選商品")
# 為了讓 LightGBM 能跑,我們要幫「新招回」的商品補上特徵
# 實務上這裡會去查 product_features 表,這裡先簡化用熱門度填充
new_candidates = []
for _, row in cat_recall.iterrows():
pid = row["product_id"]
# 如果這個商品使用者沒看過,就補一張「虛擬」特徵票給它
if pid not in hist_df["product_id"].values:
new_candidates.append({
"user_id": user_id,
"product_id": pid,
"user_buy_rate": hist_df["user_buy_rate"].mean() if not hist_df.empty else 0,
"product_popularity": row["pop_score"],
"view_cnt": 0, # 沒看過,所以是 0
})
new_df = pd.DataFrame(new_candidates)
print(f"為 {len(new_df)} 個新候選商品補上特徵")
# 合併:舊的(看過的) + 新的(召回的)
full_df = pd.concat([hist_df, new_df], ignore_index=True).drop_duplicates(subset=['product_id'])
return full_df
# ===============================
# 6️⃣ 推薦 API (更新後)
# ===============================
@app.get("/recommend/{vid}")
def recommend(vid: str, top_k: int = 20):
user_id = int(vid)
# --- 1. 執行多路召回 ---
# 現在 feature_df 裡面包含了「看過的」跟「沒看過但同類別很火的」
feature_df = get_candidates(user_id)
print(f"使用者 {user_id} 的候選清單長這樣: feature_df.shape = {feature_df.shape} , first 5 rows = \n{feature_df.head()}")
if feature_df.empty:
# 如果真的是全新用戶,就給全站最熱門的商品(冷啟動處理)
feature_df = get_global_hot_items(user_id)
# --- 2. 執行排序 (原本的 LightGBM 邏輯) ---
ranked = rank_items(feature_df)
# --- 3. 過濾 score > 0.05的商品後 全部打亂排序
ranked = ranked[ranked["score"] > 0.05].sample(frac=1).reset_index(drop=True)
print(f"使用者 {user_id} 的推薦結果: len = {len(ranked)}")
result = ranked.head(top_k)[
["user_id", "product_id", "score"]
].to_dict("records")
return {
"vid": user_id,
"top_k": top_k,
"recommendations": result
}
result
就會拿到 不同結果 且更逼真
{
"vid": 752231,
"top_k": 5,
"recommendations": [
{
"user_id": 752231,
"product_id": 4488036,
"score": 0.07542575804187057
},
{
"user_id": 752231,
"product_id": 2438528,
"score": 0.3351618043839475
},
{
"user_id": 752231,
"product_id": 4051195,
"score": 0.0744589905811745
},
{
"user_id": 752231,
"product_id": 4200647,
"score": 0.13268716248683052
},
{
"user_id": 752231,
"product_id": 998534,
"score": 0.07501561949841754
}
]
}