Source code for aws_sdk_polars.s3.write
# -*- coding: utf-8 -*-
"""
todo: docstring
"""
import typing as T
import io
import polars as pl
from s3pathlib import S3Path
from polars_writer.writer import Writer, ParquetCompressionEnum
from compress.api import Algorithm, compress as do_compress
from ..constants import (
S3_METADATA_KEY_N_RECORD,
S3_METADATA_KEY_N_COLUMN,
)
from ..typehint import T_OPTIONAL_KWARGS
from .partition import encode_hive_partition
if T.TYPE_CHECKING: # pragma: no cover
from mypy_boto3_s3.client import S3Client
_content_encoding_mapping = {
Algorithm.gzip.value: "gzip",
Algorithm.bz2.value: "bz2",
Algorithm.snappy.value: "snappy",
Algorithm.lz4.value: "lz4",
Algorithm.lzma.value: "lzo",
Algorithm.zstd.value: "zstd",
}
_file_ext_mapping = {
Algorithm.gzip.value: ".gz",
Algorithm.bz2.value: ".bz2",
Algorithm.snappy.value: ".snappy",
Algorithm.lz4.value: ".lz4",
Algorithm.lzma.value: ".lzo",
Algorithm.zstd.value: ".zst",
}
[docs]def configure_s3_write_options(
df: pl.DataFrame,
polars_writer: Writer,
compress: Algorithm,
s3pathlib_write_bytes_kwargs: T.Dict[str, T.Any],
) -> str:
"""
Configure S3 write options based on the polars writer.
This function sets up the necessary metadata and content-related parameters for
writing a Polars DataFrame to S3. It determines the appropriate file extension
and configures compression settings based on the writer format and user preferences.
:param df: The Polars DataFrame to be written.
:param polars_writer: The Polars writer object specifying the output format.
:param gzip_compress: Whether to apply gzip compression (where applicable).
:param s3pathlib_write_bytes_kwargs: Dictionary of keyword arguments
for S3 write operation, to be modified in-place.
:return: The appropriate file extension for the configured write operation.
"""
more_metadata = {
S3_METADATA_KEY_N_RECORD: str(df.shape[0]),
S3_METADATA_KEY_N_COLUMN: str(df.shape[1]),
}
if "metadata" in s3pathlib_write_bytes_kwargs:
s3pathlib_write_bytes_kwargs["metadata"].update(more_metadata)
else:
s3pathlib_write_bytes_kwargs["metadata"] = more_metadata
compress_str: str = Algorithm.ensure_str(compress)
content_encoding = _content_encoding_mapping.get(compress_str)
if content_encoding is not None:
s3pathlib_write_bytes_kwargs["content_encoding"] = content_encoding
compress_ext = _file_ext_mapping.get(compress_str, "")
if polars_writer.is_csv():
s3pathlib_write_bytes_kwargs["content_type"] = "text/plain"
return f".csv{compress_ext}"
elif polars_writer.is_json() or polars_writer.is_ndjson():
s3pathlib_write_bytes_kwargs["content_type"] = "application/json"
return f".json{compress_ext}"
elif polars_writer.is_parquet():
s3pathlib_write_bytes_kwargs["content_type"] = "application/x-parquet"
compression = polars_writer.parquet_compression
if isinstance(compression, str):
if compression == ParquetCompressionEnum.uncompressed:
return ".parquet"
else:
s3pathlib_write_bytes_kwargs["content_encoding"] = compression
return f".{compression}.parquet"
else:
return ".parquet"
elif polars_writer.is_delta(): # pragma: no cover
raise NotImplementedError
else: # pragma: no cover
raise ValueError(f"Unsupported format: {polars_writer.format}")
[docs]def configure_s3path(
s3dir: T.Optional[S3Path] = None,
fname: T.Optional[str] = None,
ext: T.Optional[str] = None,
s3path: T.Optional[S3Path] = None,
):
"""
Configure and return an S3Path object for file operations.
This function allows flexible specification of an S3 path. It can either construct
a path from individual components (directory, filename, and extension) or use a
pre-configured S3Path object.
:param s3dir: The S3 directory path. Required if s3path is not provided.
:param fname: The filename without extension. Required if s3path is not provided.
for example, if the full file name is "data.csv", then fname is "data".
:param ext: The file extension, including the dot (e.g., '.csv').
Required if s3path is not provided.
:param s3path: A pre-configured S3Path object. If provided, other arguments are ignored.
:return The configured S3Path object representing the full file path in S3.
"""
if s3path is None:
if (s3dir is None) or (fname is None) or (ext is None):
raise ValueError(
"s3dir, fname, and ext must be provided when s3path is not provided"
)
return s3dir.joinpath(fname + ext)
else:
return s3path
[docs]def partition_df_for_s3(
df: pl.DataFrame,
s3dir: S3Path,
part_keys: T.List[str],
) -> T.Iterator[T.Tuple[pl.DataFrame, S3Path]]:
"""
Group dataframe by partition keys and locate the S3 location for each partition.
:param df: ``polars.DataFrame`` object.
:param s3dir: ``s3pathlib.S3Path`` object, the root directory of the
S3 location of the datalake table.
:param part_keys: list of partition keys. for example: ["year", "month"].
"""
part_values: T.List[str]
for part_values, sub_df in df.group_by(part_keys):
sub_df = sub_df.drop(part_keys)
kvs = dict(zip(part_keys, part_values))
partition_relpath = encode_hive_partition(kvs=kvs)
s3dir_partition = s3dir.joinpath(partition_relpath).to_dir()
yield (sub_df, s3dir_partition)
[docs]def write(
df: pl.DataFrame,
s3_client: "S3Client",
polars_writer: Writer,
compression: T.Union[str, Algorithm] = Algorithm.uncompressed,
compression_kwargs: T_OPTIONAL_KWARGS = None,
s3pathlib_write_bytes_kwargs: T_OPTIONAL_KWARGS = None,
s3dir: T.Optional[S3Path] = None,
fname: T.Optional[str] = None,
s3path: T.Optional[S3Path] = None,
) -> S3Path:
"""
Write the DataFrame to the given S3Path object, also attach
additional information related to the dataframe.
The original ``polars.write_parquet`` method doesn't work with moto,
so we use buffer to store the parquet file and then write it to S3.
:param df: ``polars.DataFrame`` object.
:param s3_client: ``boto3.client("s3")`` object.
:param polars_writer: `polars_writer.api.Writer <https://github.com/MacHu-GWU/polars_writer-project>`_
object.
:param compression: compression method for CSV, JSON. This option is ignored
for parquet, deltalake formats. Because it is already defined in polars_writer.
:param compression_kwargs: compression method keyword arguments.
For example, for `gzip <https://docs.python.org/3/library/gzip.html>`,
you can provide `{"compresslevel": 9}`.
:param s3pathlib_write_bytes_kwargs: Keyword arguments for
``s3path.write_bytes`` method. See
https://s3pathlib.readthedocs.io/en/latest/s3pathlib/core/rw.html#s3pathlib.core.rw.ReadAndWriteAPIMixin.write_bytes
:param s3dir: The S3 directory path. Required if s3path is not provided.
:param fname: The filename without extension. Required if s3path is not provided.
for example, if the full file name is "data.csv", then fname is "data".
:param s3path: A pre-configured S3Path object. If provided, other arguments are ignored.
:return: the S3Path object representing the created file on S3. You could
access its attribute like 'size', 'etag', 'last_modified_at'
"""
# --- preprocess input arguments
if s3pathlib_write_bytes_kwargs is None:
s3pathlib_write_bytes_kwargs = {}
ext = configure_s3_write_options(
df=df,
polars_writer=polars_writer,
compress=compression,
s3pathlib_write_bytes_kwargs=s3pathlib_write_bytes_kwargs,
)
if (
polars_writer.is_csv()
or polars_writer.is_json()
or polars_writer.is_ndjson()
or polars_writer.is_parquet()
):
buffer = io.BytesIO()
polars_writer.write(df, file_args=[buffer])
b = buffer.getvalue()
b = do_compress(algo=compression, data=b, kwargs=compression_kwargs)
s3path = configure_s3path(
s3dir=s3dir,
fname=fname,
ext=ext,
s3path=s3path,
)
s3path_new = s3path.write_bytes(
b,
bsm=s3_client,
**s3pathlib_write_bytes_kwargs,
)
return s3path_new
elif polars_writer.is_delta(): # pragma: no cover
# if s3dir is None:
# raise ValueError("s3dir must be provided for deltalake formats")
# polars_writer.write(df, file_args=[s3dir.uri])
# return s3dir
raise NotImplementedError
else: # pragma: no cover
raise NotImplementedError