Skip to content

Reference

async_s3

S3 Bucket helper utils. Async list objects by folders

The file is mandatory for build system to find the package.

S3BucketObjects

Source code in src/async_s3/s3_bucket_objects.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class S3BucketObjects:
    def __init__(self, bucket: str, *, parallelism: int = DEFAULT_PARALLELISM) -> None:
        """Initialize the S3BucketObjects object.

        bucket: The name of the S3 bucket.
        parallelism: The maximum number of concurrent requests to AWS S3.
        """
        self._bucket = bucket
        self.semaphore = asyncio.Semaphore(parallelism)

    async def _list_objects(  # noqa: PLR0913
        self,
        s3_client: aiobotocore.client.AioBaseClient,
        prefix: str,
        current_depth: int,
        max_level: Optional[int],
        max_folders: Optional[int],
        delimiter: str,
        objects_keys: set[str],
        queue: asyncio.Queue[list[dict[str, Any]]],
        active_tasks: set[asyncio.Task[None]],
    ) -> None:
        """Emit object pages to the queue."""
        paginator = s3_client.get_paginator("list_objects_v2")
        prefixes = []

        params = {"Bucket": self._bucket, "Prefix": prefix}
        if (current_depth != -1) and (max_level is None or current_depth < max_level):
            params["Delimiter"] = delimiter

        async for page in paginator.paginate(**params):
            objects = page.get("Contents", [])
            new_keys = {
                obj["Key"]
                for obj in objects
                if not obj["Key"].endswith(delimiter) and obj["Key"] not in objects_keys
            }
            cleared_objects = [obj for obj in objects if obj["Key"] in new_keys]
            objects_keys.update(new_keys)
            await queue.put(cleared_objects)

            if "Delimiter" in params:
                prefixes.extend([prefix["Prefix"] for prefix in page.get("CommonPrefixes", [])])

        level = -1 if current_depth == -1 else current_depth + 1
        if max_folders is not None and (len(prefixes) > max_folders):
            prefixes = list(group_by_prefix(prefixes, max_folders))
            level = -1

        for folder in prefixes:
            await self.semaphore.acquire()
            try:
                task = asyncio.create_task(
                    self._list_objects(
                        s3_client,
                        folder,
                        level,
                        max_level,
                        max_folders,
                        delimiter,
                        objects_keys,
                        queue,
                        active_tasks,
                    ),
                )
            except Exception:
                self.semaphore.release()
                raise
            active_tasks.add(task)
            task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

    async def iter(
        self,
        prefix: str = "/",
        *,
        max_level: Optional[int] = None,
        max_folders: Optional[int] = None,
        delimiter: str = "/",
    ) -> AsyncIterator[list[dict[str, Any]]]:
        """Generator that yields objects in the bucket with the given prefix.

        Yield objects by partial chunks (list of AWS S3 object dicts) as they
         are collected from AWS asynchronously.

        max_level: The maximum folders depth to traverse in separate requests.
        If None, traverse all levels.
        max_folders: The maximum number of folders to load in separate requests.
        If None, requests all folders.
        Otherwise, the folders are grouped by prefixes before loading in separate requests.
        Try to group in the given number of folders if possible.
        delimiter: The delimiter for "folders".
        """
        # if we group by prefixes, some objects may be listed multiple times
        # to avoid this, we store the keys of the objects already listed
        objects_keys: set[str] = set()

        # queue to store the objects pages from the tasks
        queue: asyncio.Queue[list[dict[str, Any]]] = asyncio.Queue()

        # set to keep track of active tasks
        active_tasks: set[asyncio.Task[None]] = set()

        async with get_s3_client() as s3_client:
            await self.semaphore.acquire()
            try:
                root_task = asyncio.create_task(
                    self._list_objects(
                        s3_client,
                        prefix,
                        0,
                        max_level,
                        max_folders,
                        delimiter,
                        objects_keys,
                        queue,
                        active_tasks,
                    ),
                )
            except Exception:
                self.semaphore.release()
                raise
            active_tasks.add(root_task)
            root_task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

            while active_tasks:
                try:
                    page = await queue.get()
                    if page:
                        yield page
                except asyncio.QueueEmpty:
                    await asyncio.sleep(0)

    def _task_done(
        self,
        task: asyncio.Task[None],
        active_tasks: set[asyncio.Task[None]],
        queue: asyncio.Queue[list[dict[str, Any]]],
    ) -> None:
        """Callback for when a task is done."""

        async def async_task_done() -> None:
            active_tasks.discard(task)
            self.semaphore.release()
            await queue.put([])  # signal that the task is done

        asyncio.create_task(async_task_done())

    async def list(
        self,
        prefix: str = "/",
        *,
        max_level: Optional[int] = None,
        max_folders: Optional[int] = None,
        delimiter: str = "/",
    ) -> list[dict[str, Any]]:
        """List all objects in the bucket with the given prefix.

        max_level: The maximum folders depth to traverse in separate requests.
        If None, traverse all levels.
        max_folders: The maximum number of folders to load in separate requests.
        If None, requests all folders.
        Otherwise, the folders are grouped by prefixes before loading in separate requests.
        Try to group to the given `max_folders` if possible.
        delimiter: The delimiter for "folders".
        """
        objects = []
        async for objects_page in self.iter(
            prefix,
            max_level=max_level,
            max_folders=max_folders,
            delimiter=delimiter,
        ):
            objects.extend(objects_page)
        return objects

__init__(bucket, *, parallelism=DEFAULT_PARALLELISM)

Initialize the S3BucketObjects object.

bucket: The name of the S3 bucket. parallelism: The maximum number of concurrent requests to AWS S3.

Source code in src/async_s3/s3_bucket_objects.py
32
33
34
35
36
37
38
39
def __init__(self, bucket: str, *, parallelism: int = DEFAULT_PARALLELISM) -> None:
    """Initialize the S3BucketObjects object.

    bucket: The name of the S3 bucket.
    parallelism: The maximum number of concurrent requests to AWS S3.
    """
    self._bucket = bucket
    self.semaphore = asyncio.Semaphore(parallelism)

iter(prefix='/', *, max_level=None, max_folders=None, delimiter='/') async

Generator that yields objects in the bucket with the given prefix.

Yield objects by partial chunks (list of AWS S3 object dicts) as they are collected from AWS asynchronously.

max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels. max_folders: The maximum number of folders to load in separate requests. If None, requests all folders. Otherwise, the folders are grouped by prefixes before loading in separate requests. Try to group in the given number of folders if possible. delimiter: The delimiter for "folders".

Source code in src/async_s3/s3_bucket_objects.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def iter(
    self,
    prefix: str = "/",
    *,
    max_level: Optional[int] = None,
    max_folders: Optional[int] = None,
    delimiter: str = "/",
) -> AsyncIterator[list[dict[str, Any]]]:
    """Generator that yields objects in the bucket with the given prefix.

    Yield objects by partial chunks (list of AWS S3 object dicts) as they
     are collected from AWS asynchronously.

    max_level: The maximum folders depth to traverse in separate requests.
    If None, traverse all levels.
    max_folders: The maximum number of folders to load in separate requests.
    If None, requests all folders.
    Otherwise, the folders are grouped by prefixes before loading in separate requests.
    Try to group in the given number of folders if possible.
    delimiter: The delimiter for "folders".
    """
    # if we group by prefixes, some objects may be listed multiple times
    # to avoid this, we store the keys of the objects already listed
    objects_keys: set[str] = set()

    # queue to store the objects pages from the tasks
    queue: asyncio.Queue[list[dict[str, Any]]] = asyncio.Queue()

    # set to keep track of active tasks
    active_tasks: set[asyncio.Task[None]] = set()

    async with get_s3_client() as s3_client:
        await self.semaphore.acquire()
        try:
            root_task = asyncio.create_task(
                self._list_objects(
                    s3_client,
                    prefix,
                    0,
                    max_level,
                    max_folders,
                    delimiter,
                    objects_keys,
                    queue,
                    active_tasks,
                ),
            )
        except Exception:
            self.semaphore.release()
            raise
        active_tasks.add(root_task)
        root_task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

        while active_tasks:
            try:
                page = await queue.get()
                if page:
                    yield page
            except asyncio.QueueEmpty:
                await asyncio.sleep(0)

list(prefix='/', *, max_level=None, max_folders=None, delimiter='/') async

List all objects in the bucket with the given prefix.

max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels. max_folders: The maximum number of folders to load in separate requests. If None, requests all folders. Otherwise, the folders are grouped by prefixes before loading in separate requests. Try to group to the given max_folders if possible. delimiter: The delimiter for "folders".

Source code in src/async_s3/s3_bucket_objects.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
async def list(
    self,
    prefix: str = "/",
    *,
    max_level: Optional[int] = None,
    max_folders: Optional[int] = None,
    delimiter: str = "/",
) -> list[dict[str, Any]]:
    """List all objects in the bucket with the given prefix.

    max_level: The maximum folders depth to traverse in separate requests.
    If None, traverse all levels.
    max_folders: The maximum number of folders to load in separate requests.
    If None, requests all folders.
    Otherwise, the folders are grouped by prefixes before loading in separate requests.
    Try to group to the given `max_folders` if possible.
    delimiter: The delimiter for "folders".
    """
    objects = []
    async for objects_page in self.iter(
        prefix,
        max_level=max_level,
        max_folders=max_folders,
        delimiter=delimiter,
    ):
        objects.extend(objects_page)
    return objects

group_by_prefix

find_longest_common_prefix(words)

Finds the longest common prefix among a list of words.

Source code in src/async_s3/group_by_prefix.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def find_longest_common_prefix(words: list[str]) -> str:
    """Finds the longest common prefix among a list of words."""
    if not words:
        return ""

    def is_common_prefix(length: int) -> bool:
        prefix = words[0][:length]
        return all(s.startswith(prefix) for s in words)

    min_length = min(len(s) for s in words)

    low, high = 0, min_length
    while low <= high:
        mid = (low + high) // 2
        if is_common_prefix(mid):
            low = mid + 1
        else:
            high = mid - 1

    return words[0][: (low + high) // 2]

group_by_prefix(words, desired_group_count)

Groups words by prefixes to create a desired number of word groups.

Try to create the desired number of groups if possible.

Source code in src/async_s3/group_by_prefix.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def group_by_prefix(words: list[str], desired_group_count: int) -> list[str]:  # noqa: C901
    """Groups words by prefixes to create a desired number of word groups.

    Try to create the desired number of groups if possible.
    """
    words.sort()

    common_prefix = find_longest_common_prefix(words)
    common_prefix_length = len(common_prefix)
    prefix_groups = {}  # {prefix: (start_index, count, can_split)}

    for i, s in enumerate(words):
        if len(s) <= common_prefix_length:
            prefix_groups = {s: [0, len(words), True]}
            break
        prefix = common_prefix + s[common_prefix_length]
        if prefix not in prefix_groups:
            prefix_groups[prefix] = [i, 1, True]
        else:
            prefix_groups[prefix][1] += 1

    def split_prefix_groups(groups: dict[str, list[int]]) -> dict[str, list[int]]:
        new_groups = {}
        for prefix, (start_index, count, can_split) in groups.items():
            if not can_split or count < max(2, (len(words) // desired_group_count)):
                new_groups[prefix] = [start_index, count, False]
                continue

            subgroups = {}
            can_further_split = True
            for i in range(start_index, start_index + count):
                s = words[i]
                if len(prefix) < len(s):
                    new_prefix = prefix + s[len(prefix)]
                else:
                    can_further_split = False
                    break  # A string with the length of the group prefix prevents splitting
                if new_prefix not in subgroups:
                    subgroups[new_prefix] = [i, 1, True]
                else:
                    subgroups[new_prefix][1] += 1

            if can_further_split and len(subgroups) < count:
                new_groups.update(subgroups)
            else:
                new_groups[prefix] = [start_index, count, False]

        return new_groups

    while len(prefix_groups) < desired_group_count and any(
        can_split for _, _, can_split in prefix_groups.values()
    ):
        prefix_groups = split_prefix_groups(prefix_groups)

    return list(prefix_groups.keys())

main

async-s3.

ListingConfig dataclass

Configuration for S3 object listing operations.

Source code in src/async_s3/main.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@dataclass
class ListingConfig:
    """Configuration for S3 object listing operations."""

    s3_url: str
    max_level: Optional[int] = None
    max_folders: Optional[int] = None
    delimiter: str = "/"
    parallelism: int = 100
    repeat: int = 1

    @property
    def bucket(self) -> str:
        """Extract bucket name from S3 URL."""
        return split_s3_url(self.s3_url)[0]

    @property
    def key(self) -> str:
        """Extract key/prefix from S3 URL."""
        return split_s3_url(self.s3_url)[1]
bucket property

Extract bucket name from S3 URL.

key property

Extract key/prefix from S3 URL.

S3ListCommand

Base class for S3 listing commands.

Source code in src/async_s3/main.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class S3ListCommand:
    """Base class for S3 listing commands."""

    def __init__(  # noqa: PLR0913
        self,
        s3_url: str,
        max_level: Optional[int],
        max_folders: Optional[int],
        repeat: int,
        parallelism: int,
        delimiter: str,
    ):
        self.config = ListingConfig(
            s3_url=s3_url,
            max_level=max_level,
            max_folders=max_folders,
            repeat=repeat,
            parallelism=parallelism,
            delimiter=delimiter,
        )

    def validate_and_execute(self) -> Iterable[dict[str, Any]]:
        """Validate S3 URL and execute listing."""
        if not self.config.s3_url.startswith(S3PROTO):
            error("Invalid S3 URL. It should start with s3://")
        return list_objects(self.config)
validate_and_execute()

Validate S3 URL and execute listing.

Source code in src/async_s3/main.py
89
90
91
92
93
def validate_and_execute(self) -> Iterable[dict[str, Any]]:
    """Validate S3 URL and execute listing."""
    if not self.config.s3_url.startswith(S3PROTO):
        error("Invalid S3 URL. It should start with s3://")
    return list_objects(self.config)

as3()

Async S3.

Source code in src/async_s3/main.py
62
63
64
65
@click.group()
@click.version_option(version=__version__, prog_name="as3")
def as3() -> None:
    """Async S3."""

du(s3_url, max_level, max_folders, repeat, parallelism, delimiter)

Show count and size for objects in an S3 bucket.

Example: as3 du s3://bucket/key

Source code in src/async_s3/main.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@list_objects_options
@as3.command()
def du(  # noqa: PLR0913
    s3_url: str,
    max_level: Optional[int],
    max_folders: Optional[int],
    repeat: int,
    parallelism: int,
    delimiter: str,
) -> None:
    """
    Show count and size for objects in an S3 bucket.

    Example:
    as3 du s3://bucket/key
    """
    cmd = S3ListCommand(s3_url, max_level, max_folders, repeat, parallelism, delimiter)
    objects = cmd.validate_and_execute()
    print_summary(objects)

error(message)

Print an error message and exit.

Source code in src/async_s3/main.py
44
45
46
47
def error(message: str) -> None:
    """Print an error message and exit."""
    click.secho(message, fg="red", bold=True)
    raise click.Abort()

human_readable_size(size, decimal_places=2)

Convert bytes to a human-readable format.

Source code in src/async_s3/main.py
193
194
195
196
197
198
199
200
def human_readable_size(size: float, decimal_places: int = 2) -> str:
    """Convert bytes to a human-readable format."""
    for _unit in ["B", "KB", "MB", "GB", "TB"]:
        bytes_in_kilo = 1024.0
        if size < bytes_in_kilo:
            break
        size /= bytes_in_kilo
    return f"{size:.{decimal_places}f} {_unit}"

list_objects(config)

List objects in an S3 bucket.

Source code in src/async_s3/main.py
203
204
205
def list_objects(config: ListingConfig) -> Iterable[dict[str, Any]]:
    """List objects in an S3 bucket."""
    return asyncio.run(list_objects_async(config))

list_objects_async(config) async

List objects in an S3 bucket.

Source code in src/async_s3/main.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
async def list_objects_async(config: ListingConfig) -> Iterable[dict[str, Any]]:
    """List objects in an S3 bucket."""
    assert config.repeat > 0
    print_start_info(config)

    s3_list = S3BucketObjects(config.bucket, parallelism=config.parallelism)
    total_time = 0.0
    result: Iterable[dict[str, Any]] = []

    for attempt in range(config.repeat):
        result, duration = await list_objects_with_progress(s3_list, config)
        total_time += duration
        print_attempt_info(attempt, duration)

    if config.repeat > 1:
        print_average_time(total_time, config.repeat)

    return result

list_objects_options(func)

Add common options to commands using list_objects.

Source code in src/async_s3/main.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def list_objects_options(func: Callable[[Any], None]) -> Callable[[Any], None]:
    """Add common options to commands using list_objects."""
    func = click.argument("s3_url")(func)
    func = click.option(
        "--max-level",
        "-l",
        type=int,
        default=None,
        help=(
            "The maximum folder levels to traverse in separate requests. "
            "By default, traverse all levels."
        ),
    )(func)
    func = click.option(
        "--max-folders",
        "-f",
        type=int,
        default=None,
        help="The maximum number of folders to list in one request. By default, list all folders.",
    )(func)
    func = click.option(
        "--repeat",
        "-r",
        type=int,
        default=1,
        help=(
            "Repeat the operation multiple times to calculate average elapsed time. "
            "By default, repeat once."
        ),
    )(func)
    func = click.option(
        "--parallelism",
        "-p",
        type=int,
        default=100,
        help="The maximum number of concurrent requests to AWS S3. By default, 100.",
    )(func)
    return click.option(
        "--delimiter",
        "-d",
        type=str,
        callback=validate_delimiter,
        default="/",
        help="Delimiter for 'folders'. Default is '/'.",
    )(func)

list_objects_with_progress(s3_list, config) async

List objects in an S3 bucket with a progress bar.

Returns:

Type Description
tuple[Iterable[dict[str, Any]], float]

(The objects, the elapsed time)

Source code in src/async_s3/main.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
async def list_objects_with_progress(
    s3_list: S3BucketObjects,
    config: ListingConfig,
) -> tuple[Iterable[dict[str, Any]], float]:
    """List objects in an S3 bucket with a progress bar.

    Returns:
        (The objects, the elapsed time)
    """
    start_time = time.time()
    result = []
    total_size = 0
    last_update_time = start_time - PROGRESS_REFRESH_INTERVAL

    try:
        with Progress(
            TextColumn("[progress.description]{task.description}{task.completed:>,}"),
            BarColumn(),
            TaskProgressColumn(),
            transient=True,
        ) as progress:
            objects_bar = progress.add_task("[green]Objects: ", total=None)
            size_bar = progress.add_task("[green]Size:    ", total=None)
            async for objects_page in s3_list.iter(
                config.key,
                max_level=config.max_level,
                max_folders=config.max_folders,
                delimiter=config.delimiter,
            ):
                result.extend(objects_page)
                page_objects_size = sum(obj["Size"] for obj in objects_page)
                total_size += page_objects_size
                current_time = time.time()
                if current_time - last_update_time >= PROGRESS_REFRESH_INTERVAL:
                    progress.update(objects_bar, advance=len(objects_page))
                    progress.update(size_bar, advance=page_objects_size)
                    last_update_time = current_time
            progress.remove_task(objects_bar)
            progress.remove_task(size_bar)
    except botocore.exceptions.ClientError as exc:
        error(f"Error: {exc}")

    end_time = time.time()
    duration = end_time - start_time
    return result, duration

ls(s3_url, max_level, max_folders, repeat, parallelism, delimiter)

List objects in an S3 bucket.

Example: as3 ls s3://bucket/key

Source code in src/async_s3/main.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@list_objects_options
@as3.command()
def ls(  # noqa: PLR0913
    s3_url: str,
    max_level: Optional[int],
    max_folders: Optional[int],
    repeat: int,
    parallelism: int,
    delimiter: str,
) -> None:
    """
    List objects in an S3 bucket.

    Example:
    as3 ls s3://bucket/key
    """
    cmd = S3ListCommand(s3_url, max_level, max_folders, repeat, parallelism, delimiter)
    objects = cmd.validate_and_execute()
    click.echo("\n".join([obj["Key"] for obj in objects]))
    print_summary(objects)

print_attempt_info(attempt, duration)

Print the elapsed time for an attempt.

Source code in src/async_s3/main.py
301
302
303
304
305
306
307
def print_attempt_info(attempt: int, duration: float) -> None:
    """Print the elapsed time for an attempt."""
    click.echo(
        f"{click.style(f'({attempt + 1}) Elapsed time: ', fg='green')}"
        f"{click.style(f'{duration:.2f}', fg='green', bold=True)} "
        f"{click.style('seconds', fg='green')}",
    )

print_average_time(total_time, repeat)

Print the average elapsed time.

Source code in src/async_s3/main.py
310
311
312
313
314
315
316
def print_average_time(total_time: float, repeat: int) -> None:
    """Print the average elapsed time."""
    click.echo(
        f"{click.style('Average time: ', fg='yellow')}"
        f"{click.style(f'{total_time / repeat:.2f}', fg='yellow', bold=True)} "
        f"{click.style('seconds', fg='yellow')}",
    )

print_start_info(config)

Print the command parameters.

Source code in src/async_s3/main.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def print_start_info(config: ListingConfig) -> None:
    """Print the command parameters."""
    click.echo(
        f"{click.style('Listing objects in ', fg='yellow')}"
        f"{click.style(config.s3_url, fg='yellow', bold=True)}",
    )
    click.echo(
        f"{click.style('max_level: ', fg='yellow')}"
        f"{click.style(str(config.max_level), fg='yellow', bold=True)}, "
        f"{click.style('max_folders: ', fg='yellow')}"
        f"{click.style(str(config.max_folders), fg='yellow', bold=True)}, "
        f"{click.style('delimiter: ', fg='yellow')}"
        f"{click.style(config.delimiter, fg='yellow', bold=True)}, "
        f"{click.style('parallelism: ', fg='yellow')}"
        f"{click.style(str(config.parallelism), fg='yellow', bold=True)}, "
        f"{click.style(str(config.repeat), fg='yellow', bold=True)}"
        f"{click.style(' times.', fg='yellow')}",
    )

print_summary(objects)

Print a summary of the objects.

Source code in src/async_s3/main.py
50
51
52
53
54
55
56
57
58
59
def print_summary(objects: Iterable[dict[str, Any]]) -> None:
    """Print a summary of the objects."""
    total_size = sum(obj["Size"] for obj in objects)
    message = (
        f"{click.style('Total objects: ', fg='yellow')}"
        f"{click.style(f'{len(list(objects)):,}', fg='yellow', bold=True)}, "
        f"{click.style('size: ', fg='yellow')}"
        f"{click.style(human_readable_size(total_size), fg='yellow', bold=True)}"
    )
    click.echo(message)

split_s3_url(s3_url)

Split an S3 URL into bucket and key.

Source code in src/async_s3/main.py
248
249
250
251
def split_s3_url(s3_url: str) -> tuple[str, str]:
    """Split an S3 URL into bucket and key."""
    parts = s3_url[len(S3PROTO) :].split("/", 1)
    return (parts[0], parts[1] if len(parts) > 1 else "")

validate_delimiter(ctx, param, value)

Validate the Delimiter option.

Source code in src/async_s3/main.py
143
144
145
146
147
def validate_delimiter(ctx: click.Context, param: click.Parameter, value: str) -> str:  # noqa: ARG001
    """Validate the `Delimiter` option."""
    if len(value) != 1:
        raise click.BadParameter("Delimiter must be exactly one character.")
    return value

s3_bucket_objects

S3BucketObjects

Source code in src/async_s3/s3_bucket_objects.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class S3BucketObjects:
    def __init__(self, bucket: str, *, parallelism: int = DEFAULT_PARALLELISM) -> None:
        """Initialize the S3BucketObjects object.

        bucket: The name of the S3 bucket.
        parallelism: The maximum number of concurrent requests to AWS S3.
        """
        self._bucket = bucket
        self.semaphore = asyncio.Semaphore(parallelism)

    async def _list_objects(  # noqa: PLR0913
        self,
        s3_client: aiobotocore.client.AioBaseClient,
        prefix: str,
        current_depth: int,
        max_level: Optional[int],
        max_folders: Optional[int],
        delimiter: str,
        objects_keys: set[str],
        queue: asyncio.Queue[list[dict[str, Any]]],
        active_tasks: set[asyncio.Task[None]],
    ) -> None:
        """Emit object pages to the queue."""
        paginator = s3_client.get_paginator("list_objects_v2")
        prefixes = []

        params = {"Bucket": self._bucket, "Prefix": prefix}
        if (current_depth != -1) and (max_level is None or current_depth < max_level):
            params["Delimiter"] = delimiter

        async for page in paginator.paginate(**params):
            objects = page.get("Contents", [])
            new_keys = {
                obj["Key"]
                for obj in objects
                if not obj["Key"].endswith(delimiter) and obj["Key"] not in objects_keys
            }
            cleared_objects = [obj for obj in objects if obj["Key"] in new_keys]
            objects_keys.update(new_keys)
            await queue.put(cleared_objects)

            if "Delimiter" in params:
                prefixes.extend([prefix["Prefix"] for prefix in page.get("CommonPrefixes", [])])

        level = -1 if current_depth == -1 else current_depth + 1
        if max_folders is not None and (len(prefixes) > max_folders):
            prefixes = list(group_by_prefix(prefixes, max_folders))
            level = -1

        for folder in prefixes:
            await self.semaphore.acquire()
            try:
                task = asyncio.create_task(
                    self._list_objects(
                        s3_client,
                        folder,
                        level,
                        max_level,
                        max_folders,
                        delimiter,
                        objects_keys,
                        queue,
                        active_tasks,
                    ),
                )
            except Exception:
                self.semaphore.release()
                raise
            active_tasks.add(task)
            task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

    async def iter(
        self,
        prefix: str = "/",
        *,
        max_level: Optional[int] = None,
        max_folders: Optional[int] = None,
        delimiter: str = "/",
    ) -> AsyncIterator[list[dict[str, Any]]]:
        """Generator that yields objects in the bucket with the given prefix.

        Yield objects by partial chunks (list of AWS S3 object dicts) as they
         are collected from AWS asynchronously.

        max_level: The maximum folders depth to traverse in separate requests.
        If None, traverse all levels.
        max_folders: The maximum number of folders to load in separate requests.
        If None, requests all folders.
        Otherwise, the folders are grouped by prefixes before loading in separate requests.
        Try to group in the given number of folders if possible.
        delimiter: The delimiter for "folders".
        """
        # if we group by prefixes, some objects may be listed multiple times
        # to avoid this, we store the keys of the objects already listed
        objects_keys: set[str] = set()

        # queue to store the objects pages from the tasks
        queue: asyncio.Queue[list[dict[str, Any]]] = asyncio.Queue()

        # set to keep track of active tasks
        active_tasks: set[asyncio.Task[None]] = set()

        async with get_s3_client() as s3_client:
            await self.semaphore.acquire()
            try:
                root_task = asyncio.create_task(
                    self._list_objects(
                        s3_client,
                        prefix,
                        0,
                        max_level,
                        max_folders,
                        delimiter,
                        objects_keys,
                        queue,
                        active_tasks,
                    ),
                )
            except Exception:
                self.semaphore.release()
                raise
            active_tasks.add(root_task)
            root_task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

            while active_tasks:
                try:
                    page = await queue.get()
                    if page:
                        yield page
                except asyncio.QueueEmpty:
                    await asyncio.sleep(0)

    def _task_done(
        self,
        task: asyncio.Task[None],
        active_tasks: set[asyncio.Task[None]],
        queue: asyncio.Queue[list[dict[str, Any]]],
    ) -> None:
        """Callback for when a task is done."""

        async def async_task_done() -> None:
            active_tasks.discard(task)
            self.semaphore.release()
            await queue.put([])  # signal that the task is done

        asyncio.create_task(async_task_done())

    async def list(
        self,
        prefix: str = "/",
        *,
        max_level: Optional[int] = None,
        max_folders: Optional[int] = None,
        delimiter: str = "/",
    ) -> list[dict[str, Any]]:
        """List all objects in the bucket with the given prefix.

        max_level: The maximum folders depth to traverse in separate requests.
        If None, traverse all levels.
        max_folders: The maximum number of folders to load in separate requests.
        If None, requests all folders.
        Otherwise, the folders are grouped by prefixes before loading in separate requests.
        Try to group to the given `max_folders` if possible.
        delimiter: The delimiter for "folders".
        """
        objects = []
        async for objects_page in self.iter(
            prefix,
            max_level=max_level,
            max_folders=max_folders,
            delimiter=delimiter,
        ):
            objects.extend(objects_page)
        return objects
__init__(bucket, *, parallelism=DEFAULT_PARALLELISM)

Initialize the S3BucketObjects object.

bucket: The name of the S3 bucket. parallelism: The maximum number of concurrent requests to AWS S3.

Source code in src/async_s3/s3_bucket_objects.py
32
33
34
35
36
37
38
39
def __init__(self, bucket: str, *, parallelism: int = DEFAULT_PARALLELISM) -> None:
    """Initialize the S3BucketObjects object.

    bucket: The name of the S3 bucket.
    parallelism: The maximum number of concurrent requests to AWS S3.
    """
    self._bucket = bucket
    self.semaphore = asyncio.Semaphore(parallelism)
iter(prefix='/', *, max_level=None, max_folders=None, delimiter='/') async

Generator that yields objects in the bucket with the given prefix.

Yield objects by partial chunks (list of AWS S3 object dicts) as they are collected from AWS asynchronously.

max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels. max_folders: The maximum number of folders to load in separate requests. If None, requests all folders. Otherwise, the folders are grouped by prefixes before loading in separate requests. Try to group in the given number of folders if possible. delimiter: The delimiter for "folders".

Source code in src/async_s3/s3_bucket_objects.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def iter(
    self,
    prefix: str = "/",
    *,
    max_level: Optional[int] = None,
    max_folders: Optional[int] = None,
    delimiter: str = "/",
) -> AsyncIterator[list[dict[str, Any]]]:
    """Generator that yields objects in the bucket with the given prefix.

    Yield objects by partial chunks (list of AWS S3 object dicts) as they
     are collected from AWS asynchronously.

    max_level: The maximum folders depth to traverse in separate requests.
    If None, traverse all levels.
    max_folders: The maximum number of folders to load in separate requests.
    If None, requests all folders.
    Otherwise, the folders are grouped by prefixes before loading in separate requests.
    Try to group in the given number of folders if possible.
    delimiter: The delimiter for "folders".
    """
    # if we group by prefixes, some objects may be listed multiple times
    # to avoid this, we store the keys of the objects already listed
    objects_keys: set[str] = set()

    # queue to store the objects pages from the tasks
    queue: asyncio.Queue[list[dict[str, Any]]] = asyncio.Queue()

    # set to keep track of active tasks
    active_tasks: set[asyncio.Task[None]] = set()

    async with get_s3_client() as s3_client:
        await self.semaphore.acquire()
        try:
            root_task = asyncio.create_task(
                self._list_objects(
                    s3_client,
                    prefix,
                    0,
                    max_level,
                    max_folders,
                    delimiter,
                    objects_keys,
                    queue,
                    active_tasks,
                ),
            )
        except Exception:
            self.semaphore.release()
            raise
        active_tasks.add(root_task)
        root_task.add_done_callback(lambda t: self._task_done(t, active_tasks, queue))

        while active_tasks:
            try:
                page = await queue.get()
                if page:
                    yield page
            except asyncio.QueueEmpty:
                await asyncio.sleep(0)
list(prefix='/', *, max_level=None, max_folders=None, delimiter='/') async

List all objects in the bucket with the given prefix.

max_level: The maximum folders depth to traverse in separate requests. If None, traverse all levels. max_folders: The maximum number of folders to load in separate requests. If None, requests all folders. Otherwise, the folders are grouped by prefixes before loading in separate requests. Try to group to the given max_folders if possible. delimiter: The delimiter for "folders".

Source code in src/async_s3/s3_bucket_objects.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
async def list(
    self,
    prefix: str = "/",
    *,
    max_level: Optional[int] = None,
    max_folders: Optional[int] = None,
    delimiter: str = "/",
) -> list[dict[str, Any]]:
    """List all objects in the bucket with the given prefix.

    max_level: The maximum folders depth to traverse in separate requests.
    If None, traverse all levels.
    max_folders: The maximum number of folders to load in separate requests.
    If None, requests all folders.
    Otherwise, the folders are grouped by prefixes before loading in separate requests.
    Try to group to the given `max_folders` if possible.
    delimiter: The delimiter for "folders".
    """
    objects = []
    async for objects_page in self.iter(
        prefix,
        max_level=max_level,
        max_folders=max_folders,
        delimiter=delimiter,
    ):
        objects.extend(objects_page)
    return objects

create_session() cached

Create a session object.

Source code in src/async_s3/s3_bucket_objects.py
16
17
18
19
@functools.lru_cache
def create_session() -> aiobotocore.session.AioSession:
    """Create a session object."""
    return aiobotocore.session.get_session()

get_s3_client()

Get S3 client.

Source code in src/async_s3/s3_bucket_objects.py
22
23
24
25
26
27
28
def get_s3_client() -> ClientCreatorContext:
    """Get S3 client."""
    session = create_session()
    config = Config(
        retries={"max_attempts": 3, "mode": "adaptive"},
    )
    return session.create_client("s3", config=config)