import logging
from enum import Enum
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
import json
import warnings
from labelbox.orm import query
from labelbox.orm.db_object import (
DbObject,
Updateable,
BulkDeletable,
)
from labelbox.orm.model import Entity, Field, Relationship
from labelbox.schema.asset_attachment import AttachmentType
from labelbox.schema.data_row_metadata import DataRowMetadataField # type: ignore
from labelbox.schema.export_filters import (
DatarowExportFilters,
build_filters,
validate_at_least_one_of_data_row_ids_or_global_keys,
)
from labelbox.schema.export_params import (
CatalogExportParams,
validate_catalog_export_params,
)
from labelbox.schema.export_task import ExportTask
from labelbox.schema.task import Task
if TYPE_CHECKING:
from labelbox import AssetAttachment, Client
logger = logging.getLogger(__name__)
[docs]class KeyType(str, Enum):
ID = "ID"
"""An existing CUID"""
GKEY = "GKEY"
"""A Global key, could be existing or non-existing"""
AUTO = "AUTO"
"""The key will be auto-generated. Only usable for creates"""
[docs]class DataRow(DbObject, Updateable, BulkDeletable):
"""Internal Labelbox representation of a single piece of data (e.g. image, video, text).
Attributes:
external_id (str): User-generated file name or identifier
global_key (str): User-generated globally unique identifier
row_data (str): Paths to local files are uploaded to Labelbox's server.
Otherwise, it's treated as an external URL.
updated_at (datetime)
created_at (datetime)
media_attributes (dict): generated media attributes for the data row
metadata_fields (list): metadata associated with the data row
metadata (list): metadata associated with the data row as list of DataRowMetadataField.
When importing Data Rows with metadata, use `metadata_fields` instead
dataset (Relationship): `ToOne` relationship to Dataset
created_by (Relationship): `ToOne` relationship to User
organization (Relationship): `ToOne` relationship to Organization
labels (Relationship): `ToMany` relationship to Label
attachments (Relationship) `ToMany` relationship with AssetAttachment
"""
external_id = Field.String("external_id")
global_key = Field.String("global_key")
row_data = Field.String("row_data")
updated_at = Field.DateTime("updated_at")
created_at = Field.DateTime("created_at")
media_attributes = Field.Json("media_attributes")
metadata_fields = Field.List(
dict,
graphql_type="DataRowCustomMetadataUpsertInput!",
name="metadata_fields",
result_subquery="metadataFields { schemaId name value kind }",
)
metadata = Field.List(
DataRowMetadataField,
name="metadata",
graphql_name="customMetadata",
result_subquery="customMetadata { schemaId value }",
)
# Relationships
dataset = Relationship.ToOne("Dataset")
created_by = Relationship.ToOne("User", False, "created_by")
organization = Relationship.ToOne("Organization", False)
labels = Relationship.ToMany("Label", True)
attachments = Relationship.ToMany("AssetAttachment", False, "attachments")
supported_meta_types = supported_attachment_types = set(
AttachmentType.__members__
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.attachments.supports_filtering = False
self.attachments.supports_sorting = False
[docs] def update(self, **kwargs):
# Convert row data to string if it is an object
# All other updates pass through
primary_fields = ["external_id", "global_key", "row_data"]
for field in primary_fields:
data = kwargs.get(field)
if data == "" or data == {}:
raise ValueError(f"{field} cannot be empty if it is set")
if not any(kwargs.get(field) for field in primary_fields):
raise ValueError(
f"At least one of these fields needs to be present: {primary_fields}"
)
row_data = kwargs.get("row_data")
if isinstance(row_data, dict):
kwargs["row_data"] = json.dumps(row_data)
super().update(**kwargs)
[docs] @staticmethod
def bulk_delete(data_rows) -> None:
"""Deletes all the given DataRows.
Args:
data_rows (list of DataRow): The DataRows to delete.
"""
BulkDeletable._bulk_delete(data_rows, True)
[docs] def get_winning_label_id(self, project_id: str) -> Optional[str]:
"""Retrieves the winning label ID, i.e. the one that was marked as the
best for a particular data row, in a project's workflow.
Args:
project_id (str): ID of the project containing the data row
"""
data_row_id_param = "dataRowId"
project_id_param = "projectId"
query_str = """query GetWinningLabelIdPyApi($%s: ID!, $%s: ID!) {
dataRow(where: { id: $%s }) {
labelingActivity(where: { projectId: $%s }) {
selectedLabelId
}
}} """ % (
data_row_id_param,
project_id_param,
data_row_id_param,
project_id_param,
)
res = self.client.execute(
query_str,
{
data_row_id_param: self.uid,
project_id_param: project_id,
},
)
return res["dataRow"]["labelingActivity"]["selectedLabelId"]
[docs] def create_attachment(
self, attachment_type, attachment_value, attachment_name=None
) -> "AssetAttachment":
"""Adds an AssetAttachment to a DataRow.
Labelers can view these attachments while labeling.
>>> datarow.create_attachment("TEXT", "This is a text message")
Args:
attachment_type (str): Asset attachment type, must be one of:
VIDEO, IMAGE, TEXT, IMAGE_OVERLAY (AttachmentType)
attachment_value (str): Asset attachment value.
attachment_name (str): (Optional) Asset attachment name.
Returns:
`AssetAttachment` DB object.
Raises:
ValueError: attachment_type must be one of the supported types.
ValueError: attachment_value must be a non-empty string.
"""
Entity.AssetAttachment.validate_attachment_json(
{"type": attachment_type, "value": attachment_value}
)
attachment_type_param = "type"
attachment_value_param = "value"
attachment_name_param = "name"
data_row_id_param = "dataRowId"
query_str = """mutation CreateDataRowAttachmentPyApi(
$%s: AttachmentType!, $%s: String!, $%s: String, $%s: ID!) {
createDataRowAttachment(data: {
type: $%s value: $%s name: $%s dataRowId: $%s}) {%s}} """ % (
attachment_type_param,
attachment_value_param,
attachment_name_param,
data_row_id_param,
attachment_type_param,
attachment_value_param,
attachment_name_param,
data_row_id_param,
query.results_query_part(Entity.AssetAttachment),
)
res = self.client.execute(
query_str,
{
attachment_type_param: attachment_type,
attachment_value_param: attachment_value,
attachment_name_param: attachment_name,
data_row_id_param: self.uid,
},
)
return Entity.AssetAttachment(
self.client, res["createDataRowAttachment"]
)
[docs] @staticmethod
def export(
client: "Client",
data_rows: Optional[List[Union[str, "DataRow"]]] = None,
global_keys: Optional[List[str]] = None,
task_name: Optional[str] = None,
params: Optional[CatalogExportParams] = None,
) -> ExportTask:
"""
Creates a data rows export task with the given list, params and returns the task.
Args:
client (Client): client to use to make the export request
data_rows (list of DataRow or str): list of data row objects or data row ids to export
task_name (str): name of remote task
params (CatalogExportParams): export params
>>> dataset = client.get_dataset(DATASET_ID)
>>> task = DataRow.export(
>>> data_rows=[data_row.uid for data_row in dataset.data_rows.list()],
>>> # or a list of DataRow objects: data_rows = data_set.data_rows.list()
>>> # or a list of global_keys=["global_key_1", "global_key_2"],
>>> # Note that exactly one of: data_rows or global_keys parameters can be passed in at a time
>>> # and if data rows ids is present, global keys will be ignored
>>> params={
>>> "performance_details": False,
>>> "label_details": True
>>> })
>>> task.wait_till_done()
>>> task.result
"""
task, _ = DataRow._export(
client, data_rows, global_keys, task_name, params, streamable=True
)
return ExportTask(task)
[docs] @staticmethod
def export_v2(
client: "Client",
data_rows: Optional[List[Union[str, "DataRow"]]] = None,
global_keys: Optional[List[str]] = None,
task_name: Optional[str] = None,
params: Optional[CatalogExportParams] = None,
) -> Union[Task, ExportTask]:
"""
Creates a data rows export task with the given list, params and returns the task.
Args:
client (Client): client to use to make the export request
data_rows (list of DataRow or str): list of data row objects or data row ids to export
task_name (str): name of remote task
params (CatalogExportParams): export params
>>> dataset = client.get_dataset(DATASET_ID)
>>> task = DataRow.export_v2(
>>> data_rows=[data_row.uid for data_row in dataset.data_rows.list()],
>>> # or a list of DataRow objects: data_rows = data_set.data_rows.list()
>>> # or a list of global_keys=["global_key_1", "global_key_2"],
>>> # Note that exactly one of: data_rows or global_keys parameters can be passed in at a time
>>> # and if data rows ids is present, global keys will be ignored
>>> params={
>>> "performance_details": False,
>>> "label_details": True
>>> })
>>> task.wait_till_done()
>>> task.result
"""
warnings.warn(
"The method export_v2 for DataRow is deprecated and will be removed in the next major release. Use the export method instead.",
DeprecationWarning,
stacklevel=2,
)
task, is_streamable = DataRow._export(
client, data_rows, global_keys, task_name, params
)
if is_streamable:
return ExportTask(task, True)
return task
@staticmethod
def _export(
client: "Client",
data_rows: Optional[List[Union[str, "DataRow"]]] = None,
global_keys: Optional[List[str]] = None,
task_name: Optional[str] = None,
params: Optional[CatalogExportParams] = None,
streamable: bool = False,
) -> Tuple[Task, bool]:
_params = params or CatalogExportParams(
{
"attachments": False,
"embeddings": False,
"metadata_fields": False,
"data_row_details": False,
"project_details": False,
"performance_details": False,
"label_details": False,
"media_type_override": None,
"model_run_ids": None,
"project_ids": None,
"interpolated_frames": False,
"all_projects": False,
"all_model_runs": False,
}
)
validate_catalog_export_params(_params)
mutation_name = "exportDataRowsInCatalog"
create_task_query_str = (
f"mutation {mutation_name}PyApi"
f"($input: ExportDataRowsInCatalogInput!)"
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}"
)
data_row_ids = []
if data_rows is not None:
for dr in data_rows:
if isinstance(dr, DataRow):
data_row_ids.append(dr.uid)
elif isinstance(dr, str):
data_row_ids.append(dr)
filters = (
DatarowExportFilters(
{
"data_row_ids": data_row_ids,
"global_keys": None,
}
)
if data_row_ids
else DatarowExportFilters(
{
"data_row_ids": None,
"global_keys": global_keys,
}
)
)
validate_at_least_one_of_data_row_ids_or_global_keys(filters)
search_query = build_filters(client, filters)
media_type_override = _params.get("media_type_override", None)
if task_name is None:
task_name = f"Export v2: data rows {len(data_row_ids)}"
query_params = {
"input": {
"taskName": task_name,
"filters": {
"searchQuery": {"scope": None, "query": search_query}
},
"isStreamableReady": True,
"params": {
"mediaTypeOverride": media_type_override.value
if media_type_override is not None
else None,
"includeAttachments": _params.get("attachments", False),
"includeEmbeddings": _params.get("embeddings", False),
"includeMetadata": _params.get("metadata_fields", False),
"includeDataRowDetails": _params.get(
"data_row_details", False
),
"includeProjectDetails": _params.get(
"project_details", False
),
"includePerformanceDetails": _params.get(
"performance_details", False
),
"includeLabelDetails": _params.get("label_details", False),
"includeInterpolatedFrames": _params.get(
"interpolated_frames", False
),
"projectIds": _params.get("project_ids", None),
"modelRunIds": _params.get("model_run_ids", None),
"allProjects": _params.get("all_projects", False),
"allModelRuns": _params.get("all_model_runs", False),
},
"streamable": streamable,
}
}
res = client.execute(
create_task_query_str, query_params, error_log_key="errors"
)
res = res[mutation_name]
task_id = res["taskId"]
is_streamable = res["isStreamable"]
return Task.get_task(client, task_id), is_streamable