import json
import logging
import time
from pathlib import Path
from typing import Any
from typing import BinaryIO
from typing import Dict
from typing import Iterable
from typing import Set
from typing import Tuple
from typing import Union
import backoff
import ndjson
import requests
from labelbox import utils
import labelbox.exceptions
from labelbox.orm import query
from labelbox.orm.db_object import DbObject
from labelbox.orm.model import Field
from labelbox.orm.model import Relationship
from labelbox.schema.enums import BulkImportRequestState
NDJSON_MIME_TYPE = "application/x-ndjson"
logger = logging.getLogger(__name__)
def _make_file_name(project_id: str, name: str) -> str:
return f"{project_id}__{name}.ndjson"
# TODO(gszpak): move it to client.py
def _make_request_data(project_id: str, name: str, content_length: int,
file_name: str) -> dict:
query_str = """mutation createBulkImportRequestFromFilePyApi(
$projectId: ID!, $name: String!, $file: Upload!, $contentLength: Int!) {
createBulkImportRequest(data: {
projectId: $projectId,
name: $name,
filePayload: {
file: $file,
contentLength: $contentLength
}
}) {
%s
}
}
""" % query.results_query_part(BulkImportRequest)
variables = {
"projectId": project_id,
"name": name,
"file": None,
"contentLength": content_length
}
operations = json.dumps({"variables": variables, "query": query_str})
return {
"operations": operations,
"map": (None, json.dumps({file_name: ["variables.file"]}))
}
# TODO(gszpak): move it to client.py
def _send_create_file_command(
client, request_data: dict, file_name: str,
file_data: Tuple[str, Union[bytes, BinaryIO], str]) -> dict:
response = requests.post(
client.endpoint,
headers={"authorization": "Bearer %s" % client.api_key},
data=request_data,
files={file_name: file_data})
try:
response_json = response.json()
except ValueError:
raise labelbox.exceptions.LabelboxError(
"Failed to parse response as JSON: %s" % response.text)
response_data = response_json.get("data", None)
if response_data is None:
raise labelbox.exceptions.LabelboxError(
"Failed to upload, message: %s" % response_json.get("errors", None))
if not response_data.get("createBulkImportRequest", None):
raise labelbox.exceptions.LabelboxError(
"Failed to create BulkImportRequest, message: %s" %
response_json.get("errors", None) or
response_data.get("error", None))
return response_data
[docs]class BulkImportRequest(DbObject):
"""Represents the import job when importing annotations.
Attributes:
name (str)
state (Enum): FAILED, RUNNING, or FINISHED (Refers to the whole import job)
input_file_url (str): URL to your web-hosted NDJSON file
error_file_url (str): NDJSON that contains error messages for failed annotations
status_file_url (str): NDJSON that contains status for each annotation
created_at (datetime): UTC timestamp for date BulkImportRequest was created
project (Relationship): `ToOne` relationship to Project
created_by (Relationship): `ToOne` relationship to User
"""
name = Field.String("name")
state = Field.Enum(BulkImportRequestState, "state")
input_file_url = Field.String("input_file_url")
error_file_url = Field.String("error_file_url")
status_file_url = Field.String("status_file_url")
created_at = Field.DateTime("created_at")
project = Relationship.ToOne("Project")
created_by = Relationship.ToOne("User", False, "created_by")
[docs] def refresh(self) -> None:
"""Synchronizes values of all fields with the database.
"""
query_str, params = query.get_single(BulkImportRequest, self.uid)
res = self.client.execute(query_str, params)
res = res[utils.camel_case(BulkImportRequest.type_name())]
self._set_field_values(res)
[docs] def wait_until_done(self, sleep_time_seconds: int = 30) -> None:
"""Blocks import job until certain conditions are met.
Blocks until the BulkImportRequest.state changes either to
`BulkImportRequestState.FINISHED` or `BulkImportRequestState.FAILED`,
periodically refreshing object's state.
Args:
sleep_time_seconds (str): a time to block between subsequent API calls
"""
while self.state == BulkImportRequestState.RUNNING:
logger.info(f"Sleeping for {sleep_time_seconds} seconds...")
time.sleep(sleep_time_seconds)
self.__exponential_backoff_refresh()
@backoff.on_exception(
backoff.expo,
(labelbox.exceptions.ApiLimitError, labelbox.exceptions.TimeoutError,
labelbox.exceptions.NetworkError),
max_tries=10,
jitter=None)
def __exponential_backoff_refresh(self) -> None:
self.refresh()
@classmethod
def from_name(cls, client, project_id: str,
name: str) -> 'BulkImportRequest':
""" Fetches existing BulkImportRequest.
Args:
client (Client): a Labelbox client
project_id (str): BulkImportRequest's project id
name (str): name of BulkImportRequest
Returns:
BulkImportRequest object
"""
query_str = """query getBulkImportRequestPyApi(
$projectId: ID!, $name: String!) {
bulkImportRequest(where: {
projectId: $projectId,
name: $name
}) {
%s
}
}
""" % query.results_query_part(cls)
params = {"projectId": project_id, "name": name}
response = client.execute(query_str, params=params)
return cls(client, response['bulkImportRequest'])
@classmethod
def create_from_url(cls, client, project_id: str, name: str,
url: str) -> 'BulkImportRequest':
"""
Creates a BulkImportRequest from a publicly accessible URL
to an ndjson file with predictions.
Args:
client (Client): a Labelbox client
project_id (str): id of project for which predictions will be imported
name (str): name of BulkImportRequest
url (str): publicly accessible URL pointing to ndjson file containing predictions
Returns:
BulkImportRequest object
"""
query_str = """mutation createBulkImportRequestPyApi(
$projectId: ID!, $name: String!, $fileUrl: String!) {
createBulkImportRequest(data: {
projectId: $projectId,
name: $name,
fileUrl: $fileUrl
}) {
%s
}
}
""" % query.results_query_part(cls)
params = {"projectId": project_id, "name": name, "fileUrl": url}
bulk_import_request_response = client.execute(query_str, params=params)
return cls(client,
bulk_import_request_response["createBulkImportRequest"])
@classmethod
def create_from_objects(cls, client, project_id: str, name: str,
predictions: Iterable[dict]) -> 'BulkImportRequest':
"""
Creates a `BulkImportRequest` from an iterable of dictionaries.
Conforms to JSON predictions format, e.g.:
``{
"uuid": "9fd9a92e-2560-4e77-81d4-b2e955800092",
"schemaId": "ckappz7d700gn0zbocmqkwd9i",
"dataRow": {
"id": "ck1s02fqxm8fi0757f0e6qtdc"
},
"bbox": {
"top": 48,
"left": 58,
"height": 865,
"width": 1512
}
}``
Args:
client (Client): a Labelbox client
project_id (str): id of project for which predictions will be imported
name (str): name of BulkImportRequest
predictions (Iterable[dict]): iterable of dictionaries representing predictions
Returns:
BulkImportRequest object
"""
_validate_ndjson(predictions)
data_str = ndjson.dumps(predictions)
if not data_str:
raise ValueError('annotations cannot be empty')
data = data_str.encode('utf-8')
file_name = _make_file_name(project_id, name)
request_data = _make_request_data(project_id, name, len(data_str),
file_name)
file_data = (file_name, data, NDJSON_MIME_TYPE)
response_data = _send_create_file_command(client,
request_data=request_data,
file_name=file_name,
file_data=file_data)
return cls(client, response_data["createBulkImportRequest"])
@classmethod
def create_from_local_file(cls,
client,
project_id: str,
name: str,
file: Path,
validate_file=True) -> 'BulkImportRequest':
"""
Creates a BulkImportRequest from a local ndjson file with predictions.
Args:
client (Client): a Labelbox client
project_id (str): id of project for which predictions will be imported
name (str): name of BulkImportRequest
file (Path): local ndjson file with predictions
validate_file (bool): a flag indicating if there should be a validation
if `file` is a valid ndjson file
Returns:
BulkImportRequest object
"""
file_name = _make_file_name(project_id, name)
content_length = file.stat().st_size
request_data = _make_request_data(project_id, name, content_length,
file_name)
with file.open('rb') as f:
if validate_file:
reader = ndjson.reader(f)
# ensure that the underlying json load call is valid
# https://github.com/rhgrant10/ndjson/blob/ff2f03c56b21f28f7271b27da35ca4a8bf9a05d0/ndjson/api.py#L53
# by iterating through the file so we only store
# each line in memory rather than the entire file
try:
_validate_ndjson(reader)
except ValueError:
raise ValueError(f"{file} is not a valid ndjson file")
else:
f.seek(0)
file_data = (file.name, f, NDJSON_MIME_TYPE)
response_data = _send_create_file_command(client, request_data,
file_name, file_data)
return cls(client, response_data["createBulkImportRequest"])
def _validate_ndjson(lines: Iterable[Dict[str, Any]]) -> None:
"""Validate individual ndjson lines.
- verifies that uuids are unique
"""
uuids: Set[str] = set()
for line in lines:
uuid = line['uuid']
if uuid in uuids:
raise labelbox.exceptions.UuidError(
f'{uuid} already used in this import job, '
'must be unique for the project.')
uuids.add(uuid)