dapclienterror malformed http response and processingerror occured while downloading the cd2 data
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-07-2024
03:48 AM
DAPClientError: malformed HTTP response occurs at least twice in a month. and was unable to identify the exact issue.
DAPClientError Traceback (most recent call last)
Cell In [35], line 3
1 if __name__ == '__main__':
2 loop = asyncio.get_event_loop()
----> 3 res = loop.run_until_complete(main())
4 try:
5 loop.close()
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/nest_asyncio.py:90, in _patch_loop.<locals>.run_until_complete(self, future)
87 if not f.done():
88 raise RuntimeError(
89 'Event loop stopped before Future completed.')
---> 90 return f.result()
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
199 self.__log_traceback = False
200 if self._exception is not None:
--> 201 raise self._exception.with_traceback(self._exception_tb)
202 return self._result
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the send method directly, because coroutines
231 # don't have __iter__ and __next__ methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [31], line 3, in main()
1 async def main():
2 async with ClientSession() as session:
----> 3 data_d = await asyncio.gather(*[download_and_upload_to_blob(table, session) for table in valid_tables])
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
302 def __wakeup(self, future):
303 try:
--> 304 future.result()
305 except BaseException as exc:
306 # This may also be a cancellation.
307 self.__step(exc)
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the send method directly, because coroutines
231 # don't have __iter__ and __next__ methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [27], line 27, in download_and_upload_to_blob(table, session)
25 await asyncio.sleep(BACKOFF_FACTOR ** attempt)
26 else:
---> 27 raise e
31 # Create a BlobServiceClient
32 blob_service_client = BlobServiceClient(
33 account_url=account_url,
34 credential=account_key
35 )
Cell In [27], line 20, in download_and_upload_to_blob(table, session)
13 async with DAPClient() as session:
14 query = IncrementalQuery(
15 format=Format.Parquet,
16 mode=None,
17 since=last_seen,
18 until=None,
19 )
---> 20 await session.download_table_data("canvas", table, query, output_directory)
21 break
22 except Exception as e:
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:676, in DAPSession.download_table_data(self, namespace, table, query, output_directory, decompress)
673 objects = await self.get_objects(job.id)
674 directory = os.path.join(output_directory, f"job_{job.id}")
--> 676 downloaded_files = await self.download_objects(objects, directory, decompress)
677 logger.info(f"Files from server downloaded to folder: {directory}")
679 if isinstance(job, CompleteSnapshotJob):
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:566, in DAPSession.download_objects(self, objects, output_directory, decompress)
553 """
554 Save data stored remotely into a local directory.
555
(...)
559 :returns: A list of paths to files saved in the local file system.
560 """
562 downloads = [
563 self.download_object(object, output_directory, decompress)
564 for object in objects
565 ]
--> 566 local_files = await gather_n(downloads, concurrency=DOWNLOAD_CONCURRENCY)
568 return local_files
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/concurrency.py:186, in gather_n(coroutines, concurrency, return_exceptions)
169 async def gather_n(
170 coroutines: Iterable[Invokable[T]],
171 *,
172 concurrency: int,
173 return_exceptions: bool = False
174 ) -> Iterable[T]:
175 """
176 Runs awaitable objects, with at most the specified degree of concurrency.
177
(...)
183 :raises asyncio.CancelledError: Raised when one of the tasks in cancelled.
184 """
--> 186 results = await _gather_n(
187 coroutines, concurrency=concurrency, return_exceptions=return_exceptions
188 )
190 if isinstance(coroutines, list):
191 return list(results)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/concurrency.py:111, in _gather_n(coroutines, concurrency, return_exceptions)
109 exc = task.exception()
110 if exc and not return_exceptions:
--> 111 raise exc
113 # schedule some tasks up to degree of concurrency
114 for _, coroutine in zip(range(len(done)), iterator):
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the send method directly, because coroutines
231 # don't have __iter__ and __next__ methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:582, in DAPSession.download_object(self, object, output_directory, decompress)
570 async def download_object(
571 self, object: Object, output_directory: str, decompress: bool = False
572 ) -> str:
573 """
574 Save a single remote file to a local directory.
575
(...)
579 :returns: A path of the file saved in the local file system.
580 """
--> 582 resource = (await self.get_resources([object]))[object.id]
584 return await self.download_resource(resource, output_directory, decompress)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:490, in DAPSession.get_resources(self, objects)
487 logger.debug("Retrieve resource URLs for objects:")
488 logger.debug([o.id for o in objects])
--> 490 response = await self._post("/dap/object/url", objects, ResourceResult)
492 return response.urls
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:311, in DAPSession._post(self, path, request_data, response_type)
304 logger.debug(f"POST request payload:\n{repr(request_payload)}")
306 async with self._session.post(
307 f"{self._base_url}{path}",
308 data=json_dump_string(request_payload),
309 headers={"Content-Type": "application/json"},
310 ) as response:
--> 311 return await self._process(response, response_type)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:348, in DAPSession._process(self, response, response_type, suppress_output)
345 if response_text:
346 logger.error(f"malformed HTTP response:\n{response_text}")
--> 348 raise DAPClientError("malformed HTTP response")
350 if not suppress_output:
351 logger.debug(f"GET/POST response payload:\n{repr(response_payload)}")
DAPClientError: malformed HTTP response
and from Today this new error occurred 'ProcessingError'
Operation on target Incremental Load failed: ---------------------------------------------------------------------------
ProcessingError Traceback (most recent call last)
Cell In [51], line 3
1 if __name__ == '__main__':
2 loop = asyncio.get_event_loop()
----> 3 res = loop.run_until_complete(main())
4 try:
5 loop.close()
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/nest_asyncio.py:90, in _patch_loop.<locals>.run_until_complete(self, future)
87 if not f.done():
88 raise RuntimeError(
89 'Event loop stopped before Future completed.')
---> 90 return f.result()
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
199 self.__log_traceback = False
200 if self._exception is not None:
--> 201 raise self._exception.with_traceback(self._exception_tb)
202 return self._result
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the `send` method directly, because coroutines
231 # don't have `__iter__` and `__next__` methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [47], line 3, in main()
1 async def main():
2 async with ClientSession() as session:
----> 3 data_d = await asyncio.gather(*[download_and_upload_to_blob_logs(table, session) for table in valid_tables_logs])
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
302 def __wakeup(self, future):
303 try:
--> 304 future.result()
305 except BaseException as exc:
306 # This may also be a cancellation.
307 self.__step(exc)
File ~/cluster-env/clonedenv/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
228 try:
229 if exc is None:
230 # We use the `send` method directly, because coroutines
231 # don't have `__iter__` and `__next__` methods.
--> 232 result = coro.send(None)
233 else:
234 result = coro.throw(exc)
Cell In [45], line 23, in download_and_upload_to_blob_logs(table, session)
21 await asyncio.sleep(BACKOFF_FACTOR ** attempt)
22 else:
---> 23 raise e
25 # Create a BlobServiceClient
26 blob_service_client = BlobServiceClient(
27 account_url=account_url,
28 credential=account_key
29 )
Cell In [45], line 16, in download_and_upload_to_blob_logs(table, session)
9 async with DAPClient() as session:
10 query = IncrementalQuery(
11 format=Format.Parquet,
12 mode=None,
13 since=last_seen,
14 until=None,
15 )
---> 16 await session.download_table_data("canvas_logs", table, query, output_directory)
17 break
18 except Exception as e:
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:668, in DAPSession.download_table_data(self, namespace, table, query, output_directory, decompress)
665 # fail early if output directory does not exist and cannot be created
666 os.makedirs(output_directory, exist_ok=True)
--> 668 job = await self.execute_job(namespace, table, query)
670 if job.status is not JobStatus.Complete:
671 raise DAPClientError(f"query job ended with status: {job.status.value}")
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:636, in DAPSession.execute_job(self, namespace, table, query)
634 job = await self.query_snapshot(namespace, table, query)
635 elif isinstance(query, IncrementalQuery):
--> 636 job = await self.query_incremental(namespace, table, query)
637 else:
638 raise TypeError(f"type mismatch for parameter `query`: {type(query)}")
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:404, in DAPSession.query_incremental(self, namespace, table, query)
399 """
400 Starts an incremental query.
401 """
403 logger.debug(f"Query updates for table: {table}")
--> 404 job = await self._post(f"/dap/query/{namespace}/table/{table}/data", query, Job) # type: ignore
405 return job
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:311, in DAPSession._post(self, path, request_data, response_type)
304 logger.debug(f"POST request payload:\n{repr(request_payload)}")
306 async with self._session.post(
307 f"{self._base_url}{path}",
308 data=json_dump_string(request_payload),
309 headers={"Content-Type": "application/json"},
310 ) as response:
--> 311 return await self._process(response, response_type)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/dap/api.py:358, in DAPSession._process(self, response, response_type, suppress_output)
356 error_object = self._map_to_error_type(response.status, response_payload)
357 logger.warning(f"Received error in response: {error_object}")
--> 358 raise error_object
359 else:
360 response_object = json_to_object(response_type, response_payload)
ProcessingError: