cached_historical_data_fetcher package
- class cached_historical_data_fetcher.HistoricalDataCache[source]
Bases:
objectBase class for historical data cache.
Usage
Override self.get() to implement the logic.
Override self.to_update() if the index is not Timestamp.
Call self.update() to get historical data.
Examples
from cached_historical_data_fetcher import HistoricalDataCache from pandas import DataFrame, Timedelta, Timestamp, date_range
- class MyCache(HistoricalDataCache):
interval: Timedelta = Timedelta(days=1)
df = await MyCache().update()
- add_interval: bool = True
If True, start in self.get() is the last index of historical data + self.interval. If False, start in self.get() is the last index of historical data.
- compress: int | str | tuple[str, int] = ('lz4', 3)
The compression level.
- folder: str
The folder name. By default, the class name.
- abstract async get(start: Timestamp | Any | None, *args: Any, **kwargs: Any) DataFrame[source]
Get historical data. Override this method to implement the logic.
- Parameters:
start (Timestamp | Any | None) – The last index of historical data.
- Returns:
The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well. Multiindex is supported. It is recommended to set the first level to Timestamp.
- Return type:
DataFrame
- interval: Timedelta
The interval to update cache file.
- keep: Literal['first', 'last'] = 'last'
Which duplicated index to keep.
- mismatch: Literal['warn', 'raise'] | int | None = 'warn'
The action when data mismatch. If int, log level. If None, do nothing.
- protocol: int | None = None
The pickle protocol.
- to_update(end: Timestamp | Any | None, *args: Any, **kwargs: Any) bool[source]
Check if need to update cache file. Override this method to implement the logic. By default, update if cache file is older than self.interval.
- Parameters:
end (Timestamp | Any | None) – The last index of historical data. If the cache file is empty, end is None.
- Returns:
Whether to update cache file.
- Return type:
bool
- async update(reload: bool = False, *args: Any, **kwargs: Any) DataFrame[source]
Update cache file with DataFrame.
- Parameters:
reload (bool, optional) – Whether to ignore cache file and reload, by default False
*args (Any) – The arguments for self.get() and self.to_update().
**kwargs (Any) – The keyword arguments for self.get() and self.to_update().
- Returns:
The DataFrame read from cache file.
- Return type:
DataFrame
- Raises:
RuntimeError – If unexpected type read from cache file or self.get() does not return DataFrame or self.to_update() does not return bool.
- class cached_historical_data_fetcher.HistoricalDataCacheWithChunk[source]
Bases:
HistoricalDataCacheBase class for historical data cache with chunk.
Usage
Override self.get_one() to implement the logic.
Override self.to_update() if the index is not Timestamp or interval is not fixed.
Call self.update() to get historical data.
Examples
from cached_historical_data_fetcher import HistoricalDataCacheWithChunk from pandas import DataFrame, Timedelta, Timestamp
- class MyCacheWithChunk(HistoricalDataCacheWithChunk):
delay_seconds: float = 0 interval: Timedelta = Timedelta(days=1) start_init: Timestamp = Timestamp.utcnow().floor(“10D”)
df = await MyCacheWithChunk().update()
- property delay: float
Delay between chunks in seconds. (Alias of self.delay_seconds.)
- delay_seconds: float
Delay between chunks in seconds.
- async get(start: Timestamp | None, *args: Any, **kwargs: Any) DataFrame[source]
Get historical data. This method does not need to be overridden.
- Parameters:
start (Timestamp | Any | None) – The last index of historical data.
- Returns:
The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well.
- Return type:
DataFrame
- get_latest_uncompleted_chunk: bool = False
Whether to get the latest uncompleted chunk. If True, make sure to set self.add_interval to False to avoid uncompleted chunk left in cache file.
- abstract async get_one(start: Timestamp, *args: Any, **kwargs: Any) DataFrame[source]
Get one chunk of historical data. Override this method to implement the logic.
- Parameters:
start (Timestamp) – The start index of historical data.
- Returns:
The chunk of historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well. Multiindex is supported. It is recommended to set the first level to Timestamp.
- Return type:
DataFrame
- start_init: Timestamp
The initial start index of historical data. Used when no cache file exists.
- class cached_historical_data_fetcher.HistoricalDataCacheWithFixedChunk[source]
Bases:
HistoricalDataCacheWithChunkBase class for historical data cache with chunk.
This class only supports fixed interval. To support variable interval, use HistoricalDataCacheWithChunk instead.
As HistoricalDataCacheWithChunk calls self.get_one() one by one, HistoricalDataCacheWithFixedChunk calls self.get_one() in parallel. This makes it impossible to guarantee that rate limits are not exceeded, because depending on network conditions etc., it might theoretically be possible for all the requests to reach the server at the same time. Make sure to set self.delay_seconds large enough to avoid server overload or ban.
Usage
Override self.get_one() to implement the logic.
Call self.update() to get historical data.
Examples
from cached_historical_data_fetcher import HistoricalDataCacheWithFixedChunk from pandas import DataFrame, Timedelta, Timestamp
- class MyCacheWithFixedChunk(HistoricalDataCacheWithFixedChunk):
delay_seconds: float = 0 interval: Timedelta = Timedelta(days=1) start_init: Timestamp = Timestamp.utcnow().floor(“10D”)
df = await MyCacheWithFixedChunk().update()
- delay_seconds: float
Delay between chunks in seconds.
- folder: str
The folder name. By default, the class name.
- async get(start: Timestamp | None, *args: Any, **kwargs: Any) DataFrame[source]
Get historical data. This method does not need to be overridden.
- Parameters:
start (Timestamp | Any | None) – The last index of historical data.
- Returns:
The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well.
- Return type:
DataFrame
- interval: Timedelta
The interval to update cache file.
- start_init: Timestamp
The initial start index of historical data. Used when no cache file exists.
- final to_update(end: Timestamp | None, *args: Any, **kwargs: Any) bool[source]
Check if need to update cache file. Override this method to implement the logic. By default, update if cache file is older than self.interval.
- Parameters:
end (Timestamp | Any | None) – The last index of historical data. If the cache file is empty, end is None.
- Returns:
Whether to update cache file.
- Return type:
bool
Subpackages
- cached_historical_data_fetcher.cache package
HistoricalDataCacheHistoricalDataCache.add_intervalHistoricalDataCache.compressHistoricalDataCache.folderHistoricalDataCache.get()HistoricalDataCache.intervalHistoricalDataCache.keepHistoricalDataCache.mismatchHistoricalDataCache.path()HistoricalDataCache.protocolHistoricalDataCache.to_update()HistoricalDataCache.update()
HistoricalDataCacheWithChunkHistoricalDataCacheWithChunk.delayHistoricalDataCacheWithChunk.delay_secondsHistoricalDataCacheWithChunk.folderHistoricalDataCacheWithChunk.get()HistoricalDataCacheWithChunk.get_latest_uncompleted_chunkHistoricalDataCacheWithChunk.get_one()HistoricalDataCacheWithChunk.intervalHistoricalDataCacheWithChunk.start_init
HistoricalDataCacheWithFixedChunk- Submodules
- cached_historical_data_fetcher.cache.base module
HistoricalDataCacheHistoricalDataCache.add_intervalHistoricalDataCache.compressHistoricalDataCache.folderHistoricalDataCache.get()HistoricalDataCache.intervalHistoricalDataCache.keepHistoricalDataCache.mismatchHistoricalDataCache.path()HistoricalDataCache.protocolHistoricalDataCache.to_update()HistoricalDataCache.update()
- cached_historical_data_fetcher.cache.chunk module
HistoricalDataCacheWithChunkHistoricalDataCacheWithChunk.delayHistoricalDataCacheWithChunk.delay_secondsHistoricalDataCacheWithChunk.folderHistoricalDataCacheWithChunk.get()HistoricalDataCacheWithChunk.get_latest_uncompleted_chunkHistoricalDataCacheWithChunk.get_one()HistoricalDataCacheWithChunk.intervalHistoricalDataCacheWithChunk.start_init
HistoricalDataCacheWithFixedChunk
Submodules
cached_historical_data_fetcher.io module
- cached_historical_data_fetcher.io.get_path(folder: str, name: str) Path[source]
Get path to cache file.
- Parameters:
folder (str) – The folder name.
name (str) – The file name.
- Returns:
The path to cache file.
- Return type:
Path
- async cached_historical_data_fetcher.io.read(path: Path) DataFrame[source]
Read cache file using joblib and aiofiles, and return DataFrame. If cache file does not exist, return empty DataFrame.
- Parameters:
path (Path) – The path to cache file.
- Returns:
The DataFrame read from cache file.
- Return type:
DataFrame
- async cached_historical_data_fetcher.io.save(path: Path, df: DataFrame, *, compress: int | str | tuple[str, int] = ('lz4', 3), protocol: int | None = None) None[source]
Save DataFrame to cache file using joblib and aiofiles.
- Parameters:
path (Path) – The path to cache file.
df (DataFrame) – The DataFrame to save.
compress (int | str | tuple[str, int], optional) – The compression level, by default (“lz4”, 3)
protocol (int | None, optional) – The pickle protocol, by default None (latest protocol)
- async cached_historical_data_fetcher.io.update(path: Path, df: DataFrame, *, reload: bool = False, mismatch: Literal['warn', 'raise'] | int | None = 'warn', keep: Literal['first', 'last'] = 'last', compress: int | str | tuple[str, int] = ('lz4', 3), protocol: int | None = None) DataFrame[source]
Update cache file with DataFrame.
- Parameters:
path (Path) – The path to cache file.
df (DataFrame) – The DataFrame to save.
reload (bool, optional) – Whether to ignore cache file and reload, by default False
mismatch (Literal["warn", "raise"] | int | None, optional) – The action when data mismatch, by default “warn” If int, log level. If None, do nothing.
keep (Literal["first", "last"], optional) – Which duplicated index to keep, by default “last” (has no effect if mismatch is “raise”)
compress (int | str | tuple[str, int], optional) – The compression level, by default (“lz4”, 3)
protocol (int | None, optional) – The pickle protocol, by default None (latest protocol)
- Returns:
_description_
- Return type:
DataFrame