Renku Workflow¶
Renku uses PROV-O and its own Renku ontology to represent workflows.
Plans¶
Represent run templates.
-
class
renku.core.models.workflow.plan.AbstractPlan(*, description: Optional[str] = None, id: str, invalidated_at: Optional[datetime.datetime] = None, keywords: Optional[List[str]] = None, name: Optional[str] = None, derived_from: Optional[str] = None)[source]¶ Abstract base class for all plans.
-
assign_new_id()[source]¶ Assign a new UUID.
This is required only when there is another plan which is exactly the same except the parameters’ list.
-
find_parameter(parameter: renku.core.models.workflow.parameter.CommandParameterBase) → bool[source]¶ Find if a parameter exists on this plan.
-
find_parameter_workflow(parameter: renku.core.models.workflow.parameter.CommandParameterBase) → renku.core.models.workflow.plan.Plan[source]¶ Return the workflow a parameter belongs to.
-
resolve_direct_reference(reference: str) → renku.core.models.workflow.parameter.CommandParameterBase[source]¶ Resolve a direct parameter reference.
-
resolve_mapping_path(mapping_path: str) → Tuple[renku.core.models.workflow.parameter.CommandParameterBase, renku.core.models.workflow.plan.Plan][source]¶ Resolve a mapping path to its reference parameter.
-
-
class
renku.core.models.workflow.plan.Plan(*, parameters: Optional[List[renku.core.models.workflow.parameter.CommandParameter]] = None, command: Optional[str] = None, description: Optional[str] = None, id: str, inputs: Optional[List[renku.core.models.workflow.parameter.CommandInput]] = None, invalidated_at: Optional[datetime.datetime] = None, keywords: Optional[List[str]] = None, name: Optional[str] = None, derived_from: Optional[str] = None, outputs: Optional[List[renku.core.models.workflow.parameter.CommandOutput]] = None, success_codes: Optional[List[int]] = None)[source]¶ Represent a renku run execution template.
-
derive() → renku.core.models.workflow.plan.Plan[source]¶ Create a new
Planthat is derived from self.
-
find_parameter(parameter: renku.core.models.workflow.parameter.CommandParameterBase) → bool[source]¶ Find if a parameter exists on this plan.
-
find_parameter_workflow(parameter: renku.core.models.workflow.parameter.CommandParameterBase) → renku.core.models.workflow.plan.Plan[source]¶ Return the workflow a parameter belongs to.
-
is_similar_to(other: renku.core.models.workflow.plan.Plan) → bool[source]¶ Return true if plan has the same inputs/outputs/arguments as another plan.
-
property
keywords_csv¶ Comma-separated list of keywords associated with workflow.
-
resolve_direct_reference(reference: str) → renku.core.models.workflow.parameter.CommandParameterBase[source]¶ Resolve a direct parameter reference.
-
resolve_mapping_path(mapping_path: str) → Tuple[renku.core.models.workflow.parameter.CommandParameterBase, renku.core.models.workflow.plan.Plan][source]¶ Resolve a mapping path to its reference parameter.
-
-
class
renku.core.models.workflow.plan.PlanDetailsJson(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)[source]¶ Serialize a plan to a response object.
-
class
renku.core.models.workflow.plan.PlanSchema(*args, commit=None, client=None, **kwargs)[source]¶ Plan schema.
-
class
Meta[source]¶ Meta class.
-
model¶ alias of
renku.core.models.workflow.plan.Plan
-
-
class
Represent a group of run templates.
-
class
renku.core.models.workflow.composite_plan.CompositePlan(*, description: Optional[str] = None, id: str, invalidated_at: Optional[datetime.datetime] = None, keywords: Optional[List[str]] = None, name: str, derived_from: Optional[str] = None, plans: Optional[List[Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan]]] = None, mappings: Optional[List[renku.core.models.workflow.parameter.ParameterMapping]] = None, links: Optional[List[renku.core.models.workflow.parameter.ParameterLink]] = None)[source]¶ A plan containing child plans.
-
add_link(source: renku.core.models.workflow.parameter.CommandParameterBase, sinks: List[renku.core.models.workflow.parameter.CommandParameterBase]) → None[source]¶ Validate and add a ParameterLink.
-
add_mapping(mapping: renku.core.models.workflow.parameter.ParameterMapping) → None[source]¶ Add a mapping to this run.
-
derive() → renku.core.models.workflow.composite_plan.CompositePlan[source]¶ Create a new
CompositePlanthat is derived from self.
-
find_link_by_target(target: renku.core.models.workflow.parameter.CommandInput)[source]¶ Find a link on this or a child workflow that has target as a sink.
-
find_parameter(parameter: renku.core.models.workflow.parameter.CommandParameterBase)[source]¶ Check if a parameter exists on this run or one of its children.
-
find_parameter_workflow(parameter: renku.core.models.workflow.parameter.CommandParameterBase) → Optional[Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan]][source]¶ Return the workflow a parameter belongs to.
-
resolve_direct_reference(reference: str) → renku.core.models.workflow.parameter.CommandParameterBase[source]¶ Resolve a direct parameter reference.
-
resolve_mapping_path(mapping_path: str) → Tuple[renku.core.models.workflow.parameter.CommandParameterBase, Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan]][source]¶ Resolve a mapping path to its reference parameter.
-
set_links_from_strings(link_strings: List[str]) → None[source]¶ Set links between parameters of child steps.
-
set_mapping_defaults(default_strings: List[str]) → None[source]¶ Set default value based on a default specification string.
-
Parameters¶
Classes to represent inputs/outputs/parameters in a Plan.
-
class
renku.core.models.workflow.parameter.CommandInput(*, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, mapped_to: Optional[renku.core.models.workflow.parameter.MappedIOStream] = None, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]¶ An input to a command.
-
class
renku.core.models.workflow.parameter.CommandInputSchema(*args, commit=None, client=None, **kwargs)[source]¶ CommandInput schema.
-
class
renku.core.models.workflow.parameter.CommandOutput(*, create_folder: bool = False, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, mapped_to: Optional[renku.core.models.workflow.parameter.MappedIOStream] = None, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]¶ An output from a command.
-
class
renku.core.models.workflow.parameter.CommandOutputSchema(*args, commit=None, client=None, **kwargs)[source]¶ CommandOutput schema.
-
class
renku.core.models.workflow.parameter.CommandParameter(*, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None)[source]¶ An argument to a command that is neither input nor output.
-
class
renku.core.models.workflow.parameter.CommandParameterBase(*, default_value: Any, description: str, id: str, name: str, position: Optional[int] = None, prefix: Optional[str] = None)[source]¶ Represents a parameter for a Plan.
-
property
actual_value¶ Get the actual value to be used for execution.
-
property
actual_value_set¶ Whether the actual_value on this parameter has been set at least once.
-
property
role¶ Return a unique role for this parameter within its Plan.
-
property
-
class
renku.core.models.workflow.parameter.CommandParameterBaseSchema(*args, commit=None, client=None, **kwargs)[source]¶ CommandParameterBase schema.
-
class
Meta[source]¶ Meta class.
-
model¶ alias of
renku.core.models.workflow.parameter.CommandParameterBase
-
-
class
-
class
renku.core.models.workflow.parameter.CommandParameterSchema(*args, commit=None, client=None, **kwargs)[source]¶ CommandParameter schema.
-
class
Meta[source]¶ Meta class.
-
model¶ alias of
renku.core.models.workflow.parameter.CommandParameter
-
-
class
-
class
renku.core.models.workflow.parameter.MappedIOStream(*, id: Optional[str] = None, stream_type: str)[source]¶ Represents an IO stream (stdin, stdout, stderr).
-
class
renku.core.models.workflow.parameter.MappedIOStreamSchema(*args, commit=None, client=None, **kwargs)[source]¶ MappedIOStream schema.
-
class
Meta[source]¶ Meta class.
-
model¶ alias of
renku.core.models.workflow.parameter.MappedIOStream
-
-
class
-
class
renku.core.models.workflow.parameter.ParameterLink(source: renku.core.models.workflow.parameter.CommandParameterBase, sinks: List[renku.core.models.workflow.parameter.CommandParameterBase], id: str)[source]¶ A link between a source and one or more sink parameters.
-
class
renku.core.models.workflow.parameter.ParameterLinkSchema(*args, commit=None, client=None, **kwargs)[source]¶ ParameterLink schema.
-
class
renku.core.models.workflow.parameter.ParameterMapping(*, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, name: Optional[str] = None, mapped_parameters: Optional[List[renku.core.models.workflow.parameter.CommandParameterBase]] = None, **kwargs)[source]¶ A mapping of child parameter(s) to a parent CompositePlan.
-
property
actual_value¶ Get the actual value to be used for execution.
-
static
generate_id(plan_id: str, position: Optional[int] = None, postfix: Optional[str] = None) → str[source]¶ Generate an id for CommandOutput.
-
property
leaf_parameters¶ Return leaf (non-Mapping) parameters contained by this Mapping.
-
property
Renku Workflow Logic¶
Execution Graph¶
Build an execution graph for a workflow.
-
class
renku.core.management.workflow.concrete_execution_graph.ExecutionGraph(workflow: Union[renku.core.models.workflow.plan.Plan, renku.core.models.workflow.composite_plan.CompositePlan], virtual_links: bool = False)[source]¶ Represents an execution graph for one or more workflow steps.
-
calculate_concrete_execution_graph(virtual_links: bool = False)[source]¶ Create an execution DAG between Plans showing dependencies between them.
Resolve ParameterLink’s involving ParameterMapping’s to the underlying actual parameters and potentially also virtual links determined by parameter values.
-
property
cycles¶ Get potential cycles in execution graph.
-
property
workflow_graph¶ Return a subgraph with only workflows and their dependencies.
-
Value Resolution¶
Resolution of Worklow execution values precedence.
-
renku.core.management.workflow.value_resolution.apply_composite_run_values(workflow: renku.core.models.workflow.composite_plan.CompositePlan, values: Optional[Dict[str, Any]] = None) → None[source]¶ Applies values and default_values to a nested workflow.
-
renku.core.management.workflow.value_resolution.apply_parameter_defaults(mapping: renku.core.models.workflow.parameter.ParameterMapping) → None[source]¶ Apply default values to a mapping and contained params if they’re not set already.
-
renku.core.management.workflow.value_resolution.apply_parameter_links(workflow: renku.core.models.workflow.composite_plan.CompositePlan) → None[source]¶ Apply values from parameter links.
-
renku.core.management.workflow.value_resolution.apply_parameters_values(workflow: renku.core.models.workflow.composite_plan.CompositePlan, values: Dict[str, str]) → None[source]¶ Apply values to mappings of a CompositePlan.
-
renku.core.management.workflow.value_resolution.apply_run_values(workflow: Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan], values: Optional[Dict[str, Any]] = None) → None[source]¶ Applies values and default_values to a potentially nested workflow.
Order of precedence is as follows (from lowest to highest): - Default value on a parameter - Default value on a mapping to the parameter - Value passed to a mapping to the parameter - Value passed to the parameter - Value propagated to a parameter from the source of a ParameterLink
-
renku.core.management.workflow.value_resolution.apply_single_run_values(workflow: renku.core.models.workflow.plan.Plan, values: Optional[Dict[str, Any]] = None) → None[source]¶ Applies values and default_values to a workflow.
Plan Factory¶
Used to create Plan objects based on command line arguments
Represent a PlanFactory for tracking workflows.
-
class
renku.core.management.workflow.plan_factory.PlanFactory(command_line: str, explicit_inputs: Optional[List[str]] = None, explicit_outputs: Optional[List[str]] = None, directory: Optional[str] = None, working_dir: Optional[str] = None, no_input_detection: bool = False, no_output_detection: bool = False, success_codes: Optional[List[int]] = None, stdin: Optional[str] = None, stdout: Optional[str] = None, stderr: Optional[str] = None)[source]¶ Factory for creating a plan from a command line call.
-
add_command_input(default_value: Any, prefix: Optional[str] = None, position: Optional[int] = None, postfix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]¶ Create a CommandInput.
-
add_command_output(default_value: Any, prefix: Optional[str] = None, position: Optional[int] = None, postfix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]¶ Create a CommandOutput.
-
add_command_output_from_input(input: renku.core.models.workflow.parameter.CommandInput)[source]¶ Create a CommandOutput from an input.
-
add_command_output_from_parameter(parameter: renku.core.models.workflow.parameter.CommandParameter)[source]¶ Create a CommandOutput from a parameter.
-
add_command_parameter(default_value: Any, prefix: Optional[str] = None, position: Optional[int] = None, name: Optional[str] = None)[source]¶ Create a CommandParameter.
-
add_outputs(candidates: Set[str])[source]¶ Yield detected output and changed command input parameter.
-
get_stream_mapping_for_value(value: Any)[source]¶ Return a stream mapping if value is a path mapped to a stream.
-
guess_type(value: str, ignore_filenames: Optional[Set[str]] = None) → Tuple[Any, str][source]¶ Return new value and CWL parameter type.
-
is_existing_path(candidate, ignore=None)[source]¶ Return a path instance if it exists in current directory.
-
to_plan(name: Optional[str] = None, description: str = <class 'NoneType'>, keywords: Optional[List[str]] = None) → renku.core.models.workflow.plan.Plan[source]¶ Return an instance of
Planbased on this factory.
-
-
renku.core.management.workflow.plan_factory.add_indirect_parameter(working_dir, name, value)[source]¶ Add a parameter to indirect parameters.
-
renku.core.management.workflow.plan_factory.delete_indirect_files_list(working_dir)[source]¶ Remove indirect inputs, outputs, and parameters list.
-
renku.core.management.workflow.plan_factory.get_indirect_inputs_path(client_path)[source]¶ Return path to file that contains indirect inputs list.
-
renku.core.management.workflow.plan_factory.get_indirect_outputs_path(client_path)[source]¶ Return path to file that contains indirect outputs list.
Renku Workflow Conversion¶
Renku allows conversion of tracked workflows to runnable workflows in supported tools (Currently CWL)