102 lines
3.8 KiB
Python
102 lines
3.8 KiB
Python
from datetime import datetime
|
|
import logging
|
|
import requests
|
|
from typing import List, Dict, Optional, Set
|
|
from time import sleep
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
|
|
class NewsCollector:
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.base_url = "https://financialmodelingprep.com/api/v3"
|
|
self.base_url_v4 = "https://financialmodelingprep.com/api/v4"
|
|
self.rate_limit_pause = 0.2
|
|
self.session = self._create_session()
|
|
|
|
def _create_session(self) -> requests.Session:
|
|
"""Create a session with retry strategy."""
|
|
session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=3,
|
|
backoff_factor=1,
|
|
status_forcelist=[429, 500, 502, 503, 504]
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("https://", adapter)
|
|
return session
|
|
|
|
def _make_request(self, url: str, params: Dict) -> Optional[List[Dict]]:
|
|
"""Make API request with error handling."""
|
|
try:
|
|
sleep(self.rate_limit_pause)
|
|
response = self.session.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"API request error for {url}: {str(e)}")
|
|
return None
|
|
|
|
def _is_relevant_to_ticker(self, text: str, ticker: str) -> bool:
|
|
"""Check if the text is relevant to the given ticker."""
|
|
ticker_patterns = [
|
|
f" {ticker} ", # Surrounded by spaces
|
|
f"({ticker})", # In parentheses
|
|
f"${ticker}", # With dollar sign
|
|
f":{ticker}", # With colon
|
|
f"{ticker}:", # Ending with colon
|
|
f" {ticker}.", # Ending with period
|
|
f" {ticker},", # Ending with comma
|
|
]
|
|
return any(pattern in f" {text} " for pattern in ticker_patterns)
|
|
|
|
def get_general_news(self, ticker: str, from_date: datetime, existing_titles: Set[str]) -> List[Dict]:
|
|
"""Get general news, filtering out existing ones."""
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'tickers': ticker,
|
|
'from': from_date.strftime('%Y-%m-%d')
|
|
}
|
|
result = self._make_request(f"{self.base_url}/stock_news", params)
|
|
if not result:
|
|
return []
|
|
|
|
return [
|
|
news for news in result
|
|
if news.get('title') not in existing_titles and
|
|
self._is_relevant_to_ticker(news.get('text', ''), ticker)
|
|
]
|
|
|
|
def get_sentiment_news(self, ticker: str, existing_titles: Set[str]) -> List[Dict]:
|
|
"""Get sentiment news, filtering out existing ones."""
|
|
params = {'apikey': self.api_key}
|
|
result = self._make_request(f"{self.base_url_v4}/stock-news-sentiments-rss-feed", params)
|
|
if not result:
|
|
return []
|
|
|
|
filtered_news = []
|
|
for news in result:
|
|
title = news.get('title', '')
|
|
content = news.get('content', '')
|
|
|
|
if title in existing_titles:
|
|
continue
|
|
|
|
if (self._is_relevant_to_ticker(title, ticker) or
|
|
self._is_relevant_to_ticker(content, ticker)):
|
|
filtered_news.append(news)
|
|
|
|
return filtered_news
|
|
|
|
def get_press_releases(self, ticker: str, existing_titles: Set[str]) -> List[Dict]:
|
|
"""Get press releases, filtering out existing ones."""
|
|
params = {'apikey': self.api_key}
|
|
result = self._make_request(f"{self.base_url}/press-releases/{ticker}", params)
|
|
if not result:
|
|
return []
|
|
|
|
return [
|
|
news for news in result
|
|
if news.get('title') not in existing_titles
|
|
]
|