Source code for labelbox.schema.export_task

import json
import os
import tempfile
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Generic,
    Iterator,
    Optional,
    Tuple,
    TypeVar,
    Union,
)
import warnings

import requests
from pydantic import BaseModel

from labelbox.schema.task import Task
from labelbox.utils import _CamelCaseMixin

if TYPE_CHECKING:
    from labelbox import Client

OutputT = TypeVar("OutputT")


[docs]class StreamType(Enum): """The type of the stream.""" RESULT = "RESULT" ERRORS = "ERRORS"
class Range(_CamelCaseMixin, BaseModel): # pylint: disable=too-few-public-methods """Represents a range.""" start: int end: int class _MetadataHeader(_CamelCaseMixin, BaseModel): # pylint: disable=too-few-public-methods total_size: int total_lines: int class _MetadataFileInfo(_CamelCaseMixin, BaseModel): # pylint: disable=too-few-public-methods offsets: Range lines: Range file: str @dataclass class _TaskContext: client: "Client" task_id: str stream_type: StreamType metadata_header: _MetadataHeader class Converter(ABC, Generic[OutputT]): """Abstract class for transforming data.""" @dataclass class ConverterInputArgs: """Input for the converter.""" ctx: _TaskContext file_info: _MetadataFileInfo raw_data: str def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return False @abstractmethod def convert(self, input_args: ConverterInputArgs) -> Iterator[OutputT]: """Converts the data. Returns an iterator that yields the converted data. Args: current_offset: The global offset indicating the position of the data within the exported files. It represents a cumulative offset in characters across multiple files. raw_data: The raw data to convert. Yields: Iterator[OutputT]: The converted data. """ class FileRetrieverStrategy(ABC): # pylint: disable=too-few-public-methods """Abstract class for retrieving files.""" def __init__(self, ctx: _TaskContext) -> None: super().__init__() self._ctx = ctx @abstractmethod def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: """Retrieves the file.""" def _get_file_content( self, query: str, variables: dict, result_field_name: str ) -> Tuple[_MetadataFileInfo, str]: """Runs the query.""" res = self._ctx.client.execute(query, variables, error_log_key="errors") res = res["task"][result_field_name] file_info = _MetadataFileInfo(**res) if res else None if not file_info: raise ValueError( f"Task {self._ctx.task_id} does not have a metadata file for the " f"{self._ctx.stream_type.value} stream" ) response = requests.get(file_info.file, timeout=30) response.raise_for_status() response.encoding = "utf-8" assert ( len(response.content) == file_info.offsets.end - file_info.offsets.start + 1 ), ( f"expected {file_info.offsets.end - file_info.offsets.start + 1} bytes, " f"got {len(response.content)} bytes" ) return file_info, response.text class _Reader(ABC): # pylint: disable=too-few-public-methods """Abstract class for reading data from a source.""" @abstractmethod def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: """Sets the retrieval strategy.""" @abstractmethod def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: """Reads data from the source.""" class _BufferedFileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods """Retrieves files by offset.""" def __init__( self, ctx: _TaskContext, offset: int, ) -> None: super().__init__(ctx) self._current_offset = offset self._current_line = 0 if self._current_offset >= self._ctx.metadata_header.total_size: raise ValueError( f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" ) def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: if self._current_offset >= self._ctx.metadata_header.total_size: return None query = ( f"query GetExportFileFromOffsetPyApi" f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)" f"{{task(where: $where)" f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" f"{{offsets {{start end}} lines {{start end}} file}}" f"}}}}" ) variables = { "where": {"id": self._ctx.task_id}, "streamType": self._ctx.stream_type.value, "offset": str(self._current_offset), } file_info, file_content = self._get_file_content( query, variables, "exportFileFromOffset" ) file_info.offsets.start = self._current_offset file_info.lines.start = self._current_line self._current_offset = file_info.offsets.end + 1 self._current_line = file_info.lines.end + 1 return file_info, file_content
[docs]class BufferedStream(Generic[OutputT]): """Streams data from a Reader.""" def __init__( self, ctx: _TaskContext, ): self._ctx = ctx self._reader = _BufferedGCSFileReader() self._converter = _BufferedJsonConverter() self._reader.set_retrieval_strategy( _BufferedFileRetrieverByOffset(self._ctx, 0) ) def __iter__(self): yield from self._fetch() def _fetch( self, ) -> Iterator[OutputT]: """Fetches the result data. Returns an iterator that yields the offset and the data. """ if self._ctx.metadata_header.total_size is None: return stream = self._reader.read() with self._converter as converter: for file_info, raw_data in stream: for output in converter.convert( Converter.ConverterInputArgs(self._ctx, file_info, raw_data) ): yield output
[docs] def start( self, stream_handler: Optional[Callable[[OutputT], None]] = None ) -> None: """Starts streaming the result data. Calls the stream_handler for each result. """ # this calls the __iter__ method, which in turn calls the _fetch method for output in self: if stream_handler: stream_handler(output)
[docs]@dataclass class BufferedJsonConverterOutput: """Output with the JSON object""" json: Any
class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): """Converts JSON data in a buffered manner""" def convert( self, input_args: Converter.ConverterInputArgs ) -> Iterator[BufferedJsonConverterOutput]: yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data)) class _BufferedGCSFileReader(_Reader): """Reads data from multiple GCS files and buffer them to disk""" def __init__(self): super().__init__() self._retrieval_strategy = None def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: """Sets the retrieval strategy.""" self._retrieval_strategy = strategy def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: if not self._retrieval_strategy: raise ValueError("retrieval strategy not set") # create a buffer with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file: result = self._retrieval_strategy.get_next_chunk() while result: _, raw_data = result # there is something wrong with the way the offsets are being calculated # so just write all of the chunks as is too the file, with pointer initially # pointed to the start of the file (like what is in GCS) and do not # rely on offsets for file location # temp_file.seek(file_info.offsets.start) temp_file.write(raw_data) result = self._retrieval_strategy.get_next_chunk() # read buffer with open(temp_file.name, "r") as temp_file_reopened: for idx, line in enumerate(temp_file_reopened): yield ( _MetadataFileInfo( offsets=Range(start=0, end=len(line) - 1), lines=Range(start=idx, end=idx + 1), file=temp_file.name, ), line, ) # manually delete buffer os.unlink(temp_file.name)
[docs]class ExportTask: """ An adapter class for working with task objects, providing extended functionality and convenient access to task-related information. This class wraps a `Task` object, allowing you to interact with tasks of this type. It offers methods to retrieve task results, errors, and metadata, as well as properties for accessing task details such as UID, status, and creation time. """
[docs] class ExportTaskException(Exception): """Raised when the task is not ready yet."""
def __init__(self, task: Task, is_export_v2: bool = False) -> None: self._is_export_v2 = is_export_v2 self._task = task def __repr__(self): return ( f"<ExportTask ID: {self.uid}>" if getattr(self, "uid", None) else "<ExportTask>" ) def __str__(self): properties_to_include = [ "completion_percentage", "created_at", "metadata", "name", "result", "result_url", "errors", "errors_url", "status", "type", "uid", "updated_at", ] props = {prop: getattr(self, prop) for prop in properties_to_include} return f"<ExportTask {json.dumps(props, indent=4, default=str)}>" def __eq__(self, other): return self._task.__eq__(other) def __hash__(self): return self._task.__hash__() @property def uid(self): """Returns the uid of the task.""" return self._task.uid @property def deleted(self): """Returns whether the task is deleted.""" return self._task.deleted @property def updated_at(self): """Returns the last time the task was updated.""" return self._task.updated_at @property def created_at(self): """Returns the time the task was created.""" return self._task.created_at @property def name(self): """Returns the name of the task.""" return self._task.name @property def status(self): """Returns the status of the task.""" return self._task.status @property def metadata(self): """Returns the metadata of the task.""" return self._task.metadata @property def result_url(self): """Returns the result URL of the task.""" if not self._is_export_v2: raise ExportTask.ExportTaskException( "This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead" ) base_url = self._task.client.rest_endpoint return ( base_url + "/export-results/" + self._task.uid + "/" + self._task.client.get_organization().uid ) @property def errors_url(self): """Returns the errors URL of the task.""" if not self._is_export_v2: raise ExportTask.ExportTaskException( "This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead" ) if not self.has_errors(): return None base_url = self._task.client.rest_endpoint return ( base_url + "/export-errors/" + self._task.uid + "/" + self._task.client.get_organization().uid ) @property def errors(self): """Returns the errors of the task.""" if not self._is_export_v2: raise ExportTask.ExportTaskException( "This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead" ) if self.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") if not self.has_errors(): return None data = [] metadata_header = ExportTask._get_metadata_header( self._task.client, self._task.uid, StreamType.ERRORS ) if metadata_header is None: return None BufferedStream( _TaskContext( self._task.client, self._task.uid, StreamType.ERRORS, metadata_header, ), ).start(stream_handler=lambda output: data.append(output.json)) return data @property def result(self): """Returns the result of the task.""" if self._is_export_v2: if self.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") data = [] metadata_header = ExportTask._get_metadata_header( self._task.client, self._task.uid, StreamType.RESULT ) if metadata_header is None: return [] BufferedStream( _TaskContext( self._task.client, self._task.uid, StreamType.RESULT, metadata_header, ), ).start(stream_handler=lambda output: data.append(output.json)) return data return self._task.result_url @property def completion_percentage(self): """Returns the completion percentage of the task.""" return self._task.completion_percentage @property def type(self): """Returns the type of the task.""" return self._task.type @property def created_by(self): """Returns the user who created the task.""" return self._task.created_by @property def organization(self): """Returns the organization of the task.""" return self._task.organization def wait_until_done(self, timeout_seconds: int = 7200) -> None: warnings.warn( "The method wait_until_done for ExportTask is deprecated and will be removed in the next major release. Use the wait_till_done method instead.", DeprecationWarning, stacklevel=2, ) self.wait_till_done(timeout_seconds)
[docs] def wait_till_done(self, timeout_seconds: int = 7200) -> None: """Waits until the task is done.""" return self._task.wait_till_done(timeout_seconds)
@staticmethod @lru_cache(maxsize=5) def _get_metadata_header( client, task_id: str, stream_type: StreamType ) -> Union[_MetadataHeader, None]: """Returns the total file size for a specific task.""" query = ( f"query GetExportMetadataHeaderPyApi" f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!)" f"{{task(where: $where)" f"{{{'exportMetadataHeader'}(streamType: $streamType)" f"{{totalSize totalLines}}" f"}}}}" ) variables = {"where": {"id": task_id}, "streamType": stream_type.value} res = client.execute(query, variables, error_log_key="errors") res = res["task"]["exportMetadataHeader"] return _MetadataHeader(**res) if res else None
[docs] def get_total_file_size(self, stream_type: StreamType) -> Union[int, None]: """Returns the total file size for a specific task.""" if self._task.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") header = ExportTask._get_metadata_header( self._task.client, self._task.uid, stream_type ) return header.total_size if header else None
[docs] def get_total_lines(self, stream_type: StreamType) -> Union[int, None]: """Returns the total file size for a specific task.""" if self._task.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") header = ExportTask._get_metadata_header( self._task.client, self._task.uid, stream_type ) return header.total_lines if header else None
[docs] def has_result(self) -> bool: """Returns whether the task has a result.""" total_size = self.get_total_file_size(StreamType.RESULT) return total_size is not None and total_size > 0
[docs] def has_errors(self) -> bool: """Returns whether the task has errors.""" total_size = self.get_total_file_size(StreamType.ERRORS) return total_size is not None and total_size > 0
[docs] def get_buffered_stream( self, stream_type: StreamType = StreamType.RESULT, ) -> BufferedStream: """ Returns the result of the task. Args: stream_type (StreamType, optional): The type of stream to retrieve. Defaults to StreamType.RESULT. Returns: Stream: The buffered stream object. Raises: ExportTask.ExportTaskException: If the task has failed or is not ready yet. ValueError: If the task does not have the specified stream type. """ if self._task.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") metadata_header = self._get_metadata_header( self._task.client, self._task.uid, stream_type ) if metadata_header is None: raise ValueError( f"Task {self._task.uid} does not have a {stream_type.value} stream" ) return BufferedStream( _TaskContext( self._task.client, self._task.uid, stream_type, metadata_header ), )
[docs] @staticmethod def get_task(client, task_id): """Returns the task with the given id.""" return ExportTask(Task.get_task(client, task_id))