import logging from datetime import datetime, timedelta # Added timedelta import from typing import Dict, List, Any, Optional, Tuple import psycopg2 from psycopg2.extras import execute_values def get_last_price_date(conn: psycopg2.extensions.connection, pair_id: int) -> Optional[datetime.date]: """Get the last available price date for a forex pair.""" cursor = None try: cursor = conn.cursor() cursor.execute(""" SELECT MAX(date) FROM forex_prices WHERE pair_id = %s """, (pair_id,)) result = cursor.fetchone() return result[0] if result and result[0] else None finally: if cursor: cursor.close() def update_forex_prices( conn: psycopg2.extensions.connection, pair_id: int, symbol: str, collector: Any, last_update: datetime.date, batch_size: int = 5000 ) -> Tuple[int, Optional[str]]: """Update historical OHLC prices for a forex pair.""" cursor = None rows_inserted = 0 error_message = None try: cursor = conn.cursor() today = datetime.now().date() # If last_update is None, start from 1962 if last_update is None: last_update = datetime(1962, 1, 1).date() else: # For existing data, start 3 days before last update last_update = last_update - timedelta(days=3) from_date = last_update.strftime('%Y-%m-%d') to_date = (today - timedelta(days=1)).strftime('%Y-%m-%d') historical_data = collector.get_historical_prices(symbol, from_date, to_date) if not historical_data: return 0, "No historical data available" # Filter out today's data filtered_data = [ record for record in historical_data if datetime.strptime(record['date'], '%Y-%m-%d').date() < today ] if not filtered_data: return 0, "No new complete day data available" records = [ (pair_id, record['date'], record['open'], record['high'], record['low'], record['close']) for record in filtered_data ] for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] try: execute_values( cursor, """ INSERT INTO forex_prices (pair_id, date, open, high, low, close) VALUES %s ON CONFLICT (pair_id, date) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close """, batch ) rows_inserted += len(batch) conn.commit() logging.info(f"Committed batch of {len(batch)} records for {symbol}") except Exception as e: conn.rollback() error_message = f"Error in batch update: {str(e)}" logging.error(error_message) continue # Update last_update timestamp cursor.execute( "UPDATE forex_pairs SET last_update = %s WHERE id = %s", (today - timedelta(days=1), pair_id) ) conn.commit() return rows_inserted, error_message except Exception as e: error_message = f"Error updating forex prices: {str(e)}" logging.error(error_message) if conn: conn.rollback() return 0, error_message finally: if cursor: cursor.close() def get_forex_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]: """Get statistics about forex OHLC data.""" cursor = None try: cursor = conn.cursor() stats = {} # Get basic statistics cursor.execute(""" SELECT COUNT(*) as total_prices, MIN(date) as earliest_date, MAX(date) as latest_date, COUNT(DISTINCT pair_id) as pairs_with_prices FROM forex_prices """) row = cursor.fetchone() stats.update({ 'total_prices': row[0], 'earliest_date': row[1].isoformat() if row[1] else None, 'latest_date': row[2].isoformat() if row[2] else None, 'pairs_with_prices': row[3] }) # Get coverage per pair cursor.execute(""" SELECT fp.symbol, MIN(p.date) as first_date, MAX(p.date) as last_date, COUNT(*) as records FROM forex_pairs fp JOIN forex_prices p ON fp.id = p.pair_id WHERE fp.is_active = true GROUP BY fp.symbol ORDER BY fp.symbol """) stats['coverage'] = [ { 'symbol': row[0], 'first_date': row[1].isoformat() if row[1] else None, 'last_date': row[2].isoformat() if row[2] else None, 'records': row[3] } for row in cursor.fetchall() ] return stats except Exception as e: logging.error(f"Error getting forex statistics: {e}") return {} finally: if cursor: cursor.close()