Complete ML Pipeline
End-to-end examples showing how to build complete machine learning pipelines with Rhoa.
Basic Pipeline
A complete workflow from data to predictions.
Step-by-Step Pipeline
import pandas as pd
import rhoa
from rhoa.targets import generate_target_combinations
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
# Step 1: Load data
df = pd.read_csv('stock_prices.csv')
print(f"Data shape: {df.shape}")
# Step 2: Generate technical indicator features
df['SMA_20'] = df.rhoa.indicators.sma(20)
df['SMA_50'] = df.rhoa.indicators.sma(50)
df['RSI_14'] = df.rhoa.indicators.rsi(14)
df['ATR_14'] = df.rhoa.indicators.atr(window_size=14)
macd = df.rhoa.indicators.macd()
df['MACD'] = macd['macd']
df['MACD_Signal'] = macd['signal']
# Step 3: Generate optimized targets
targets, meta = generate_target_combinations(
df,
mode='auto',
target_class_balance=0.4
)
print(f"\nTarget parameters for method 7:")
print(f" Period: {meta['method_7']['period']}")
print(f" Threshold: {meta['method_7']['threshold']}%")
# Step 4: Prepare ML dataset
ml_df = pd.concat([df, targets], axis=1).dropna()
feature_cols = ['SMA_20', 'SMA_50', 'RSI_14', 'ATR_14', 'MACD', 'MACD_Signal']
X = ml_df[feature_cols]
y = ml_df['Target_7']
print(f"\nML dataset shape: {X.shape}")
print(f"Positive class: {y.sum()} ({y.mean()*100:.1f}%)")
# Step 5: Train-test split (time-based)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Step 6: Train model
model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
model.fit(X_train, y_train)
# Step 7: Evaluate
y_pred = model.predict(X_test)
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
# Step 8: Feature importance
importance = pd.DataFrame({
'feature': feature_cols,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nFeature Importance:")
print(importance)
Advanced Pipeline
More sophisticated pipeline with additional features and validation.
Feature Engineering Pipeline
import pandas as pd
import numpy as np
import rhoa
from rhoa.targets import generate_target_combinations
def engineer_features(df):
"""Create comprehensive feature set."""
volume = df['Volume']
# Price-based features
df['Returns'] = df['Close'].pct_change()
df['Log_Returns'] = np.log(df['Close'] / df['Close'].shift(1))
# Moving averages
df['SMA_10'] = df.rhoa.indicators.sma(10)
df['SMA_20'] = df.rhoa.indicators.sma(20)
df['SMA_50'] = df.rhoa.indicators.sma(50)
df['SMA_200'] = df.rhoa.indicators.sma(200)
# MA relationships
df['SMA_10_20_ratio'] = df['SMA_10'] / df['SMA_20']
df['SMA_50_200_ratio'] = df['SMA_50'] / df['SMA_200']
df['Price_SMA20_ratio'] = df['Close'] / df['SMA_20']
# Momentum indicators
df['RSI_14'] = df.rhoa.indicators.rsi(14)
df['RSI_28'] = df.rhoa.indicators.rsi(28)
macd = df.rhoa.indicators.macd()
df['MACD'] = macd['macd']
df['MACD_Signal'] = macd['signal']
df['MACD_Hist'] = macd['histogram']
# Volatility
df['ATR_14'] = df.rhoa.indicators.atr(window_size=14)
df['ATR_28'] = df.rhoa.indicators.atr(window_size=28)
bb = df.rhoa.indicators.bollinger_bands(20, 2.0)
df['BB_Upper'] = bb['upper_band']
df['BB_Lower'] = bb['lower_band']
df['BB_Width'] = (bb['upper_band'] - bb['lower_band']) / bb['middle_band']
df['BB_Position'] = (df['Close'] - bb['lower_band']) / (bb['upper_band'] - bb['lower_band'])
# Trend strength
adx = df.rhoa.indicators.adx(window_size=14)
df['ADX'] = adx['ADX']
df['Plus_DI'] = adx['+DI']
df['Minus_DI'] = adx['-DI']
# Volume features
df['Volume_SMA'] = volume.rolling(20).mean()
df['Volume_Ratio'] = volume / df['Volume_SMA']
# Lagged features
for lag in [1, 2, 3, 5]:
df[f'Returns_Lag{lag}'] = df['Returns'].shift(lag)
df[f'RSI_Lag{lag}'] = df['RSI_14'].shift(lag)
return df
# Use it
df = pd.read_csv('prices.csv')
df = engineer_features(df)
print(f"Total features created: {len(df.columns)}")
Cross-Validation Pipeline
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import precision_score, recall_score, f1_score
def time_series_cv(X, y, model, n_splits=5):
"""Perform time-series cross-validation."""
tscv = TimeSeriesSplit(n_splits=n_splits)
scores = {
'precision': [],
'recall': [],
'f1': [],
'accuracy': []
}
for fold, (train_idx, val_idx) in enumerate(tscv.split(X), 1):
# Split data
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
# Train
model.fit(X_train, y_train)
# Predict
y_pred = model.predict(X_val)
# Score
scores['precision'].append(precision_score(y_val, y_pred))
scores['recall'].append(recall_score(y_val, y_pred))
scores['f1'].append(f1_score(y_val, y_pred))
scores['accuracy'].append((y_pred == y_val).mean())
print(f"Fold {fold}:")
print(f" Precision: {scores['precision'][-1]:.3f}")
print(f" Recall: {scores['recall'][-1]:.3f}")
print(f" F1: {scores['f1'][-1]:.3f}")
# Average scores
print("\nAverage Scores:")
for metric, values in scores.items():
print(f" {metric}: {np.mean(values):.3f} (+/- {np.std(values):.3f})")
return scores
# Use it
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, random_state=42)
scores = time_series_cv(X, y, model, n_splits=5)
Hyperparameter Tuning
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import randint
# Define parameter space
param_dist = {
'n_estimators': randint(50, 200),
'max_depth': randint(5, 20),
'min_samples_split': randint(2, 20),
'min_samples_leaf': randint(1, 10),
'max_features': ['sqrt', 'log2']
}
# Time series split for CV
tscv = TimeSeriesSplit(n_splits=3)
# Random search
rf = RandomForestClassifier(random_state=42)
search = RandomizedSearchCV(
rf,
param_distributions=param_dist,
n_iter=20,
cv=tscv,
scoring='f1',
n_jobs=-1,
random_state=42
)
# Fit
search.fit(X_train, y_train)
print("Best parameters:")
print(search.best_params_)
print(f"\nBest F1 score: {search.best_score_:.3f}")
# Use best model
best_model = search.best_estimator_
y_pred = best_model.predict(X_test)
Multiple Model Comparison
Compare different algorithms.
Model Comparison Function
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import classification_report, roc_auc_score
def compare_models(X_train, X_test, y_train, y_test):
"""Compare multiple models."""
models = {
'Logistic Regression': LogisticRegression(max_iter=1000),
'Random Forest': RandomForestClassifier(n_estimators=100),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100),
'SVM': SVC(probability=True, kernel='rbf')
}
results = []
for name, model in models.items():
print(f"\n{'='*50}")
print(f"Training {name}")
print('='*50)
# Train
model.fit(X_train, y_train)
# Predict
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
# Metrics
accuracy = (y_pred == y_test).mean()
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_proba) if y_proba is not None else None
results.append({
'Model': name,
'Accuracy': accuracy,
'Precision': precision,
'Recall': recall,
'F1': f1,
'AUC': auc
})
print(f"Accuracy: {accuracy:.3f}")
print(f"Precision: {precision:.3f}")
print(f"Recall: {recall:.3f}")
print(f"F1: {f1:.3f}")
if auc:
print(f"AUC: {auc:.3f}")
# Summary
results_df = pd.DataFrame(results)
print("\n" + "="*50)
print("SUMMARY")
print("="*50)
print(results_df.to_string(index=False))
return results_df
# Use it
results = compare_models(X_train, X_test, y_train, y_test)
Production Pipeline
A production-ready pipeline with proper structure.
Pipeline Class
import pandas as pd
import numpy as np
import pickle
from sklearn.ensemble import RandomForestClassifier
from rhoa.targets import generate_target_combinations
class StockPredictionPipeline:
"""Production ML pipeline for stock prediction."""
def __init__(self, model=None, target_meta=None):
self.model = model or RandomForestClassifier(n_estimators=100)
self.target_meta = target_meta
self.feature_cols = None
def create_features(self, df):
"""Create feature set."""
features = df.copy()
# Technical indicators
features['SMA_20'] = features.rhoa.indicators.sma(20)
features['SMA_50'] = features.rhoa.indicators.sma(50)
features['RSI_14'] = features.rhoa.indicators.rsi(14)
features['ATR_14'] = features.rhoa.indicators.atr(window_size=14)
macd = features.rhoa.indicators.macd()
features['MACD'] = macd['macd']
features['MACD_Signal'] = macd['signal']
bb = features.rhoa.indicators.bollinger_bands(20, 2.0)
features['BB_Width'] = (bb['upper_band'] - bb['lower_band']) / bb['middle_band']
# Returns
features['Returns'] = df['Close'].pct_change()
return features
def fit(self, df, target_balance=0.4):
"""Train the pipeline."""
print("Creating features...")
df_features = self.create_features(df)
print("Generating targets...")
targets, self.target_meta = generate_target_combinations(
df,
mode='auto',
target_class_balance=target_balance
)
# Prepare data
ml_df = pd.concat([df_features, targets], axis=1).dropna()
self.feature_cols = ['SMA_20', 'SMA_50', 'RSI_14', 'ATR_14',
'MACD', 'MACD_Signal', 'BB_Width', 'Returns']
X = ml_df[self.feature_cols]
y = ml_df['Target_7']
print(f"Training on {len(X)} samples...")
self.model.fit(X, y)
print("Training complete!")
return self
def predict(self, df):
"""Make predictions on new data."""
if self.feature_cols is None:
raise ValueError("Pipeline not trained. Call fit() first.")
# Create features
df_features = self.create_features(df)
# Extract feature columns
X = df_features[self.feature_cols].dropna()
# Predict
predictions = self.model.predict(X)
probabilities = self.model.predict_proba(X)
return pd.DataFrame({
'prediction': predictions,
'probability': probabilities[:, 1]
}, index=X.index)
def save(self, path):
"""Save pipeline to disk."""
with open(path, 'wb') as f:
pickle.dump(self, f)
@classmethod
def load(cls, path):
"""Load pipeline from disk."""
with open(path, 'rb') as f:
return pickle.load(f)
# Usage
# Training
df_train = pd.read_csv('train_data.csv')
pipeline = StockPredictionPipeline()
pipeline.fit(df_train)
pipeline.save('model.pkl')
# Prediction
df_new = pd.read_csv('new_data.csv')
pipeline = StockPredictionPipeline.load('model.pkl')
predictions = pipeline.predict(df_new)
print(predictions)
Backtesting Framework
def backtest_strategy(df, predictions, initial_capital=100000):
"""Simple backtesting framework."""
capital = initial_capital
position = 0
trades = []
portfolio_value = []
for idx in predictions.index:
current_price = df.loc[idx, 'Close']
pred = predictions.loc[idx, 'prediction']
prob = predictions.loc[idx, 'probability']
# Entry: buy signal with high confidence
if pred == 1 and prob > 0.7 and position == 0:
# Buy
position = capital / current_price
entry_price = current_price
capital = 0
trades.append({
'date': idx,
'type': 'BUY',
'price': current_price,
'position': position
})
# Exit: sell signal or stop loss
elif position > 0:
# Simple exit: hold for N days or price drops 5%
if current_price < entry_price * 0.95 or len(trades) >= 5:
# Sell
capital = position * current_price
trades.append({
'date': idx,
'type': 'SELL',
'price': current_price,
'pnl': capital - initial_capital
})
position = 0
# Track portfolio value
total_value = capital + (position * current_price if position > 0 else 0)
portfolio_value.append(total_value)
# Results
final_value = capital + (position * df.loc[predictions.index[-1], 'Close'] if position > 0 else 0)
returns = (final_value - initial_capital) / initial_capital * 100
print(f"Initial Capital: ${initial_capital:,.2f}")
print(f"Final Value: ${final_value:,.2f}")
print(f"Total Return: {returns:.2f}%")
print(f"Number of Trades: {len([t for t in trades if t['type'] == 'BUY'])}")
return trades, portfolio_value
# Use it
trades, portfolio = backtest_strategy(df, predictions)
Complete Example
Putting it all together.
import pandas as pd
import rhoa
from rhoa.targets import generate_target_combinations
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
# 1. Load and prepare data
df = pd.read_csv('stock_prices.csv')
print(f"Loaded {len(df)} records")
# 2. Engineer features
df['SMA_20'] = df.rhoa.indicators.sma(20)
df['SMA_50'] = df.rhoa.indicators.sma(50)
df['RSI_14'] = df.rhoa.indicators.rsi(14)
df['ATR_14'] = df.rhoa.indicators.atr(window_size=14)
macd = df.rhoa.indicators.macd()
df['MACD'] = macd['macd']
df['MACD_Signal'] = macd['signal']
# 3. Generate targets
targets, meta = generate_target_combinations(df, mode='auto', target_class_balance=0.4)
# 4. Prepare ML data
ml_df = pd.concat([df, targets], axis=1).dropna()
feature_cols = ['SMA_20', 'SMA_50', 'RSI_14', 'ATR_14', 'MACD', 'MACD_Signal']
X = ml_df[feature_cols]
y = ml_df['Target_7']
# 5. Time-based split
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# 6. Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_train, y_train)
# 7. Evaluate
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
# 8. Visualize predictions
fig = ml_df.iloc[split_idx:].rhoa.plots.signal(
y_pred=y_pred,
y_true=y_test,
date_col='Date',
price_col='Close',
title='Model Predictions'
)
print("Pipeline complete!")
Next Steps
You now have complete working examples of ML pipelines with Rhoa. For more details:
API Reference - API reference
User Guide - Conceptual guides
Frequently Asked Questions - Troubleshooting