Source code for labelbox.schema.data_row_metadata
# type: ignore
from datetime import datetime
import warnings
from copy import deepcopy
from enum import Enum
from itertools import chain
from typing import List, Optional, Dict, Union, Callable, Type, Any, Generator
from pydantic import BaseModel, conlist, constr
from labelbox.schema.ontology import SchemaId
from labelbox.utils import camel_case
[docs]class DataRowMetadataKind(Enum):
number = "CustomMetadataNumber"
datetime = "CustomMetadataDateTime"
enum = "CustomMetadataEnum"
string = "CustomMetadataString"
option = "CustomMetadataEnumOption"
embedding = "CustomMetadataEmbedding"
# Metadata schema
class DataRowMetadataSchema(BaseModel):
uid: SchemaId
name: constr(strip_whitespace=True, min_length=1, max_length=100)
reserved: bool
kind: DataRowMetadataKind
options: Optional[List["DataRowMetadataSchema"]]
parent: Optional[SchemaId]
@property
def id(self):
""" DataRowMetadataSchema.id will be removed after 2021-12-06
use DataRowMetadataSchema.uid instead
"""
warnings.warn(
"DataRowMetadataSchema.id will be removed after 2021-12-06 "
"use DataRowMetadataSchema.uid instead")
return self.uid
DataRowMetadataSchema.update_forward_refs()
Embedding: Type[List[float]] = conlist(float, min_items=128, max_items=128)
String: Type[str] = constr(max_length=500)
class _CamelCaseMixin(BaseModel):
class Config:
allow_population_by_field_name = True
alias_generator = camel_case
# Metadata base class
class DataRowMetadataField(_CamelCaseMixin):
schema_id: SchemaId
# value is of type `Any` so that we do not improperly coerce the value to the wrong tpye
# Additional validation is performed before upload using the schema information
value: Any
class DataRowMetadata(_CamelCaseMixin):
data_row_id: str
fields: List[DataRowMetadataField]
class DeleteDataRowMetadata(_CamelCaseMixin):
data_row_id: str
fields: List[SchemaId]
class DataRowMetadataBatchResponse(_CamelCaseMixin):
data_row_id: str
error: Optional[str] = None
fields: List[Union[DataRowMetadataField, SchemaId]]
# --- Batch GraphQL Objects ---
# Don't want to crowd the name space with internals
# Bulk upsert values
class _UpsertDataRowMetadataInput(_CamelCaseMixin):
schema_id: str
value: Any
# Batch of upsert values for a datarow
class _UpsertBatchDataRowMetadata(_CamelCaseMixin):
data_row_id: str
fields: List[_UpsertDataRowMetadataInput]
class _DeleteBatchDataRowMetadata(_CamelCaseMixin):
data_row_id: str
schema_ids: List[SchemaId]
_BatchInputs = Union[List[_UpsertBatchDataRowMetadata],
List[_DeleteBatchDataRowMetadata]]
_BatchFunction = Callable[[_BatchInputs], List[DataRowMetadataBatchResponse]]
[docs]class DataRowMetadataOntology:
""" Ontology for data row metadata
Metadata provides additional context for a data rows. Metadata is broken into two classes
reserved and custom. Reserved fields are defined by Labelbox and used for creating
specific experiences in the platform.
>>> mdo = client.get_data_row_metadata_ontology()
"""
def __init__(self, client):
self._client = client
self._batch_size = 50 # used for uploads and deletes
self._raw_ontology = self._get_ontology()
self._build_ontology()
def _build_ontology(self):
# all fields
self.fields = self._parse_ontology(self._raw_ontology)
self.fields_by_id = self._make_id_index(self.fields)
# reserved fields
self.reserved_fields: List[DataRowMetadataSchema] = [
f for f in self.fields if f.reserved
]
self.reserved_by_id = self._make_id_index(self.reserved_fields)
self.reserved_by_name: Dict[
str,
DataRowMetadataSchema] = self._make_name_index(self.reserved_fields)
# custom fields
self.custom_fields: List[DataRowMetadataSchema] = [
f for f in self.fields if not f.reserved
]
self.custom_by_id = self._make_id_index(self.custom_fields)
self.custom_by_name: Dict[
str,
DataRowMetadataSchema] = self._make_name_index(self.custom_fields)
@staticmethod
def _make_name_index(fields: List[DataRowMetadataSchema]):
index = {}
for f in fields:
if f.options:
index[f.name] = {}
for o in f.options:
index[f.name][o.name] = o
else:
index[f.name] = f
return index
@staticmethod
def _make_id_index(
fields: List[DataRowMetadataSchema]
) -> Dict[SchemaId, DataRowMetadataSchema]:
index = {}
for f in fields:
index[f.uid] = f
if f.options:
for o in f.options:
index[o.uid] = o
return index
def _get_ontology(self) -> List[Dict[str, Any]]:
query = """query GetMetadataOntologyBetaPyApi {
customMetadataOntology {
id
name
kind
reserved
options {
id
kind
name
reserved
}
}}
"""
return self._client.execute(query)["customMetadataOntology"]
@staticmethod
def _parse_ontology(raw_ontology) -> List[DataRowMetadataSchema]:
fields = []
copy = deepcopy(raw_ontology)
for schema in copy:
schema["uid"] = schema["id"]
options = None
if schema.get("options"):
options = []
for option in schema["options"]:
option["uid"] = option["id"]
options.append(
DataRowMetadataSchema(**{
**option,
**{
"parent": schema["uid"]
}
}))
schema["options"] = options
fields.append(DataRowMetadataSchema(**schema))
return fields
[docs] def parse_metadata(
self, unparsed: List[Dict[str,
List[Union[str,
Dict]]]]) -> List[DataRowMetadata]:
""" Parse metadata responses
>>> mdo.parse_metadata([metdata])
Args:
unparsed: An unparsed metadata export
Returns:
metadata: List of `DataRowMetadata`
"""
parsed = []
if isinstance(unparsed, dict):
raise ValueError("Pass a list of dictionaries")
for dr in unparsed:
fields = []
for f in dr["fields"]:
schema = self.fields_by_id[f["schemaId"]]
if schema.kind == DataRowMetadataKind.enum:
continue
elif schema.kind == DataRowMetadataKind.option:
field = DataRowMetadataField(schema_id=schema.parent,
value=schema.uid)
elif schema.kind == DataRowMetadataKind.datetime:
field = DataRowMetadataField(
schema_id=schema.uid,
value=datetime.fromisoformat(f["value"][:-1] +
"+00:00"))
else:
field = DataRowMetadataField(schema_id=schema.uid,
value=f["value"])
fields.append(field)
parsed.append(
DataRowMetadata(data_row_id=dr["dataRowId"], fields=fields))
return parsed
[docs] def bulk_upsert(
self, metadata: List[DataRowMetadata]
) -> List[DataRowMetadataBatchResponse]:
"""Upsert datarow metadata
>>> metadata = DataRowMetadata(
>>> data_row_id="datarow-id",
>>> fields=[
>>> DataRowMetadataField(schema_id="schema-id", value="my-message"),
>>> ...
>>> ]
>>> )
>>> mdo.batch_upsert([metadata])
Args:
metadata: List of DataRow Metadata to upsert
Returns:
list of unsuccessful upserts.
An empty list means the upload was successful.
"""
if not len(metadata):
raise ValueError("Empty list passed")
def _batch_upsert(
upserts: List[_UpsertBatchDataRowMetadata]
) -> List[DataRowMetadataBatchResponse]:
query = """mutation UpsertDataRowMetadataBetaPyApi($metadata: [DataRowCustomMetadataBatchUpsertInput!]!) {
upsertDataRowCustomMetadata(data: $metadata){
dataRowId
error
fields {
value
schemaId
}
}
}"""
res = self._client.execute(
query, {"metadata": upserts})['upsertDataRowCustomMetadata']
return [
DataRowMetadataBatchResponse(data_row_id=r['dataRowId'],
error=r['error'],
fields=self.parse_metadata(
[r])[0].fields) for r in res
]
items = []
for m in metadata:
items.append(
_UpsertBatchDataRowMetadata(
data_row_id=m.data_row_id,
fields=list(
chain.from_iterable(
self._parse_upsert(m) for m in m.fields))).dict(
by_alias=True))
res = _batch_operations(_batch_upsert, items, self._batch_size)
return res
[docs] def bulk_delete(
self, deletes: List[DeleteDataRowMetadata]
) -> List[DataRowMetadataBatchResponse]:
""" Delete metadata from a datarow by specifiying the fields you want to remove
>>> delete = DeleteDataRowMetadata(
>>> data_row_id="datarow-id",
>>> fields=[
>>> "schema-id-1",
>>> "schema-id-2"
>>> ...
>>> ]
>>> )
>>> mdo.batch_delete([metadata])
Args:
deletes: Data row and schema ids to delete
Returns:
list of unsuccessful deletions.
An empty list means all data rows were successfully deleted.
"""
if not len(deletes):
raise ValueError("Empty list passed")
def _batch_delete(
deletes: List[_DeleteBatchDataRowMetadata]
) -> List[DataRowMetadataBatchResponse]:
query = """mutation DeleteDataRowMetadataBetaPyApi($deletes: [DataRowCustomMetadataBatchDeleteInput!]!) {
deleteDataRowCustomMetadata(data: $deletes) {
dataRowId
error
fields {
value
schemaId
}
}
}
"""
res = self._client.execute(
query, {"deletes": deletes})['deleteDataRowCustomMetadata']
failures = []
for dr in res:
dr['fields'] = [f['schemaId'] for f in dr['fields']]
failures.append(DataRowMetadataBatchResponse(**dr))
return failures
items = [self._validate_delete(m) for m in deletes]
return _batch_operations(_batch_delete,
items,
batch_size=self._batch_size)
[docs] def bulk_export(self, data_row_ids: List[str]) -> List[DataRowMetadata]:
""" Exports metadata for a list of data rows
>>> mdo.bulk_export([data_row.uid for data_row in data_rows])
Args:
data_row_ids: List of data data rows to fetch metadata for
Returns:
A list of DataRowMetadata.
There will be one DataRowMetadata for each data_row_id passed in.
This is true even if the data row does not have any meta data.
Data rows without metadata will have empty `fields`.
"""
if not len(data_row_ids):
raise ValueError("Empty list passed")
def _bulk_export(_data_row_ids: List[str]) -> List[DataRowMetadata]:
query = """query dataRowCustomMetadataPyApi($dataRowIds: [ID!]!) {
dataRowCustomMetadata(where: {dataRowIds : $dataRowIds}) {
dataRowId
fields {
value
schemaId
}
}
}
"""
return self.parse_metadata(
self._client.execute(
query,
{"dataRowIds": _data_row_ids})['dataRowCustomMetadata'])
return _batch_operations(_bulk_export,
data_row_ids,
batch_size=self._batch_size)
def _parse_upsert(
self, metadatum: DataRowMetadataField
) -> List[_UpsertDataRowMetadataInput]:
"""Format for metadata upserts to GQL"""
if metadatum.schema_id not in self.fields_by_id:
raise ValueError(
f"Schema Id `{metadatum.schema_id}` not found in ontology")
schema = self.fields_by_id[metadatum.schema_id]
if schema.kind == DataRowMetadataKind.datetime:
parsed = _validate_parse_datetime(metadatum)
elif schema.kind == DataRowMetadataKind.string:
parsed = _validate_parse_text(metadatum)
elif schema.kind == DataRowMetadataKind.number:
parsed = _validate_parse_number(metadatum)
elif schema.kind == DataRowMetadataKind.embedding:
parsed = _validate_parse_embedding(metadatum)
elif schema.kind == DataRowMetadataKind.enum:
parsed = _validate_enum_parse(schema, metadatum)
elif schema.kind == DataRowMetadataKind.option:
raise ValueError("An Option id should not be set as the Schema id")
else:
raise ValueError(f"Unknown type: {schema}")
return [_UpsertDataRowMetadataInput(**p) for p in parsed]
def _validate_delete(self, delete: DeleteDataRowMetadata):
if not len(delete.fields):
raise ValueError(f"No fields specified for {delete.data_row_id}")
deletes = set()
for schema_id in delete.fields:
if schema_id not in self.fields_by_id:
raise ValueError(
f"Schema Id `{schema_id}` not found in ontology")
schema = self.fields_by_id[schema_id]
# handle users specifying enums by adding all option enums
if schema.kind == DataRowMetadataKind.enum:
[deletes.add(o.uid) for o in schema.options]
deletes.add(schema.uid)
return _DeleteBatchDataRowMetadata(
data_row_id=delete.data_row_id,
schema_ids=list(delete.fields)).dict(by_alias=True)
def _batch_items(iterable: List[Any], size: int) -> Generator[Any, None, None]:
l = len(iterable)
for ndx in range(0, l, size):
yield iterable[ndx:min(ndx + size, l)]
def _batch_operations(
batch_function: _BatchFunction,
items: List,
batch_size: int = 100,
):
response = []
for batch in _batch_items(items, batch_size):
response += batch_function(batch)
return response
def _validate_parse_embedding(
field: DataRowMetadataField
) -> List[Dict[str, Union[SchemaId, Embedding]]]:
if isinstance(field.value, list):
if not (Embedding.min_items <= len(field.value) <= Embedding.max_items):
raise ValueError(
"Embedding length invalid. "
"Must have length within the interval "
f"[{Embedding.min_items},{Embedding.max_items}]. Found {len(field.value)}"
)
field.value = [float(x) for x in field.value]
else:
raise ValueError(
f"Expected a list for embedding. Found {type(field.value)}")
return [field.dict(by_alias=True)]
def _validate_parse_number(
field: DataRowMetadataField
) -> List[Dict[str, Union[SchemaId, str, float, int]]]:
field.value = float(field.value)
return [field.dict(by_alias=True)]
def _validate_parse_datetime(
field: DataRowMetadataField) -> List[Dict[str, Union[SchemaId, str]]]:
if isinstance(field.value, str):
if field.value.endswith("Z"):
field.value = field.value[:-1]
field.value = datetime.fromisoformat(field.value)
elif not isinstance(field.value, datetime):
raise TypeError(
f"value for datetime fields must be either a string or datetime object. Found {type(field.value)}"
)
return [{
"schemaId": field.schema_id,
"value": field.value.isoformat() + "Z", # needs to be UTC
}]
def _validate_parse_text(
field: DataRowMetadataField) -> List[Dict[str, Union[SchemaId, str]]]:
if not isinstance(field.value, str):
raise ValueError(
f"Expected a string type for the text field. Found {type(field.value)}"
)
if len(field.value) > String.max_length:
raise ValueError(
f"string fields cannot exceed {String.max_length} characters.")
return [field.dict(by_alias=True)]
def _validate_enum_parse(
schema: DataRowMetadataSchema,
field: DataRowMetadataField) -> List[Dict[str, Union[SchemaId, dict]]]:
if schema.options:
if field.value not in {o.uid for o in schema.options}:
raise ValueError(
f"Option `{field.value}` not found for {field.schema_id}")
else:
raise ValueError("Incorrectly specified enum schema")
return [{
"schemaId": field.schema_id,
"value": {}
}, {
"schemaId": field.value,
"value": {}
}]