Using DAPSession.stream_resource

msarnold
Community Member

Hello,

 

I am trying to use DAPSession.stream_resource to stream resources/files directly into an S3 bucket.

The idea is to pipe the stream directly into S3 without downloading and storing the entire file locally first, and then reuploading it into S3.

However, I am not sure how to use that method. First, I am not sure how to handle the async nature of it, and how/if/when to await it. Second, I am confused by it returning an Iteration of StreamReaders (i.e. potentially several), rather than just one. Since we are passing in one single resource, with that single resource representing one single download URL, shouldn't there be only one single StreamReader, rather than an iteration of them?!

On the S3/AWS side of my code, I want to use the boto3 S3 client's upload_fileobj() method (see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj....)

So given a job ID, what would my python code need to look like to get the resources for that job id and streaming them into s3 (self._dap_session is an instance of DAPSession)?

I tried this, but got an error:

    async def stream_job_files_to_s3(self, job_id: str, namespace:str, tablename:str😞
        async with self._dap_session as session:
            objects = await session.get_objects(job_id)
            if objects is not None:
                urls = await session.get_resources(objects)
                for key in urls.keys():
                    rsrc = urls[key]
                    key = key.replace('/', '_')
                    aiter = await session.stream_resource(rsrc)
                    s3_key = f'{self._prefix}/{namespace}/{tablename}/{key.strip("/")}'
                    async for stream in aiter:
                        self._s3_client.upload_fileobj(stream, self._bucket, s3_key)
                    keys.append(s3_key)
        return keys
The error is:
TypeError: object async_generator can't be used in 'await' expression
 and it is happening in this line:
aiter = await session.stream_resource(rsrc)
 It will also obviously not behave well in case there are more than one Stream returned in the Iterator, but that's the next question once the basic mechanism is solved...
 
Thank you,
Mark
 
Labels (2)
0 Likes