import json
import logging
import time
from collections import namedtuple
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, List, Optional, Union
from urllib.parse import urlparse
import ndjson
import requests
from labelbox import utils
from labelbox.exceptions import (InvalidQueryError, LabelboxError,
ProcessingWaitTimeout, ResourceConflict,
ResourceNotFoundError)
from labelbox.orm import query
from labelbox.orm.db_object import DbObject, Deletable, Updateable
from labelbox.orm.model import Entity, Field, Relationship
from labelbox.pagination import PaginatedCollection
from labelbox.schema.consensus_settings import ConsensusSettings
from labelbox.schema.data_row import DataRow
from labelbox.schema.export_filters import ProjectExportFilters
from labelbox.schema.export_params import ProjectExportParams
from labelbox.schema.media_type import MediaType
from labelbox.schema.queue_mode import QueueMode
from labelbox.schema.resource_tag import ResourceTag
from labelbox.schema.task import Task
from labelbox.schema.user import User
from labelbox.schema.task_queue import TaskQueue
if TYPE_CHECKING:
from labelbox import BulkImportRequest
try:
datetime.fromisoformat # type: ignore[attr-defined]
except AttributeError:
from backports.datetime_fromisoformat import MonkeyPatch
MonkeyPatch.patch_fromisoformat()
try:
from labelbox.data.serialization import LBV1Converter
except ImportError:
pass
logger = logging.getLogger(__name__)
def _validate_datetime(string_date: str) -> bool:
"""helper function validate that datetime is as follows: YYYY-MM-DD for the export"""
if string_date:
for fmt in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S"):
try:
datetime.strptime(string_date, fmt)
return True
except ValueError:
pass
raise ValueError(f"""Incorrect format for: {string_date}.
Format must be \"YYYY-MM-DD\" or \"YYYY-MM-DD hh:mm:ss\"""")
return True
[docs]class Project(DbObject, Updateable, Deletable):
""" A Project is a container that includes a labeling frontend, an ontology,
datasets and labels.
Attributes:
name (str)
description (str)
updated_at (datetime)
created_at (datetime)
setup_complete (datetime)
last_activity_time (datetime)
queue_mode (string)
auto_audit_number_of_labels (int)
auto_audit_percentage (float)
datasets (Relationship): `ToMany` relationship to Dataset
created_by (Relationship): `ToOne` relationship to User
organization (Relationship): `ToOne` relationship to Organization
labeling_frontend (Relationship): `ToOne` relationship to LabelingFrontend
labeling_frontend_options (Relationship): `ToMany` relationship to LabelingFrontendOptions
labeling_parameter_overrides (Relationship): `ToMany` relationship to LabelingParameterOverride
webhooks (Relationship): `ToMany` relationship to Webhook
benchmarks (Relationship): `ToMany` relationship to Benchmark
ontology (Relationship): `ToOne` relationship to Ontology
task_queues (Relationship): `ToMany` relationship to TaskQueue
"""
name = Field.String("name")
description = Field.String("description")
updated_at = Field.DateTime("updated_at")
created_at = Field.DateTime("created_at")
setup_complete = Field.DateTime("setup_complete")
last_activity_time = Field.DateTime("last_activity_time")
queue_mode = Field.Enum(QueueMode, "queue_mode")
auto_audit_number_of_labels = Field.Int("auto_audit_number_of_labels")
auto_audit_percentage = Field.Float("auto_audit_percentage")
# Bind data_type and allowedMediaTYpe using the GraphQL type MediaType
media_type = Field.Enum(MediaType, "media_type", "allowedMediaType")
# Relationships
datasets = Relationship.ToMany("Dataset", True)
created_by = Relationship.ToOne("User", False, "created_by")
organization = Relationship.ToOne("Organization", False)
labeling_frontend = Relationship.ToOne("LabelingFrontend")
labeling_frontend_options = Relationship.ToMany(
"LabelingFrontendOptions", False, "labeling_frontend_options")
labeling_parameter_overrides = Relationship.ToMany(
"LabelingParameterOverride", False, "labeling_parameter_overrides")
webhooks = Relationship.ToMany("Webhook", False)
benchmarks = Relationship.ToMany("Benchmark", False)
ontology = Relationship.ToOne("Ontology", True)
#
_wait_processing_max_seconds = 3600
[docs] def update(self, **kwargs):
""" Updates this project with the specified attributes
Args:
kwargs: a dictionary containing attributes to be upserted
Note that the queue_mode cannot be changed after a project has been created.
Additionally, the quality setting cannot be changed after a project has been created. The quality mode
for a project is inferred through the following attributes:
Benchmark:
auto_audit_number_of_labels = 1
auto_audit_percentage = 1.0
Consensus:
auto_audit_number_of_labels > 1
auto_audit_percentage <= 1.0
Attempting to switch between benchmark and consensus modes is an invalid operation and will result
in an error.
"""
media_type = kwargs.get("media_type")
if media_type:
if MediaType.is_supported(media_type):
kwargs["media_type"] = media_type.value
else:
raise TypeError(f"{media_type} is not a valid media type. Use"
f" any of {MediaType.get_supported_members()}"
" from MediaType. Example: MediaType.Image.")
return super().update(**kwargs)
[docs] def members(self) -> PaginatedCollection:
""" Fetch all current members for this project
Returns:
A `PaginatedCollection` of `ProjectMember`s
"""
id_param = "projectId"
query_str = """query ProjectMemberOverviewPyApi($%s: ID!) {
project(where: {id : $%s}) { id members(skip: %%d first: %%d){ id user { %s } role { id name } }
}
}""" % (id_param, id_param, query.results_query_part(Entity.User))
return PaginatedCollection(self.client, query_str,
{id_param: str(self.uid)},
["project", "members"], ProjectMember)
[docs] def labels(self, datasets=None, order_by=None) -> PaginatedCollection:
""" Custom relationship expansion method to support limited filtering.
Args:
datasets (iterable of Dataset): Optional collection of Datasets
whose Labels are sought. If not provided, all Labels in
this Project are returned.
order_by (None or (Field, Field.Order)): Ordering clause.
"""
Label = Entity.Label
if datasets is not None:
where = " where:{dataRow: {dataset: {id_in: [%s]}}}" % ", ".join(
'"%s"' % dataset.uid for dataset in datasets)
else:
where = ""
if order_by is not None:
query.check_order_by_clause(Label, order_by)
order_by_str = "orderBy: %s_%s" % (order_by[0].graphql_name,
order_by[1].name.upper())
else:
order_by_str = ""
id_param = "projectId"
query_str = """query GetProjectLabelsPyApi($%s: ID!)
{project (where: {id: $%s})
{labels (skip: %%d first: %%d %s %s) {%s}}}""" % (
id_param, id_param, where, order_by_str,
query.results_query_part(Label))
return PaginatedCollection(self.client, query_str, {id_param: self.uid},
["project", "labels"], Label)
[docs] def export_queued_data_rows(
self,
timeout_seconds=120,
include_metadata: bool = False) -> List[Dict[str, str]]:
""" Returns all data rows that are currently enqueued for this project.
Args:
timeout_seconds (float): Max waiting time, in seconds.
include_metadata (bool): True to return related DataRow metadata
Returns:
Data row fields for all data rows in the queue as json
Raises:
LabelboxError: if the export fails or is unable to download within the specified time.
"""
id_param = "projectId"
metadata_param = "includeMetadataInput"
query_str = """mutation GetQueuedDataRowsExportUrlPyApi($%s: ID!, $%s: Boolean!)
{exportQueuedDataRows(data:{projectId: $%s , includeMetadataInput: $%s}) {downloadUrl createdAt status} }
""" % (id_param, metadata_param, id_param, metadata_param)
sleep_time = 2
start_time = time.time()
while True:
res = self.client.execute(query_str, {
id_param: self.uid,
metadata_param: include_metadata
})
res = res["exportQueuedDataRows"]
if res["status"] == "COMPLETE":
download_url = res["downloadUrl"]
response = requests.get(download_url)
response.raise_for_status()
return ndjson.loads(response.text)
elif res["status"] == "FAILED":
raise LabelboxError("Data row export failed.")
current_time = time.time()
if current_time - start_time > timeout_seconds:
raise LabelboxError(
f"Unable to export data rows within {timeout_seconds} seconds."
)
logger.debug(
"Project '%s' queued data row export, waiting for server...",
self.uid)
time.sleep(sleep_time)
[docs] def label_generator(self, timeout_seconds=600, **kwargs):
"""
Download text and image annotations, or video annotations.
For a mixture of text/image and video, use project.export_labels()
Returns:
LabelGenerator for accessing labels
"""
_check_converter_import()
json_data = self.export_labels(download=True,
timeout_seconds=timeout_seconds,
**kwargs)
# assert that the instance this would fail is only if timeout runs out
assert isinstance(
json_data,
List), "Unable to successfully get labels. Please try again"
if json_data is None:
raise TimeoutError(
f"Unable to download labels in {timeout_seconds} seconds."
"Please try again or contact support if the issue persists.")
is_video = [
"frames" in row["Label"]
for row in json_data
if row["Label"] and not row["Skipped"]
]
if len(is_video) and not all(is_video) and any(is_video):
raise ValueError(
"Found mixed data types of video and text/image. "
"Use project.export_labels() to export projects with mixed data types. "
)
if len(is_video) and all(is_video):
# Filter skipped labels to avoid inference errors
json_data = [
label for label in self.export_labels(download=True)
if not label["Skipped"]
]
return LBV1Converter.deserialize_video(json_data, self.client)
return LBV1Converter.deserialize(json_data)
[docs] def export_labels(self,
download=False,
timeout_seconds=1800,
**kwargs) -> Optional[Union[str, List[Dict[Any, Any]]]]:
""" Calls the server-side Label exporting that generates a JSON
payload, and returns the URL to that payload.
Will only generate a new URL at a max frequency of 30 min.
Args:
download (bool): Returns the url if False
timeout_seconds (float): Max waiting time, in seconds.
start (str): Earliest date for labels, formatted "YYYY-MM-DD" or "YYYY-MM-DD hh:mm:ss"
end (str): Latest date for labels, formatted "YYYY-MM-DD" or "YYYY-MM-DD hh:mm:ss"
last_activity_start (str): Will include all labels that have had any updates to
data rows, issues, comments, metadata, or reviews since this timestamp.
formatted "YYYY-MM-DD" or "YYYY-MM-DD hh:mm:ss"
last_activity_end (str): Will include all labels that do not have any updates to
data rows, issues, comments, metadata, or reviews after this timestamp.
formatted "YYYY-MM-DD" or "YYYY-MM-DD hh:mm:ss"
Returns:
URL of the data file with this Project's labels. If the server didn't
generate during the `timeout_seconds` period, None is returned.
"""
def _string_from_dict(dictionary: dict, value_with_quotes=False) -> str:
"""Returns a concatenated string of the dictionary's keys and values
The string will be formatted as {key}: 'value' for each key. Value will be inclusive of
quotations while key will not. This can be toggled with `value_with_quotes`"""
quote = "\"" if value_with_quotes else ""
return ",".join([
f"""{c}: {quote}{dictionary.get(c)}{quote}"""
for c in dictionary
if dictionary.get(c)
])
sleep_time = 2
id_param = "projectId"
filter_param = ""
filter_param_dict = {}
if "start" in kwargs or "end" in kwargs:
created_at_dict = {
"start": kwargs.get("start", ""),
"end": kwargs.get("end", "")
}
[_validate_datetime(date) for date in created_at_dict.values()]
filter_param_dict["labelCreatedAt"] = "{%s}" % _string_from_dict(
created_at_dict, value_with_quotes=True)
if "last_activity_start" in kwargs or "last_activity_end" in kwargs:
last_activity_start = kwargs.get('last_activity_start')
last_activity_end = kwargs.get('last_activity_end')
if last_activity_start:
_validate_datetime(str(last_activity_start))
if last_activity_end:
_validate_datetime(str(last_activity_end))
filter_param_dict["lastActivityAt"] = "{%s}" % _string_from_dict(
{
"start": last_activity_start,
"end": last_activity_end
},
value_with_quotes=True)
if filter_param_dict:
filter_param = """, filters: {%s }""" % (_string_from_dict(
filter_param_dict, value_with_quotes=False))
query_str = """mutation GetLabelExportUrlPyApi($%s: ID!)
{exportLabels(data:{projectId: $%s%s}) {downloadUrl createdAt shouldPoll} }
""" % (id_param, id_param, filter_param)
start_time = time.time()
while True:
res = self.client.execute(query_str, {id_param: self.uid})
res = res["exportLabels"]
if not res["shouldPoll"] and res["downloadUrl"] is not None:
url = res['downloadUrl']
if not download:
return url
else:
response = requests.get(url)
response.raise_for_status()
return response.json()
current_time = time.time()
if current_time - start_time > timeout_seconds:
return None
logger.debug("Project '%s' label export, waiting for server...",
self.uid)
time.sleep(sleep_time)
[docs] def export_v2(self,
task_name: Optional[str] = None,
filters: Optional[ProjectExportFilters] = None,
params: Optional[ProjectExportParams] = None) -> Task:
"""
Creates a project export task with the given params and returns the task.
For more information visit: https://docs.labelbox.com/docs/exports-v2#export-from-a-project-python-sdk
>>> task = project.export_v2(
>>> filters={
>>> "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
>>> "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"]
>>> },
>>> params={
>>> "performance_details": False,
>>> "label_details": True
>>> })
>>> task.wait_till_done()
>>> task.result
"""
_params = params or ProjectExportParams({
"attachments": False,
"metadata_fields": False,
"data_row_details": False,
"project_details": False,
"performance_details": False,
"label_details": False,
"media_type_override": None
})
_filters = filters or ProjectExportFilters({
"last_activity_at": None,
"label_created_at": None
})
def _get_timezone() -> str:
timezone_query_str = """query CurrentUserPyApi { user { timezone } }"""
tz_res = self.client.execute(timezone_query_str)
return tz_res["user"]["timezone"] or "UTC"
timezone: Optional[str] = None
mutation_name = "exportDataRowsInProject"
create_task_query_str = """mutation exportDataRowsInProjectPyApi($input: ExportDataRowsInProjectInput!){
%s(input: $input) {taskId} }
""" % (mutation_name)
search_query: List[Dict[str, Collection[str]]] = []
media_type_override = _params.get('media_type_override', None)
query_params = {
"input": {
"taskName": task_name,
"filters": {
"projectId": self.uid,
"searchQuery": {
"scope": None,
"query": search_query
}
},
"params": {
"mediaTypeOverride":
media_type_override.value
if media_type_override is not None else None,
"includeAttachments":
_params.get('attachments', False),
"includeMetadata":
_params.get('metadata_fields', False),
"includeDataRowDetails":
_params.get('data_row_details', False),
"includeProjectDetails":
_params.get('project_details', False),
"includePerformanceDetails":
_params.get('performance_details', False),
"includeLabelDetails":
_params.get('label_details', False)
},
}
}
if "last_activity_at" in _filters and _filters[
'last_activity_at'] is not None:
if timezone is None:
timezone = _get_timezone()
values = _filters['last_activity_at']
start, end = values
if (start is not None and end is not None):
[_validate_datetime(date) for date in values]
search_query.append({
"type": "data_row_last_activity_at",
"value": {
"operator": "BETWEEN",
"timezone": timezone,
"value": {
"min": start,
"max": end
}
}
})
elif (start is not None):
_validate_datetime(start)
search_query.append({
"type": "data_row_last_activity_at",
"value": {
"operator": "GREATER_THAN_OR_EQUAL",
"timezone": timezone,
"value": start
}
})
elif (end is not None):
_validate_datetime(end)
search_query.append({
"type": "data_row_last_activity_at",
"value": {
"operator": "LESS_THAN_OR_EQUAL",
"timezone": timezone,
"value": end
}
})
if "label_created_at" in _filters and _filters[
"label_created_at"] is not None:
if timezone is None:
timezone = _get_timezone()
values = _filters['label_created_at']
start, end = values
if (start is not None and end is not None):
[_validate_datetime(date) for date in values]
search_query.append({
"type": "labeled_at",
"value": {
"operator": "BETWEEN",
"value": {
"min": start,
"max": end
}
}
})
elif (start is not None):
_validate_datetime(start)
search_query.append({
"type": "labeled_at",
"value": {
"operator": "GREATER_THAN_OR_EQUAL",
"value": start
}
})
elif (end is not None):
_validate_datetime(end)
search_query.append({
"type": "labeled_at",
"value": {
"operator": "LESS_THAN_OR_EQUAL",
"value": end
}
})
res = self.client.execute(
create_task_query_str,
query_params,
)
res = res[mutation_name]
task_id = res["taskId"]
user: User = self.client.get_user()
tasks: List[Task] = list(
user.created_tasks(where=Entity.Task.uid == task_id))
# Cache user in a private variable as the relationship can't be
# resolved due to server-side limitations (see Task.created_by)
# for more info.
if len(tasks) != 1:
raise ResourceNotFoundError(Entity.Task, task_id)
task: Task = tasks[0]
task._user = user
return task
[docs] def export_issues(self, status=None) -> str:
""" Calls the server-side Issues exporting that
returns the URL to that payload.
Args:
status (string): valid values: Open, Resolved
Returns:
URL of the data file with this Project's issues.
"""
id_param = "projectId"
status_param = "status"
query_str = """query GetProjectIssuesExportPyApi($%s: ID!, $%s: IssueStatus) {
project(where: { id: $%s }) {
issueExportUrl(where: { status: $%s })
}
}""" % (id_param, status_param, id_param, status_param)
valid_statuses = {None, "Open", "Resolved"}
if status not in valid_statuses:
raise ValueError("status must be in {}. Found {}".format(
valid_statuses, status))
res = self.client.execute(query_str, {
id_param: self.uid,
status_param: status
})
res = res['project']
logger.debug("Project '%s' issues export, link generated", self.uid)
return res.get('issueExportUrl')
[docs] def upsert_instructions(self, instructions_file: str) -> None:
"""
* Uploads instructions to the UI. Running more than once will replace the instructions
Args:
instructions_file (str): Path to a local file.
* Must be a pdf or html file
Raises:
ValueError:
* project must be setup
* instructions file must have a ".pdf" or ".html" extension
"""
if self.setup_complete is None:
raise ValueError(
"Cannot attach instructions to a project that has not been set up."
)
frontend = self.labeling_frontend()
if frontend.name != "Editor":
logger.warning(
f"This function has only been tested to work with the Editor front end. Found %s",
frontend.name)
supported_instruction_formats = (".pdf", ".html")
if not instructions_file.endswith(supported_instruction_formats):
raise ValueError(
f"instructions_file must be a pdf or html file. Found {instructions_file}"
)
instructions_url = self.client.upload_file(instructions_file)
query_str = """mutation setprojectinsructionsPyApi($projectId: ID!, $instructions_url: String!) {
setProjectInstructions(
where: {id: $projectId},
data: {instructionsUrl: $instructions_url}
) {
id
ontology {
id
options
}
}
}"""
self.client.execute(query_str, {
'projectId': self.uid,
'instructions_url': instructions_url
})
[docs] def review_metrics(self, net_score) -> int:
""" Returns this Project's review metrics.
Args:
net_score (None or Review.NetScore): Indicates desired metric.
Returns:
int, aggregation count of reviews for given `net_score`.
"""
if net_score not in (None,) + tuple(Entity.Review.NetScore):
raise InvalidQueryError(
"Review metrics net score must be either None "
"or one of Review.NetScore values")
id_param = "projectId"
net_score_literal = "None" if net_score is None else net_score.name
query_str = """query ProjectReviewMetricsPyApi($%s: ID!){
project(where: {id:$%s})
{reviewMetrics {labelAggregate(netScore: %s) {count}}}
}""" % (id_param, id_param, net_score_literal)
res = self.client.execute(query_str, {id_param: self.uid})
return res["project"]["reviewMetrics"]["labelAggregate"]["count"]
[docs] def setup_editor(self, ontology) -> None:
"""
Sets up the project using the Pictor editor.
Args:
ontology (Ontology): The ontology to attach to the project
"""
if self.labeling_frontend() is not None:
raise ResourceConflict("Editor is already set up.")
labeling_frontend = next(
self.client.get_labeling_frontends(
where=Entity.LabelingFrontend.name == "Editor"))
self.labeling_frontend.connect(labeling_frontend)
LFO = Entity.LabelingFrontendOptions
self.client._create(
LFO, {
LFO.project:
self,
LFO.labeling_frontend:
labeling_frontend,
LFO.customization_options:
json.dumps({
"tools": [],
"classifications": []
})
})
query_str = """mutation ConnectOntologyPyApi($projectId: ID!, $ontologyId: ID!){
project(where: {id: $projectId}) {connectOntology(ontologyId: $ontologyId) {id}}}"""
self.client.execute(query_str, {
'ontologyId': ontology.uid,
'projectId': self.uid
})
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
self.update(setup_complete=timestamp)
[docs] def setup(self, labeling_frontend, labeling_frontend_options) -> None:
""" Finalizes the Project setup.
Args:
labeling_frontend (LabelingFrontend): Which UI to use to label the
data.
labeling_frontend_options (dict or str): Labeling frontend options,
a.k.a. project ontology. If given a `dict` it will be converted
to `str` using `json.dumps`.
"""
if self.labeling_frontend() is not None:
raise ResourceConflict("Editor is already set up.")
if not isinstance(labeling_frontend_options, str):
labeling_frontend_options = json.dumps(labeling_frontend_options)
self.labeling_frontend.connect(labeling_frontend)
LFO = Entity.LabelingFrontendOptions
self.client._create(
LFO, {
LFO.project: self,
LFO.labeling_frontend: labeling_frontend,
LFO.customization_options: labeling_frontend_options
})
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
self.update(setup_complete=timestamp)
[docs] def create_batch(
self,
name: str,
data_rows: Optional[List[Union[str, DataRow]]] = None,
priority: int = 5,
consensus_settings: Optional[Dict[str, float]] = None,
global_keys: Optional[List[str]] = None,
):
"""Create a new batch for a project. One of `global_keys` or `data_rows` must be provided but not both.
Args:
name: a name for the batch, must be unique within a project
data_rows: Either a list of `DataRows` or Data Row ids.
global_keys: global keys for data rows to add to the batch.
priority: An optional priority for the Data Rows in the Batch. 1 highest -> 5 lowest
consensus_settings: An optional dictionary with consensus settings: {'number_of_labels': 3, 'coverage_percentage': 0.1}
"""
# @TODO: make this automatic?
if self.queue_mode != QueueMode.Batch:
raise ValueError("Project must be in batch mode")
dr_ids = []
if data_rows is not None:
for dr in data_rows:
if isinstance(dr, Entity.DataRow):
dr_ids.append(dr.uid)
elif isinstance(dr, str):
dr_ids.append(dr)
else:
raise ValueError(
"`data_rows` must be DataRow ids or DataRow objects")
if data_rows is not None:
row_count = len(dr_ids)
elif global_keys is not None:
row_count = len(global_keys)
else:
row_count = 0
if row_count > 100_000:
raise ValueError(
f"Batch exceeds max size, break into smaller batches")
if not row_count:
raise ValueError("You need at least one data row in a batch")
self._wait_until_data_rows_are_processed(
dr_ids, global_keys, self._wait_processing_max_seconds)
if consensus_settings:
consensus_settings = ConsensusSettings(**consensus_settings).dict(
by_alias=True)
if len(dr_ids) >= 10_000:
return self._create_batch_async(name, dr_ids, global_keys, priority,
consensus_settings)
else:
return self._create_batch_sync(name, dr_ids, global_keys, priority,
consensus_settings)
def _create_batch_sync(self, name, dr_ids, global_keys, priority,
consensus_settings):
method = 'createBatchV2'
query_str = """mutation %sPyApi($projectId: ID!, $batchInput: CreateBatchInput!) {
project(where: {id: $projectId}) {
%s(input: $batchInput) {
batch {
%s
}
failedDataRowIds
}
}
}
""" % (method, method, query.results_query_part(Entity.Batch))
params = {
"projectId": self.uid,
"batchInput": {
"name": name,
"dataRowIds": dr_ids,
"globalKeys": global_keys,
"priority": priority,
"consensusSettings": consensus_settings
}
}
res = self.client.execute(query_str,
params,
timeout=180.0,
experimental=True)["project"][method]
batch = res['batch']
batch['size'] = len(dr_ids)
return Entity.Batch(self.client,
self.uid,
batch,
failed_data_row_ids=res['failedDataRowIds'])
def _create_batch_async(self,
name: str,
dr_ids: Optional[List[str]] = None,
global_keys: Optional[List[str]] = None,
priority: int = 5,
consensus_settings: Optional[Dict[str,
float]] = None):
method = 'createEmptyBatch'
create_empty_batch_mutation_str = """mutation %sPyApi($projectId: ID!, $input: CreateEmptyBatchInput!) {
project(where: {id: $projectId}) {
%s(input: $input) {
id
}
}
}
""" % (method, method)
params = {
"projectId": self.uid,
"input": {
"name": name,
"consensusSettings": consensus_settings
}
}
res = self.client.execute(create_empty_batch_mutation_str,
params,
timeout=180.0,
experimental=True)["project"][method]
batch_id = res['id']
method = 'addDataRowsToBatchAsync'
add_data_rows_mutation_str = """mutation %sPyApi($projectId: ID!, $input: AddDataRowsToBatchInput!) {
project(where: {id: $projectId}) {
%s(input: $input) {
taskId
}
}
}
""" % (method, method)
params = {
"projectId": self.uid,
"input": {
"batchId": batch_id,
"dataRowIds": dr_ids,
"globalKeys": global_keys,
"priority": priority,
}
}
res = self.client.execute(add_data_rows_mutation_str,
params,
timeout=180.0,
experimental=True)["project"][method]
task_id = res['taskId']
task = self._wait_for_task(task_id)
if task.status != "COMPLETE":
raise LabelboxError(f"Batch was not created successfully: " +
json.dumps(task.errors))
# obtain batch entity to return
get_batch_str = """query %s($projectId: ID!, $batchId: ID!) {
project(where: {id: $projectId}) {
batches(where: {id: $batchId}) {
nodes {
%s
}
}
}
}
""" % ("getProjectBatchPyApi",
query.results_query_part(Entity.Batch))
batch = self.client.execute(
get_batch_str, {
"projectId": self.uid,
"batchId": batch_id
},
timeout=180.0,
experimental=True)["project"]["batches"]["nodes"][0]
# TODO async endpoints currently do not provide failed_data_row_ids in response
return Entity.Batch(self.client, self.uid, batch)
def _update_queue_mode(self, mode: "QueueMode") -> "QueueMode":
"""
Updates the queueing mode of this project.
Deprecation notice: This method is deprecated. Going forward, projects must
go through a migration to have the queue mode changed. Users should specify the
queue mode for a project during creation if a non-default mode is desired.
For more information, visit https://docs.labelbox.com/reference/migrating-to-workflows#upcoming-changes
Args:
mode: the specified queue mode
Returns: the updated queueing mode of this project
"""
logger.warning(
"Updating the queue_mode for a project will soon no longer be supported."
)
if self.queue_mode == mode:
return mode
if mode == QueueMode.Batch:
status = "ENABLED"
elif mode == QueueMode.Dataset:
status = "DISABLED"
else:
raise ValueError(
"Must provide either `BATCH` or `DATASET` as a mode")
query_str = """mutation %s($projectId: ID!, $status: TagSetStatusInput!) {
project(where: {id: $projectId}) {
setTagSetStatus(input: {tagSetStatus: $status}) {
tagSetStatus
}
}
}
""" % "setTagSetStatusPyApi"
self.client.execute(query_str, {
'projectId': self.uid,
'status': status
})
return mode
[docs] def get_queue_mode(self) -> "QueueMode":
"""
Provides the queue mode used for this project.
Deprecation notice: This method is deprecated and will be removed in
a future version. To obtain the queue mode of a project, simply refer
to the queue_mode attribute of a Project.
For more information, visit https://docs.labelbox.com/reference/migrating-to-workflows#upcoming-changes
Returns: the QueueMode for this project
"""
logger.warning(
"Obtaining the queue_mode for a project through this method will soon"
" no longer be supported.")
query_str = """query %s($projectId: ID!) {
project(where: {id: $projectId}) {
tagSetStatus
}
}
""" % "GetTagSetStatusPyApi"
status = self.client.execute(
query_str, {'projectId': self.uid})["project"]["tagSetStatus"]
if status == "ENABLED":
return QueueMode.Batch
elif status == "DISABLED":
return QueueMode.Dataset
else:
raise ValueError("Status not known")
def validate_labeling_parameter_overrides(self, data) -> None:
for idx, row in enumerate(data):
if len(row) != 3:
raise TypeError(
f"Data must be a list of tuples containing a DataRow, priority (int), num_labels (int). Found {len(row)} items. Index: {idx}"
)
data_row, priority, num_labels = row
if not isinstance(data_row, Entity.DataRow):
raise TypeError(
f"data_row should be be of type DataRow. Found {type(data_row)}. Index: {idx}"
)
for name, value in [["Priority", priority],
["Number of labels", num_labels]]:
if not isinstance(value, int):
raise TypeError(
f"{name} must be an int. Found {type(value)} for data_row {data_row}. Index: {idx}"
)
if value < 1:
raise ValueError(
f"{name} must be greater than 0 for data_row {data_row}. Index: {idx}"
)
[docs] def set_labeling_parameter_overrides(self, data) -> bool:
""" Adds labeling parameter overrides to this project.
See information on priority here:
https://docs.labelbox.com/en/configure-editor/queue-system#reservation-system
>>> project.set_labeling_parameter_overrides([
>>> (data_row_1, 2, 3), (data_row_2, 1, 4)])
Args:
data (iterable): An iterable of tuples. Each tuple must contain
(DataRow, priority<int>, number_of_labels<int>) for the new override.
Priority:
* Data will be labeled in priority order.
- A lower number priority is labeled first.
- Minimum priority is 1.
* Priority is not the queue position.
- The position is determined by the relative priority.
- E.g. [(data_row_1, 5,1), (data_row_2, 2,1), (data_row_3, 10,1)]
will be assigned in the following order: [data_row_2, data_row_1, data_row_3]
* Datarows with parameter overrides will appear before datarows without overrides.
* The priority only effects items in the queue.
- Assigning a priority will not automatically add the item back into the queue.
Number of labels:
* The number of times a data row should be labeled.
- Creates duplicate data rows in a project (one for each number of labels).
* New duplicated data rows will be added to the queue.
- Already labeled duplicates will not be sent back to the queue.
* The queue will never assign the same datarow to a single labeler more than once.
- If the number of labels is greater than the number of labelers working on a project then
the extra items will remain in the queue (this can be fixed by removing the override at any time).
* Setting this to 1 will result in the default behavior (no duplicates).
Returns:
bool, indicates if the operation was a success.
"""
logger.warning(
"LabelingParameterOverrides are deprecated for new projects, and will eventually be removed "
"completely. Prefer to use batch based queuing with priority & consensus number of labels instead."
)
self.validate_labeling_parameter_overrides(data)
data_str = ",\n".join(
"{dataRow: {id: \"%s\"}, priority: %d, numLabels: %d }" %
(data_row.uid, priority, num_labels)
for data_row, priority, num_labels in data)
id_param = "projectId"
query_str = """mutation SetLabelingParameterOverridesPyApi($%s: ID!){
project(where: { id: $%s }) {setLabelingParameterOverrides
(data: [%s]) {success}}} """ % (id_param, id_param, data_str)
res = self.client.execute(query_str, {id_param: self.uid})
return res["project"]["setLabelingParameterOverrides"]["success"]
[docs] def unset_labeling_parameter_overrides(self, data_rows) -> bool:
""" Removes labeling parameter overrides to this project.
* This will remove unlabeled duplicates in the queue.
Args:
data_rows (iterable): An iterable of DataRows.
Returns:
bool, indicates if the operation was a success.
"""
id_param = "projectId"
query_str = """mutation UnsetLabelingParameterOverridesPyApi($%s: ID!){
project(where: { id: $%s}) {
unsetLabelingParameterOverrides(data: [%s]) { success }}}""" % (
id_param, id_param, ",\n".join(
"{dataRowId: \"%s\"}" % row.uid for row in data_rows))
res = self.client.execute(query_str, {id_param: self.uid})
return res["project"]["unsetLabelingParameterOverrides"]["success"]
[docs] def upsert_review_queue(self, quota_factor) -> None:
""" Sets the the proportion of total assets in a project to review.
More information can be found here:
https://docs.labelbox.com/en/quality-assurance/review-labels#configure-review-percentage
Args:
quota_factor (float): Which part (percentage) of the queue
to reinitiate. Between 0 and 1.
"""
if not 0. <= quota_factor <= 1.:
raise ValueError("Quota factor must be in the range of [0,1]")
id_param = "projectId"
quota_param = "quotaFactor"
query_str = """mutation UpsertReviewQueuePyApi($%s: ID!, $%s: Float!){
upsertReviewQueue(where:{project: {id: $%s}}
data:{quotaFactor: $%s}) {id}}""" % (
id_param, quota_param, id_param, quota_param)
res = self.client.execute(query_str, {
id_param: self.uid,
quota_param: quota_factor
})
[docs] def extend_reservations(self, queue_type) -> int:
""" Extends all the current reservations for the current user on the given
queue type.
Args:
queue_type (str): Either "LabelingQueue" or "ReviewQueue"
Returns:
int, the number of reservations that were extended.
"""
if queue_type not in ("LabelingQueue", "ReviewQueue"):
raise InvalidQueryError("Unsupported queue type: %s" % queue_type)
id_param = "projectId"
query_str = """mutation ExtendReservationsPyApi($%s: ID!){
extendReservations(projectId:$%s queueType:%s)}""" % (
id_param, id_param, queue_type)
res = self.client.execute(query_str, {id_param: self.uid})
return res["extendReservations"]
[docs] def enable_model_assisted_labeling(self, toggle: bool = True) -> bool:
""" Turns model assisted labeling either on or off based on input
Args:
toggle (bool): True or False boolean
Returns:
True if toggled on or False if toggled off
"""
project_param = "project_id"
show_param = "show"
query_str = """mutation toggle_model_assisted_labelingPyApi($%s: ID!, $%s: Boolean!) {
project(where: {id: $%s }) {
showPredictionsToLabelers(show: $%s) {
id, showingPredictionsToLabelers
}
}
}""" % (project_param, show_param, project_param, show_param)
params = {project_param: self.uid, show_param: toggle}
res = self.client.execute(query_str, params)
return res["project"]["showPredictionsToLabelers"][
"showingPredictionsToLabelers"]
[docs] def bulk_import_requests(self) -> PaginatedCollection:
""" Returns bulk import request objects which are used in model-assisted labeling.
These are returned with the oldest first, and most recent last.
"""
id_param = "project_id"
query_str = """query ListAllImportRequestsPyApi($%s: ID!) {
bulkImportRequests (
where: { projectId: $%s }
skip: %%d
first: %%d
) {
%s
}
}""" % (id_param, id_param,
query.results_query_part(Entity.BulkImportRequest))
return PaginatedCollection(self.client, query_str,
{id_param: str(self.uid)},
["bulkImportRequests"],
Entity.BulkImportRequest)
[docs] def batches(self) -> PaginatedCollection:
""" Fetch all batches that belong to this project
Returns:
A `PaginatedCollection` of `Batch`es
"""
id_param = "projectId"
query_str = """query GetProjectBatchesPyApi($from: String, $first: PageSize, $%s: ID!) {
project(where: {id: $%s}) {id
batches(after: $from, first: $first) { nodes { %s } pageInfo { endCursor }}}}
""" % (id_param, id_param, query.results_query_part(Entity.Batch))
return PaginatedCollection(
self.client,
query_str, {id_param: self.uid}, ['project', 'batches', 'nodes'],
lambda client, res: Entity.Batch(client, self.uid, res),
cursor_path=['project', 'batches', 'pageInfo', 'endCursor'],
experimental=True)
[docs] def task_queues(self) -> List[TaskQueue]:
""" Fetch all task queues that belong to this project
Returns:
A `List` of `TaskQueue`s
"""
query_str = """query GetProjectTaskQueuesPyApi($projectId: ID!) {
project(where: {id: $projectId}) {
taskQueues {
%s
}
}
}
""" % (query.results_query_part(Entity.TaskQueue))
task_queue_values = self.client.execute(
query_str, {"projectId": self.uid},
timeout=180.0,
experimental=True)["project"]["taskQueues"]
return [
Entity.TaskQueue(self.client, field_values)
for field_values in task_queue_values
]
[docs] def move_data_rows_to_task_queue(self, data_row_ids: List[str],
task_queue_id: str):
"""
Moves data rows to the specified task queue.
Args:
data_row_ids: a list of data row ids to be moved
task_queue_id: the task queue id to be moved to, or None to specify the "Done" queue
Returns:
None if successful, or a raised error on failure
"""
method = "createBulkAddRowsToQueueTask"
query_str = """mutation AddDataRowsToTaskQueueAsyncPyApi(
$projectId: ID!
$queueId: ID
$dataRowIds: [ID!]!
) {
project(where: { id: $projectId }) {
%s(
data: { queueId: $queueId, dataRowIds: $dataRowIds }
) {
taskId
}
}
}
""" % method
task_id = self.client.execute(
query_str, {
"projectId": self.uid,
"queueId": task_queue_id,
"dataRowIds": data_row_ids
},
timeout=180.0,
experimental=True)["project"][method]["taskId"]
task = self._wait_for_task(task_id)
if task.status != "COMPLETE":
raise LabelboxError(f"Data rows were not moved successfully: " +
json.dumps(task.errors))
def _wait_for_task(self, task_id: str) -> Task:
task = Task.get_task(self.client, task_id)
task.wait_till_done()
return task
[docs] def upload_annotations(
self,
name: str,
annotations: Union[str, Path, Iterable[Dict]],
validate: bool = False) -> 'BulkImportRequest': # type: ignore
""" Uploads annotations to a new Editor project.
Args:
name (str): name of the BulkImportRequest job
annotations (str or Path or Iterable):
url that is publicly accessible by Labelbox containing an
ndjson file
OR local path to an ndjson file
OR iterable of annotation rows
validate (bool):
Whether or not to validate the payload before uploading.
Returns:
BulkImportRequest
"""
if isinstance(annotations, str) or isinstance(annotations, Path):
def _is_url_valid(url: Union[str, Path]) -> bool:
""" Verifies that the given string is a valid url.
Args:
url: string to be checked
Returns:
True if the given url is valid otherwise False
"""
if isinstance(url, Path):
return False
parsed = urlparse(url)
return bool(parsed.scheme) and bool(parsed.netloc)
if _is_url_valid(annotations):
return Entity.BulkImportRequest.create_from_url(
client=self.client,
project_id=self.uid,
name=name,
url=str(annotations),
validate=validate)
else:
path = Path(annotations)
if not path.exists():
raise FileNotFoundError(
f'{annotations} is not a valid url nor existing local file'
)
return Entity.BulkImportRequest.create_from_local_file(
client=self.client,
project_id=self.uid,
name=name,
file=path,
validate_file=validate,
)
elif isinstance(annotations, Iterable):
return Entity.BulkImportRequest.create_from_objects(
client=self.client,
project_id=self.uid,
name=name,
predictions=annotations, # type: ignore
validate=validate)
else:
raise ValueError(
f'Invalid annotations given of type: {type(annotations)}')
def _wait_until_data_rows_are_processed(
self,
data_row_ids: Optional[List[str]] = None,
global_keys: Optional[List[str]] = None,
wait_processing_max_seconds: int = _wait_processing_max_seconds,
sleep_interval=30):
""" Wait until all the specified data rows are processed"""
start_time = datetime.now()
while True:
if (datetime.now() -
start_time).total_seconds() >= wait_processing_max_seconds:
raise ProcessingWaitTimeout(
"Maximum wait time exceeded while waiting for data rows to be processed. Try creating a batch a bit later"
)
all_good = self.__check_data_rows_have_been_processed(
data_row_ids, global_keys)
if all_good:
return
logger.debug(
'Some of the data rows are still being processed, waiting...')
time.sleep(sleep_interval)
def __check_data_rows_have_been_processed(
self,
data_row_ids: Optional[List[str]] = None,
global_keys: Optional[List[str]] = None):
if data_row_ids is not None and len(data_row_ids) > 0:
param_name = "dataRowIds"
params = {param_name: data_row_ids}
else:
param_name = "globalKeys"
global_keys = global_keys if global_keys is not None else []
params = {param_name: global_keys}
query_str = """query CheckAllDataRowsHaveBeenProcessedPyApi($%s: [ID!]) {
queryAllDataRowsHaveBeenProcessed(%s:$%s) {
allDataRowsHaveBeenProcessed
}
}""" % (param_name, param_name, param_name)
response = self.client.execute(query_str, params)
return response["queryAllDataRowsHaveBeenProcessed"][
"allDataRowsHaveBeenProcessed"]
[docs]class ProjectMember(DbObject):
user = Relationship.ToOne("User", cache=True)
role = Relationship.ToOne("Role", cache=True)
[docs]class LabelingParameterOverride(DbObject):
""" Customizes the order of assets in the label queue.
Attributes:
priority (int): A prioritization score.
number_of_labels (int): Number of times an asset should be labeled.
"""
priority = Field.Int("priority")
number_of_labels = Field.Int("number_of_labels")
data_row = Relationship.ToOne("DataRow", cache=True)
LabelerPerformance = namedtuple(
"LabelerPerformance", "user count seconds_per_label, total_time_labeling "
"consensus average_benchmark_agreement last_activity_time")
LabelerPerformance.__doc__ = (
"Named tuple containing info about a labeler's performance.")
def _check_converter_import():
if 'LBV1Converter' not in globals():
raise ImportError(
"Missing dependencies to import converter. "
"Use `pip install labelbox[data] --upgrade` to add missing dependencies. "
"or download raw json with project.export_labels()")