Source code for aws_dynamodb_io.import_table

# -*- coding: utf-8 -*-

"""
DynamoDB import from S3 tool box.

Reference:

- DynamoDB data import from Amazon S3: how it works: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.HowItWorks.html
"""

import typing as T
import enum
import gzip
import dataclasses
from datetime import datetime

import botocore.exceptions

from .importer import amazon_ion, dynamodb_json

from .waiter import Waiter
from .utils import split_s3_uri


if T.TYPE_CHECKING:  # pragma: no cover
    from mypy_boto3_s3.client import S3Client
    from mypy_boto3_s3.type_defs import PutObjectOutputTypeDef
    from mypy_boto3_dynamodb.client import DynamoDBClient


[docs]class ImportStatusEnum(str, enum.Enum): """ Import job status enumeration. """ IN_PROGRESS = "IN_PROGRESS" COMPLETED = "COMPLETED" CANCELLING = "CANCELLING" CANCELLED = "CANCELLED" FAILED = "FAILED"
[docs]class ImportFormatEnum(str, enum.Enum): """ Import format enumeration. """ DYNAMODB_JSON = "DYNAMODB_JSON" ION = "ION" CSV = "CSV"
[docs]@dataclasses.dataclass class ImportJob: """ The DynamoDB import table job data model. Ref: - import_table: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/import_table.html - describe_import: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/describe_import.html - list_imports: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/list_imports.html - How it works: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.HowItWorks.html """ # fmt: off arn: str = dataclasses.field() status: str = dataclasses.field() table_arn: T.Optional[str] = dataclasses.field(default=None) table_id: T.Optional[str] = dataclasses.field(default=None) client_token: T.Optional[str] = dataclasses.field(default=None) s3_bucket_owner: T.Optional[str] = dataclasses.field(default=None) s3_bucket: T.Optional[str] = dataclasses.field(default=None) s3_prefix: T.Optional[str] = dataclasses.field(default=None) error_count: T.Optional[int] = dataclasses.field(default=None) cloudwatch_log_group_arn: T.Optional[str] = dataclasses.field(default=None) input_format: T.Optional[str] = dataclasses.field(default=None) import_format_options: T.Optional[T.Dict[str, str]] = dataclasses.field(default=None) import_compression_type: T.Optional[str] = dataclasses.field(default=None) table_creation_parameters: T.Optional[T.Dict[str, str]] = dataclasses.field(default=None) start_time: T.Optional[datetime] = dataclasses.field(default=None) end_time: T.Optional[datetime] = dataclasses.field(default=None) processed_size_bytes: T.Optional[int] = dataclasses.field(default=None) processed_item_count: T.Optional[int] = dataclasses.field(default=None) imported_item_count: T.Optional[int] = dataclasses.field(default=None) failure_code: T.Optional[str] = dataclasses.field(default=None) failure_message: T.Optional[str] = dataclasses.field(default=None) # fmt: on def __post_init__(self): if self.s3_prefix is not None: if self.s3_prefix.endswith("/") is False: self.s3_prefix += "/"
[docs] @classmethod def list_imports( cls, dynamodb_client: "DynamoDBClient", table_arn: str, page_size: int = 25, max_results: int = 1000, get_details: bool = False, ) -> T.List["ImportJob"]: """ Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/list_imports.html """ imports = list() next_token = None while 1: kwargs = dict( TableArn=table_arn, MaxResults=page_size, ) if next_token is not None: kwargs["NextToken"] = next_token res = dynamodb_client.list_imports(**kwargs) for dct in res.get("ImportSummaryList", []): import_arn = dct["ImportArn"] import_status = dct["ImportStatus"] if get_details: import_job = cls.describe_import(dynamodb_client, import_arn) else: import_job = cls(arn=import_arn, status=import_status) imports.append(import_job) if len(imports) >= max_results: return imports _next_token = res.get("NextToken", "NOT_AVAILABLE") if _next_token == "NOT_AVAILABLE": break else: next_token = _next_token return imports
[docs] @classmethod def from_import_description(cls, desc: dict): """ :param desc: The import description dictionary from ``import_table``, ``describe_import`` or ``list_imports``. """ return cls( arn=desc["ImportArn"], status=desc["ImportStatus"], table_arn=desc.get("TableArn"), table_id=desc.get("TableId"), client_token=desc.get("ClientToken"), s3_bucket_owner=desc.get("S3BucketSource", {}).get("S3BucketOwner"), s3_bucket=desc.get("S3BucketSource", {}).get("S3Bucket"), s3_prefix=desc.get("S3BucketSource", {}).get("S3KeyPrefix"), error_count=desc.get("ErrorCount"), cloudwatch_log_group_arn=desc.get("CloudWatchLogGroupArn"), input_format=desc.get("InputFormat"), import_format_options=desc.get("InputFormatOptions"), import_compression_type=desc.get("InputCompressionType"), table_creation_parameters=desc.get("TableCreationParameters"), start_time=desc.get("StartTime"), end_time=desc.get("EndTime"), processed_size_bytes=desc.get("ProcessedSizeBytes"), processed_item_count=desc.get("ProcessedItemCount"), imported_item_count=desc.get("ImportedItemCount"), failure_code=desc.get("FailureCode"), failure_message=desc.get("FailureMessage"), )
[docs] @classmethod def describe_import( cls, dynamodb_client: "DynamoDBClient", import_arn: str, ) -> T.Optional["ImportJob"]: """ Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/describe_import.html """ try: res = dynamodb_client.describe_import(ImportArn=import_arn) except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "ImportNotFoundException": return None else: raise NotImplementedError desc = res["ImportTableDescription"] return cls.from_import_description(desc)
def is_in_progress(self) -> bool: return self.status == ImportStatusEnum.IN_PROGRESS.value def is_completed(self) -> bool: return self.status == ImportStatusEnum.COMPLETED.value def is_cancelling(self) -> bool: return self.status == ImportStatusEnum.CANCELLING.value def is_cancelled(self) -> bool: return self.status == ImportStatusEnum.CANCELLED.value def is_failed(self) -> bool: return self.status == ImportStatusEnum.FAILED.value
[docs] @classmethod def wait_until_complete( cls, dynamodb_client: "DynamoDBClient", import_arn: str, delays: int = 10, timeout: int = 900, verbose: bool = True, ) -> "ImportJob": """ Wait until the DynamoDB import is completed. """ for attempt, elapse in Waiter( delays=delays, timeout=timeout, instant=True, verbose=verbose, ): import_job = cls.describe_import( dynamodb_client=dynamodb_client, import_arn=import_arn, ) if import_job.is_completed(): return import_job elif import_job.status in [ ImportStatusEnum.CANCELLING.value, ImportStatusEnum.CANCELLED.value, ImportStatusEnum.FAILED.value, ]: raise ValueError(f"Import failed: {import_job.failure_message}") else: pass
ITEM = "Item"
[docs]def write_amazon_ion( records: T.Iterable[T.Dict[str, T.Any]], s3uri: str, s3_client: "S3Client", ) -> "PutObjectOutputTypeDef": """ Write records to S3, so that you can import DynamoDB table from S3 later. This function uses the Amazon Ion format and gzip compression. Ref: - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.Format.html#S3DataImport.Requesting.Formats.Ion .. note:: the ``amazon.ion`` python library doesn't work well with the DynamoDB import table feature. The ``amazon.ion`` library will encode integer value like the standard ``json`` library. ``1`` to ``1``. However, the data for import table requires a "dot" after the integer, for example ``1.``. Looks like there's no way we can let the ``amazon.ion`` to add the "dot". If you want to construct dataset for DynamoDB import table, don't use ION, use DynamoDB JSON instead. :param records: The list of python dictionary representing the dynamodb item to write to S3. :param s3uri: The S3 URI to write the data to. :param s3_client: The S3 client to use. """ raise NotImplementedError( "amazon.ion library doesn't work well with DynamoDB import table feature. " "Use DynamoDB JSON instead." ) # -- this should be the right way to implement it, but it doesn't add the dot # -- so that it doesn't work # lines = [] # for record in records: # lines.append(amazon_ion.dumps({ITEM: record}, binary=False)) # lines.append("") # content = gzip.compress("\n".join(lines).encode("utf-8")) # -- I also tried this hacky way, but it doesn't work either # s = amazon_ion.dumps([{ITEM: record} for record in records], binary=False) # content = gzip.compress((s + "\n").encode("utf-8")) bucket, key = split_s3_uri(s3uri=s3uri) return s3_client.put_object( Bucket=bucket, Key=key, Body=content, ContentType="text/plain", ContentEncoding="gzip", )
[docs]def write_dynamodb_json( records: T.Iterable[T.Dict[str, T.Any]], s3uri: str, s3_client: "S3Client", ) -> "PutObjectOutputTypeDef": """ Write records to S3, so that you can import DynamoDB table from S3 later. This function uses the DynamoDB JSON format and gzip compression. Ref: - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.Format.html#S3DataImport.Requesting.Formats.DDBJson :param records: The list of python dictionary representing the dynamodb item to write to S3. :param s3uri: The S3 URI to write the data to. :param s3_client: The S3 client to use. """ lines = [] for record in records: lines.append(f'{{"{ITEM}": {dynamodb_json.dumps(record)}}}') lines.append("") content = gzip.compress("\n".join(lines).encode("utf-8")) bucket, key = split_s3_uri(s3uri=s3uri) return s3_client.put_object( Bucket=bucket, Key=key, Body=content, ContentType="application/json", ContentEncoding="gzip", )