Source code for labelbox.schema.task

import logging
import time

from labelbox.exceptions import ResourceNotFoundError
from labelbox.orm.db_object import DbObject
from labelbox.orm.model import Field, Relationship

logger = logging.getLogger(__name__)


[docs]class Task(DbObject): """ Represents a server-side process that might take a longer time to process. Allows the Task state to be updated and checked on the client side. Attributes: updated_at (datetime) created_at (datetime) name (str) status (str) completion_percentage (float) created_by (Relationship): `ToOne` relationship to User organization (Relationship): `ToOne` relationship to Organization """ updated_at = Field.DateTime("updated_at") created_at = Field.DateTime("created_at") name = Field.String("name") status = Field.String("status") completion_percentage = Field.Float("completion_percentage") # Relationships created_by = Relationship.ToOne("User", False, "created_by") organization = Relationship.ToOne("Organization")
[docs] def refresh(self): """ Refreshes Task data from the server. """ tasks = list(self._user.created_tasks(where=Task.uid == self.uid)) if len(tasks) != 1: raise ResourceNotFoundError(Task, self.uid) for field in self.fields(): setattr(self, field.name, getattr(tasks[0], field.name))
[docs] def wait_till_done(self, timeout_seconds=300): """ Waits until the task is completed. Periodically queries the server to update the task attributes. Args: timeout_seconds (float): Maximum time this method can block, in seconds. Defaults to one minute. """ check_frequency = 2 # frequency of checking, in seconds while True: if self.status != "IN_PROGRESS": return sleep_time_seconds = min(check_frequency, timeout_seconds) logger.debug("Task.wait_till_done sleeping for %.2f seconds" % sleep_time_seconds) if sleep_time_seconds <= 0: break timeout_seconds -= check_frequency time.sleep(sleep_time_seconds) self.refresh()