競馬AI予測システム設計:データ収集から機械学習モデルまで完全ガイド

公開日: 2026-01-25 20分 / 4150文字 機械学習Pythonデータ分析システム設計AI競馬

はじめに:競馬予測の課題と可能性

競馬は多くの変数が絡み合う複雑な予測問題です。単純なオッズ追従では長期的な収益は見込めません。本記事では、機械学習を活用した競馬予測システムの設計を段階的に解説します。

競馬予測の難しさ

graph TD A[競馬予測の難しさ] --> B[変数の多さ] A --> C[データの不確実性] A --> D[市場の効率性] A --> E[時系列の複雑さ] B --> B1[馬の能力] B --> B2[騎手の技術] B --> B3[馬場状態] B --> B4[展開・ペース] C --> C1[馬のコンディション] C --> C2[調教の質] C --> C3[当日の気性] D --> D1[オッズに情報が織り込み済み] D --> D2[大量の参加者] E --> E1[馬の成長曲線] E --> E2[距離適性の変化] E --> E3[季節要因]

本システムの目標

指標目標値説明
単勝的中率25%以上人気馬に偏らない予測
回収率80%以上長期的な損失を抑制
複勝的中率50%以上安定した予測精度
期待値馬券100%以上期待値がプラスの馬券を特定

システム全体アーキテクチャ

6層アーキテクチャ

graph TB subgraph "Layer 1: データ収集層" A1[JRA公式データ] A2[netkeiba API] A3[地方競馬データ] A4[オッズデータ] A5[天候・馬場データ] end subgraph "Layer 2: データストア層" B1[(PostgreSQL
マスタデータ)] B2[(TimescaleDB
時系列データ)] B3[(Redis
キャッシュ)] B4[S3/MinIO
生データ] end subgraph "Layer 3: データ処理層" C1[ETLパイプライン] C2[特徴量エンジニアリング] C3[データ品質チェック] end subgraph "Layer 4: モデル層" D1[LightGBM
着順予測] D2[XGBoost
勝率予測] D3[Neural Network
複合予測] D4[アンサンブル
統合モデル] end subgraph "Layer 5: 推論・配信層" E1[リアルタイム推論API] E2[バッチ予測] E3[通知サービス] end subgraph "Layer 6: フロントエンド層" F1[Web Dashboard] F2[モバイルアプリ] F3[LINE Bot] end A1 & A2 & A3 & A4 & A5 --> B4 B4 --> C1 C1 --> B1 & B2 B1 & B2 --> C2 C2 --> C3 C3 --> D1 & D2 & D3 D1 & D2 & D3 --> D4 D4 --> E1 & E2 E1 --> F1 & F2 & F3 E2 --> E3 B3 -.キャッシュ.-> E1

技術スタック

レイヤー技術選定理由
データ収集Python (Scrapy, requests)柔軟なスクレイピング
データストアPostgreSQL + TimescaleDB時系列データに最適
データ処理Apache Airflow + dbt堅牢なETLパイプライン
機械学習LightGBM, XGBoost, PyTorch高精度モデル
推論APIFastAPI + Redis低レイテンシ
フロントエンドNext.jsモダンなUI
インフラDocker + Kubernetesスケーラブル

データモデル設計

エンティティ関連図(ER図)

erDiagram HORSE ||--o{ RACE_RESULT : "出走" JOCKEY ||--o{ RACE_RESULT : "騎乗" TRAINER ||--o{ HORSE : "管理" RACE ||--o{ RACE_RESULT : "含む" RACECOURSE ||--o{ RACE : "開催" HORSE ||--o{ TRAINING : "調教" HORSE }|--|| BLOODLINE : "血統" HORSE { int horse_id PK string name date birth_date string gender int trainer_id FK int bloodline_id FK string color string origin } JOCKEY { int jockey_id PK string name date birth_date string belonging float career_win_rate float career_place_rate } TRAINER { int trainer_id PK string name string stable_location float career_win_rate } RACE { int race_id PK int racecourse_id FK date race_date int race_number string grade int distance string surface string condition int prize_money } RACE_RESULT { int result_id PK int race_id FK int horse_id FK int jockey_id FK int post_position int finish_position float finish_time float last_3f_time float weight float weight_diff float odds int popularity } BLOODLINE { int bloodline_id PK string sire string dam string broodmare_sire string sire_line } TRAINING { int training_id PK int horse_id FK date training_date string course float time string evaluation } RACECOURSE { int racecourse_id PK string name string location int turf_distance_min int turf_distance_max int dirt_distance_min int dirt_distance_max }

主要テーブル定義

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
-- 馬テーブル
CREATE TABLE horses (
    horse_id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    birth_date DATE,
    gender CHAR(1) CHECK (gender IN ('牡', '牝', 'セ')),
    trainer_id INTEGER REFERENCES trainers(trainer_id),
    bloodline_id INTEGER REFERENCES bloodlines(bloodline_id),
    color VARCHAR(20),
    origin VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- レース結果テーブル(時系列データ)
CREATE TABLE race_results (
    result_id SERIAL,
    race_id INTEGER NOT NULL REFERENCES races(race_id),
    horse_id INTEGER NOT NULL REFERENCES horses(horse_id),
    jockey_id INTEGER NOT NULL REFERENCES jockeys(jockey_id),
    post_position SMALLINT,
    finish_position SMALLINT,
    finish_time NUMERIC(6, 1),
    last_3f_time NUMERIC(4, 1),
    weight NUMERIC(4, 1),
    weight_diff NUMERIC(3, 1),
    odds NUMERIC(6, 1),
    popularity SMALLINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (result_id, race_id)
);

-- TimescaleDBでハイパーテーブル化
SELECT create_hypertable('race_results', 'created_at');

-- 血統テーブル
CREATE TABLE bloodlines (
    bloodline_id SERIAL PRIMARY KEY,
    sire VARCHAR(100),           -- 父
    dam VARCHAR(100),            -- 母
    broodmare_sire VARCHAR(100), -- 母父
    sire_line VARCHAR(50),       -- 父系統
    INDEX idx_sire (sire),
    INDEX idx_broodmare_sire (broodmare_sire)
);

特徴量エンジニアリング

特徴量カテゴリ

graph LR A[特徴量] --> B[馬の基本情報] A --> C[過去成績] A --> D[血統] A --> E[騎手・調教師] A --> F[レース条件] A --> G[オッズ・人気] A --> H[調教] B --> B1[年齢] B --> B2[性別] B --> B3[馬体重] B --> B4[体重増減] C --> C1[過去N走の着順] C --> C2[勝率・連対率] C --> C3[距離別成績] C --> C4[馬場別成績] C --> C5[コース別成績] D --> D1[父系スコア] D --> D2[母父系スコア] D --> D3[距離適性遺伝] D --> D4[馬場適性遺伝] E --> E1[騎手勝率] E --> E2[調教師勝率] E --> E3[騎手×馬相性] F --> F1[距離] F --> F2[馬場状態] F --> F3[コース形態] F --> F4[グレード] G --> G1[単勝オッズ] G --> G2[人気順位] G --> G3[オッズ変動] H --> H1[調教タイム] H --> H2[調教評価] H --> H3[調教パターン]

特徴量生成コード

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple

class FeatureEngineer:
    """競馬予測のための特徴量エンジニアリング"""

    def __init__(self, n_past_races: int = 5):
        self.n_past_races = n_past_races

    def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """全特徴量を生成"""
        features = pd.DataFrame(index=df.index)

        # 馬の基本特徴量
        features = pd.concat([
            features,
            self._create_horse_basic_features(df),
            self._create_past_performance_features(df),
            self._create_bloodline_features(df),
            self._create_jockey_trainer_features(df),
            self._create_race_condition_features(df),
            self._create_odds_features(df),
            self._create_training_features(df)
        ], axis=1)

        return features

    def _create_horse_basic_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """馬の基本特徴量"""
        return pd.DataFrame({
            # 年齢(レース日 - 誕生日)
            'age': (df['race_date'] - df['birth_date']).dt.days / 365.25,

            # 性別(ワンホットエンコーディング)
            'is_male': (df['gender'] == '牡').astype(int),
            'is_female': (df['gender'] == '牝').astype(int),
            'is_gelding': (df['gender'] == 'セ').astype(int),

            # 馬体重
            'weight': df['weight'],
            'weight_diff': df['weight_diff'],

            # 体重変動の絶対値(大きな変動は不安要素)
            'weight_diff_abs': df['weight_diff'].abs(),

            # 適正体重からの乖離(馬ごとの平均体重との差)
            'weight_deviation': df.groupby('horse_id')['weight'].transform(
                lambda x: x - x.rolling(10, min_periods=1).mean()
            )
        }, index=df.index)

    def _create_past_performance_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """過去成績の特徴量"""
        features = {}

        # 馬ごとにソートして過去成績を計算
        df_sorted = df.sort_values(['horse_id', 'race_date'])

        for i in range(1, self.n_past_races + 1):
            # 過去i走前の着順
            features[f'finish_pos_{i}'] = df_sorted.groupby('horse_id')['finish_position'].shift(i)

            # 過去i走前の上がり3F
            features[f'last_3f_{i}'] = df_sorted.groupby('horse_id')['last_3f_time'].shift(i)

        # 過去N走の統計
        past_positions = df_sorted.groupby('horse_id')['finish_position'].apply(
            lambda x: x.shift(1).rolling(self.n_past_races, min_periods=1)
        )

        features['avg_finish_pos'] = past_positions.mean()
        features['min_finish_pos'] = past_positions.min()
        features['std_finish_pos'] = past_positions.std()

        # 勝率・連対率・複勝率
        features['win_rate'] = df_sorted.groupby('horse_id').apply(
            lambda x: (x['finish_position'].shift(1) == 1).rolling(20, min_periods=1).mean()
        ).reset_index(level=0, drop=True)

        features['place_rate'] = df_sorted.groupby('horse_id').apply(
            lambda x: (x['finish_position'].shift(1) <= 2).rolling(20, min_periods=1).mean()
        ).reset_index(level=0, drop=True)

        features['show_rate'] = df_sorted.groupby('horse_id').apply(
            lambda x: (x['finish_position'].shift(1) <= 3).rolling(20, min_periods=1).mean()
        ).reset_index(level=0, drop=True)

        # 距離別成績
        features['distance_win_rate'] = self._calculate_condition_rate(
            df_sorted, 'distance_category', 'win'
        )

        # 馬場別成績
        features['surface_win_rate'] = self._calculate_condition_rate(
            df_sorted, 'surface', 'win'
        )

        # コース別成績
        features['course_win_rate'] = self._calculate_condition_rate(
            df_sorted, 'racecourse_id', 'win'
        )

        return pd.DataFrame(features, index=df.index)

    def _calculate_condition_rate(
        self, df: pd.DataFrame, condition_col: str, rate_type: str
    ) -> pd.Series:
        """条件別の成績を計算"""
        if rate_type == 'win':
            threshold = 1
        elif rate_type == 'place':
            threshold = 2
        else:
            threshold = 3

        return df.groupby(['horse_id', condition_col]).apply(
            lambda x: (x['finish_position'].shift(1) <= threshold).rolling(10, min_periods=1).mean()
        ).reset_index(level=[0, 1], drop=True)

    def _create_bloodline_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """血統特徴量"""
        # 父系統別の距離適性スコア(事前に集計したマスタを使用)
        sire_distance_score = self._load_sire_distance_scores()
        broodmare_sire_score = self._load_broodmare_sire_scores()

        return pd.DataFrame({
            # 父系統の距離適性
            'sire_sprint_score': df['sire'].map(sire_distance_score.get('sprint', {})),
            'sire_mile_score': df['sire'].map(sire_distance_score.get('mile', {})),
            'sire_middle_score': df['sire'].map(sire_distance_score.get('middle', {})),
            'sire_long_score': df['sire'].map(sire_distance_score.get('long', {})),

            # 母父系統の馬場適性
            'broodmare_turf_score': df['broodmare_sire'].map(broodmare_sire_score.get('turf', {})),
            'broodmare_dirt_score': df['broodmare_sire'].map(broodmare_sire_score.get('dirt', {})),

            # 父系統の勝率
            'sire_win_rate': df['sire'].map(self._calculate_sire_win_rate(df)),

            # インブリード係数(近親交配度)
            'inbreeding_coefficient': df.apply(self._calculate_inbreeding, axis=1)
        }, index=df.index)

    def _create_jockey_trainer_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """騎手・調教師特徴量"""
        return pd.DataFrame({
            # 騎手の直近成績
            'jockey_win_rate': df.groupby('jockey_id').apply(
                lambda x: (x['finish_position'].shift(1) == 1).rolling(50, min_periods=1).mean()
            ).reset_index(level=0, drop=True),

            'jockey_place_rate': df.groupby('jockey_id').apply(
                lambda x: (x['finish_position'].shift(1) <= 3).rolling(50, min_periods=1).mean()
            ).reset_index(level=0, drop=True),

            # 調教師の直近成績
            'trainer_win_rate': df.groupby('trainer_id').apply(
                lambda x: (x['finish_position'].shift(1) == 1).rolling(50, min_periods=1).mean()
            ).reset_index(level=0, drop=True),

            # 騎手×馬の相性(過去の組み合わせ成績)
            'jockey_horse_chemistry': self._calculate_jockey_horse_chemistry(df),

            # 騎手×コースの成績
            'jockey_course_win_rate': df.groupby(['jockey_id', 'racecourse_id']).apply(
                lambda x: (x['finish_position'].shift(1) == 1).rolling(20, min_periods=1).mean()
            ).reset_index(level=[0, 1], drop=True),

            # 騎手の乗り替わりフラグ
            'jockey_change': (df.groupby('horse_id')['jockey_id'].shift(1) != df['jockey_id']).astype(int)
        }, index=df.index)

    def _create_race_condition_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """レース条件特徴量"""
        return pd.DataFrame({
            # 距離カテゴリ
            'distance': df['distance'],
            'is_sprint': (df['distance'] <= 1400).astype(int),
            'is_mile': ((df['distance'] > 1400) & (df['distance'] <= 1800)).astype(int),
            'is_middle': ((df['distance'] > 1800) & (df['distance'] <= 2200)).astype(int),
            'is_long': (df['distance'] > 2200).astype(int),

            # 馬場
            'is_turf': (df['surface'] == '芝').astype(int),
            'is_dirt': (df['surface'] == 'ダ').astype(int),

            # 馬場状態
            'condition_good': (df['condition'] == '良').astype(int),
            'condition_slightly_heavy': (df['condition'] == '稍重').astype(int),
            'condition_heavy': (df['condition'] == '重').astype(int),
            'condition_bad': (df['condition'] == '不良').astype(int),

            # コース形態
            'is_right_handed': (df['course_direction'] == '右').astype(int),
            'is_left_handed': (df['course_direction'] == '左').astype(int),

            # グレード
            'is_g1': (df['grade'] == 'G1').astype(int),
            'is_g2': (df['grade'] == 'G2').astype(int),
            'is_g3': (df['grade'] == 'G3').astype(int),
            'is_op': (df['grade'] == 'OP').astype(int),

            # 出走頭数
            'num_runners': df['num_runners'],

            # 枠順
            'post_position': df['post_position'],

            # 枠順の有利不利(コース×距離で異なる)
            'post_position_advantage': self._calculate_post_advantage(df)
        }, index=df.index)

    def _create_odds_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """オッズ特徴量"""
        return pd.DataFrame({
            # 単勝オッズ
            'odds': df['odds'],
            'odds_log': np.log1p(df['odds']),

            # 人気順
            'popularity': df['popularity'],

            # オッズと人気の乖離(穴馬指標)
            'odds_popularity_gap': df['odds'].rank() - df['popularity'],

            # オッズの変動(前日比)
            'odds_change': df['odds'] - df['odds_previous_day'],
            'odds_change_rate': (df['odds'] - df['odds_previous_day']) / df['odds_previous_day'],

            # 支持率(オッズから逆算)
            'support_rate': 0.8 / df['odds'],  # 控除率20%を仮定

            # オッズの偏差(そのレース内での相対的な人気)
            'odds_zscore': df.groupby('race_id')['odds'].transform(
                lambda x: (x - x.mean()) / x.std()
            )
        }, index=df.index)

    def _create_training_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """調教特徴量"""
        return pd.DataFrame({
            # 最終調教タイム
            'training_time': df['training_time'],

            # 調教タイムの偏差
            'training_time_deviation': df.groupby('horse_id')['training_time'].transform(
                lambda x: x - x.rolling(5, min_periods=1).mean()
            ),

            # 調教評価(A, B, C → 数値化)
            'training_score': df['training_evaluation'].map({'A': 3, 'B': 2, 'C': 1, 'D': 0}),

            # 調教コース(坂路、ウッド、ポリ等)
            'training_course_hill': (df['training_course'] == '坂路').astype(int),
            'training_course_wood': (df['training_course'] == 'ウッド').astype(int),
            'training_course_poly': (df['training_course'] == 'ポリ').astype(int),

            # 調教本数
            'training_count': df['training_count'],

            # 併せ馬の有無
            'has_companion_training': df['has_companion_training'].astype(int)
        }, index=df.index)

    def _load_sire_distance_scores(self) -> Dict[str, Dict[str, float]]:
        """父系統の距離適性スコアを読み込む(事前に集計)"""
        # 実際はDBまたはファイルから読み込む
        return {
            'sprint': {},
            'mile': {},
            'middle': {},
            'long': {}
        }

    def _load_broodmare_sire_scores(self) -> Dict[str, Dict[str, float]]:
        """母父系統の馬場適性スコアを読み込む"""
        return {
            'turf': {},
            'dirt': {}
        }

    def _calculate_sire_win_rate(self, df: pd.DataFrame) -> Dict[str, float]:
        """父系統別の勝率を計算"""
        return df.groupby('sire').apply(
            lambda x: (x['finish_position'] == 1).mean()
        ).to_dict()

    def _calculate_inbreeding(self, row: pd.Series) -> float:
        """インブリード係数を計算(簡易版)"""
        # 実際は5代血統表から計算
        return 0.0

    def _calculate_jockey_horse_chemistry(self, df: pd.DataFrame) -> pd.Series:
        """騎手と馬の相性スコアを計算"""
        return df.groupby(['jockey_id', 'horse_id']).apply(
            lambda x: (x['finish_position'].shift(1) <= 3).rolling(5, min_periods=1).mean()
        ).reset_index(level=[0, 1], drop=True)

    def _calculate_post_advantage(self, df: pd.DataFrame) -> pd.Series:
        """枠順の有利不利スコア"""
        # コース×距離別の枠順成績を事前に集計
        # 内枠有利、外枠有利などをスコア化
        return pd.Series(0, index=df.index)  # 簡易実装

特徴量の重要度分析

graph TB A[特徴量重要度 Top 15] --> B[1. 過去5走平均着順 0.15] A --> C[2. 騎手勝率 0.12] A --> D[3. オッズ 0.11] A --> E[4. 距離適性スコア 0.09] A --> F[5. 前走着順 0.08] A --> G[6. 調教タイム 0.07] A --> H[7. 馬場適性スコア 0.06] A --> I[8. 枠順有利スコア 0.05] A --> J[9. 馬体重変動 0.05] A --> K[10. 父系スコア 0.04] A --> L[11. 騎手乗り替わり 0.04] A --> M[12. 上がり3F平均 0.04] A --> N[13. 人気順 0.03] A --> O[14. 年齢 0.03] A --> P[15. コース別成績 0.03]

機械学習モデル設計

モデルアーキテクチャ

graph TB subgraph "入力層" A[特徴量
約100次元] end subgraph "モデル層" B1[LightGBM
着順予測] B2[XGBoost
勝率予測] B3[CatBoost
複勝予測] B4[Neural Network
深層学習] end subgraph "アンサンブル層" C1[スタッキング] C2[ブレンディング] end subgraph "出力層" D1[予測着順] D2[勝利確率] D3[複勝確率] D4[期待値スコア] end A --> B1 & B2 & B3 & B4 B1 & B2 & B3 & B4 --> C1 C1 --> C2 C2 --> D1 & D2 & D3 & D4

LightGBMモデル実装

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import lightgbm as lgb
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import log_loss, roc_auc_score
import numpy as np
import pandas as pd
from typing import Tuple, Dict, List

class HorseRacingModel:
    """競馬予測モデル"""

    def __init__(self, model_type: str = 'lightgbm'):
        self.model_type = model_type
        self.models = {}
        self.feature_importance = None

    def train(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        target_type: str = 'win',  # 'win', 'place', 'show'
        n_splits: int = 5
    ) -> Dict[str, float]:
        """時系列を考慮したクロスバリデーションで学習"""

        # 時系列分割
        tscv = TimeSeriesSplit(n_splits=n_splits)

        scores = {
            'train_logloss': [],
            'valid_logloss': [],
            'train_auc': [],
            'valid_auc': []
        }

        # ターゲット変数の作成
        if target_type == 'win':
            y_binary = (y == 1).astype(int)
        elif target_type == 'place':
            y_binary = (y <= 2).astype(int)
        else:  # show
            y_binary = (y <= 3).astype(int)

        fold_models = []

        for fold, (train_idx, valid_idx) in enumerate(tscv.split(X)):
            X_train, X_valid = X.iloc[train_idx], X.iloc[valid_idx]
            y_train, y_valid = y_binary.iloc[train_idx], y_binary.iloc[valid_idx]

            # LightGBMパラメータ
            params = {
                'objective': 'binary',
                'metric': 'binary_logloss',
                'boosting_type': 'gbdt',
                'num_leaves': 31,
                'learning_rate': 0.05,
                'feature_fraction': 0.8,
                'bagging_fraction': 0.8,
                'bagging_freq': 5,
                'verbose': -1,
                'n_jobs': -1,
                'seed': 42
            }

            # データセット作成
            train_data = lgb.Dataset(X_train, label=y_train)
            valid_data = lgb.Dataset(X_valid, label=y_valid, reference=train_data)

            # 学習
            model = lgb.train(
                params,
                train_data,
                num_boost_round=1000,
                valid_sets=[train_data, valid_data],
                valid_names=['train', 'valid'],
                callbacks=[
                    lgb.early_stopping(stopping_rounds=50),
                    lgb.log_evaluation(period=100)
                ]
            )

            fold_models.append(model)

            # 予測
            train_pred = model.predict(X_train)
            valid_pred = model.predict(X_valid)

            # スコア計算
            scores['train_logloss'].append(log_loss(y_train, train_pred))
            scores['valid_logloss'].append(log_loss(y_valid, valid_pred))
            scores['train_auc'].append(roc_auc_score(y_train, train_pred))
            scores['valid_auc'].append(roc_auc_score(y_valid, valid_pred))

            print(f"Fold {fold + 1}: "
                  f"Valid LogLoss = {scores['valid_logloss'][-1]:.4f}, "
                  f"Valid AUC = {scores['valid_auc'][-1]:.4f}")

        # モデル保存
        self.models[target_type] = fold_models

        # 特徴量重要度の集計
        self._calculate_feature_importance(fold_models, X.columns)

        # 平均スコアを返す
        return {
            'mean_train_logloss': np.mean(scores['train_logloss']),
            'mean_valid_logloss': np.mean(scores['valid_logloss']),
            'mean_train_auc': np.mean(scores['train_auc']),
            'mean_valid_auc': np.mean(scores['valid_auc'])
        }

    def predict(
        self,
        X: pd.DataFrame,
        target_type: str = 'win'
    ) -> np.ndarray:
        """予測"""
        if target_type not in self.models:
            raise ValueError(f"Model for {target_type} not trained")

        # 全foldモデルの予測を平均
        predictions = np.zeros(len(X))
        for model in self.models[target_type]:
            predictions += model.predict(X)
        predictions /= len(self.models[target_type])

        return predictions

    def predict_proba(self, X: pd.DataFrame) -> pd.DataFrame:
        """全ての確率を予測"""
        return pd.DataFrame({
            'win_prob': self.predict(X, 'win'),
            'place_prob': self.predict(X, 'place'),
            'show_prob': self.predict(X, 'show')
        }, index=X.index)

    def _calculate_feature_importance(
        self,
        models: List[lgb.Booster],
        feature_names: pd.Index
    ) -> None:
        """特徴量重要度を計算"""
        importance = np.zeros(len(feature_names))

        for model in models:
            importance += model.feature_importance(importance_type='gain')

        importance /= len(models)

        self.feature_importance = pd.DataFrame({
            'feature': feature_names,
            'importance': importance
        }).sort_values('importance', ascending=False)

    def get_feature_importance(self, top_n: int = 20) -> pd.DataFrame:
        """特徴量重要度を取得"""
        if self.feature_importance is None:
            raise ValueError("Model not trained yet")
        return self.feature_importance.head(top_n)

アンサンブルモデル

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
import catboost as cb

class EnsembleModel:
    """アンサンブル予測モデル"""

    def __init__(self):
        self.base_models = {
            'lightgbm': HorseRacingModel('lightgbm'),
            'xgboost': self._create_xgboost_model(),
            'catboost': self._create_catboost_model()
        }
        self.meta_model = LogisticRegression()

    def _create_xgboost_model(self):
        """XGBoostモデル"""
        return xgb.XGBClassifier(
            n_estimators=1000,
            max_depth=6,
            learning_rate=0.05,
            subsample=0.8,
            colsample_bytree=0.8,
            early_stopping_rounds=50,
            eval_metric='logloss',
            random_state=42
        )

    def _create_catboost_model(self):
        """CatBoostモデル"""
        return cb.CatBoostClassifier(
            iterations=1000,
            depth=6,
            learning_rate=0.05,
            loss_function='Logloss',
            early_stopping_rounds=50,
            verbose=100,
            random_state=42
        )

    def train_stacking(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        n_splits: int = 5
    ) -> None:
        """スタッキングで学習"""
        tscv = TimeSeriesSplit(n_splits=n_splits)

        # ベースモデルの予測を格納
        base_predictions = np.zeros((len(X), len(self.base_models)))

        for i, (name, model) in enumerate(self.base_models.items()):
            print(f"Training {name}...")

            fold_preds = np.zeros(len(X))
            fold_count = np.zeros(len(X))

            for train_idx, valid_idx in tscv.split(X):
                X_train, X_valid = X.iloc[train_idx], X.iloc[valid_idx]
                y_train, y_valid = y.iloc[train_idx], y.iloc[valid_idx]

                if name == 'lightgbm':
                    model.train(X_train, y_train)
                    fold_preds[valid_idx] = model.predict(X_valid, 'win')
                else:
                    model.fit(X_train, y_train)
                    fold_preds[valid_idx] = model.predict_proba(X_valid)[:, 1]

                fold_count[valid_idx] += 1

            base_predictions[:, i] = fold_preds / np.maximum(fold_count, 1)

        # メタモデルの学習
        print("Training meta model...")
        self.meta_model.fit(base_predictions, y)

    def predict(self, X: pd.DataFrame) -> np.ndarray:
        """アンサンブル予測"""
        base_predictions = np.zeros((len(X), len(self.base_models)))

        for i, (name, model) in enumerate(self.base_models.items()):
            if name == 'lightgbm':
                base_predictions[:, i] = model.predict(X, 'win')
            else:
                base_predictions[:, i] = model.predict_proba(X)[:, 1]

        return self.meta_model.predict_proba(base_predictions)[:, 1]

モデル評価指標

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def evaluate_model(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    odds: np.ndarray,
    race_ids: np.ndarray
) -> Dict[str, float]:
    """モデルの評価"""

    results = {}

    # 基本指標
    results['auc'] = roc_auc_score(y_true, y_pred)
    results['logloss'] = log_loss(y_true, y_pred)

    # レースごとの的中率
    race_results = []
    for race_id in np.unique(race_ids):
        mask = race_ids == race_id
        race_pred = y_pred[mask]
        race_true = y_true[mask]
        race_odds = odds[mask]

        # 予測1位の的中
        pred_winner_idx = np.argmax(race_pred)
        actual_winner = race_true[pred_winner_idx]

        race_results.append({
            'race_id': race_id,
            'hit': actual_winner,
            'odds': race_odds[pred_winner_idx] if actual_winner else 0
        })

    race_df = pd.DataFrame(race_results)

    # 的中率
    results['hit_rate'] = race_df['hit'].mean()

    # 回収率
    total_bet = len(race_df) * 100  # 1レース100円
    total_return = (race_df['hit'] * race_df['odds'] * 100).sum()
    results['return_rate'] = total_return / total_bet

    # 期待値がプラスの馬券数
    # (予測確率 × オッズ > 1 の馬券)
    expected_values = y_pred * odds
    results['positive_ev_rate'] = (expected_values > 1).mean()

    return results

リアルタイム推論パイプライン

推論フロー

sequenceDiagram participant User as ユーザー participant API as FastAPI participant Cache as Redis Cache participant DB as PostgreSQL participant Model as MLモデル participant Notify as 通知サービス User->>API: レース予測リクエスト API->>Cache: キャッシュ確認 alt キャッシュあり Cache-->>API: キャッシュ済み予測 else キャッシュなし API->>DB: レースデータ取得 DB-->>API: 出走馬情報 API->>API: 特徴量生成 API->>Model: 予測リクエスト Model-->>API: 予測結果 API->>Cache: 結果をキャッシュ end API-->>User: 予測結果 Note over API,Notify: オッズ更新時 API->>Model: 再予測 Model-->>API: 更新された予測 API->>Notify: 大きな変動を通知 Notify-->>User: プッシュ通知

FastAPI実装

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Optional
import redis
import json

app = FastAPI(title="競馬AI予測API")

# Redis接続
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# モデルロード
model = EnsembleModel.load('models/ensemble_v1.pkl')
feature_engineer = FeatureEngineer()

class PredictionRequest(BaseModel):
    race_id: str
    include_analysis: bool = False

class HorsePrediction(BaseModel):
    horse_id: str
    horse_name: str
    win_prob: float
    place_prob: float
    show_prob: float
    expected_value: float
    recommendation: str

class RacePrediction(BaseModel):
    race_id: str
    race_name: str
    predictions: List[HorsePrediction]
    analysis: Optional[str] = None
    updated_at: str

@app.get("/api/v1/predictions/{race_id}", response_model=RacePrediction)
async def get_race_prediction(race_id: str, include_analysis: bool = False):
    """レースの予測を取得"""

    # キャッシュ確認
    cache_key = f"prediction:{race_id}"
    cached = redis_client.get(cache_key)

    if cached:
        return json.loads(cached)

    # DBからレースデータ取得
    race_data = await get_race_data(race_id)
    if not race_data:
        raise HTTPException(status_code=404, detail="Race not found")

    # 特徴量生成
    features = feature_engineer.create_features(race_data)

    # 予測
    predictions = model.predict_proba(features)

    # 期待値計算
    predictions['expected_value'] = predictions['win_prob'] * race_data['odds']

    # レコメンデーション生成
    predictions['recommendation'] = predictions.apply(
        lambda x: generate_recommendation(x), axis=1
    )

    # レスポンス作成
    response = RacePrediction(
        race_id=race_id,
        race_name=race_data['race_name'].iloc[0],
        predictions=[
            HorsePrediction(
                horse_id=row['horse_id'],
                horse_name=row['horse_name'],
                win_prob=row['win_prob'],
                place_prob=row['place_prob'],
                show_prob=row['show_prob'],
                expected_value=row['expected_value'],
                recommendation=row['recommendation']
            )
            for _, row in predictions.iterrows()
        ],
        analysis=generate_analysis(predictions) if include_analysis else None,
        updated_at=datetime.now().isoformat()
    )

    # キャッシュ保存(5分間)
    redis_client.setex(cache_key, 300, json.dumps(response.dict()))

    return response

def generate_recommendation(row: pd.Series) -> str:
    """馬券レコメンデーションを生成"""
    if row['expected_value'] > 1.5 and row['win_prob'] > 0.15:
        return "強く推奨(単勝・馬単軸)"
    elif row['expected_value'] > 1.2 and row['show_prob'] > 0.5:
        return "推奨(複勝・ワイド)"
    elif row['show_prob'] > 0.4:
        return "検討(三連複ヒモ)"
    else:
        return "見送り"

def generate_analysis(predictions: pd.DataFrame) -> str:
    """レース分析を生成"""
    top_horse = predictions.iloc[predictions['win_prob'].argmax()]
    second_horse = predictions.iloc[predictions['win_prob'].argsort()[-2]]

    analysis = f"""
    【レース展望】
    本命: {top_horse['horse_name']}(勝率: {top_horse['win_prob']*100:.1f}%)
    対抗: {second_horse['horse_name']}(勝率: {second_horse['win_prob']*100:.1f}%)

    【期待値分析】
    期待値1.0以上: {(predictions['expected_value'] > 1).sum()}    最高期待値: {predictions['expected_value'].max():.2f}{predictions.iloc[predictions['expected_value'].argmax()]['horse_name']}
    【推奨馬券】
    - 単勝: {top_horse['horse_name']}
    - 馬連: {top_horse['horse_name']} - {second_horse['horse_name']}
    """

    return analysis

モニタリングと継続的改善

モニタリングダッシュボード

graph TB subgraph "リアルタイムモニタリング" A1[予測API応答時間] A2[予測件数/分] A3[キャッシュヒット率] A4[エラー率] end subgraph "予測精度モニタリング" B1[日次的中率] B2[週次回収率] B3[モデル別AUC] B4[特徴量ドリフト] end subgraph "ビジネスモニタリング" C1[ユーザー数] C2[アクティブ率] C3[推奨採用率] C4[ユーザー回収率] end subgraph "アラート" D1[精度低下アラート] D2[システム異常アラート] D3[データ異常アラート] end A1 & A2 & A3 & A4 --> D2 B1 & B2 & B3 & B4 --> D1 B4 --> D3

精度モニタリング実装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import mlflow
from datetime import datetime, timedelta

class ModelMonitor:
    """モデルの精度をモニタリング"""

    def __init__(self, model_name: str, alert_threshold: float = 0.05):
        self.model_name = model_name
        self.alert_threshold = alert_threshold
        self.baseline_metrics = self._load_baseline_metrics()

    def log_prediction(
        self,
        race_id: str,
        predictions: pd.DataFrame,
        actual_results: pd.DataFrame
    ) -> None:
        """予測結果をログ"""
        # 的中判定
        predicted_winner = predictions.iloc[predictions['win_prob'].argmax()]['horse_id']
        actual_winner = actual_results[actual_results['finish_position'] == 1]['horse_id'].iloc[0]
        is_hit = predicted_winner == actual_winner

        # MLflowにログ
        with mlflow.start_run(run_name=f"prediction_{race_id}"):
            mlflow.log_param("race_id", race_id)
            mlflow.log_param("num_runners", len(predictions))
            mlflow.log_metric("hit", int(is_hit))

            if is_hit:
                odds = actual_results[actual_results['horse_id'] == predicted_winner]['odds'].iloc[0]
                mlflow.log_metric("return", odds * 100)
            else:
                mlflow.log_metric("return", 0)

    def calculate_daily_metrics(self, date: datetime) -> Dict[str, float]:
        """日次メトリクスを計算"""
        # MLflowから当日の予測結果を取得
        runs = mlflow.search_runs(
            filter_string=f"params.date = '{date.strftime('%Y-%m-%d')}'"
        )

        if len(runs) == 0:
            return {}

        metrics = {
            'hit_rate': runs['metrics.hit'].mean(),
            'return_rate': runs['metrics.return'].sum() / (len(runs) * 100),
            'num_predictions': len(runs)
        }

        return metrics

    def check_degradation(self, current_metrics: Dict[str, float]) -> List[str]:
        """精度劣化をチェック"""
        alerts = []

        for metric, value in current_metrics.items():
            if metric in self.baseline_metrics:
                baseline = self.baseline_metrics[metric]
                degradation = (baseline - value) / baseline

                if degradation > self.alert_threshold:
                    alerts.append(
                        f"{metric}: {degradation*100:.1f}%劣化 "
                        f"(baseline: {baseline:.3f}, current: {value:.3f})"
                    )

        return alerts

    def detect_feature_drift(
        self,
        current_features: pd.DataFrame,
        reference_features: pd.DataFrame
    ) -> Dict[str, float]:
        """特徴量ドリフトを検出"""
        from scipy.stats import ks_2samp

        drift_scores = {}

        for col in current_features.columns:
            if current_features[col].dtype in ['float64', 'int64']:
                statistic, p_value = ks_2samp(
                    reference_features[col].dropna(),
                    current_features[col].dropna()
                )
                drift_scores[col] = statistic

        return drift_scores

    def _load_baseline_metrics(self) -> Dict[str, float]:
        """ベースラインメトリクスを読み込む"""
        # 過去30日間の平均をベースラインとする
        return {
            'hit_rate': 0.25,
            'return_rate': 0.82
        }

モデル再学習パイプライン

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'horse_racing_model_retrain',
    default_args=default_args,
    description='週次モデル再学習パイプライン',
    schedule_interval='0 3 * * 1',  # 毎週月曜3時
    catchup=False
)

def extract_training_data(**context):
    """学習データを抽出"""
    # 過去2年分のデータを抽出
    end_date = datetime.now()
    start_date = end_date - timedelta(days=730)

    query = f"""
    SELECT * FROM race_results
    WHERE race_date BETWEEN '{start_date}' AND '{end_date}'
    """
    # データ抽出処理...

def create_features(**context):
    """特徴量を生成"""
    # 特徴量エンジニアリング処理...

def train_model(**context):
    """モデルを学習"""
    model = EnsembleModel()
    model.train_stacking(X, y)
    model.save(f"models/ensemble_{datetime.now().strftime('%Y%m%d')}.pkl")

def evaluate_model(**context):
    """モデルを評価"""
    # 直近1ヶ月のデータで評価
    metrics = evaluate_model(y_test, y_pred, odds_test, race_ids_test)

    if metrics['return_rate'] < 0.75:
        raise ValueError(f"回収率が基準以下: {metrics['return_rate']}")

def deploy_model(**context):
    """モデルをデプロイ"""
    # Blue-Greenデプロイ
    # 新モデルをステージング環境にデプロイ
    # A/Bテスト開始

# タスク定義
extract_task = PythonOperator(
    task_id='extract_training_data',
    python_callable=extract_training_data,
    dag=dag
)

feature_task = PythonOperator(
    task_id='create_features',
    python_callable=create_features,
    dag=dag
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag
)

evaluate_task = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    dag=dag
)

deploy_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=dag
)

# 依存関係
extract_task >> feature_task >> train_task >> evaluate_task >> deploy_task

費用対効果の試算

システム運用コスト

項目月額コスト備考
クラウドインフラ¥50,000AWS/GCP 中規模構成
データソース¥10,000API利用料
MLプラットフォーム¥20,000MLflow, モデル管理
モニタリング¥5,000Datadog等
合計¥85,000/月

期待リターン

前提条件:
- 月間予測レース数: 300レース
- 1レースあたり投資額: ¥1,000
- 月間投資総額: ¥300,000

シナリオ別回収率:
┌─────────────────┬──────────┬──────────┬──────────┐
│ シナリオ        │ 回収率   │ 月間収支 │ 年間収支 │
├─────────────────┼──────────┼──────────┼──────────┤
│ 悲観的          │ 75%      │ -¥75,000 │ -¥900,000│
│ 標準的          │ 85%      │ -¥45,000 │ -¥540,000│
│ 楽観的          │ 100%     │ ¥0       │ ¥0       │
│ 理想的          │ 110%     │ +¥30,000 │ +¥360,000│
└─────────────────┴──────────┴──────────┴──────────┘

※ システム運用コスト ¥85,000/月 を考慮すると、
  回収率110%以上が損益分岐点

投資対効果の考え方

graph TB A[競馬AI予測システム] --> B[定量的効果] A --> C[定性的効果] B --> B1[回収率向上
目標: 85%→100%] B --> B2[時間削減
分析時間 80%減] B --> B3[的中率向上
目標: 20%→25%] C --> C1[データドリブンな
意思決定] C --> C2[感情に左右されない
投資判断] C --> C3[学習と改善の
サイクル確立] B1 --> D[ROI計算] B2 --> D B3 --> D

段階的な導入ロードマップ

フェーズ別実装計画

gantt title 競馬AI予測システム導入計画 dateFormat YYYY-MM-DD section Phase 1: MVP データ収集基盤構築 :a1, 2026-02-01, 14d 基本特徴量実装 :a2, after a1, 14d LightGBMモデル構築 :a3, after a2, 14d 簡易予測API開発 :a4, after a3, 7d section Phase 2: 精度向上 特徴量拡充 :b1, after a4, 21d アンサンブルモデル :b2, after b1, 14d バックテスト基盤 :b3, after b2, 7d section Phase 3: 本格運用 リアルタイム推論 :c1, after b3, 14d モニタリング基盤 :c2, after c1, 7d フロントエンド開発 :c3, after c2, 21d section Phase 4: 高度化 ニューラルネット統合 :d1, after c3, 21d 自動再学習パイプライン :d2, after d1, 14d A/Bテスト基盤 :d3, after d2, 14d

各フェーズの目標

フェーズ期間目標回収率主要成果物
Phase 1: MVP2ヶ月75%基本予測モデル、API
Phase 2: 精度向上1.5ヶ月85%アンサンブルモデル
Phase 3: 本格運用1.5ヶ月90%完全なシステム
Phase 4: 高度化2ヶ月95%+自動化・高度化

まとめ

設計のポイント

  1. 段階的なアプローチ

    • 単純なモデルから始めて徐々に複雑化
    • 各段階で効果を検証
  2. データ品質の重視

    • 特徴量エンジニアリングが精度の鍵
    • データパイプラインの堅牢性
  3. 継続的な改善サイクル

    • モニタリングと再学習の自動化
    • A/Bテストによる検証
  4. 現実的な期待値

    • 競馬は不確実性が高い
    • 長期的な視点での評価

今後の発展可能性

  • LLMによる分析: レース展望の自動生成
  • 画像解析: パドック映像からのコンディション判定
  • リアルタイムオッズ分析: オッズ変動からの情報抽出
  • 地方競馬への展開: データソースの拡充

参考資料


更新履歴:

  • 2026-01-25: 初版作成