Source code for labelbox.schema.data_row_metadata

# type: ignore
import warnings
from copy import deepcopy
from datetime import datetime
from enum import Enum
from itertools import chain
from typing import (
    Annotated,
    Any,
    Callable,
    Dict,
    Generator,
    List,
    Optional,
    Type,
    Union,
    overload,
)

from pydantic import (
    BaseModel,
    BeforeValidator,
    ConfigDict,
    Field,
    StringConstraints,
    conlist,
    model_serializer,
)
from typing_extensions import Annotated

from labelbox.schema.identifiable import GlobalKey, UniqueId
from labelbox.schema.identifiables import DataRowIdentifiers, UniqueIds
from labelbox.schema.ontology import SchemaId
from labelbox.utils import (
    _CamelCaseMixin,
    format_iso_datetime,
    format_iso_from_string,
)

Name = Annotated[
    str,
    BeforeValidator(lambda x: str.strip(str(x))),
    Field(min_length=1, max_length=100),
]


[docs]class DataRowMetadataKind(Enum): number = "CustomMetadataNumber" datetime = "CustomMetadataDateTime" enum = "CustomMetadataEnum" string = "CustomMetadataString" option = "CustomMetadataEnumOption" embedding = "CustomMetadataEmbedding"
# Metadata schema
[docs]class DataRowMetadataSchema(BaseModel): uid: SchemaId name: Name reserved: bool kind: DataRowMetadataKind options: Optional[List["DataRowMetadataSchema"]] = None parent: Optional[SchemaId] = None
DataRowMetadataSchema.model_rebuild() Embedding: Type[List[float]] = conlist(float, min_length=128, max_length=128) String: Type[str] = Field(max_length=4096) # Metadata base class
[docs]class DataRowMetadataField(_CamelCaseMixin): # One of `schema_id` or `name` must be provided. If `schema_id` is not provided, it is # inferred from `name` # schema id alias to json key name for pydantic v2 support schema_id: Optional[SchemaId] = None name: Optional[str] = None # value is of type `Any` so that we do not improperly coerce the value to the wrong type # Additional validation is performed before upload using the schema information value: Any
[docs]class DataRowMetadata(_CamelCaseMixin): global_key: Optional[str] = None data_row_id: Optional[str] = None fields: List[DataRowMetadataField]
[docs]class DeleteDataRowMetadata(_CamelCaseMixin): data_row_id: Union[str, UniqueId, GlobalKey] = None fields: List[SchemaId]
[docs]class DataRowMetadataBatchResponse(_CamelCaseMixin): global_key: Optional[str] = None data_row_id: Optional[str] = None error: Optional[str] = None fields: List[Union[DataRowMetadataField, SchemaId]]
# --- Batch GraphQL Objects --- # Don't want to crowd the name space with internals # Bulk upsert values class _UpsertDataRowMetadataInput(_CamelCaseMixin): schema_id: str value: Any # Batch of upsert values for a datarow class _UpsertBatchDataRowMetadata(_CamelCaseMixin): global_key: Optional[str] = None data_row_id: Optional[str] = None fields: List[_UpsertDataRowMetadataInput] class _DeleteBatchDataRowMetadata(_CamelCaseMixin): data_row_identifier: Union[UniqueId, GlobalKey] schema_ids: List[SchemaId] model_config = ConfigDict(arbitrary_types_allowed=True) @model_serializer(mode="wrap") def model_serializer(self, handler): res = handler(self) if "data_row_identifier" in res.keys(): key = "data_row_identifier" id_type_key = "id_type" else: key = "dataRowIdentifier" id_type_key = "idType" data_row_identifier = res.pop(key) res[key] = { "id": data_row_identifier.key, id_type_key: data_row_identifier.id_type, } return res _BatchInputs = Union[ List[_UpsertBatchDataRowMetadata], List[_DeleteBatchDataRowMetadata] ] _BatchFunction = Callable[[_BatchInputs], List[DataRowMetadataBatchResponse]] class _UpsertCustomMetadataSchemaEnumOptionInput(_CamelCaseMixin): id: Optional[SchemaId] = None name: Annotated[ str, StringConstraints(strip_whitespace=True, min_length=1, max_length=100), ] kind: str class _UpsertCustomMetadataSchemaInput(_CamelCaseMixin): id: Optional[SchemaId] = None name: Annotated[ str, StringConstraints(strip_whitespace=True, min_length=1, max_length=100), ] kind: str options: Optional[List[_UpsertCustomMetadataSchemaEnumOptionInput]] = None
[docs]class DataRowMetadataOntology: """Ontology for data row metadata Metadata provides additional context for a data rows. Metadata is broken into two classes reserved and custom. Reserved fields are defined by Labelbox and used for creating specific experiences in the platform. >>> mdo = client.get_data_row_metadata_ontology() """ def __init__(self, client): self._client = client self._batch_size = 50 # used for uploads and deletes self._raw_ontology = self._get_ontology() self._build_ontology() def _build_ontology(self): # all fields self.fields = self._parse_ontology(self._raw_ontology) self.fields_by_id = self._make_id_index(self.fields) # reserved fields self.reserved_fields: List[DataRowMetadataSchema] = [ f for f in self.fields if f.reserved ] self.reserved_by_id = self._make_id_index(self.reserved_fields) self.reserved_by_name: Dict[ str, Union[DataRowMetadataSchema, Dict[str, DataRowMetadataSchema]] ] = self._make_name_index(self.reserved_fields) self.reserved_by_name_normalized: Dict[str, DataRowMetadataSchema] = ( self._make_normalized_name_index(self.reserved_fields) ) # custom fields self.custom_fields: List[DataRowMetadataSchema] = [ f for f in self.fields if not f.reserved ] self.custom_by_id = self._make_id_index(self.custom_fields) self.custom_by_name: Dict[ str, Union[DataRowMetadataSchema, Dict[str, DataRowMetadataSchema]] ] = self._make_name_index(self.custom_fields) self.custom_by_name_normalized: Dict[str, DataRowMetadataSchema] = ( self._make_normalized_name_index(self.custom_fields) ) @staticmethod def _lookup_in_index_by_name(reserved_index, custom_index, name): # search through reserved names first if name in reserved_index: return reserved_index[name] elif name in custom_index: return custom_index[name] else: raise KeyError(f"There is no metadata with name '{name}'")
[docs] def get_by_name( self, name: str ) -> Union[DataRowMetadataSchema, Dict[str, DataRowMetadataSchema]]: """Get metadata by name >>> mdo.get_by_name(name) Args: name (str): Name of metadata schema Returns: Metadata schema as `DataRowMetadataSchema` or dict, in case of Enum metadata Raises: KeyError: When provided name is not presented in neither reserved nor custom metadata list """ return self._lookup_in_index_by_name( self.reserved_by_name, self.custom_by_name, name )
def _get_by_name_normalized(self, name: str) -> DataRowMetadataSchema: """Get metadata by name. For options, it provides the option schema instead of list of options """ # using `normalized` indices to find options by name as well return self._lookup_in_index_by_name( self.reserved_by_name_normalized, self.custom_by_name_normalized, name, ) @staticmethod def _make_name_index( fields: List[DataRowMetadataSchema], ) -> Dict[ str, Union[DataRowMetadataSchema, Dict[str, DataRowMetadataSchema]] ]: index = {} for f in fields: if f.options: index[f.name] = {} for o in f.options: index[f.name][o.name] = o else: index[f.name] = f return index @staticmethod def _make_normalized_name_index( fields: List[DataRowMetadataSchema], ) -> Dict[str, DataRowMetadataSchema]: index = {} for f in fields: index[f.name] = f return index @staticmethod def _make_id_index( fields: List[DataRowMetadataSchema], ) -> Dict[SchemaId, DataRowMetadataSchema]: index = {} for f in fields: index[f.uid] = f if f.options: for o in f.options: index[o.uid] = o return index def _get_ontology(self) -> List[Dict[str, Any]]: query = """query GetMetadataOntologyBetaPyApi { customMetadataOntology { id name kind reserved options { id kind name reserved } }} """ return self._client.execute(query)["customMetadataOntology"] @staticmethod def _parse_ontology(raw_ontology) -> List[DataRowMetadataSchema]: fields = [] copy = deepcopy(raw_ontology) for schema in copy: schema["uid"] = schema["id"] options = None if schema.get("options"): options = [] for option in schema["options"]: option["uid"] = option["id"] options.append( DataRowMetadataSchema( **{**option, **{"parent": schema["uid"]}} ) ) schema["options"] = options fields.append(DataRowMetadataSchema(**schema)) return fields
[docs] def refresh_ontology(self): """Update the `DataRowMetadataOntology` instance with the latest metadata ontology schemas """ self._raw_ontology = self._get_ontology() self._build_ontology()
[docs] def create_schema( self, name: str, kind: DataRowMetadataKind, options: List[str] = None ) -> DataRowMetadataSchema: """Create metadata schema >>> mdo.create_schema(name, kind, options) Args: name (str): Name of metadata schema kind (DataRowMetadataKind): Kind of metadata schema as `DataRowMetadataKind` options (List[str]): List of Enum options Returns: Created metadata schema as `DataRowMetadataSchema` Raises: KeyError: When provided name is not a valid custom metadata """ if not isinstance(kind, DataRowMetadataKind): raise ValueError(f"kind '{kind}' must be a `DataRowMetadataKind`") upsert_schema = _UpsertCustomMetadataSchemaInput( name=name, kind=kind.value ) if options: if kind != DataRowMetadataKind.enum: raise ValueError( f"Kind '{kind}' must be an Enum, if Enum options are provided" ) upsert_enum_options = [ _UpsertCustomMetadataSchemaEnumOptionInput( name=o, kind=DataRowMetadataKind.option.value ) for o in options ] upsert_schema.options = upsert_enum_options return self._upsert_schema(upsert_schema)
[docs] def update_schema(self, name: str, new_name: str) -> DataRowMetadataSchema: """Update metadata schema >>> mdo.update_schema(name, new_name) Args: name (str): Current name of metadata schema new_name (str): New name of metadata schema Returns: Updated metadata schema as `DataRowMetadataSchema` Raises: KeyError: When provided name is not a valid custom metadata """ schema = self._validate_custom_schema_by_name(name) upsert_schema = _UpsertCustomMetadataSchemaInput( id=schema.uid, name=new_name, kind=schema.kind.value ) if schema.options: upsert_enum_options = [ _UpsertCustomMetadataSchemaEnumOptionInput( id=o.uid, name=o.name, kind=DataRowMetadataKind.option.value ) for o in schema.options ] upsert_schema.options = upsert_enum_options return self._upsert_schema(upsert_schema)
[docs] def update_enum_option( self, name: str, option: str, new_option: str ) -> DataRowMetadataSchema: """Update Enum metadata schema option >>> mdo.update_enum_option(name, option, new_option) Args: name (str): Name of metadata schema to update option (str): Name of Enum option to update new_option (str): New name of Enum option Returns: Updated metadata schema as `DataRowMetadataSchema` Raises: KeyError: When provided name is not a valid custom metadata """ schema = self._validate_custom_schema_by_name(name) if schema.kind != DataRowMetadataKind.enum: raise ValueError( "Updating Enum option is only supported for Enum metadata schema" ) valid_options: List[str] = [o.name for o in schema.options] if option not in valid_options: raise ValueError( f"Enum option '{option}' is not a valid option for Enum '{name}', valid options are: {valid_options}" ) upsert_schema = _UpsertCustomMetadataSchemaInput( id=schema.uid, name=schema.name, kind=schema.kind.value ) upsert_enum_options = [] for o in schema.options: enum_option = _UpsertCustomMetadataSchemaEnumOptionInput( id=o.uid, name=o.name, kind=o.kind.value ) if enum_option.name == option: enum_option.name = new_option upsert_enum_options.append(enum_option) upsert_schema.options = upsert_enum_options return self._upsert_schema(upsert_schema)
[docs] def delete_schema(self, name: str) -> bool: """Delete metadata schema >>> mdo.delete_schema(name) Args: name: Name of metadata schema to delete Returns: True if deletion is successful, False if unsuccessful Raises: KeyError: When provided name is not a valid custom metadata """ schema = self._validate_custom_schema_by_name(name) query = """mutation DeleteCustomMetadataSchemaPyApi($where: WhereUniqueIdInput!) { deleteCustomMetadataSchema(schema: $where){ success } }""" res = self._client.execute(query, {"where": {"id": schema.uid}})[ "deleteCustomMetadataSchema" ] self.refresh_ontology() return res["success"]
[docs] def parse_metadata( self, unparsed: List[Dict[str, List[Union[str, Dict]]]] ) -> List[DataRowMetadata]: """Parse metadata responses >>> mdo.parse_metadata([metadata]) Args: unparsed: An unparsed metadata export Returns: metadata: List of `DataRowMetadata` """ parsed = [] if isinstance(unparsed, dict): raise ValueError("Pass a list of dictionaries") for dr in unparsed: fields = [] if "fields" in dr: fields = self.parse_metadata_fields(dr["fields"]) parsed.append( DataRowMetadata( data_row_id=dr["dataRowId"], global_key=dr["globalKey"], fields=fields, ) ) return parsed
[docs] def parse_metadata_fields( self, unparsed: List[Dict[str, Dict]] ) -> List[DataRowMetadataField]: """Parse metadata fields as list of `DataRowMetadataField` >>> mdo.parse_metadata_fields([metadata_fields]) Args: unparsed: An unparsed list of metadata represented as a dict containing 'schemaId' and 'value' Returns: metadata: List of `DataRowMetadataField` """ parsed = [] if isinstance(unparsed, dict): raise ValueError("Pass a list of dictionaries") for f in unparsed: if f["schemaId"] not in self.fields_by_id: # Fetch latest metadata ontology if metadata can't be found self.refresh_ontology() if f["schemaId"] not in self.fields_by_id: raise ValueError( f"Schema Id `{f['schemaId']}` not found in ontology" ) schema = self.fields_by_id[f["schemaId"]] if schema.kind == DataRowMetadataKind.enum: continue elif schema.kind == DataRowMetadataKind.option: field = DataRowMetadataField( schema_id=schema.parent, value=schema.uid ) elif schema.kind == DataRowMetadataKind.datetime: field = DataRowMetadataField( schema_id=schema.uid, value=format_iso_from_string(f["value"]), ) else: field = DataRowMetadataField( schema_id=schema.uid, value=f["value"] ) field.name = schema.name parsed.append(field) return parsed
[docs] def bulk_upsert( self, metadata: List[DataRowMetadata] ) -> List[DataRowMetadataBatchResponse]: """Upsert metadata to a list of data rows You may specify data row by either data_row_id or global_key >>> metadata = DataRowMetadata( >>> data_row_id="datarow-id", # Alternatively, set global_key="global-key" >>> fields=[ >>> DataRowMetadataField(schema_id="schema-id", value="my-message"), >>> ... >>> ] >>> ) >>> mdo.batch_upsert([metadata]) Args: metadata: List of DataRow Metadata to upsert Returns: list of unsuccessful upserts. An empty list means the upload was successful. """ if not len(metadata): raise ValueError("Empty list passed") def _batch_upsert( upserts: List[_UpsertBatchDataRowMetadata], ) -> List[DataRowMetadataBatchResponse]: query = """mutation UpsertDataRowMetadataBetaPyApi($metadata: [DataRowCustomMetadataBatchUpsertInput!]!) { upsertDataRowCustomMetadata(data: $metadata){ globalKey dataRowId error fields { value schemaId } } }""" res = self._client.execute(query, {"metadata": upserts})[ "upsertDataRowCustomMetadata" ] return [ DataRowMetadataBatchResponse( global_key=r["globalKey"], data_row_id=r["dataRowId"], error=r["error"], fields=self.parse_metadata([r])[0].fields, ) for r in res ] items = [] for m in metadata: items.append( _UpsertBatchDataRowMetadata( global_key=m.global_key, data_row_id=m.data_row_id, fields=list( chain.from_iterable( self._parse_upsert(f, m.data_row_id) for f in m.fields ) ), ).model_dump(by_alias=True) ) res = _batch_operations(_batch_upsert, items, self._batch_size) return res
[docs] def bulk_delete( self, deletes: List[DeleteDataRowMetadata] ) -> List[DataRowMetadataBatchResponse]: """Delete metadata from a datarow by specifiying the fields you want to remove >>> delete = DeleteDataRowMetadata( >>> data_row_id=UniqueId("datarow-id"), >>> fields=[ >>> "schema-id-1", >>> "schema-id-2" >>> ... >>> ] >>> ) >>> mdo.batch_delete([metadata]) >>> delete = DeleteDataRowMetadata( >>> data_row_id=GlobalKey("global-key"), >>> fields=[ >>> "schema-id-1", >>> "schema-id-2" >>> ... >>> ] >>> ) >>> mdo.batch_delete([metadata]) >>> delete = DeleteDataRowMetadata( >>> data_row_id="global-key", >>> fields=[ >>> "schema-id-1", >>> "schema-id-2" >>> ... >>> ] >>> ) >>> mdo.batch_delete([metadata]) Args: deletes: Data row and schema ids to delete For data row, we support UniqueId, str, and GlobalKey. If you pass a str, we will assume it is a UniqueId Do not pass a mix of data row ids and global keys in the same list Returns: list of unsuccessful deletions. An empty list means all data rows were successfully deleted. """ if not len(deletes): raise ValueError("The 'deletes' list cannot be empty.") for i, delete in enumerate(deletes): if isinstance(delete.data_row_id, str): deletes[i] = DeleteDataRowMetadata( data_row_id=UniqueId(delete.data_row_id), fields=delete.fields, ) elif isinstance(delete.data_row_id, UniqueId): continue elif isinstance(delete.data_row_id, GlobalKey): continue else: raise ValueError( f"Invalid data row identifier type '{type(delete.data_row_id)}' for '{delete.data_row_id}'" ) def _batch_delete( deletes: List[_DeleteBatchDataRowMetadata], ) -> List[DataRowMetadataBatchResponse]: query = """mutation DeleteDataRowMetadataBetaPyApi($deletes: [DataRowIdentifierCustomMetadataBatchDeleteInput!]) { deleteDataRowCustomMetadata(dataRowIdentifiers: $deletes) { dataRowId error fields { value schemaId } } } """ res = self._client.execute(query, {"deletes": deletes})[ "deleteDataRowCustomMetadata" ] failures = [] for dr in res: dr["fields"] = [f["schemaId"] for f in dr["fields"]] failures.append(DataRowMetadataBatchResponse(**dr)) return failures items = [self._validate_delete(m) for m in deletes] return _batch_operations( _batch_delete, items, batch_size=self._batch_size )
@overload def bulk_export(self, data_row_ids: List[str]) -> List[DataRowMetadata]: pass @overload def bulk_export( self, data_row_ids: DataRowIdentifiers ) -> List[DataRowMetadata]: pass
[docs] def bulk_export(self, data_row_ids) -> List[DataRowMetadata]: """Exports metadata for a list of data rows >>> mdo.bulk_export([data_row.uid for data_row in data_rows]) Args: data_row_ids: List of data data rows to fetch metadata for. This can be a list of strings or a DataRowIdentifiers object DataRowIdentifier objects are lists of ids or global keys. A DataIdentifier object can be a UniqueIds or GlobalKeys class. Returns: A list of DataRowMetadata. There will be one DataRowMetadata for each data_row_id passed in. This is true even if the data row does not have any meta data. Data rows without metadata will have empty `fields`. """ if not len(data_row_ids): raise ValueError("Empty list passed") if ( isinstance(data_row_ids, list) and len(data_row_ids) > 0 and isinstance(data_row_ids[0], str) ): data_row_ids = UniqueIds(data_row_ids) def _bulk_export( _data_row_ids: DataRowIdentifiers, ) -> List[DataRowMetadata]: query = """query dataRowCustomMetadataPyApi($dataRowIdentifiers: DataRowCustomMetadataDataRowIdentifiersInput) { dataRowCustomMetadata(where: {dataRowIdentifiers : $dataRowIdentifiers}) { dataRowId globalKey fields { value schemaId } } } """ return self.parse_metadata( self._client.execute( query, { "dataRowIdentifiers": { "ids": [id for id in _data_row_ids], "idType": _data_row_ids.id_type, } }, )["dataRowCustomMetadata"] ) return _batch_operations( _bulk_export, data_row_ids, batch_size=self._batch_size )
[docs] def parse_upsert_metadata(self, metadata_fields) -> List[Dict[str, Any]]: """Converts either `DataRowMetadataField` or a dictionary representation of `DataRowMetadataField` into a validated, flattened dictionary of metadata fields that are used to create data row metadata. Used internally in `Dataset.create_data_rows()` Args: metadata_fields: List of `DataRowMetadataField` or a dictionary representation of `DataRowMetadataField` Returns: List of dictionaries representing a flattened view of metadata fields """ def _convert_metadata_field(metadata_field): if isinstance(metadata_field, DataRowMetadataField): return metadata_field elif isinstance(metadata_field, dict): if "value" not in metadata_field: raise ValueError( f"Custom metadata field '{metadata_field}' must have a 'value' key" ) if ( "schema_id" not in metadata_field and "name" not in metadata_field ): raise ValueError( f"Custom metadata field '{metadata_field}' must have either 'schema_id' or 'name' key" ) return DataRowMetadataField( schema_id=metadata_field.get("schema_id"), name=metadata_field.get("name"), value=metadata_field["value"], ) else: raise ValueError( f"Metadata field '{metadata_field}' is neither 'DataRowMetadataField' type or a dictionary" ) # Convert all metadata fields to DataRowMetadataField type metadata_fields = [_convert_metadata_field(m) for m in metadata_fields] parsed_metadata = list( chain.from_iterable(self._parse_upsert(m) for m in metadata_fields) ) return [m.model_dump(by_alias=True) for m in parsed_metadata]
def _upsert_schema( self, upsert_schema: _UpsertCustomMetadataSchemaInput ) -> DataRowMetadataSchema: query = """mutation UpsertCustomMetadataSchemaPyApi($data: UpsertCustomMetadataSchemaInput!) { upsertCustomMetadataSchema(data: $data){ id name kind options { id name kind } } }""" res = self._client.execute( query, {"data": upsert_schema.model_dump(exclude_none=True)} )["upsertCustomMetadataSchema"] self.refresh_ontology() return _parse_metadata_schema(res) def _load_option_by_name(self, metadatum: DataRowMetadataField): is_value_a_valid_schema_id = metadatum.value in self.fields_by_id if not is_value_a_valid_schema_id: metadatum_by_name = self.get_by_name(metadatum.name) if metadatum.value not in metadatum_by_name: raise KeyError( f"There is no enum option by name '{metadatum.value}' for enum name '{metadatum.name}'" ) metadatum.value = metadatum_by_name[metadatum.value].uid def _load_schema_id_by_name(self, metadatum: DataRowMetadataField): """ Loads schema id by name for a metadata field including options schema id. """ if metadatum.name is None: return if metadatum.schema_id is None: schema = self._get_by_name_normalized(metadatum.name) metadatum.schema_id = schema.uid if schema.options: self._load_option_by_name(metadatum) def _parse_upsert( self, metadatum: DataRowMetadataField, data_row_id: Optional[str] = None ) -> List[_UpsertDataRowMetadataInput]: """Format for metadata upserts to GQL""" self._load_schema_id_by_name(metadatum) if metadatum.schema_id not in self.fields_by_id: # Fetch latest metadata ontology if metadata can't be found self.refresh_ontology() if metadatum.schema_id not in self.fields_by_id: raise ValueError( f"Schema Id `{metadatum.schema_id}` not found in ontology" ) schema = self.fields_by_id[metadatum.schema_id] try: if schema.kind == DataRowMetadataKind.datetime: parsed = _validate_parse_datetime(metadatum) elif schema.kind == DataRowMetadataKind.string: parsed = _validate_parse_text(metadatum) elif schema.kind == DataRowMetadataKind.number: parsed = _validate_parse_number(metadatum) elif schema.kind == DataRowMetadataKind.embedding: parsed = _validate_parse_embedding(metadatum) elif schema.kind == DataRowMetadataKind.enum: parsed = _validate_enum_parse(schema, metadatum) elif schema.kind == DataRowMetadataKind.option: raise ValueError( "An Option id should not be set as the Schema id" ) else: raise ValueError(f"Unknown type: {schema}") except ValueError as e: error_str = f"Could not validate metadata [{metadatum}]" if data_row_id: error_str += f", data_row_id='{data_row_id}'" raise ValueError(f"{error_str}. Reason: {e}") return [_UpsertDataRowMetadataInput(**p) for p in parsed] def _validate_delete(self, delete: DeleteDataRowMetadata): if not len(delete.fields): raise ValueError(f"No fields specified for {delete.data_row_id}") deletes = set() for schema_id in delete.fields: if schema_id not in self.fields_by_id: # Fetch latest metadata ontology if metadata can't be found self.refresh_ontology() if schema_id not in self.fields_by_id: raise ValueError( f"Schema Id `{schema_id}` not found in ontology" ) schema = self.fields_by_id[schema_id] # handle users specifying enums by adding all option enums if schema.kind == DataRowMetadataKind.enum: [deletes.add(o.uid) for o in schema.options] deletes.add(schema.uid) return _DeleteBatchDataRowMetadata( data_row_identifier=delete.data_row_id, schema_ids=list(delete.fields), ).model_dump(by_alias=True) def _validate_custom_schema_by_name( self, name: str ) -> DataRowMetadataSchema: if name not in self.custom_by_name_normalized: # Fetch latest metadata ontology if metadata can't be found self.refresh_ontology() if name not in self.custom_by_name_normalized: raise KeyError(f"'{name}' is not a valid custom metadata") return self.custom_by_name_normalized[name]
def _batch_items(iterable: List[Any], size: int) -> Generator[Any, None, None]: for ndx in range(0, len(iterable), size): yield iterable[ndx : min(ndx + size, len(iterable))] def _batch_operations( batch_function: _BatchFunction, items: List, batch_size: int = 100, ): response = [] for batch in _batch_items(items, batch_size): response += batch_function(batch) return response def _validate_parse_embedding( field: DataRowMetadataField, ) -> List[Dict[str, Union[SchemaId, Embedding]]]: if isinstance(field.value, list): if not (Embedding.min_items <= len(field.value) <= Embedding.max_items): raise ValueError( "Embedding length invalid. " "Must have length within the interval " f"[{Embedding.min_items},{Embedding.max_items}]. Found {len(field.value)}" ) field.value = [float(x) for x in field.value] else: raise ValueError( f"Expected a list for embedding. Found {type(field.value)}" ) return [field.model_dump(by_alias=True)] def _validate_parse_number( field: DataRowMetadataField, ) -> List[Dict[str, Union[SchemaId, str, float, int]]]: field.value = float(field.value) return [field.model_dump(by_alias=True)] def _validate_parse_datetime( field: DataRowMetadataField, ) -> List[Dict[str, Union[SchemaId, str]]]: if isinstance(field.value, str): field.value = format_iso_from_string(field.value) elif not isinstance(field.value, datetime): raise TypeError( f"Value for datetime fields must be either a string or datetime object. Found {type(field.value)}" ) return [ {"schemaId": field.schema_id, "value": format_iso_datetime(field.value)} ] def _validate_parse_text( field: DataRowMetadataField, ) -> List[Dict[str, Union[SchemaId, str]]]: if not isinstance(field.value, str): raise ValueError( f"Expected a string type for the text field. Found {type(field.value)}" ) if len(field.value) > String.metadata[0].max_length: raise ValueError( f"String fields cannot exceed {String.metadata.max_length} characters." ) return [field.model_dump(by_alias=True)] def _validate_enum_parse( schema: DataRowMetadataSchema, field: DataRowMetadataField ) -> List[Dict[str, Union[SchemaId, dict]]]: if schema.options: if field.value not in {o.uid for o in schema.options}: raise ValueError( f"Option `{field.value}` not found for {field.schema_id}" ) else: raise ValueError("Incorrectly specified enum schema") return [ {"schemaId": field.schema_id, "value": {}}, {"schemaId": field.value, "value": {}}, ] def _parse_metadata_schema( unparsed: Dict[str, Union[str, List]], ) -> DataRowMetadataSchema: uid = unparsed["id"] name = unparsed["name"] kind = DataRowMetadataKind(unparsed["kind"]) options = [ DataRowMetadataSchema( uid=o["id"], name=o["name"], reserved=False, kind=DataRowMetadataKind.option, parent=uid, ) for o in unparsed["options"] ] return DataRowMetadataSchema( uid=uid, name=name, reserved=False, kind=kind, options=options or None )