Renku Workflow

Renku uses PROV-O and its own Renku ontology to represent workflows.

Plans

Represent run templates.

class renku.core.models.workflow.plan.AbstractPlan(*, description: Optional[str] = None, id: str, invalidated_at: Optional[datetime.datetime] = None, keywords: Optional[List[str]] = None, name: Optional[str] = None, derived_from: Optional[str] = None)[source]

Abstract base class for all plans.

assign_new_id()[source]

Assign a new UUID.

This is required only when there is another plan which is exactly the same except the parameters’ list.

find_parameter(parameter: renku.core.models.workflow.parameter.CommandParameterBase)bool[source]

Find if a parameter exists on this plan.

find_parameter_workflow(parameter: renku.core.models.workflow.parameter.CommandParameterBase)renku.core.models.workflow.plan.Plan[source]

Return the workflow a parameter belongs to.

static generate_id(uuid: Optional[str] = None)str[source]

Generate an identifier for Plan.

resolve_direct_reference(reference: str)renku.core.models.workflow.parameter.CommandParameterBase[source]

Resolve a direct parameter reference.

resolve_mapping_path(mapping_path: str)Tuple[renku.core.models.workflow.parameter.CommandParameterBase, renku.core.models.workflow.plan.Plan][source]

Resolve a mapping path to its reference parameter.

static validate_name(name: str)[source]

Check a name for invalid characters.

class renku.core.models.workflow.plan.Plan(*, parameters: Optional[List[renku.core.models.workflow.parameter.CommandParameter]] = None, command: Optional[str] = None, description: Optional[str] = None, id: str, inputs: Optional[List[renku.core.models.workflow.parameter.CommandInput]] = None, invalidated_at: Optional[datetime.datetime] = None, keywords: Optional[List[str]] = None, name: Optional[str] = None, derived_from: Optional[str] = None, outputs: Optional[List[renku.core.models.workflow.parameter.CommandOutput]] = None, success_codes: Optional[List[int]] = None)[source]

Represent a renku run execution template.

derive()renku.core.models.workflow.plan.Plan[source]

Create a new Plan that is derived from self.

find_parameter(parameter: renku.core.models.workflow.parameter.CommandParameterBase)bool[source]

Find if a parameter exists on this plan.

find_parameter_workflow(parameter: renku.core.models.workflow.parameter.CommandParameterBase)renku.core.models.workflow.plan.Plan[source]

Return the workflow a parameter belongs to.

is_similar_to(other: renku.core.models.workflow.plan.Plan)bool[source]

Return true if plan has the same inputs/outputs/arguments as another plan.

property keywords_csv

Comma-separated list of keywords associated with workflow.

resolve_direct_reference(reference: str)renku.core.models.workflow.parameter.CommandParameterBase[source]

Resolve a direct parameter reference.

resolve_mapping_path(mapping_path: str)Tuple[renku.core.models.workflow.parameter.CommandParameterBase, renku.core.models.workflow.plan.Plan][source]

Resolve a mapping path to its reference parameter.

set_parameters_from_strings(params_strings: List[str])None[source]

Set parameters by parsing parameters strings.

to_argv()List[Any][source]

Convert a Plan into argv list.

class renku.core.models.workflow.plan.PlanDetailsJson(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)[source]

Serialize a plan to a response object.

class renku.core.models.workflow.plan.PlanSchema(*args, commit=None, client=None, **kwargs)[source]

Plan schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.plan.Plan

Represent a group of run templates.

class renku.core.models.workflow.composite_plan.CompositePlan(*, description: Optional[str] = None, id: str, invalidated_at: Optional[datetime.datetime] = None, keywords: Optional[List[str]] = None, name: str, derived_from: Optional[str] = None, plans: Optional[List[Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan]]] = None, mappings: Optional[List[renku.core.models.workflow.parameter.ParameterMapping]] = None, links: Optional[List[renku.core.models.workflow.parameter.ParameterLink]] = None)[source]

A plan containing child plans.

Validate and add a ParameterLink.

add_mapping(mapping: renku.core.models.workflow.parameter.ParameterMapping)None[source]

Add a mapping to this run.

derive()renku.core.models.workflow.composite_plan.CompositePlan[source]

Create a new CompositePlan that is derived from self.

Find a link on this or a child workflow that has target as a sink.

find_parameter(parameter: renku.core.models.workflow.parameter.CommandParameterBase)[source]

Check if a parameter exists on this run or one of its children.

find_parameter_workflow(parameter: renku.core.models.workflow.parameter.CommandParameterBase)Optional[Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan]][source]

Return the workflow a parameter belongs to.

static generate_id(uuid: Optional[str] = None)str[source]

Generate an identifier for Plan.

map_all_inputs()None[source]

Map all unmapped inputs from child steps to the parent.

map_all_outputs()None[source]

Map all unmapped outputs from child steps to the parent.

map_all_parameters()None[source]

Map all unmapped parameters from child steps to the parent.

resolve_direct_reference(reference: str)renku.core.models.workflow.parameter.CommandParameterBase[source]

Resolve a direct parameter reference.

resolve_mapping_path(mapping_path: str)Tuple[renku.core.models.workflow.parameter.CommandParameterBase, Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan]][source]

Resolve a mapping path to its reference parameter.

Set links between parameters of child steps.

set_mapping_defaults(default_strings: List[str])None[source]

Set default value based on a default specification string.

set_mapping_descriptions(mapping_descriptions: List[str])None[source]

Set descriptions for mappings.

set_mappings_from_strings(mapping_strings: List[str])None[source]

Set mappings by parsing mapping strings.

class renku.core.models.workflow.composite_plan.CompositePlanSchema(*args, commit=None, client=None, **kwargs)[source]

Plan schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.composite_plan.CompositePlan

Parameters

Classes to represent inputs/outputs/parameters in a Plan.

class renku.core.models.workflow.parameter.CommandInput(*, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, mapped_to: Optional[renku.core.models.workflow.parameter.MappedIOStream] = None, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]

An input to a command.

static generate_id(plan_id: str, position: Optional[int] = None, postfix: Optional[str] = None)str[source]

Generate an id for CommandInput.

to_stream_representation()str[source]

Input stream representation.

class renku.core.models.workflow.parameter.CommandInputSchema(*args, commit=None, client=None, **kwargs)[source]

CommandInput schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.parameter.CommandInput

class renku.core.models.workflow.parameter.CommandOutput(*, create_folder: bool = False, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, mapped_to: Optional[renku.core.models.workflow.parameter.MappedIOStream] = None, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]

An output from a command.

static generate_id(plan_id: str, position: Optional[int] = None, postfix: Optional[str] = None)str[source]

Generate an id for CommandOutput.

to_stream_representation()str[source]

Input stream representation.

class renku.core.models.workflow.parameter.CommandOutputSchema(*args, commit=None, client=None, **kwargs)[source]

CommandOutput schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.parameter.CommandOutput

class renku.core.models.workflow.parameter.CommandParameter(*, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, name: Optional[str] = None, position: Optional[int] = None, prefix: Optional[str] = None)[source]

An argument to a command that is neither input nor output.

static generate_id(plan_id: str, position: Optional[int] = None, postfix: Optional[str] = None)str[source]

Generate an id for CommandParameter.

class renku.core.models.workflow.parameter.CommandParameterBase(*, default_value: Any, description: str, id: str, name: str, position: Optional[int] = None, prefix: Optional[str] = None)[source]

Represents a parameter for a Plan.

property actual_value

Get the actual value to be used for execution.

property actual_value_set

Whether the actual_value on this parameter has been set at least once.

property role

Return a unique role for this parameter within its Plan.

to_argv()List[Any][source]

String representation (sames as cmd argument).

class renku.core.models.workflow.parameter.CommandParameterBaseSchema(*args, commit=None, client=None, **kwargs)[source]

CommandParameterBase schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.parameter.CommandParameterBase

class renku.core.models.workflow.parameter.CommandParameterSchema(*args, commit=None, client=None, **kwargs)[source]

CommandParameter schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.parameter.CommandParameter

class renku.core.models.workflow.parameter.MappedIOStream(*, id: Optional[str] = None, stream_type: str)[source]

Represents an IO stream (stdin, stdout, stderr).

static generate_id(stream_type: str)str[source]

Generate an id for parameters.

class renku.core.models.workflow.parameter.MappedIOStreamSchema(*args, commit=None, client=None, **kwargs)[source]

MappedIOStream schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.parameter.MappedIOStream

A link between a source and one or more sink parameters.

apply()[source]

Apply source value to sinks.

static generate_id(plan_id: str)str[source]

Generate an id for parameters.

class renku.core.models.workflow.parameter.ParameterLinkSchema(*args, commit=None, client=None, **kwargs)[source]

ParameterLink schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.parameter.ParameterLink

class renku.core.models.workflow.parameter.ParameterMapping(*, default_value: Optional[Any] = None, description: Optional[str] = None, id: str, name: Optional[str] = None, mapped_parameters: Optional[List[renku.core.models.workflow.parameter.CommandParameterBase]] = None, **kwargs)[source]

A mapping of child parameter(s) to a parent CompositePlan.

property actual_value

Get the actual value to be used for execution.

static generate_id(plan_id: str, position: Optional[int] = None, postfix: Optional[str] = None)str[source]

Generate an id for CommandOutput.

property leaf_parameters

Return leaf (non-Mapping) parameters contained by this Mapping.

to_stream_representation()str[source]

Input stream representation.

class renku.core.models.workflow.parameter.ParameterMappingSchema(*args, commit=None, client=None, **kwargs)[source]

ParameterMapping schema.

class Meta[source]

Meta class.

model

alias of renku.core.models.workflow.parameter.ParameterMapping

Renku Workflow Logic

Execution Graph

Build an execution graph for a workflow.

class renku.core.management.workflow.concrete_execution_graph.ExecutionGraph(workflow: Union[renku.core.models.workflow.plan.Plan, renku.core.models.workflow.composite_plan.CompositePlan], virtual_links: bool = False)[source]

Represents an execution graph for one or more workflow steps.

calculate_concrete_execution_graph(virtual_links: bool = False)[source]

Create an execution DAG between Plans showing dependencies between them.

Resolve ParameterLink’s involving ParameterMapping’s to the underlying actual parameters and potentially also virtual links determined by parameter values.

property cycles

Get potential cycles in execution graph.

property workflow_graph

Return a subgraph with only workflows and their dependencies.

Value Resolution

Resolution of Worklow execution values precedence.

renku.core.management.workflow.value_resolution.apply_composite_run_values(workflow: renku.core.models.workflow.composite_plan.CompositePlan, values: Optional[Dict[str, Any]] = None)None[source]

Applies values and default_values to a nested workflow.

renku.core.management.workflow.value_resolution.apply_parameter_defaults(mapping: renku.core.models.workflow.parameter.ParameterMapping)None[source]

Apply default values to a mapping and contained params if they’re not set already.

Apply values from parameter links.

renku.core.management.workflow.value_resolution.apply_parameters_values(workflow: renku.core.models.workflow.composite_plan.CompositePlan, values: Dict[str, str])None[source]

Apply values to mappings of a CompositePlan.

renku.core.management.workflow.value_resolution.apply_run_values(workflow: Union[renku.core.models.workflow.composite_plan.CompositePlan, renku.core.models.workflow.plan.Plan], values: Optional[Dict[str, Any]] = None)None[source]

Applies values and default_values to a potentially nested workflow.

Order of precedence is as follows (from lowest to highest): - Default value on a parameter - Default value on a mapping to the parameter - Value passed to a mapping to the parameter - Value passed to the parameter - Value propagated to a parameter from the source of a ParameterLink

renku.core.management.workflow.value_resolution.apply_single_run_values(workflow: renku.core.models.workflow.plan.Plan, values: Optional[Dict[str, Any]] = None)None[source]

Applies values and default_values to a workflow.

Plan Factory

Used to create Plan objects based on command line arguments

Represent a PlanFactory for tracking workflows.

class renku.core.management.workflow.plan_factory.PlanFactory(command_line: str, explicit_inputs: Optional[List[str]] = None, explicit_outputs: Optional[List[str]] = None, directory: Optional[str] = None, working_dir: Optional[str] = None, no_input_detection: bool = False, no_output_detection: bool = False, success_codes: Optional[List[int]] = None, stdin: Optional[str] = None, stdout: Optional[str] = None, stderr: Optional[str] = None)[source]

Factory for creating a plan from a command line call.

add_command_input(default_value: Any, prefix: Optional[str] = None, position: Optional[int] = None, postfix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]

Create a CommandInput.

add_command_output(default_value: Any, prefix: Optional[str] = None, position: Optional[int] = None, postfix: Optional[str] = None, encoding_format: Optional[List[str]] = None)[source]

Create a CommandOutput.

add_command_output_from_input(input: renku.core.models.workflow.parameter.CommandInput)[source]

Create a CommandOutput from an input.

add_command_output_from_parameter(parameter: renku.core.models.workflow.parameter.CommandParameter)[source]

Create a CommandOutput from a parameter.

add_command_parameter(default_value: Any, prefix: Optional[str] = None, position: Optional[int] = None, name: Optional[str] = None)[source]

Create a CommandParameter.

add_explicit_inputs()[source]

Add explicit inputs .

add_indirect_inputs()[source]

Read indirect inputs list and add them to explicit inputs.

add_indirect_outputs()[source]

Read indirect outputs list and add them to explicit outputs.

add_inputs_and_parameters(*arguments)[source]

Yield command input parameters.

add_outputs(candidates: Set[str])[source]

Yield detected output and changed command input parameter.

get_stream_mapping_for_value(value: Any)[source]

Return a stream mapping if value is a path mapped to a stream.

guess_type(value: str, ignore_filenames: Optional[Set[str]] = None)Tuple[Any, str][source]

Return new value and CWL parameter type.

is_existing_path(candidate, ignore=None)[source]

Return a path instance if it exists in current directory.

iter_input_files(basedir)[source]

Yield tuples with input id and path.

split_command_and_args()[source]

Return tuple with command and args from command line arguments.

to_plan(name: Optional[str] = None, description: str = <class 'NoneType'>, keywords: Optional[List[str]] = None)renku.core.models.workflow.plan.Plan[source]

Return an instance of Plan based on this factory.

watch(client_dispatcher: renku.core.management.interface.client_dispatcher.IClientDispatcher, no_output=False)[source]

Watch a Renku repository for changes to detect outputs.

renku.core.management.workflow.plan_factory.add_indirect_parameter(working_dir, name, value)[source]

Add a parameter to indirect parameters.

renku.core.management.workflow.plan_factory.delete_indirect_files_list(working_dir)[source]

Remove indirect inputs, outputs, and parameters list.

renku.core.management.workflow.plan_factory.get_indirect_inputs_path(client_path)[source]

Return path to file that contains indirect inputs list.

renku.core.management.workflow.plan_factory.get_indirect_outputs_path(client_path)[source]

Return path to file that contains indirect outputs list.

renku.core.management.workflow.plan_factory.get_indirect_parameters_path(client_path)[source]

Return path to file that contains indirect parameters list.

renku.core.management.workflow.plan_factory.read_indirect_parameters(working_dir)[source]

Read and return indirect parameters.

Renku Workflow Conversion

Renku allows conversion of tracked workflows to runnable workflows in supported tools (Currently CWL)

CWL

Converter for workflows to cwl.

class renku.core.management.workflow.converters.cwl.CWLConverter[source]

Converts a Run to cwl file(s).

static convert(run, basedir, path=None)[source]

Convert the workflow to one ore more .cwl files.