RivaCube/utils/Forex/utils.py
2025-02-04 19:31:18 +01:00

168 lines
5.4 KiB
Python

import logging
from datetime import datetime, timedelta # Added timedelta import
from typing import Dict, List, Any, Optional, Tuple
import psycopg2
from psycopg2.extras import execute_values
def get_last_price_date(conn: psycopg2.extensions.connection, pair_id: int) -> Optional[datetime.date]:
"""Get the last available price date for a forex pair."""
cursor = None
try:
cursor = conn.cursor()
cursor.execute("""
SELECT MAX(date)
FROM forex_prices
WHERE pair_id = %s
""", (pair_id,))
result = cursor.fetchone()
return result[0] if result and result[0] else None
finally:
if cursor:
cursor.close()
def update_forex_prices(
conn: psycopg2.extensions.connection,
pair_id: int,
symbol: str,
collector: Any,
last_update: datetime.date,
batch_size: int = 5000
) -> Tuple[int, Optional[str]]:
"""Update historical OHLC prices for a forex pair."""
cursor = None
rows_inserted = 0
error_message = None
try:
cursor = conn.cursor()
today = datetime.now().date()
# If last_update is None, start from 1962
if last_update is None:
last_update = datetime(1962, 1, 1).date()
else:
# For existing data, start 3 days before last update
last_update = last_update - timedelta(days=3)
from_date = last_update.strftime('%Y-%m-%d')
to_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')
historical_data = collector.get_historical_prices(symbol, from_date, to_date)
if not historical_data:
return 0, "No historical data available"
# Filter out today's data
filtered_data = [
record for record in historical_data
if datetime.strptime(record['date'], '%Y-%m-%d').date() < today
]
if not filtered_data:
return 0, "No new complete day data available"
records = [
(pair_id, record['date'], record['open'], record['high'],
record['low'], record['close'])
for record in filtered_data
]
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
try:
execute_values(
cursor,
"""
INSERT INTO forex_prices
(pair_id, date, open, high, low, close)
VALUES %s
ON CONFLICT (pair_id, date) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close
""",
batch
)
rows_inserted += len(batch)
conn.commit()
logging.info(f"Committed batch of {len(batch)} records for {symbol}")
except Exception as e:
conn.rollback()
error_message = f"Error in batch update: {str(e)}"
logging.error(error_message)
continue
# Update last_update timestamp
cursor.execute(
"UPDATE forex_pairs SET last_update = %s WHERE id = %s",
(today - timedelta(days=1), pair_id)
)
conn.commit()
return rows_inserted, error_message
except Exception as e:
error_message = f"Error updating forex prices: {str(e)}"
logging.error(error_message)
if conn:
conn.rollback()
return 0, error_message
finally:
if cursor:
cursor.close()
def get_forex_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
"""Get statistics about forex OHLC data."""
cursor = None
try:
cursor = conn.cursor()
stats = {}
# Get basic statistics
cursor.execute("""
SELECT
COUNT(*) as total_prices,
MIN(date) as earliest_date,
MAX(date) as latest_date,
COUNT(DISTINCT pair_id) as pairs_with_prices
FROM forex_prices
""")
row = cursor.fetchone()
stats.update({
'total_prices': row[0],
'earliest_date': row[1].isoformat() if row[1] else None,
'latest_date': row[2].isoformat() if row[2] else None,
'pairs_with_prices': row[3]
})
# Get coverage per pair
cursor.execute("""
SELECT
fp.symbol,
MIN(p.date) as first_date,
MAX(p.date) as last_date,
COUNT(*) as records
FROM forex_pairs fp
JOIN forex_prices p ON fp.id = p.pair_id
WHERE fp.is_active = true
GROUP BY fp.symbol
ORDER BY fp.symbol
""")
stats['coverage'] = [
{
'symbol': row[0],
'first_date': row[1].isoformat() if row[1] else None,
'last_date': row[2].isoformat() if row[2] else None,
'records': row[3]
}
for row in cursor.fetchall()
]
return stats
except Exception as e:
logging.error(f"Error getting forex statistics: {e}")
return {}
finally:
if cursor:
cursor.close()