Source code for labelbox.schema.export_task

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from io import TextIOWrapper
import json
from pathlib import Path
from typing import (
    Callable,
    Generic,
    Iterator,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
    TYPE_CHECKING,
    overload,
    Any,
)

import requests
import warnings
import tempfile
import os
from labelbox import pydantic_compat

from labelbox.schema.task import Task
from labelbox.utils import _CamelCaseMixin

if TYPE_CHECKING:
    from labelbox import Client

OutputT = TypeVar("OutputT")


[docs]class StreamType(Enum): """The type of the stream.""" RESULT = "RESULT" ERRORS = "ERRORS"
class Range(_CamelCaseMixin, pydantic_compat.BaseModel): # pylint: disable=too-few-public-methods """Represents a range.""" start: int end: int class _MetadataHeader(_CamelCaseMixin, pydantic_compat.BaseModel): # pylint: disable=too-few-public-methods total_size: int total_lines: int class _MetadataFileInfo(_CamelCaseMixin, pydantic_compat.BaseModel): # pylint: disable=too-few-public-methods offsets: Range lines: Range file: str @dataclass class _TaskContext: client: "Client" task_id: str stream_type: StreamType metadata_header: _MetadataHeader class Converter(ABC, Generic[OutputT]): """Abstract class for transforming data.""" @dataclass class ConverterInputArgs: """Input for the converter.""" ctx: _TaskContext file_info: _MetadataFileInfo raw_data: str def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return False @abstractmethod def convert(self, input_args: ConverterInputArgs) -> Iterator[OutputT]: """Converts the data. Returns an iterator that yields the converted data. Args: current_offset: The global offset indicating the position of the data within the exported files. It represents a cumulative offset in characters across multiple files. raw_data: The raw data to convert. Yields: Iterator[OutputT]: The converted data. """
[docs]@dataclass class JsonConverterOutput: """Output with the JSON string.""" current_offset: int current_line: int json_str: str
[docs]class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-public-methods """Converts JSON data. Deprecated: This converter is deprecated and will be removed in a future release. """ def __init__(self) -> None: warnings.warn("JSON converter is deprecated and will be removed in a future release") super().__init__() def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]: object_offsets: List[Tuple[int, int]] = [] stack = [] current_object_start = None for index, char in enumerate(data): if char == "{": stack.append(char) if len(stack) == 1: current_object_start = index # we need to account for scenarios where data lands in the middle of an object # and the object is not the last one in the data if index > 0 and data[index - 1] == "\n" and not object_offsets: object_offsets.append((0, index - 1)) elif char == "}" and stack: stack.pop() # this covers cases where the last object is either followed by a newline or # it is missing if len(stack) == 0 and (len(data) == index + 1 or data[index + 1] == "\n" ) and current_object_start is not None: object_offsets.append((current_object_start, index + 1)) current_object_start = None # we also need to account for scenarios where data lands in the middle of the last object return object_offsets if object_offsets else [(0, len(data) - 1)]
[docs] def convert( self, input_args: Converter.ConverterInputArgs ) -> Iterator[JsonConverterOutput]: current_offset, current_line, raw_data = ( input_args.file_info.offsets.start, input_args.file_info.lines.start, input_args.raw_data, ) offsets = self._find_json_object_offsets(raw_data) for line, (offset_start, offset_end) in enumerate(offsets): yield JsonConverterOutput( current_offset=current_offset + offset_start, current_line=current_line + line, json_str=raw_data[offset_start:offset_end + 1].strip(), )
[docs]@dataclass class FileConverterOutput: """Output with statistics about the written file.""" file_path: Path total_size: int total_lines: int current_offset: int current_line: int bytes_written: int
[docs]class FileConverter(Converter[FileConverterOutput]): """Converts data to a file. """ def __init__(self, file_path: str) -> None: super().__init__() self._file: Optional[TextIOWrapper] = None self._file_path = file_path def __enter__(self): self._file = open(self._file_path, "w", encoding="utf-8") return self def __exit__(self, exc_type, exc_val, exc_tb): if self._file: self._file.close() return False
[docs] def convert( self, input_args: Converter.ConverterInputArgs ) -> Iterator[FileConverterOutput]: # appends data to the file assert self._file is not None self._file.write(input_args.raw_data) yield FileConverterOutput( file_path=Path(self._file_path), total_size=input_args.ctx.metadata_header.total_size, total_lines=input_args.ctx.metadata_header.total_lines, current_offset=input_args.file_info.offsets.start, current_line=input_args.file_info.lines.start, bytes_written=len(input_args.raw_data), )
class FileRetrieverStrategy(ABC): # pylint: disable=too-few-public-methods """Abstract class for retrieving files.""" def __init__(self, ctx: _TaskContext) -> None: super().__init__() self._ctx = ctx @abstractmethod def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: """Retrieves the file.""" def _get_file_content( self, query: str, variables: dict, result_field_name: str) -> Tuple[_MetadataFileInfo, str]: """Runs the query.""" res = self._ctx.client.execute(query, variables, error_log_key="errors") res = res["task"][result_field_name] file_info = _MetadataFileInfo(**res) if res else None if not file_info: raise ValueError( f"Task {self._ctx.task_id} does not have a metadata file for the " f"{self._ctx.stream_type.value} stream") response = requests.get(file_info.file, timeout=30) response.raise_for_status() assert len( response.content ) == file_info.offsets.end - file_info.offsets.start + 1, ( f"expected {file_info.offsets.end - file_info.offsets.start + 1} bytes, " f"got {len(response.content)} bytes") return file_info, response.text class FileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods """Retrieves files by offset.""" def __init__( self, ctx: _TaskContext, offset: int, ) -> None: super().__init__(ctx) self._current_offset = offset self._current_line: Optional[int] = None if self._current_offset >= self._ctx.metadata_header.total_size: raise ValueError( f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}" ) def _find_line_at_offset(self, file_content: str, target_offset: int) -> int: # TODO: Remove this, incorrect parsing of JSON to find braces stack = [] line_number = 0 for index, char in enumerate(file_content): if char == "{": stack.append(char) if len(stack) == 1 and index > 0: line_number += 1 elif char == "}" and stack: stack.pop() if index == target_offset: break return line_number def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: if self._current_offset >= self._ctx.metadata_header.total_size: return None query = ( f"query GetExportFileFromOffsetPyApi" f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)" f"{{task(where: $where)" f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)" f"{{offsets {{start end}} lines {{start end}} file}}" f"}}}}") variables = { "where": { "id": self._ctx.task_id }, "streamType": self._ctx.stream_type.value, "offset": str(self._current_offset), } file_info, file_content = self._get_file_content( query, variables, "exportFileFromOffset") if self._current_line is None: self._current_line = self._find_line_at_offset( file_content, self._current_offset - file_info.offsets.start) self._current_line += file_info.lines.start file_content = file_content[self._current_offset - file_info.offsets.start:] file_info.offsets.start = self._current_offset file_info.lines.start = self._current_line self._current_offset = file_info.offsets.end + 1 self._current_line = file_info.lines.end + 1 return file_info, file_content class FileRetrieverByLine(FileRetrieverStrategy): # pylint: disable=too-few-public-methods """Retrieves files by line.""" def __init__( self, ctx: _TaskContext, line: int, ) -> None: super().__init__(ctx) self._current_line = line self._current_offset: Optional[int] = None if self._current_line >= self._ctx.metadata_header.total_lines: raise ValueError( f"line is out of range, max line is {self._ctx.metadata_header.total_lines - 1}" ) def _find_offset_of_line(self, file_content: str, target_line: int): # TODO: Remove this, incorrect parsing of JSON to find braces start_offset = None stack = [] line_number = 0 for index, char in enumerate(file_content): if char == "{": stack.append(char) if len(stack) == 1: if line_number == target_line: start_offset = index line_number += 1 elif char == "}" and stack: stack.pop() if line_number > target_line: break return start_offset def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]: if self._current_line >= self._ctx.metadata_header.total_lines: return None query = ( f"query GetExportFileFromLinePyApi" f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $line: UInt64!)" f"{{task(where: $where)" f"{{{'exportFileFromLine'}(streamType: $streamType, line: $line)" f"{{offsets {{start end}} lines {{start end}} file}}" f"}}}}") variables = { "where": { "id": self._ctx.task_id }, "streamType": self._ctx.stream_type.value, "line": self._current_line, } file_info, file_content = self._get_file_content( query, variables, "exportFileFromLine") if self._current_offset is None: self._current_offset = self._find_offset_of_line( file_content, self._current_line - file_info.lines.start) self._current_offset += file_info.offsets.start file_content = file_content[self._current_offset - file_info.offsets.start:] file_info.offsets.start = self._current_offset file_info.lines.start = self._current_line self._current_offset = file_info.offsets.end + 1 self._current_line = file_info.lines.end + 1 return file_info, file_content class _Reader(ABC): # pylint: disable=too-few-public-methods """Abstract class for reading data from a source.""" @abstractmethod def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: """Sets the retrieval strategy.""" @abstractmethod def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: """Reads data from the source.""" class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods """Reads data from multiple GCS files in a seamless way. Deprecated: This reader is deprecated and will be removed in a future release. """ def __init__(self): warnings.warn("_MultiGCSFileReader is deprecated and will be removed in a future release") super().__init__() self._retrieval_strategy = None def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: """Sets the retrieval strategy.""" self._retrieval_strategy = strategy def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: if not self._retrieval_strategy: raise ValueError("retrieval strategy not set") result = self._retrieval_strategy.get_next_chunk() while result: file_info, raw_data = result yield file_info, raw_data result = self._retrieval_strategy.get_next_chunk()
[docs]@dataclass class BufferedJsonConverterOutput: """Output with the JSON object""" json: Any
class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]): """Converts JSON data in a buffered manner """ def convert( self, input_args: Converter.ConverterInputArgs ) -> Iterator[BufferedJsonConverterOutput]: yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data)) class _BufferedGCSFileReader(_Reader): """Reads data from multiple GCS files and buffer them to disk""" def __init__(self): super().__init__() self._retrieval_strategy = None def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None: """Sets the retrieval strategy.""" self._retrieval_strategy = strategy def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]: if not self._retrieval_strategy: raise ValueError("retrieval strategy not set") # create a buffer with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file: result = self._retrieval_strategy.get_next_chunk() while result: file_info, raw_data = result temp_file.seek(file_info.offsets.start) temp_file.write(raw_data) result = self._retrieval_strategy.get_next_chunk() # read buffer with open(temp_file.name, 'r') as temp_file_reopened: for idx, line in enumerate(temp_file_reopened): yield _MetadataFileInfo( offsets=Range(start=0, end=len(line) - 1), lines=Range(start=idx, end=idx + 1), file=temp_file.name), line # manually delete buffer os.unlink(temp_file.name)
[docs]class Stream(Generic[OutputT]): """Streams data from a Reader.""" def __init__( self, ctx: _TaskContext, reader: _Reader, converter: Converter, ): self._ctx = ctx self._reader = reader self._converter = converter # default strategy is to retrieve files by offset, starting from 0 self.with_offset(0) def __iter__(self): yield from self._fetch() def _fetch(self,) -> Iterator[OutputT]: """Fetches the result data. Returns an iterator that yields the offset and the data. """ if self._ctx.metadata_header.total_size is None: return stream = self._reader.read() with self._converter as converter: for file_info, raw_data in stream: for output in converter.convert( Converter.ConverterInputArgs(self._ctx, file_info, raw_data)): yield output
[docs] def with_offset(self, offset: int) -> "Stream[OutputT]": """Sets the offset for the stream.""" self._reader.set_retrieval_strategy( FileRetrieverByOffset(self._ctx, offset)) return self
[docs] def with_line(self, line: int) -> "Stream[OutputT]": """Sets the line number for the stream.""" self._reader.set_retrieval_strategy(FileRetrieverByLine( self._ctx, line)) return self
[docs] def start( self, stream_handler: Optional[Callable[[OutputT], None]] = None) -> None: """Starts streaming the result data. Calls the stream_handler for each result. """ # this calls the __iter__ method, which in turn calls the _fetch method for output in self: if stream_handler: stream_handler(output)
[docs]class ExportTask: """ An adapter class for working with task objects, providing extended functionality and convenient access to task-related information. This class wraps a `Task` object, allowing you to interact with tasks of this type. It offers methods to retrieve task results, errors, and metadata, as well as properties for accessing task details such as UID, status, and creation time. """
[docs] class ExportTaskException(Exception): """Raised when the task is not ready yet."""
def __init__(self, task: Task, is_export_v2: bool = False) -> None: self._is_export_v2 = is_export_v2 self._task = task def __repr__(self): return f"<ExportTask ID: {self.uid}>" if getattr( self, "uid", None) else "<ExportTask>" def __str__(self): properties_to_include = [ "completion_percentage", "created_at", "metadata", "name", "result", "result_url", "errors", "errors_url", "status", "type", "uid", "updated_at", ] props = {prop: getattr(self, prop) for prop in properties_to_include} return f"<ExportTask {json.dumps(props, indent=4, default=str)}>" def __eq__(self, other): return self._task.__eq__(other) def __hash__(self): return self._task.__hash__() @property def uid(self): """Returns the uid of the task.""" return self._task.uid @property def deleted(self): """Returns whether the task is deleted.""" return self._task.deleted @property def updated_at(self): """Returns the last time the task was updated.""" return self._task.updated_at @property def created_at(self): """Returns the time the task was created.""" return self._task.created_at @property def name(self): """Returns the name of the task.""" return self._task.name @property def status(self): """Returns the status of the task.""" return self._task.status @property def metadata(self): """Returns the metadata of the task.""" return self._task.metadata @property def result_url(self): """Returns the result URL of the task.""" if not self._is_export_v2: raise ExportTask.ExportTaskException( "This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead" ) base_url = self._task.client.rest_endpoint return base_url + '/export-results/' + self._task.uid + '/' + self._task.client.get_organization( ).uid @property def errors_url(self): """Returns the errors URL of the task.""" if not self._is_export_v2: raise ExportTask.ExportTaskException( "This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead" ) if not self.has_errors(): return None base_url = self._task.client.rest_endpoint return base_url + '/export-errors/' + self._task.uid + '/' + self._task.client.get_organization( ).uid @property def errors(self): """Returns the errors of the task.""" if not self._is_export_v2: raise ExportTask.ExportTaskException( "This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead" ) if self.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") if not self.has_errors(): return None data = [] metadata_header = ExportTask._get_metadata_header( self._task.client, self._task.uid, StreamType.ERRORS) if metadata_header is None: return None Stream( _TaskContext(self._task.client, self._task.uid, StreamType.ERRORS, metadata_header), _BufferedGCSFileReader(), _BufferedJsonConverter(), ).start(stream_handler=lambda output: data.append(output.json)) return data @property def result(self): """Returns the result of the task.""" if self._is_export_v2: if self.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") data = [] metadata_header = ExportTask._get_metadata_header( self._task.client, self._task.uid, StreamType.RESULT) if metadata_header is None: return [] Stream( _TaskContext(self._task.client, self._task.uid, StreamType.RESULT, metadata_header), _BufferedGCSFileReader(), _BufferedJsonConverter(), ).start(stream_handler=lambda output: data.append(output.json)) return data return self._task.result_url @property def completion_percentage(self): """Returns the completion percentage of the task.""" return self._task.completion_percentage @property def type(self): """Returns the type of the task.""" return self._task.type @property def created_by(self): """Returns the user who created the task.""" return self._task.created_by @property def organization(self): """Returns the organization of the task.""" return self._task.organization
[docs] def wait_till_done(self, timeout_seconds: int = 7200) -> None: """Waits until the task is done.""" return self._task.wait_till_done(timeout_seconds)
@staticmethod @lru_cache(maxsize=5) def _get_metadata_header( client, task_id: str, stream_type: StreamType) -> Union[_MetadataHeader, None]: """Returns the total file size for a specific task.""" query = (f"query GetExportMetadataHeaderPyApi" f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!)" f"{{task(where: $where)" f"{{{'exportMetadataHeader'}(streamType: $streamType)" f"{{totalSize totalLines}}" f"}}}}") variables = {"where": {"id": task_id}, "streamType": stream_type.value} res = client.execute(query, variables, error_log_key="errors") res = res["task"]["exportMetadataHeader"] return _MetadataHeader(**res) if res else None
[docs] def get_total_file_size(self, stream_type: StreamType) -> Union[int, None]: """Returns the total file size for a specific task.""" if self._task.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") header = ExportTask._get_metadata_header(self._task.client, self._task.uid, stream_type) return header.total_size if header else None
[docs] def get_total_lines(self, stream_type: StreamType) -> Union[int, None]: """Returns the total file size for a specific task.""" if self._task.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") header = ExportTask._get_metadata_header(self._task.client, self._task.uid, stream_type) return header.total_lines if header else None
[docs] def has_result(self) -> bool: """Returns whether the task has a result.""" total_size = self.get_total_file_size(StreamType.RESULT) return total_size is not None and total_size > 0
[docs] def has_errors(self) -> bool: """Returns whether the task has errors.""" total_size = self.get_total_file_size(StreamType.ERRORS) return total_size is not None and total_size > 0
@overload def get_stream( self, converter: JsonConverter = JsonConverter(), stream_type: StreamType = StreamType.RESULT, ) -> Stream[JsonConverterOutput]: """Overload for getting the right typing hints when using a JsonConverter.""" @overload def get_stream( self, converter: FileConverter, stream_type: StreamType = StreamType.RESULT, ) -> Stream[FileConverterOutput]: """Overload for getting the right typing hints when using a FileConverter."""
[docs] def get_stream( self, converter: Converter = JsonConverter(), stream_type: StreamType = StreamType.RESULT, ) -> Stream: """Returns the result of the task.""" if self._task.status == "FAILED": raise ExportTask.ExportTaskException("Task failed") if self._task.status != "COMPLETE": raise ExportTask.ExportTaskException("Task is not ready yet") metadata_header = self._get_metadata_header(self._task.client, self._task.uid, stream_type) if metadata_header is None: raise ValueError( f"Task {self._task.uid} does not have a {stream_type.value} stream" ) return Stream( _TaskContext(self._task.client, self._task.uid, stream_type, metadata_header), _MultiGCSFileReader(), converter, )
[docs] @staticmethod def get_task(client, task_id): """Returns the task with the given id.""" return ExportTask(Task.get_task(client, task_id))