Collections

The result of terminated Query is a collection, specifically an instance of either EntityCollection or EmptyCollection, depending on the outcome. When printed, the collection is represented as:

EntityCollection[<Entity Type>]{<Number of Results>} or EmptyCollection[<Entity Type>]

An EntityCollection is a container of wrapped ftrack entity objects with the following definitions:

  • It is an ordered container of entity objects.

  • It is immutable, meaning its contents entities can not be added or removed once created.

  • It is iterable, allowing for easy iteration over the entities no matter if there is a single or multiple entities in it.

  • It only contains unique elements, ensuring there are no duplicate entities.

An EmptyCollection is placeholder for an EntityCollection that doesn’t contain any entities.

  • It is iterable, allowing for iteration even though it doesn’t have any entities.

  • It allows for any attribute access that you would typically perform on an EntityCollection, providing flexibility for operations or checks on the collection itself.

EntityCollection

Higher-Order Methods

The EntityCollection class provides higher-order methods that accept functions as arguments, aligning with the principles of functional programming.

apply

apply(predicate, attribute_name=None) applies a given predicate function to each element in the collection and assigns the generated value to the specified attribute. If no attribute name is provided, the value is directly assigned to the calling collection.

Example 1: Override status of all tasks associated to an AssetVersion collection.

from trackteroid import (
    Query,
    AssetVersion,
    Status,
    Task
)
assetversion_collection = Query(AssetVersion).get_all(
    limit=5,
    projections=[Task, Task.Status, Task.Status.name]
)
print(
    assetversion_collection.Task,
    assetversion_collection.Task.Status.name
)
# output: EntityCollection[Task]{5} ['Done', 'Internal Approved', 'Not Started', 'Tweak', 'Final']

status_collection = Query(Status).by_name("In Progress").get_first(projections=["name"])
print(status_collection.name)
# output: ['In Progress']

assetversion_collection.Task.Status.apply(lambda avc: status_collection)
print(
    assetversion_collection.Task,
    assetversion_collection.Task.Status.name
)
# output: EntityCollection[Task]{5} ['In Progress']

Example 1: Extend the comment field of items within an AssetVersion collection.

from trackteroid import (
    Query,
    AssetVersion
)
assetversion_collection = Query(AssetVersion).get_all(
    limit=1, 
    projections=["comment"]
)
print(assetversion_collection.comment)
# output: ['submit version for review']

assetversion_collection.apply(lambda avc: avc.comment[0] + " (edited)", attribute_name="comment")
print(assetversion_collection.comment)
# output: ['submit version for review (edited)']

count

count(predicate) returns the number of elements for which a given predicate function returns True.

Example 1: Count the occurrences of assets that contain “character” and “environment”.

from trackteroid import (
    Query,
    Asset
)

asset_collection = Query(Asset).by_name("%character%", "%environment%").get_all(
    limit=100,
    projections=["name"]
)
print(asset_collection)
# output: EntityCollection[Asset]{100}

print(
    asset_collection.count(lambda ac: "character" in ac.name[0]),
    asset_collection.count(lambda ac: "environment" in ac.name[0]),
)
# output: 65 18

filter

filter(predicate) is used to selectively filter a collection based on a given predicate function. The predicate function is applied to each element in the collection, and its return value, which is expected to be a boolean or coercible to a boolean, determines whether the element is included in the resulting collection. Elements for which the predicate returns True are added to the filtered collection, while those for which it returns False are excluded.

Example 1: Generate a new collection that only contains elements with “prop” in the name.

from trackteroid import (
    Query,
    Asset
)
asset_collection = Query(Asset).get_all(
    limit=100,
    projections=["name"]
)
print(asset_collection)
# output: EntityCollection[Asset]{100}

prop_asset_collection = asset_collection.filter(lambda ac: "prop" in ac.name[0])
print(prop_asset_collection)
# output: EntityCollection[Asset]{1}

Example 2: Generate a new collection that only contains the assets which have 10 or more versions associated.

from trackteroid import (
    Query,
    Asset,
    AssetVersion
)
asset_collection = Query(Asset).get_all(
    limit=100,
    projections=[AssetVersion]
)
print(asset_collection)
# output: EntityCollection[Asset]{100}

frequent_asset_collection = asset_collection.filter(lambda ac: len(ac.AssetVersion) >= 10)
print(frequent_asset_collection)
# output: EntityCollection[Asset]{11}

fold

fold(start_value, predicate) accumulates the value starting with an initial value and applying an operation from the first to the last element in a collection.

Example 1: Determine the total filesize of all components for one Asset.

from trackteroid import (
    Query,
    Asset,
    Component
)

asset_collection = Query(Asset).get_first(
    projections=[
        Component,
        Component.size
    ]
)

print(asset_collection.Component.size)
# output: [1572543, 1940586, 4921736, ...]

print(
    "Asset's components total size: {:.2f} MB".format(
        asset_collection.Component.fold(
            0, lambda current, cc: current + cc.size[0]
        ) / 1024**2
    )
)
# output: Asset's components total size: 77.43 MB

group

group(predicate) returns a dictionary with keys given by the predicate. All entities from the original collection will be mapped to their corresponding key.

Example 1: Group all AssetVersion collections via name of their asset.

from pprint import pprint

from trackteroid import (
    Query,
    AssetVersion,
    Asset
)

pprint(
    Query(AssetVersion).get_all(
        limit=10,
        projections=[Asset.name]
    ).group(lambda avc: avc.Asset.name[0])
)
# output:
# {'Animation': EntityCollection[AssetVersion]{3},
#  'bc0040_comp': EntityCollection[AssetVersion]{1},
#  'classic_console_01': EntityCollection[AssetVersion]{1},
#  'classic_nightstand_01': EntityCollection[AssetVersion]{1},
#  'coffee_table_01': EntityCollection[AssetVersion]{1},
#  'gothic_bed_01': EntityCollection[AssetVersion]{1},
#  'gothic_console_01': EntityCollection[AssetVersion]{1},
#  'wooden_table_03': EntityCollection[AssetVersion]{1}}

Example 2: Group all AssetVersion collections via state name of their status.

from pprint import pprint

from trackteroid import (
    Query,
    AssetVersion,
    State
)

pprint(
    Query(AssetVersion).get_all(
        limit=10,
        projections=[State.name]
    ).group(lambda avc: avc.State.name[0])
)
# output:
# {'Done': EntityCollection[AssetVersion]{1},
#  'In Progress': EntityCollection[AssetVersion]{9}}

group_and_map

group_and_map(group_predicate, map_predicate) runs a group first and then runs the map predicate function on all the collections in the resulting dictionary values.

Example1: Provide a status -> last note overview for versions created by a given user.

from pprint import pprint

from trackteroid import (
    Query,
    AssetVersion,
    Note,
    Status
)

pprint(
    Query(AssetVersion).by_publisher(
        "leandra.rosa@example.com"
    ).get_all(
        limit=10,
        projections=[
            Status.name,
            Note.content,
            "version"
        ]
    ).group_and_map(
        lambda avc: f"{avc.Asset.name[0]}_v{str(avc.version[0]).zfill(3)}",
        lambda avc: f"{avc.Status.name[0]} - {avc.Note.content[0] or 'No notes provided yet.'}"
    )
)
# output:
# {'Animation_v001': 'Revise - I think we will still have him approach sooner. '
#                    'revised expression.',
#  'Animation_v002': "Revise - Isn't his left eyebrow too low? 2: I think we can "
#                    'improve his expression at the end of the shot. Make the '
#                    'expression a little more asymmetrical.',
#  'Animation_v003': 'Approved - Good job, Approved!',
#  'bc0040_layout_v001': 'Revise - Is this the approved camera? Seems way too '
#                        'wide. Scale is off as well',
#  'bc0040_layout_v002': 'Approved - Approved'}

map

map(predicate) generates a sequence of results by applying a given predicate function to each element in the collection. The predicate function is invoked for each element, and its return value is included in the generated sequence.

Example1: Generate a formatted representation of the AssetVersion combining the version number and the Asset name.

from trackteroid import (
    Query,
    Asset,
    AssetVersion
)

print(
    Query(AssetVersion).get_all(
        limit=2,
        projections=[Asset.name, "version"]
    ).map(lambda avc: f"{avc.Asset.name[0]}:v{str(avc.version[0]).zfill(3)}")
)
# output: ['bc0040_comp:v003', 'classic_console_01:v001']

max

max(predicate) returns the first element yielding the largest value of the given function.

Example1: Get the AssetVersion collection with the largest version number for one asset.

from trackteroid import (
    Query,
    AssetVersion
)

assetversion_collection = Query(AssetVersion).by_name("bc0050_comp").get_all(projections=["version"])
max_version_collection = assetversion_collection.max(lambda avc: avc.version)
print(
    assetversion_collection.version,
    max_version_collection,
    max_version_collection.version
)
# output: [1, 2, 3] EntityCollection[AssetVersion]{1} [3]

Example2: Get the Asset collection that holds an AssetVersion with the largest version number.

from trackteroid import (
    Query,
    Asset,
    AssetVersion
)

asset_collection = Query(Asset).\
    by_name("bc0050_comp", "classic_console_01").\
    get_all(
    projections=[
        AssetVersion,
        AssetVersion.version
    ]
)
print(asset_collection.max(lambda ac: ac.AssetVersion.version).name)
# ['classic_console_01']

Attention

The result of max will always be a single element collection even if multiple elements yield the same largest value. If multiple entities have the same max value, we follow Python’s max implementation by returning the last occurence of this value.

min

min(predicate) returns the first element yielding the smallest value of the given function.

Example1: Get the AssetVersion collection with the smallest version number for one asset.

from trackteroid import (
    Query,
    AssetVersion
)

assetversion_collection = Query(AssetVersion).by_name("bc0050_comp").get_all(projections=["version"])
max_version_collection = assetversion_collection.min(lambda avc: avc.version)
print(
    assetversion_collection.version,
    max_version_collection,
    max_version_collection.version
)
# output: [1, 2, 3] EntityCollection[AssetVersion]{1} [1]

Example2: Get the Asset collection that holds an AssetVersion with the smallest version number.

from trackteroid import (
    Query,
    Asset,
    AssetVersion
)

asset_collection = Query(Asset).\
    by_name("bc0050_comp", "classic_console_01").\
    get_all(
    projections=[
        AssetVersion,
        AssetVersion.version
    ]
)
print(asset_collection.min(lambda ac: ac.AssetVersion.version).name)
# ['bc0050_comp']

Attention

The result of min will always be a single element collection even if multiple elements yield the same smallest value. If multiple entities have the same min value, we follow Python’s min implementation by returning the first occurence of this value.

partition

partition(predicate) splits the collection into tuple of EntityCollections/EmptyCollections, where the first item contains a collection with elements for which the predicate returned True, while the second items contains a collection with elements for which the predicate returned False.

Example1: Split the AssetVersion collection based on a potential “Done” state of the elements.

from trackteroid import (
    Query,
    AssetVersion,
    State
)

print(
    Query(AssetVersion).get_all(limit=100, projections=[State.name]).\
    partition(lambda avc: avc.State.name[0] == "Done")
)
# (EntityCollection[AssetVersion]{11}, EntityCollection[AssetVersion]{89})

sort

sort(predicate) returns a collection sorted by the given predicate.

Example1: Sort the AssetVersion collection based on version number of the individual elements.

from trackteroid import (
    Query,
    AssetVersion
)

asset_version_collection = Query(AssetVersion).get_all(limit=10, projections=["version"])

print(
    f"{asset_version_collection.version}\n"
    f"{asset_version_collection.sort(lambda avc: avc.version).version}\n"
    f"{asset_version_collection.sort(lambda avc: avc.version, reverse=True).version}"
)
# output:
# [3, 1, 1, 2, 3, 1, 1, 1, 1, 1]
# [1, 1, 1, 1, 1, 1, 1, 2, 3, 3]
# [3, 3, 2, 1, 1, 1, 1, 1, 1, 1]

Example1: Sort the AssetVersion collection based on the asset name of the individual elements.

from trackteroid import (
    Query,
    AssetVersion,
    Asset
)

asset_version_collection = Query(AssetVersion).get_all(limit=5, projections=[Asset.name, "version"])
asset_version_collection_sorted = asset_version_collection.sort(lambda avc: avc.Asset.name)


print(
    list(
        zip(
            asset_version_collection.fold([], lambda x, y: x + y.asset.name),
            asset_version_collection.version
        )
    )
)
print(
    list(
        zip(
            asset_version_collection_sorted.fold([], lambda x, y: x + y.asset.name),
            asset_version_collection_sorted.version
        )
    )
)
# output:
# [('bc0040_comp', 3), ('classic_console_01', 1), ('classic_nightstand_01', 1), ('Animation', 2), ('Animation', 3)]
# [('Animation', 2), ('Animation', 3), ('bc0040_comp', 3), ('classic_console_01', 1), ('classic_nightstand_01', 1)]

Set Operations

difference

difference(*collections) computes the difference between two or more collections.

from trackteroid import (
    Query,
    AssetVersion
)

collection_a = Query(AssetVersion).get(limit=3)
collection_b = Query(AssetVersion).get(limit=3, offset=1)
collection_c = Query(AssetVersion).get(limit=3, offset=2)

print(f"a = {collection_a.version}")
# output: 'a = [3, 6, 7]'
print(f"b = {collection_b.version}")
# output: 'b = [6, 7, 19]'
print(f"c = {collection_c.version}")
# output: 'c = [7, 19, 12]'

# difference
print(f"a - b = {collection_a.difference(collection_b).version}")
# output: 'a - b = [3]'
print(f"b - a = ", collection_b.difference(collection_a).version)
# output: 'b - a =  [19]'
print(f"c - b - a = ", collection_c.difference(collection_a, collection_b).version)
# output: 'c - b - a =  [12]'
print(f"b - c - a = ", collection_b.difference(collection_c, collection_a).version)
# output: 'b - c - a =  EmptyCollection[AssetVersion]'

intersection

intersection(*collections) computes the intersection of two or more collections.

from trackteroid import (
    Query,
    AssetVersion
)

collection_a = Query(AssetVersion).get(limit=3)
collection_b = Query(AssetVersion).get(limit=3, offset=1)

print(f"a = {collection_a.version}")
# output: 'a = [3, 6, 7]'
print(f"b = {collection_b.version}")
# output: 'b = [6, 7, 19]'

# intersection
print(f"(a + b) - ((a - b) + (b - a)) = {collection_a.intersection(collection_b).version}")
# output: '(a + b) - ((a - b) + (b - a)) = [6, 7]'

symmetric_difference

symmetric_difference(*collections) computes the symmetric difference between collections.

from trackteroid import (
    Query,
    AssetVersion
)

collection_a = Query(AssetVersion).get(limit=3)
collection_b = Query(AssetVersion).get(limit=3, offset=1)

print(f"a = {collection_a.version}")
# output: 'a = [3, 6, 7]'
print(f"b = {collection_b.version}")
# output: 'b = [6, 7, 19]'

# symmetric_difference
print(f"(a - b) + (b - a) = {collection_a.symmetric_difference(collection_b).version}")
# output: '(a - b) + (b - a) = [3, 19]'

union

union(*collections) computes the union of two or more collections.

from trackteroid import (
    Query,
    AssetVersion
)

collection_a = Query(AssetVersion).get(limit=3)
collection_b = Query(AssetVersion).get(limit=3, offset=1)
collection_c = Query(AssetVersion).get(limit=3, offset=2)

print(f"a = {collection_a.version}")
# output: 'a = [3, 6, 7]'
print(f"b = {collection_b.version}")
# output: 'b = [6, 7, 19]'
print(f"c = {collection_c.version}")
# output: 'c = [7, 19, 12]'

# union
print(f"a + b = {collection_a.union(collection_b).version}")
# output: 'a + b = [3, 6, 7, 19]'
print(f"a + b + c = {collection_a.union(collection_b, collection_c).version}")
# output: 'a + b + c = [3, 6, 7, 19, 12]'

Type Coercion

Certain entity types, such as Component and TypedContext subtypes, can be coerced to their respective base types. This allows for performing set operations between multiple types that inherit from the same base type. Additionally, when creating new entities, the type of the collection determines the type of entity that will be created.

To perform type coercion, you can use the constructor of the desired entity type.

from trackteroid import (
    Query,
    Folder,
    Shot,
    TypedContext
)

folder_collection = Query(Folder).get_all()
shot_collection = Query(Shot).get_all()

print(folder_collection, shot_collection)
# output:
# EntityCollection[Folder]{6} EntityCollection[Shot]{10}

print(
    TypedContext(folder_collection),
    TypedContext(shot_collection)
)
# output:
# EntityCollection[TypedContext]{6} EntityCollection[TypedContext]{10}

# can be used with set operations now
typedcontext_collection = TypedContext(folder_collection).union(TypedContext(shot_collection))
print(typedcontext_collection)
# output: EntityCollection[TypedContext]{16}

Attention

In certain cases, attributes like parent, ancestors, children, descendants, and components will undergo automatic type coercion, as these collections can contain entities of multiple types.

Important

Although Project is not a subtype of TypedContext, accessing the parent or ancestors attributes may include Project entities. The coercion performed on these attributes ensures that Project entities are considered, allowing for proper access and attribute fetching in a cohesive manner.

Type Filtering

Contrary to type coercion, filtering for subtypes is straightforward using the implemented item getter.

Tip

EmptyCollection

The concept of the EmptyCollection shares similarities with the optional type found in various programming languages. It serves as a mechanism to handle the absence of values or empty results.

Similar to optional types in other languages, the EmptyCollection provides a consistent interface and allows for operations and attribute access without the need for explicit checks for empty or null values. It acts as a container that represents the absence of a value or result.

By utilizing the EmptyCollection, developers can write cleaner and more concise code by treating empty results as a valid state without the need for verbose conditional statements. This promotes a more functional programming style, allowing for seamless chaining and composition of operations even in scenarios where the result might be empty.

Just as optional types in different programming languages offer methods or functions to check for presence or provide fallback values , the EmptyCollection provides a simple fallback functionality to handle cases where the collection is empty as it always evaluates to False.

This demonstrates how you can implement a straightforward fallback mechanism using the or operator when retrieving the final data.

from trackteroid import (
    Asset,
    Query
)

asset_collection = Query(Asset).by_name("DOESNT_EXIST").get_all()
print(asset_collection)
# output: EmptyCollection[Asset]

print(not asset_collection)
# output: True

print(asset_collection or "Oh vey... no results found.")
# output: Oh vey... no results found.

This code example showcases how to gracefully handle scenarios where the intermediate steps of querying, filtering, and retrieving data may result in an empty collection. By utilizing the or operator and providing an empty list as a fallback, we ensure that the final result is either the desired data or an empty list, mitigating the risk of errors or unexpected behavior.

from trackteroid import (
    Asset,
    Query
)

print(
    # The Query result could already be empty.
    # This is more likely when using criteria to filter results when querying.
    Query(Asset).get_first(
        projections=[
            "versions.is_published",
            "versions.user.username"
        ]
    ).
    # An asset could have no versions or at least no versions that have been marked as `is_published`.
    versions.filter(
        lambda avc: avc.is_published[0]
    ).
    user.username
    # Finally, we retrieve the username of the user associated with the filtered versions.
    # If at any point we encounter no results, we can gracefully handle it by providing an empty list as a fallback.
    or []
)