Source code for aws_sdk_polars.s3.partition

# -*- coding: utf-8 -*-

"""
Datalake partition utilities.

This module provides functions and classes for managing and manipulating
partitions in a data lake stored on S3. It includes utilities for extracting
partition information, encoding partition data, and listing partitions.
"""

import typing as T
import dataclasses

from s3pathlib import S3Path

if T.TYPE_CHECKING:  # pragma: no cover
    from mypy_boto3_s3.client import S3Client


[docs]def decode_hive_partition( s3dir_root: S3Path, s3dir_partition: S3Path, ) -> T.Dict[str, str]: """ Extract partition data from the S3 directory path. :param s3dir_root: The root S3 directory. :param s3dir_partition: The partition S3 directory. **Example** >>> s3dir_root = S3Path("s3://bucket/folder/") >>> s3dir_partition = S3Path("s3://bucket/folder/year=2021/month=01/day=15/") >>> decode_hive_partition(s3dir_root, s3dir_partition) {"year": "2021", "month": "01", "day": "15"} """ data = dict() for part in s3dir_partition.relative_to(s3dir_root).parts: key, value = part.split("=", 1) data[key] = value return data
[docs]def encode_hive_partition(kvs: T.Dict[str, str]) -> str: """ Encode partition data into hive styled partition string. :param kvs: A dictionary of partition key-value pairs. For example: >>> encode_hive_partition({"year": "2021", "month": "01", "day": "01"}) 'year=2021/month=01/day=01' """ return "/".join([f"{k}={v}" for k, v in kvs.items()])
[docs]def build_hive_partition_dir( s3dir_root: S3Path, kvs: T.Dict[str, str], ) -> S3Path: """ Get the S3 directory path of the partition. :param s3dir_root: The root S3 directory. :param kvs: A dictionary of partition key-value pairs. **Example** >>> s3dir_root = S3Path("s3://bucket/folder/") >>> s3dir_partition = build_hive_partition_dir(s3dir_root, {"year": "2021", "month": "01", "day": "01"}) >>> s3dir_partition.uri 's3://bucket/folder/year=2021/month=01/day=01/' """ return (s3dir_root / encode_hive_partition(kvs)).to_dir()
[docs]@dataclasses.dataclass class S3Partition: """ Represents a partition in an S3-based data lake. A partition is a directory in S3 that contains data files but no subdirectories. It typically follows a hierarchical structure based on partition keys. For example, in the following S3 directory structure:: s3://bucket/folder/year=2021/month=01/day=01/data.json s3://bucket/folder/year=2021/month=01/day=02/data.json s3://bucket/folder/year=2021/month=02/day=01/data.json s3://bucket/folder/year=2021/month=02/day=02/data.json Then: - ``s3://bucket/folder/year=2021/month=01/day=01/`` is a partition. - ``s3://bucket/folder/year=2021/month=01/`` is NOT a partition. - ``s3://bucket/folder/year=2021/`` is NOT a partition. :param root_uri: The S3 URI of the root folder of partition. For example: The root folder of ``s3://bucket/folder/year=2021/month=01/day=01/`` is ``s3://bucket/folder/``. :param data: A dictionary of partition data. Note that the value is always a string, even if it represents a number. For example: ``{"year": "2021", "month": "01", "day": "01"}`` """ root_uri: str = dataclasses.field() data: T.Dict[str, str] = dataclasses.field() def __post_init__(self): if self.root_uri.endswith("/") is False: self.root_uri = self.root_uri + "/" @property def s3dir_root(self) -> S3Path: """ The S3 directory path of the root directory. If the partition is "s3://bucket/folder/year=2021/month=01/day=15/", then the root directory is "s3://bucket/folder/". """ return S3Path.from_s3_uri(self.root_uri) @property def s3dir_part(self) -> S3Path: """ The S3 directory path of the partition. """ return build_hive_partition_dir(self.s3dir_root, self.data) @property def part_uri(self) -> str: """ The S3 URI of the partition directory. """ return self.s3dir_part.uri
[docs] @classmethod def from_uri( cls, s3uri_part: str, s3uri_root: str, ): """ Construct a Partition object from S3 URIs. :param s3dir_part: The S3 URI of the partition. :param s3uri_root: The S3 URI of the root directory. :return: A new :class:`Partition` object. """ s3dir_part = S3Path.from_s3_uri(s3uri_part).to_dir() s3dir_root = S3Path.from_s3_uri(s3uri_root).to_dir() data = decode_hive_partition(s3dir_root, s3dir_part) return cls(root_uri=s3uri_root, data=data)
[docs] @classmethod def from_part_uri( cls, part_uri: str, n_levels: int, ): """ Construct a Partition object from a partition URI. :param part_uri: The S3 URI of the partition. :param n_levels: The number of levels to go up to reach the root directory. :return: A new :class:`Partition` object. """ s3dir_part = S3Path.from_s3_uri(part_uri).to_dir() s3dir_root = s3dir_part for _ in range(n_levels): s3dir_root = s3dir_root.parent return cls.from_uri(s3uri_part=part_uri, s3uri_root=s3dir_root.uri)
[docs] def list_files_by_ext( self, s3_client: "S3Client", ext: str, ) -> T.List[S3Path]: """ List files in the partition by file extension. Files in subdirectories are not included. :param ext: File extension to filter. For example, ".parquet". :return: A list of S3Path objects representing Parquet files. """ return ( self.s3dir_part.iterdir(bsm=s3_client) .filter(lambda x: x.basename.endswith(ext)) .all() )
[docs]def list_partitions( s3_client: "S3Client", s3dir_root: S3Path, ) -> T.List[S3Partition]: """ Efficiently scan the S3 directory and return a list of partitions. For example, for the following S3 structure: s3://bucket/folder/year=2021/month=01/day=01/data.json s3://bucket/folder/year=2021/month=01/day=02/data.json s3://bucket/folder/year=2021/month=02/day=01/data.json s3://bucket/folder/year=2021/month=02/day=02/data.json The function will return partitions:: s3://bucket/folder/year=2021/month=01/day=01/ s3://bucket/folder/year=2021/month=01/day=02/ s3://bucket/folder/year=2021/month=02/day=01/ s3://bucket/folder/year=2021/month=02/day=02/ .. note:: This implementation has higher performance compared to :func:`get_partitions_v1` as it avoids recursive S3 API calls. """ # locate all s3 folder that has file in it s3_uri_set = { s3path.parent.uri for s3path in s3dir_root.iter_objects(bsm=s3_client) } s3_uri_list = list() # make sure either it is the s3dir_root or it has "=" character in it len_s3dir_root = len(s3dir_root.uri) for s3_uri in s3_uri_set: # sometimes we may have non partition folder, such as ``.hoodie`` folder # so we should check if there's a "=" character in it. if ("=" in s3_uri.split("/")[-2]) or (len(s3_uri) == len_s3dir_root): s3_uri_list.append(s3_uri) # convert partition uri list to partition object list s3_uri_list.sort() partition_list = list() for s3_uri in s3_uri_list: s3dir = S3Path.from_s3_uri(s3_uri) data = decode_hive_partition(s3dir_root, s3dir) partition = S3Partition(root_uri=s3dir_root.uri, data=data) partition_list.append(partition) return partition_list