Skip to content

get_test_data

Classes:

ZenodoDataDownloader

ZenodoDataDownloader(
    data_cache_folder: Path = joinpath("data"),
)

Methods:

Source code in geokit/core/get_test_data.py
def __init__(
    self,
    data_cache_folder: pathlib.Path = pathlib.Path(__file__).parent.parent.joinpath("data"),
):
    self.data_cache_folder = data_cache_folder

download_and_extract_parallel

download_and_extract_parallel(
    download_list: list[
        tuple[str, str | None, str | None, dict | None]
    ],
    max_workers: int = 4,
) -> list[Path]

Parallelize multiple downloads with optional extraction per job.

Each download_list entry: (url, filename_or_none, extract_dir_or_none, header). If extract_dir is provided, the downloaded file is extracted there (zip only).

Args: download_list: List of jobs, each defined by a tuple of (url, filename_or_none, extract_dir_or_none, header). max_workers: Maximum number of threads to use for concurrent downloads.

Returns:

  • list[pathlib.Path]: Paths to the downloaded files or extraction folders

    in the same order as the input download_list.

Raises:

  • Exception: If any download or extraction job fails. The original

    exception from the worker is attached as the cause of the raised Exception.

Source code in geokit/core/get_test_data.py
def download_and_extract_parallel(
    self,
    download_list: list[tuple[str, str | None, str | None, dict | None]],
    max_workers: int = 4,
) -> list[pathlib.Path]:
    """Parallelize multiple downloads with optional extraction per job.

    Each download_list entry: (url, filename_or_none, extract_dir_or_none, header).
    If extract_dir is provided, the downloaded file is extracted there (zip only).

    Args:
        download_list: List of jobs, each defined by a tuple of
            (url, filename_or_none, extract_dir_or_none, header).
        max_workers: Maximum number of threads to use for concurrent downloads.

    Returns
    -------
        list[pathlib.Path]: Paths to the downloaded files or extraction folders
        in the same order as the input download_list.

    Raises
    ------
        Exception: If any download or extraction job fails. The original
            exception from the worker is attached as the cause of the
            raised Exception.
    """

    def _worker(idx: int, job: tuple[str, str | None, str | None, dict | None]) -> tuple[int, pathlib.Path]:
        url, filename, extract_dir, headers = job
        downloaded_path = self.download_file(url=url, filename=filename, headers=headers)
        if extract_dir is None:
            return idx, downloaded_path
        extracted_path = self.extract_zip_archive(
            path_to_archive=downloaded_path,
            extract_folder=extract_dir,
        )
        return idx, extracted_path

    indexed_jobs = list(enumerate(download_list))
    results: list[pathlib.Path | None] = [None for _ in download_list]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(_worker, idx, job): idx for idx, job in indexed_jobs}
        for future in as_completed(futures):
            idx = futures[future]
            try:
                out_idx, path = future.result()
            except Exception as exc:  # bubble up with context
                raise Exception(f"Batch job failed for index {idx}: {exc}") from exc
            results[out_idx] = path

    return [path for path in results if path is not None]

download_file

download_file(
    url: str,
    filename: str | None = None,
    headers: dict | None = None,
    overwrite: bool = False,
    max_attempts: int = 3,
    backoff_seconds: float = 5.0,
) -> Path

Download a single file to the cache folder.

If filename is omitted, it is derived from the URL path. Set overwrite to re-download an existing file. max_attempts controls retries for transient failures.

Source code in geokit/core/get_test_data.py
def download_file(
    self,
    url: str,
    filename: str | None = None,
    headers: dict | None = None,
    overwrite: bool = False,
    max_attempts: int = 3,
    backoff_seconds: float = 5.0,
) -> pathlib.Path:
    """Download a single file to the cache folder.

    If ``filename`` is omitted, it is derived from the URL path. Set
    ``overwrite`` to re-download an existing file.
    ``max_attempts`` controls retries for transient failures.
    """
    parsed = urlparse(url)
    derived_name = pathlib.Path(parsed.path).name or "download"
    target_name = filename if isinstance(filename, str) else derived_name
    target_path = self.data_cache_folder.joinpath(target_name)

    if target_path.exists() and not overwrite:
        print(f"File already exists at {target_path}, skipping download.")
        return target_path

    if headers is None:
        headers_internal = {}
    else:
        headers_internal = headers

    self.data_cache_folder.mkdir(parents=True, exist_ok=True)
    last_error: Exception | None = None
    for attempt in range(1, max_attempts + 1):
        try:
            with requests.get(
                url,
                timeout=600,
                allow_redirects=True,
                headers=headers_internal,
                stream=True,
            ) as response:
                response.raise_for_status()
                with open(target_path, "wb") as f:
                    for chunk in response.iter_content(chunk_size=1024 * 1024 * 10):  # 10MB chunks
                        f.write(chunk)
            last_error = None
            break
        except requests.RequestException as exc:
            last_error = exc
            if attempt >= max_attempts:
                break
            sleep_for = backoff_seconds * (2 ** (attempt - 1))
            print(f"Download failed (attempt {attempt}/{max_attempts}); retrying in {sleep_for:.1f}s...")
            time.sleep(sleep_for)

    if last_error is not None:
        raise last_error

    print(f"Downloaded to {target_path}")
    return target_path