import logging
import time
from labelbox.exceptions import ResourceNotFoundError
from labelbox.orm.db_object import DbObject
from labelbox.orm.model import Field, Relationship
logger = logging.getLogger(__name__)
[docs]class Task(DbObject):
""" Represents a server-side process that might take a longer time to process.
Allows the Task state to be updated and checked on the client side.
Attributes:
updated_at (datetime)
created_at (datetime)
name (str)
status (str)
completion_percentage (float)
created_by (Relationship): `ToOne` relationship to User
organization (Relationship): `ToOne` relationship to Organization
"""
updated_at = Field.DateTime("updated_at")
created_at = Field.DateTime("created_at")
name = Field.String("name")
status = Field.String("status")
completion_percentage = Field.Float("completion_percentage")
# Relationships
created_by = Relationship.ToOne("User", False, "created_by")
organization = Relationship.ToOne("Organization")
[docs] def refresh(self):
""" Refreshes Task data from the server. """
tasks = list(self._user.created_tasks(where=Task.uid == self.uid))
if len(tasks) != 1:
raise ResourceNotFoundError(Task, self.uid)
for field in self.fields():
setattr(self, field.name, getattr(tasks[0], field.name))
[docs] def wait_till_done(self, timeout_seconds=300):
""" Waits until the task is completed. Periodically queries the server
to update the task attributes.
Args:
timeout_seconds (float): Maximum time this method can block, in seconds. Defaults to one minute.
"""
check_frequency = 2 # frequency of checking, in seconds
while True:
if self.status != "IN_PROGRESS":
return
sleep_time_seconds = min(check_frequency, timeout_seconds)
logger.debug("Task.wait_till_done sleeping for %.2f seconds" %
sleep_time_seconds)
if sleep_time_seconds <= 0:
break
timeout_seconds -= check_frequency
time.sleep(sleep_time_seconds)
self.refresh()