Source code for renku.core.models.provenance.activity

# -*- coding: utf-8 -*-
#
# Copyright 2018-2021 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent an execution of a Plan."""

from datetime import datetime
from typing import List, Union
from uuid import uuid4

from marshmallow import EXCLUDE

from renku.core.management.command_builder import inject
from renku.core.management.interface.client_dispatcher import IClientDispatcher
from renku.core.metadata.database import Persistent
from renku.core.metadata.immutable import Immutable
from renku.core.models.calamus import JsonLDSchema, Nested, fields, oa, prov, renku
from renku.core.models.entity import Collection, CollectionSchema, Entity, EntitySchema
from renku.core.models.provenance.agent import Person, PersonSchema, SoftwareAgent, SoftwareAgentSchema
from renku.core.models.provenance.annotation import Annotation, AnnotationSchema
from renku.core.models.provenance.parameter import (
    ParameterValueSchema,
    PathParameterValue,
    PathParameterValueSchema,
    VariableParameterValue,
    VariableParameterValueSchema,
)
from renku.core.models.workflow.plan import Plan, PlanSchema

NON_EXISTING_ENTITY_CHECKSUM = "0" * 40


class Association:
    """Assign responsibility to an agent for an activity."""

    def __init__(self, *, agent: Union[Person, SoftwareAgent] = None, id: str, plan: Plan):
        self.agent: Union[Person, SoftwareAgent] = agent
        self.id: str = id
        self.plan: Plan = plan

    @staticmethod
    def generate_id(activity_id: str) -> str:
        """Generate a Association identifier."""
        return f"{activity_id}/association"  # TODO: Does it make sense to use plural name here?


class Usage(Immutable):
    """Represent a dependent path."""

    __slots__ = ("entity",)

    entity: Union[Collection, Entity]
    id: str

    def __init__(self, *, entity: Union[Collection, Entity], id: str):
        super().__init__(entity=entity, id=id)

    @staticmethod
    def generate_id(activity_id: str) -> str:
        """Generate a Usage identifier."""
        return f"{activity_id}/usages/{uuid4().hex}"


class Generation(Immutable):
    """Represent an act of generating a path."""

    __slots__ = ("entity",)

    entity: Union[Collection, Entity]
    id: str

    def __init__(self, *, entity: Union[Collection, Entity], id: str):
        super().__init__(entity=entity, id=id)

    @staticmethod
    def generate_id(activity_id: str) -> str:
        """Generate a Generation identifier."""
        return f"{activity_id}/generations/{uuid4().hex}"


# @total_ordering
[docs]class Activity(Persistent): """Represent an activity in the repository.""" def __init__( self, *, agents: List[Union[Person, SoftwareAgent]] = None, annotations: List[Annotation] = None, association: Association = None, ended_at_time: datetime = None, generations: List[Generation] = None, id: str, invalidations: List[Entity] = None, parameters: List[Union[PathParameterValue, VariableParameterValue]] = None, started_at_time: datetime = None, usages: List[Usage] = None, ): self.agents: List[Union[Person, SoftwareAgent]] = agents self.annotations: List[Annotation] = annotations or [] self.association: Association = association self.ended_at_time: datetime = ended_at_time self.generations: List[Generation] = generations or [] self.id: str = id self.invalidations: List[Entity] = invalidations or [] self.parameters: List[Union[PathParameterValue, VariableParameterValue]] = parameters or [] # self.project: Project = project self.started_at_time: datetime = started_at_time self.usages: List[Usage] = usages or [] # TODO: _was_informed_by = attr.ib(kw_only=True) # TODO: influenced = attr.ib(kw_only=True)
[docs] @classmethod @inject.autoparams() def from_plan( cls, plan: Plan, client_dispatcher: IClientDispatcher, started_at_time: datetime, ended_at_time: datetime, annotations: List[Annotation], commit=None, update_commits=False, ): """Convert a ``Plan`` to a ``Activity``.""" from renku.core.models.provenance.agent import SoftwareAgent client = client_dispatcher.current_client if not commit: commit = client.repo.head.commit usages = [] generations = [] parameter_values = [] activity_id = cls.generate_id() for input_ in plan.inputs: input_path = input_.default_value entity = Entity.from_revision(client, path=input_path, revision=commit.hexsha) dependency = Usage(entity=entity, id=Usage.generate_id(activity_id)) usages.append(dependency) for output in plan.outputs: output_path = output.default_value entity = Entity.from_revision(client, path=output_path, revision=commit.hexsha) generation = Generation(entity=entity, id=Usage.generate_id(activity_id)) generations.append(generation) agent = SoftwareAgent.from_commit(commit) association = Association(agent=agent, id=Association.generate_id(activity_id), plan=plan) return cls( id=activity_id, association=association, agents=[agent], usages=usages, generations=generations, parameters=parameter_values, started_at_time=started_at_time, ended_at_time=ended_at_time, annotations=annotations, )
[docs] @staticmethod def generate_id() -> str: """Generate an identifier for an activity.""" # TODO: make id generation idempotent return f"/activities/{uuid4().hex}"
# def __eq__(self, other): # """Implements total_ordering equality.""" # if isinstance(other, str): # return self.id == other # assert isinstance(other, Activity), f"Not an activity: {type(other)}" # return self.id == other.id # def __lt__(self, other): # """Implement total_ordering less than.""" # assert isinstance(other, Activity), f"Not an activity: {type(other)}" # return ((self.ended_at_time, self.id) < (other.ended_at_time, other.id)) class AssociationSchema(JsonLDSchema): """Association schema.""" class Meta: """Meta class.""" rdf_type = prov.Association model = Association unknown = EXCLUDE agent = Nested(prov.agent, [SoftwareAgentSchema, PersonSchema]) id = fields.Id() plan = Nested(prov.hadPlan, PlanSchema) class UsageSchema(JsonLDSchema): """Usage schema.""" class Meta: """Meta class.""" rdf_type = prov.Usage model = Usage unknown = EXCLUDE id = fields.Id() # TODO: DatasetSchema, DatasetFileSchema entity = Nested(prov.entity, [EntitySchema, CollectionSchema]) class GenerationSchema(JsonLDSchema): """Generation schema.""" class Meta: """Meta class.""" rdf_type = prov.Generation model = Generation unknown = EXCLUDE id = fields.Id() # TODO: DatasetSchema, DatasetFileSchema entity = Nested(prov.qualifiedGeneration, [EntitySchema, CollectionSchema], reverse=True) class ActivitySchema(JsonLDSchema): """Activity schema.""" class Meta: """Meta class.""" rdf_type = prov.Activity model = Activity unknown = EXCLUDE agents = Nested(prov.wasAssociatedWith, [PersonSchema, SoftwareAgentSchema], many=True) annotations = Nested(oa.hasTarget, AnnotationSchema, reverse=True, many=True) association = Nested(prov.qualifiedAssociation, AssociationSchema) ended_at_time = fields.DateTime(prov.endedAtTime, add_value_types=True) generations = Nested(prov.activity, GenerationSchema, reverse=True, many=True, missing=None) id = fields.Id() invalidations = Nested(prov.wasInvalidatedBy, EntitySchema, reverse=True, many=True, missing=None) order = fields.Integer(renku.order) parameters = Nested( renku.parameter, [ParameterValueSchema, PathParameterValueSchema, VariableParameterValueSchema], many=True, missing=None, ) path = fields.String(prov.atLocation) started_at_time = fields.DateTime(prov.startedAtTime, add_value_types=True) usages = Nested(prov.qualifiedUsage, UsageSchema, many=True)