Download Data
Learn how to download and manage market data for backtesting and live trading.
Overview
The framework provides built-in data downloading capabilities that allow you to fetch historical market data from various sources. This data is essential for backtesting strategies and can also be used for live analysis.
Getting Started with Data Download
Basic Data Download
from investing_algorithm_framework import download_data
# Download Bitcoin price data
data = download_data(
symbols=["BTC/USDT"],
start_date="2023-01-01",
end_date="2023-12-31",
timeframe="1d"
)
print(f"Downloaded {len(data)} data points")
Multiple Symbols
# Download data for multiple cryptocurrencies
symbols = ["BTC/USDT", "ETH/USDT", "ADA/USDT", "DOT/USDT"]
data = download_data(
symbols=symbols,
start_date="2023-01-01",
end_date="2023-12-31",
timeframe="1h",
data_source="binance"
)
# Access data for each symbol
for symbol in symbols:
symbol_data = data[symbol]
print(f"{symbol}: {len(symbol_data)} records")
Data Sources
Supported Exchanges
The framework supports multiple cryptocurrency exchanges:
# Binance (default)
binance_data = download_data(
symbols=["BTC/USDT"],
data_source="binance",
timeframe="1d",
start_date="2023-01-01"
)
# Coinbase Pro
coinbase_data = download_data(
symbols=["BTC-USD"],
data_source="coinbase",
timeframe="1h",
start_date="2023-01-01"
)
# Kraken
kraken_data = download_data(
symbols=["XBTUSD"],
data_source="kraken",
timeframe="1d",
start_date="2023-01-01"
)
Stock Market Data
# Download stock data using yfinance
stock_data = download_data(
symbols=["AAPL", "GOOGL", "MSFT"],
data_source="yahoo",
timeframe="1d",
start_date="2023-01-01",
end_date="2023-12-31"
)
Timeframes
Available Timeframes
timeframes = [
"1m", # 1 minute
"5m", # 5 minutes
"15m", # 15 minutes
"30m", # 30 minutes
"1h", # 1 hour
"4h", # 4 hours
"1d", # 1 day
"1w", # 1 week
"1M" # 1 month
]
# Download different timeframes
for tf in ["1h", "4h", "1d"]:
data = download_data(
symbols=["BTC/USDT"],
timeframe=tf,
start_date="2023-01-01",
end_date="2023-01-31"
)
print(f"Timeframe {tf}: {len(data['BTC/USDT'])} candles")
Data Storage
CSV Storage
# Download and save to CSV
data = download_data(
symbols=["BTC/USDT", "ETH/USDT"],
timeframe="1h",
start_date="2023-01-01",
end_date="2023-12-31",
save_to_csv=True,
csv_directory="./market_data/"
)
# Files will be saved as:
# ./market_data/BTC_USDT_1h_20230101_20231231.csv
# ./market_data/ETH_USDT_1h_20230101_20231231.csv
Database Storage
# Download and save to database
from investing_algorithm_framework import create_app
app = create_app()
# Download data directly to app database
app.download_data(
symbols=["BTC/USDT"],
timeframe="1d",
start_date="2023-01-01",
end_date="2023-12-31"
)
# Data is now available for backtesting
results = app.run_backtest(
start_date="2023-01-01",
end_date="2023-12-31"
)
Custom Storage
import pandas as pd
class CustomDataStorage:
def __init__(self, storage_path):
self.storage_path = storage_path
def save_data(self, symbol, data):
"""Save data with custom format"""
# Convert to pandas DataFrame
df = pd.DataFrame(data)
# Add metadata
df.attrs['symbol'] = symbol
df.attrs['downloaded_at'] = pd.Timestamp.now()
# Save with compression
filename = f"{symbol.replace('/', '_')}.parquet"
filepath = os.path.join(self.storage_path, filename)
df.to_parquet(filepath, compression='snappy')
print(f"Saved {symbol} data to {filepath}")
# Use custom storage
storage = CustomDataStorage("./custom_data/")
data = download_data(
symbols=["BTC/USDT"],
timeframe="1h",
start_date="2023-01-01"
)
storage.save_data("BTC/USDT", data["BTC/USDT"])
Advanced Data Download
Progress Tracking
from tqdm import tqdm
def download_with_progress(symbols, **kwargs):
"""Download data with progress bar"""
results = {}
for symbol in tqdm(symbols, desc="Downloading data"):
try:
data = download_data(
symbols=[symbol],
**kwargs
)
results[symbol] = data[symbol]
except Exception as e:
print(f"Failed to download {symbol}: {e}")
results[symbol] = None
return results
# Download large number of symbols with progress
crypto_symbols = [
"BTC/USDT", "ETH/USDT", "BNB/USDT", "ADA/USDT",
"DOT/USDT", "LINK/USDT", "XRP/USDT", "LTC/USDT"
]
data = download_with_progress(
crypto_symbols,
timeframe="1d",
start_date="2023-01-01",
end_date="2023-12-31"
)
Retry Logic
import time
from typing import Optional
def robust_download(symbols, max_retries=3, retry_delay=5, **kwargs):
"""Download data with retry logic"""
results = {}
for symbol in symbols:
for attempt in range(max_retries):
try:
data = download_data(symbols=[symbol], **kwargs)
results[symbol] = data[symbol]
break
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
print(f"Failed to download {symbol} after {max_retries} attempts")
results[symbol] = None
return results
Data Validation
def validate_data(data, symbol):
"""Validate downloaded data quality"""
if not data or len(data) == 0:
raise ValueError(f"No data downloaded for {symbol}")
df = pd.DataFrame(data)
# Check for required columns
required_columns = ['open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing columns in {symbol} data: {missing_columns}")
# Check for null values
null_counts = df[required_columns].isnull().sum()
if null_counts.sum() > 0:
print(f"Warning: {symbol} has null values: {null_counts.to_dict()}")
# Check for duplicate timestamps
if df.index.duplicated().any():
print(f"Warning: {symbol} has duplicate timestamps")
# Validate price relationships
invalid_ohlc = (
(df['high'] < df['low']) |
(df['high'] < df['open']) |
(df['high'] < df['close']) |
(df['low'] > df['open']) |
(df['low'] > df['close'])
)
if invalid_ohlc.any():
print(f"Warning: {symbol} has invalid OHLC relationships")
print(f"✓ {symbol} data validation passed")
return True
# Download and validate
data = download_data(
symbols=["BTC/USDT"],
timeframe="1h",
start_date="2023-01-01",
end_date="2023-01-31"
)
for symbol, symbol_data in data.items():
validate_data(symbol_data, symbol)
Data Management
Data Updates
class DataManager:
def __init__(self, data_directory):
self.data_directory = data_directory
def update_data(self, symbols, timeframe="1d"):
"""Update existing data with latest records"""
for symbol in symbols:
# Check existing data
existing_data = self.load_existing_data(symbol, timeframe)
if existing_data is not None:
last_date = existing_data.index[-1]
start_date = last_date + pd.Timedelta(days=1)
else:
start_date = "2020-01-01" # Default start date
# Download new data
new_data = download_data(
symbols=[symbol],
timeframe=timeframe,
start_date=start_date.strftime('%Y-%m-%d'),
end_date=pd.Timestamp.now().strftime('%Y-%m-%d')
)
if existing_data is not None:
# Merge with existing data
combined_data = pd.concat([existing_data, new_data[symbol]])
combined_data = combined_data[~combined_data.index.duplicated(keep='last')]
else:
combined_data = pd.DataFrame(new_data[symbol])
# Save updated data
self.save_data(symbol, timeframe, combined_data)
print(f"Updated {symbol} data")
def load_existing_data(self, symbol, timeframe):
"""Load existing data from storage"""
filename = f"{symbol.replace('/', '_')}_{timeframe}.csv"
filepath = os.path.join(self.data_directory, filename)
if os.path.exists(filepath):
return pd.read_csv(filepath, index_col=0, parse_dates=True)
return None
def save_data(self, symbol, timeframe, data):
"""Save data to storage"""
filename = f"{symbol.replace('/', '_')}_{timeframe}.csv"
filepath = os.path.join(self.data_directory, filename)
data.to_csv(filepath)
# Usage
data_manager = DataManager("./market_data/")
data_manager.update_data(["BTC/USDT", "ETH/USDT"], timeframe="1d")
Data Cleanup
def cleanup_data(data):
"""Clean and prepare market data"""
df = pd.DataFrame(data)
# Remove duplicates
df = df[~df.index.duplicated(keep='last')]
# Forward fill missing values
df = df.fillna(method='ffill')
# Remove outliers (prices that differ more than 20% from previous candle)
for col in ['open', 'high', 'low', 'close']:
price_change = df[col].pct_change().abs()
outliers = price_change > 0.2
if outliers.any():
print(f"Removing {outliers.sum()} outliers from {col}")
df.loc[outliers, col] = df.loc[outliers, col].shift(1)
# Ensure volume is positive
df['volume'] = df['volume'].abs()
return df.to_dict('records')
# Download and clean data
raw_data = download_data(
symbols=["BTC/USDT"],
timeframe="1h",
start_date="2023-01-01"
)
clean_data = {}
for symbol, data in raw_data.items():
clean_data[symbol] = cleanup_data(data)
Command Line Interface
CLI Download Tool
# Download data via command line
python -m investing_algorithm_framework download \
--symbols BTC/USDT ETH/USDT \
--timeframe 1d \
--start-date 2023-01-01 \
--end-date 2023-12-31 \
--output ./data/
Scheduled Downloads
# Create a task for regular data downloads
from investing_algorithm_framework import Task
class DataDownloadTask(Task):
def __init__(self):
super().__init__(
name="data_download",
interval="daily",
time="01:00" # Run at 1 AM daily
)
self.symbols = ["BTC/USDT", "ETH/USDT", "ADA/USDT"]
def run(self, algorithm):
"""Download latest data daily"""
# Get yesterday's date
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# Download data for yesterday
data = download_data(
symbols=self.symbols,
timeframe="1h",
start_date=yesterday,
end_date=yesterday,
save_to_csv=True,
csv_directory="./daily_data/"
)
print(f"Downloaded data for {len(self.symbols)} symbols")
# Register the task
app = create_app()
app.add_task(DataDownloadTask())
Best Practices
1. Rate Limiting
import time
def download_with_rate_limit(symbols, rate_limit=0.5, **kwargs):
"""Download data with rate limiting to avoid API limits"""
results = {}
for i, symbol in enumerate(symbols):
if i > 0:
time.sleep(rate_limit) # Wait between requests
data = download_data(symbols=[symbol], **kwargs)
results[symbol] = data[symbol]
print(f"Downloaded {symbol} ({i+1}/{len(symbols)})")
return results
2. Error Handling
def safe_download(symbols, **kwargs):
"""Download data with comprehensive error handling"""
successful = {}
failed = {}
for symbol in symbols:
try:
data = download_data(symbols=[symbol], **kwargs)
successful[symbol] = data[symbol]
except Exception as e:
failed[symbol] = str(e)
print(f"Failed to download {symbol}: {e}")
print(f"Successfully downloaded: {len(successful)} symbols")
print(f"Failed downloads: {len(failed)} symbols")
return successful, failed
3. Data Backup
def backup_data(data_directory, backup_directory):
"""Backup downloaded data"""
import shutil
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = os.path.join(backup_directory, f"data_backup_{timestamp}")
shutil.copytree(data_directory, backup_path)
print(f"Data backed up to {backup_path}")
Next Steps
With market data downloaded, you can now:
- Explore Market Data Sources to understand different data providers
- Learn about Multiple Data Sources for comprehensive analysis
- Start backtesting your strategies with the downloaded data
Remember to regularly update your data to ensure your backtests and live trading use the most current market information!