Using DAPSession.stream_resource
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello,
I am trying to use DAPSession.stream_resource to stream resources/files directly into an S3 bucket.
The idea is to pipe the stream directly into S3 without downloading and storing the entire file locally first, and then reuploading it into S3.
However, I am not sure how to use that method. First, I am not sure how to handle the async nature of it, and how/if/when to await it. Second, I am confused by it returning an Iteration of StreamReaders (i.e. potentially several), rather than just one. Since we are passing in one single resource, with that single resource representing one single download URL, shouldn't there be only one single StreamReader, rather than an iteration of them?!
On the S3/AWS side of my code, I want to use the boto3 S3 client's upload_fileobj() method (see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj....)
So given a job ID, what would my python code need to look like to get the resources for that job id and streaming them into s3 (self._dap_session is an instance of DAPSession)?
I tried this, but got an error:
async def stream_job_files_to_s3(self, job_id: str, namespace:str, tablename:str😞
async with self._dap_session as session:
objects = await session.get_objects(job_id)
if objects is not None:
urls = await session.get_resources(objects)
for key in urls.keys():
rsrc = urls[key]
key = key.replace('/', '_')
aiter = await session.stream_resource(rsrc)
s3_key = f'{self._prefix}/{namespace}/{tablename}/{key.strip("/")}'
async for stream in aiter:
self._s3_client.upload_fileobj(stream, self._bucket, s3_key)
keys.append(s3_key)
return keys
TypeError: object async_generator can't be used in 'await' expression
aiter = await session.stream_resource(rsrc)