RivaCube/utils/Stocks/valuations/collector.py
2025-02-04 19:31:18 +01:00

54 lines
2.4 KiB
Python

import requests
import logging
from time import sleep
from typing import List, Dict, Optional
from datetime import datetime
class ValuationCollector:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://financialmodelingprep.com/api/v3"
self.base_url_v4 = "https://financialmodelingprep.com/api/v4"
self.rate_limit_pause = 0.2
self.session = requests.Session()
self.max_retries = 3
self.retry_delay = 1 # Initial delay in seconds
def _make_request(self, url: str, params: Dict) -> Optional[List[Dict]]:
"""Make API request with retry mechanism and error handling."""
try:
sleep(self.rate_limit_pause)
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"API request error for {url}: {str(e)}")
return None
except ValueError as e:
logging.error(f"JSON parsing error for {url}: {str(e)}")
return None
def get_simple_dcf(self, ticker: str) -> Optional[List[Dict]]:
"""Get simple DCF valuation."""
params = {'apikey': self.api_key}
return self._make_request(f"{self.base_url}/discounted-cash-flow/{ticker}", params)
def get_advanced_dcf(self, ticker: str) -> Optional[List[Dict]]:
"""Get advanced DCF valuation."""
params = {'apikey': self.api_key, 'symbol': ticker}
return self._make_request(f"{self.base_url_v4}/advanced_discounted_cash_flow", params)
def get_levered_dcf(self, ticker: str) -> Optional[List[Dict]]:
"""Get levered DCF valuation."""
params = {'apikey': self.api_key, 'symbol': ticker}
return self._make_request(f"{self.base_url_v4}/advanced_levered_discounted_cash_flow", params)
def get_company_rating(self, ticker: str) -> Optional[List[Dict]]:
"""Get current company rating."""
params = {'apikey': self.api_key}
return self._make_request(f"{self.base_url}/rating/{ticker}", params)
def get_historical_rating(self, ticker: str) -> Optional[List[Dict]]:
"""Get historical company ratings."""
params = {'apikey': self.api_key}
return self._make_request(f"{self.base_url}/historical-rating/{ticker}", params)