cached_historical_data_fetcher.cache package

class cached_historical_data_fetcher.cache.HistoricalDataCache[source]

Bases: object

Base class for historical data cache.

Usage

  1. Override self.get() to implement the logic.

  2. Override self.to_update() if the index is not Timestamp.

  3. Call self.update() to get historical data.

Examples


from cached_historical_data_fetcher import HistoricalDataCache from pandas import DataFrame, Timedelta, Timestamp, date_range

class MyCache(HistoricalDataCache):

interval: Timedelta = Timedelta(days=1)

async def get(self, start: Timestamp | None, *args: Any, **kwargs: Any) -> DataFrame:

start = start or Timestamp.utcnow().floor(“10D”) date_range_chunk = date_range(start, Timestamp.utcnow(), freq=”D”) return DataFrame({“day”: [d.day for d in date_range_chunk]}, index=date_range_chunk)

df = await MyCache().update()

add_interval: bool = True

If True, start in self.get() is the last index of historical data + self.interval. If False, start in self.get() is the last index of historical data.

compress: int | str | tuple[str, int] = ('lz4', 3)

The compression level.

folder: str

The folder name. By default, the class name.

abstract async get(start: Timestamp | Any | None, *args: Any, **kwargs: Any) DataFrame[source]

Get historical data. Override this method to implement the logic.

Parameters:

start (Timestamp | Any | None) – The last index of historical data.

Returns:

The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well. Multiindex is supported. It is recommended to set the first level to Timestamp.

Return type:

DataFrame

interval: Timedelta

The interval to update cache file.

keep: Literal['first', 'last'] = 'last'

Which duplicated index to keep.

mismatch: Literal['warn', 'raise'] | int | None = 'warn'

The action when data mismatch. If int, log level. If None, do nothing.

path(name: str) Path[source]
protocol: int | None = None

The pickle protocol.

to_update(end: Timestamp | Any | None, *args: Any, **kwargs: Any) bool[source]

Check if need to update cache file. Override this method to implement the logic. By default, update if cache file is older than self.interval.

Parameters:

end (Timestamp | Any | None) – The last index of historical data. If the cache file is empty, end is None.

Returns:

Whether to update cache file.

Return type:

bool

async update(reload: bool = False, *args: Any, **kwargs: Any) DataFrame[source]

Update cache file with DataFrame.

Parameters:
  • reload (bool, optional) – Whether to ignore cache file and reload, by default False

  • *args (Any) – The arguments for self.get() and self.to_update().

  • **kwargs (Any) – The keyword arguments for self.get() and self.to_update().

Returns:

The DataFrame read from cache file.

Return type:

DataFrame

Raises:

RuntimeError – If unexpected type read from cache file or self.get() does not return DataFrame or self.to_update() does not return bool.

class cached_historical_data_fetcher.cache.HistoricalDataCacheWithChunk[source]

Bases: HistoricalDataCache

Base class for historical data cache with chunk.

Usage

  1. Override self.get_one() to implement the logic.

  2. Override self.to_update() if the index is not Timestamp or interval is not fixed.

  3. Call self.update() to get historical data.

Examples


from cached_historical_data_fetcher import HistoricalDataCacheWithChunk from pandas import DataFrame, Timedelta, Timestamp

class MyCacheWithChunk(HistoricalDataCacheWithChunk):

delay_seconds: float = 0 interval: Timedelta = Timedelta(days=1) start_init: Timestamp = Timestamp.utcnow().floor(“10D”)

async def get_one(self, start: Timestamp, *args: Any, **kwargs: Any) -> DataFrame:

return DataFrame({“day”: [start.day]}, index=[start])

df = await MyCacheWithChunk().update()

property delay: float

Delay between chunks in seconds. (Alias of self.delay_seconds.)

delay_seconds: float

Delay between chunks in seconds.

folder: str

The folder name. By default, the class name.

async get(start: Timestamp | None, *args: Any, **kwargs: Any) DataFrame[source]

Get historical data. This method does not need to be overridden.

Parameters:

start (Timestamp | Any | None) – The last index of historical data.

Returns:

The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well.

Return type:

DataFrame

get_latest_uncompleted_chunk: bool = False

Whether to get the latest uncompleted chunk. If True, make sure to set self.add_interval to False to avoid uncompleted chunk left in cache file.

abstract async get_one(start: Timestamp, *args: Any, **kwargs: Any) DataFrame[source]

Get one chunk of historical data. Override this method to implement the logic.

Parameters:

start (Timestamp) – The start index of historical data.

Returns:

The chunk of historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well. Multiindex is supported. It is recommended to set the first level to Timestamp.

Return type:

DataFrame

interval: Timedelta

The interval to update cache file.

start_init: Timestamp

The initial start index of historical data. Used when no cache file exists.

class cached_historical_data_fetcher.cache.HistoricalDataCacheWithFixedChunk[source]

Bases: HistoricalDataCacheWithChunk

Base class for historical data cache with chunk.

This class only supports fixed interval. To support variable interval, use HistoricalDataCacheWithChunk instead.

As HistoricalDataCacheWithChunk calls self.get_one() one by one, HistoricalDataCacheWithFixedChunk calls self.get_one() in parallel. This makes it impossible to guarantee that rate limits are not exceeded, because depending on network conditions etc., it might theoretically be possible for all the requests to reach the server at the same time. Make sure to set self.delay_seconds large enough to avoid server overload or ban.

Usage

  1. Override self.get_one() to implement the logic.

  2. Call self.update() to get historical data.

Examples


from cached_historical_data_fetcher import HistoricalDataCacheWithFixedChunk from pandas import DataFrame, Timedelta, Timestamp

class MyCacheWithFixedChunk(HistoricalDataCacheWithFixedChunk):

delay_seconds: float = 0 interval: Timedelta = Timedelta(days=1) start_init: Timestamp = Timestamp.utcnow().floor(“10D”)

async def get_one(self, start: Timestamp, *args: Any, **kwargs: Any) -> DataFrame:

return DataFrame({“day”: [start.day]}, index=[start])

df = await MyCacheWithFixedChunk().update()

delay_seconds: float

Delay between chunks in seconds.

folder: str

The folder name. By default, the class name.

async get(start: Timestamp | None, *args: Any, **kwargs: Any) DataFrame[source]

Get historical data. This method does not need to be overridden.

Parameters:

start (Timestamp | Any | None) – The last index of historical data.

Returns:

The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well.

Return type:

DataFrame

interval: Timedelta

The interval to update cache file.

start_init: Timestamp

The initial start index of historical data. Used when no cache file exists.

final to_update(end: Timestamp | None, *args: Any, **kwargs: Any) bool[source]

Check if need to update cache file. Override this method to implement the logic. By default, update if cache file is older than self.interval.

Parameters:

end (Timestamp | Any | None) – The last index of historical data. If the cache file is empty, end is None.

Returns:

Whether to update cache file.

Return type:

bool

Submodules

cached_historical_data_fetcher.cache.base module

class cached_historical_data_fetcher.cache.base.HistoricalDataCache[source]

Bases: object

Base class for historical data cache.

Usage

  1. Override self.get() to implement the logic.

  2. Override self.to_update() if the index is not Timestamp.

  3. Call self.update() to get historical data.

Examples


from cached_historical_data_fetcher import HistoricalDataCache from pandas import DataFrame, Timedelta, Timestamp, date_range

class MyCache(HistoricalDataCache):

interval: Timedelta = Timedelta(days=1)

async def get(self, start: Timestamp | None, *args: Any, **kwargs: Any) -> DataFrame:

start = start or Timestamp.utcnow().floor(“10D”) date_range_chunk = date_range(start, Timestamp.utcnow(), freq=”D”) return DataFrame({“day”: [d.day for d in date_range_chunk]}, index=date_range_chunk)

df = await MyCache().update()

add_interval: bool = True

If True, start in self.get() is the last index of historical data + self.interval. If False, start in self.get() is the last index of historical data.

compress: int | str | tuple[str, int] = ('lz4', 3)

The compression level.

folder: str

The folder name. By default, the class name.

abstract async get(start: Timestamp | Any | None, *args: Any, **kwargs: Any) DataFrame[source]

Get historical data. Override this method to implement the logic.

Parameters:

start (Timestamp | Any | None) – The last index of historical data.

Returns:

The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well. Multiindex is supported. It is recommended to set the first level to Timestamp.

Return type:

DataFrame

interval: Timedelta

The interval to update cache file.

keep: Literal['first', 'last'] = 'last'

Which duplicated index to keep.

mismatch: Literal['warn', 'raise'] | int | None = 'warn'

The action when data mismatch. If int, log level. If None, do nothing.

path(name: str) Path[source]
protocol: int | None = None

The pickle protocol.

to_update(end: Timestamp | Any | None, *args: Any, **kwargs: Any) bool[source]

Check if need to update cache file. Override this method to implement the logic. By default, update if cache file is older than self.interval.

Parameters:

end (Timestamp | Any | None) – The last index of historical data. If the cache file is empty, end is None.

Returns:

Whether to update cache file.

Return type:

bool

async update(reload: bool = False, *args: Any, **kwargs: Any) DataFrame[source]

Update cache file with DataFrame.

Parameters:
  • reload (bool, optional) – Whether to ignore cache file and reload, by default False

  • *args (Any) – The arguments for self.get() and self.to_update().

  • **kwargs (Any) – The keyword arguments for self.get() and self.to_update().

Returns:

The DataFrame read from cache file.

Return type:

DataFrame

Raises:

RuntimeError – If unexpected type read from cache file or self.get() does not return DataFrame or self.to_update() does not return bool.

cached_historical_data_fetcher.cache.chunk module

class cached_historical_data_fetcher.cache.chunk.HistoricalDataCacheWithChunk[source]

Bases: HistoricalDataCache

Base class for historical data cache with chunk.

Usage

  1. Override self.get_one() to implement the logic.

  2. Override self.to_update() if the index is not Timestamp or interval is not fixed.

  3. Call self.update() to get historical data.

Examples


from cached_historical_data_fetcher import HistoricalDataCacheWithChunk from pandas import DataFrame, Timedelta, Timestamp

class MyCacheWithChunk(HistoricalDataCacheWithChunk):

delay_seconds: float = 0 interval: Timedelta = Timedelta(days=1) start_init: Timestamp = Timestamp.utcnow().floor(“10D”)

async def get_one(self, start: Timestamp, *args: Any, **kwargs: Any) -> DataFrame:

return DataFrame({“day”: [start.day]}, index=[start])

df = await MyCacheWithChunk().update()

property delay: float

Delay between chunks in seconds. (Alias of self.delay_seconds.)

delay_seconds: float

Delay between chunks in seconds.

folder: str

The folder name. By default, the class name.

async get(start: Timestamp | None, *args: Any, **kwargs: Any) DataFrame[source]

Get historical data. This method does not need to be overridden.

Parameters:

start (Timestamp | Any | None) – The last index of historical data.

Returns:

The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well.

Return type:

DataFrame

get_latest_uncompleted_chunk: bool = False

Whether to get the latest uncompleted chunk. If True, make sure to set self.add_interval to False to avoid uncompleted chunk left in cache file.

abstract async get_one(start: Timestamp, *args: Any, **kwargs: Any) DataFrame[source]

Get one chunk of historical data. Override this method to implement the logic.

Parameters:

start (Timestamp) – The start index of historical data.

Returns:

The chunk of historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well. Multiindex is supported. It is recommended to set the first level to Timestamp.

Return type:

DataFrame

interval: Timedelta

The interval to update cache file.

start_init: Timestamp

The initial start index of historical data. Used when no cache file exists.

class cached_historical_data_fetcher.cache.chunk.HistoricalDataCacheWithFixedChunk[source]

Bases: HistoricalDataCacheWithChunk

Base class for historical data cache with chunk.

This class only supports fixed interval. To support variable interval, use HistoricalDataCacheWithChunk instead.

As HistoricalDataCacheWithChunk calls self.get_one() one by one, HistoricalDataCacheWithFixedChunk calls self.get_one() in parallel. This makes it impossible to guarantee that rate limits are not exceeded, because depending on network conditions etc., it might theoretically be possible for all the requests to reach the server at the same time. Make sure to set self.delay_seconds large enough to avoid server overload or ban.

Usage

  1. Override self.get_one() to implement the logic.

  2. Call self.update() to get historical data.

Examples


from cached_historical_data_fetcher import HistoricalDataCacheWithFixedChunk from pandas import DataFrame, Timedelta, Timestamp

class MyCacheWithFixedChunk(HistoricalDataCacheWithFixedChunk):

delay_seconds: float = 0 interval: Timedelta = Timedelta(days=1) start_init: Timestamp = Timestamp.utcnow().floor(“10D”)

async def get_one(self, start: Timestamp, *args: Any, **kwargs: Any) -> DataFrame:

return DataFrame({“day”: [start.day]}, index=[start])

df = await MyCacheWithFixedChunk().update()

delay_seconds: float

Delay between chunks in seconds.

folder: str

The folder name. By default, the class name.

async get(start: Timestamp | None, *args: Any, **kwargs: Any) DataFrame[source]

Get historical data. This method does not need to be overridden.

Parameters:

start (Timestamp | Any | None) – The last index of historical data.

Returns:

The historical data. It is recommended to set index to Timestamp or unique incremental number. If the index is not Timestamp, override self.to_update() to implement the logic as well.

Return type:

DataFrame

interval: Timedelta

The interval to update cache file.

start_init: Timestamp

The initial start index of historical data. Used when no cache file exists.

final to_update(end: Timestamp | None, *args: Any, **kwargs: Any) bool[source]

Check if need to update cache file. Override this method to implement the logic. By default, update if cache file is older than self.interval.

Parameters:

end (Timestamp | Any | None) – The last index of historical data. If the cache file is empty, end is None.

Returns:

Whether to update cache file.

Return type:

bool