168 lines
5.4 KiB
Python
168 lines
5.4 KiB
Python
import logging
|
|
from datetime import datetime, timedelta # Added timedelta import
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
|
|
def get_last_price_date(conn: psycopg2.extensions.connection, pair_id: int) -> Optional[datetime.date]:
|
|
"""Get the last available price date for a forex pair."""
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT MAX(date)
|
|
FROM forex_prices
|
|
WHERE pair_id = %s
|
|
""", (pair_id,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result and result[0] else None
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def update_forex_prices(
|
|
conn: psycopg2.extensions.connection,
|
|
pair_id: int,
|
|
symbol: str,
|
|
collector: Any,
|
|
last_update: datetime.date,
|
|
batch_size: int = 5000
|
|
) -> Tuple[int, Optional[str]]:
|
|
"""Update historical OHLC prices for a forex pair."""
|
|
cursor = None
|
|
rows_inserted = 0
|
|
error_message = None
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
today = datetime.now().date()
|
|
|
|
# If last_update is None, start from 1962
|
|
if last_update is None:
|
|
last_update = datetime(1962, 1, 1).date()
|
|
else:
|
|
# For existing data, start 3 days before last update
|
|
last_update = last_update - timedelta(days=3)
|
|
|
|
from_date = last_update.strftime('%Y-%m-%d')
|
|
to_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')
|
|
|
|
historical_data = collector.get_historical_prices(symbol, from_date, to_date)
|
|
if not historical_data:
|
|
return 0, "No historical data available"
|
|
|
|
# Filter out today's data
|
|
filtered_data = [
|
|
record for record in historical_data
|
|
if datetime.strptime(record['date'], '%Y-%m-%d').date() < today
|
|
]
|
|
|
|
if not filtered_data:
|
|
return 0, "No new complete day data available"
|
|
|
|
records = [
|
|
(pair_id, record['date'], record['open'], record['high'],
|
|
record['low'], record['close'])
|
|
for record in filtered_data
|
|
]
|
|
|
|
for i in range(0, len(records), batch_size):
|
|
batch = records[i:i + batch_size]
|
|
try:
|
|
execute_values(
|
|
cursor,
|
|
"""
|
|
INSERT INTO forex_prices
|
|
(pair_id, date, open, high, low, close)
|
|
VALUES %s
|
|
ON CONFLICT (pair_id, date) DO UPDATE SET
|
|
open = EXCLUDED.open,
|
|
high = EXCLUDED.high,
|
|
low = EXCLUDED.low,
|
|
close = EXCLUDED.close
|
|
""",
|
|
batch
|
|
)
|
|
rows_inserted += len(batch)
|
|
conn.commit()
|
|
logging.info(f"Committed batch of {len(batch)} records for {symbol}")
|
|
except Exception as e:
|
|
conn.rollback()
|
|
error_message = f"Error in batch update: {str(e)}"
|
|
logging.error(error_message)
|
|
continue
|
|
|
|
# Update last_update timestamp
|
|
cursor.execute(
|
|
"UPDATE forex_pairs SET last_update = %s WHERE id = %s",
|
|
(today - timedelta(days=1), pair_id)
|
|
)
|
|
conn.commit()
|
|
|
|
return rows_inserted, error_message
|
|
|
|
except Exception as e:
|
|
error_message = f"Error updating forex prices: {str(e)}"
|
|
logging.error(error_message)
|
|
if conn:
|
|
conn.rollback()
|
|
return 0, error_message
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def get_forex_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
|
|
"""Get statistics about forex OHLC data."""
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
stats = {}
|
|
|
|
# Get basic statistics
|
|
cursor.execute("""
|
|
SELECT
|
|
COUNT(*) as total_prices,
|
|
MIN(date) as earliest_date,
|
|
MAX(date) as latest_date,
|
|
COUNT(DISTINCT pair_id) as pairs_with_prices
|
|
FROM forex_prices
|
|
""")
|
|
row = cursor.fetchone()
|
|
stats.update({
|
|
'total_prices': row[0],
|
|
'earliest_date': row[1].isoformat() if row[1] else None,
|
|
'latest_date': row[2].isoformat() if row[2] else None,
|
|
'pairs_with_prices': row[3]
|
|
})
|
|
|
|
# Get coverage per pair
|
|
cursor.execute("""
|
|
SELECT
|
|
fp.symbol,
|
|
MIN(p.date) as first_date,
|
|
MAX(p.date) as last_date,
|
|
COUNT(*) as records
|
|
FROM forex_pairs fp
|
|
JOIN forex_prices p ON fp.id = p.pair_id
|
|
WHERE fp.is_active = true
|
|
GROUP BY fp.symbol
|
|
ORDER BY fp.symbol
|
|
""")
|
|
stats['coverage'] = [
|
|
{
|
|
'symbol': row[0],
|
|
'first_date': row[1].isoformat() if row[1] else None,
|
|
'last_date': row[2].isoformat() if row[2] else None,
|
|
'records': row[3]
|
|
}
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
return stats
|
|
except Exception as e:
|
|
logging.error(f"Error getting forex statistics: {e}")
|
|
return {}
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|