dapclienterror malformed http response and processingerror occured while downloading the cd2 data

OmkarIndulkar
Community Member

DAPClientError: malformed HTTP response occurs at least twice in a month. and was unable to identify the exact issue.

 

DAPClientError                            Traceback (most recent call last)
Cell In [35], line 3
      1 if __name__ == '__main__':
      2     loop = asyncio.get_event_loop()
----> 3     res = loop.run_until_complete(main())
      4     try:
      5         loop.close()    

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/nest_asyncio.py:90, in _patch_loop.<locals>.run_until_complete(self, future)
     87 if not f.done():
     88     raise RuntimeError(
     89         'Event loop stopped before Future completed.')
---> 90 return f.result()

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception.with_traceback(self._exception_tb)
    202 return self._result

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the send method directly, because coroutines
    231         # don't have __iter__ and __next__ methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

Cell In [31], line 3, in main()
      1 async def main():
      2     async with ClientSession() as session:
----> 3         data_d = await asyncio.gather(*[download_and_upload_to_blob(table, session) for table in valid_tables])

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
    302 def __wakeup(self, future):
    303     try:
--> 304         future.result()
    305     except BaseException as exc:
    306         # This may also be a cancellation.
    307         self.__step(exc)

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the send method directly, because coroutines
    231         # don't have __iter__ and __next__ methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

Cell In [27], line 27, in download_and_upload_to_blob(table, session)
     25             await asyncio.sleep(BACKOFF_FACTOR ** attempt)
     26         else:
---> 27             raise e
     31 # Create a BlobServiceClient
     32 blob_service_client = BlobServiceClient(
     33     account_url=account_url,
     34     credential=account_key
     35 )

Cell In [27], line 20, in download_and_upload_to_blob(table, session)
     13     async with DAPClient() as session:
     14         query = IncrementalQuery(
     15             format=Format.Parquet,
     16             mode=None,
     17             since=last_seen,
     18             until=None,
     19         )
---> 20         await session.download_table_data("canvas", table, query, output_directory)
     21     break
     22 except Exception as e:

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:676, in DAPSession.download_table_data(self, namespace, table, query, output_directory, decompress)
    673 objects = await self.get_objects(job.id)
    674 directory = os.path.join(output_directory, f"job_{job.id}")
--> 676 downloaded_files = await self.download_objects(objects, directory, decompress)
    677 logger.info(f"Files from server downloaded to folder: {directory}")
    679 if isinstance(job, CompleteSnapshotJob):

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:566, in DAPSession.download_objects(self, objects, output_directory, decompress)
    553 """
    554 Save data stored remotely into a local directory.
    555 
   (...)
    559 :returns: A list of paths to files saved in the local file system.
    560 """
    562 downloads = [
    563     self.download_object(object, output_directory, decompress)
    564     for object in objects
    565 ]
--> 566 local_files = await gather_n(downloads, concurrency=DOWNLOAD_CONCURRENCY)
    568 return local_files

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/concurrency.py:186, in gather_n(coroutines, concurrency, return_exceptions)
    169 async def gather_n(
    170     coroutines: Iterable[Invokable[T]],
    171     *,
    172     concurrency: int,
    173     return_exceptions: bool = False
    174 ) -> Iterable[T]:
    175     """
    176     Runs awaitable objects, with at most the specified degree of concurrency.
    177 
   (...)
    183     :raises asyncio.CancelledError: Raised when one of the tasks in cancelled.
    184     """
--> 186     results = await _gather_n(
    187         coroutines, concurrency=concurrency, return_exceptions=return_exceptions
    188     )
    190     if isinstance(coroutines, list):
    191         return list(results)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/concurrency.py:111, in _gather_n(coroutines, concurrency, return_exceptions)
    109     exc = task.exception()
    110     if exc and not return_exceptions:
--> 111         raise exc
    113 # schedule some tasks up to degree of concurrency
    114 for _, coroutine in zip(range(len(done)), iterator):

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the send method directly, because coroutines
    231         # don't have __iter__ and __next__ methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:582, in DAPSession.download_object(self, object, output_directory, decompress)
    570 async def download_object(
    571     self, object: Object, output_directory: str, decompress: bool = False
    572 ) -> str:
    573     """
    574     Save a single remote file to a local directory.
    575 
   (...)
    579     :returns: A path of the file saved in the local file system.
    580     """
--> 582     resource = (await self.get_resources([object]))[object.id]
    584     return await self.download_resource(resource, output_directory, decompress)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:490, in DAPSession.get_resources(self, objects)
    487 logger.debug("Retrieve resource URLs for objects:")
    488 logger.debug([o.id for o in objects])
--> 490 response = await self._post("/dap/object/url", objects, ResourceResult)
    492 return response.urls

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:311, in DAPSession._post(self, path, request_data, response_type)
    304 logger.debug(f"POST request payload:\n{repr(request_payload)}")
    306 async with self._session.post(
    307     f"{self._base_url}{path}",
    308     data=json_dump_string(request_payload),
    309     headers={"Content-Type": "application/json"},
    310 ) as response:
--> 311     return await self._process(response, response_type)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:348, in DAPSession._process(self, response, response_type, suppress_output)
    345     if response_text:
    346         logger.error(f"malformed HTTP response:\n{response_text}")
--> 348     raise DAPClientError("malformed HTTP response")
    350 if not suppress_output:
    351     logger.debug(f"GET/POST response payload:\n{repr(response_payload)}")

DAPClientError: malformed HTTP response

 

and from Today this new error occurred 'ProcessingError'

 

Operation on target Incremental Load failed: ---------------------------------------------------------------------------
ProcessingError                           Traceback (most recent call last)
Cell In [51], line 3
      1 if __name__ == '__main__':
      2     loop = asyncio.get_event_loop()
----> 3     res = loop.run_until_complete(main())
      4     try:
      5         loop.close()    

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/nest_asyncio.py:90, in _patch_loop.<locals>.run_until_complete(self, future)
     87 if not f.done():
     88     raise RuntimeError(
     89         'Event loop stopped before Future completed.')
---> 90 return f.result()

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception.with_traceback(self._exception_tb)
    202 return self._result

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the `send` method directly, because coroutines
    231         # don't have `__iter__` and `__next__` methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

Cell In [47], line 3, in main()
      1 async def main():
      2     async with ClientSession() as session:
----> 3         data_d = await asyncio.gather(*[download_and_upload_to_blob_logs(table, session) for table in valid_tables_logs])

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
    302 def __wakeup(self, future):
    303     try:
--> 304         future.result()
    305     except BaseException as exc:
    306         # This may also be a cancellation.
    307         self.__step(exc)

File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the `send` method directly, because coroutines
    231         # don't have `__iter__` and `__next__` methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

Cell In [45], line 23, in download_and_upload_to_blob_logs(table, session)
     21             await asyncio.sleep(BACKOFF_FACTOR ** attempt)
     22         else:
---> 23             raise e
     25 # Create a BlobServiceClient
     26 blob_service_client = BlobServiceClient(
     27     account_url=account_url,
     28     credential=account_key
     29 )

Cell In [45], line 16, in download_and_upload_to_blob_logs(table, session)
      9     async with DAPClient() as session:
     10         query = IncrementalQuery(
     11             format=Format.Parquet,
     12             mode=None,
     13             since=last_seen,
     14             until=None,
     15         )
---> 16         await session.download_table_data("canvas_logs", table, query, output_directory)
     17     break
     18 except Exception as e:

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:668, in DAPSession.download_table_data(self, namespace, table, query, output_directory, decompress)
    665 # fail early if output directory does not exist and cannot be created
    666 os.makedirs(output_directory, exist_ok=True)
--> 668 job = await self.execute_job(namespace, table, query)
    670 if job.status is not JobStatus.Complete:
    671     raise DAPClientError(f"query job ended with status: {job.status.value}")

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:636, in DAPSession.execute_job(self, namespace, table, query)
    634     job = await self.query_snapshot(namespace, table, query)
    635 elif isinstance(query, IncrementalQuery):
--> 636     job = await self.query_incremental(namespace, table, query)
    637 else:
    638     raise TypeError(f"type mismatch for parameter `query`: {type(query)}")

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:404, in DAPSession.query_incremental(self, namespace, table, query)
    399 """
    400 Starts an incremental query.
    401 """
    403 logger.debug(f"Query updates for table: {table}")
--> 404 job = await self._post(f"/dap/query/{namespace}/table/{table}/data", query, Job)  # type: ignore
    405 return job

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:311, in DAPSession._post(self, path, request_data, response_type)
    304 logger.debug(f"POST request payload:\n{repr(request_payload)}")
    306 async with self._session.post(
    307     f"{self._base_url}{path}",
    308     data=json_dump_string(request_payload),
    309     headers={"Content-Type": "application/json"},
    310 ) as response:
--> 311     return await self._process(response, response_type)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:358, in DAPSession._process(self, response, response_type, suppress_output)
    356     error_object = self._map_to_error_type(response.status, response_payload)
    357     logger.warning(f"Received error in response: {error_object}")
--> 358     raise error_object
    359 else:
    360     response_object = json_to_object(response_type, response_payload)

ProcessingError: 

 

Labels (2)
Users who also had this question