nanopyx.methods.workflow

  1from ..__agent__ import Agent
  2
  3
  4class Workflow:
  5    """
  6    Workflow class that aggregates all the steps of the analysis and the corresponding functions.
  7
  8    The Workflow class is designed to organize and execute a sequence of analysis steps in a workflow.
  9    It allows you to define a series of functions, their arguments, and their dependencies on the previous step's output.
 10    You can run the workflow sequentially and obtain the final result.
 11
 12    Args:
 13        *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs),
 14               where:
 15               - fn (callable): The function to be executed in this step.
 16               - args (tuple): The arguments to be passed to the function.
 17               - kwargs (dict): The keyword arguments to be passed to the function.
 18
 19    Methods:
 20        __init__(*args): Initialize the Workflow object with a list of analysis steps.
 21            - Each arg must be a tuple of three items (fn, args, kwargs).
 22
 23        run(_force_run_type=None): Run the workflow sequentially.
 24            - _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
 25
 26        calculate(_force_run_type=None): Calculate the final result of the workflow.
 27            - _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
 28
 29    Example:
 30        # Define a workflow with three steps
 31        workflow = Workflow(
 32            (step1_function, (arg1,), {"kwarg1": value1}),
 33            (step2_function, ("PREV_RETURN_VALUE_0", arg2), {"kwarg2": value2}),
 34            (step3_function, ("PREV_RETURN_VALUE_1",), {})
 35        )
 36
 37        # Run the workflow and get the final result
 38        result = workflow.calculate()
 39
 40    Note:
 41        - The Workflow class allows you to specify dependencies between steps using "PREV_RETURN_VALUE" placeholders.
 42        - The result of each step is stored and can be accessed later.
 43    """
 44
 45    def __init__(self, *args) -> None:
 46        """
 47        Initialize the Workflow object.
 48
 49        Args:
 50            *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs).
 51
 52        Returns:
 53            None
 54        """
 55
 56        self._methods = []
 57        self._return_values = []
 58
 59        for arg in args:
 60            if isinstance(arg, tuple) and len(arg) == 3:
 61                self._methods.append((arg[0], arg[1], arg[2]))
 62            else:
 63                raise TypeError("Each arg must be a tuple of 3 items (fn, args, kwargs)")
 64
 65    def run(self, _force_run_type=None):
 66        """
 67        Run the workflow sequentially.
 68
 69        Args:
 70            _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
 71
 72        Returns:
 73            Tuple: A tuple containing the final result, the run type of the last step, and the execution time.
 74
 75        Example:
 76            output, run_type, execution_time = workflow.run()
 77
 78        Note:
 79            - The result of each step is stored in self._return_values and can be accessed later.
 80            - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type.
 81        """
 82
 83        for method in self._methods:
 84            fn, args, kwargs = method
 85
 86            # in the list args, substitute 'PREV_RETURN_VALUE' with the return value of the previous method
 87            sane_args = []
 88            for arg in args:
 89                if isinstance(arg, str) and "PREV_RETURN_VALUE" in arg:
 90                    indices = [int(i) for i in arg.split("_") if i.isdigit()]
 91                    return_value = self._return_values[indices[0]][indices[1]]
 92                    sane_args.append(return_value)
 93                else:
 94                    sane_args.append(arg)
 95
 96            # in the dict kwargs, substitute 'PREV_RETURN_VALUE' with the return value of the previous method
 97            for key, value in kwargs.items():
 98                if isinstance(value, str) and "PREV_RETURN_VALUE" in value:
 99                    indices = [int(i) for i in value.split("_") if i.isdigit()]
100                    return_value = self._return_values[indices[0]][indices[1]]
101                    kwargs[key] = return_value
102
103            # Get run type from the Agent
104            run_type = Agent.get_run_type(fn, sane_args, kwargs)
105            kwargs["run_type"] = run_type
106
107            if _force_run_type:
108                kwargs["run_type"] = _force_run_type
109
110            # TODO maybe we need to warn the agent its running and when it finishes
111            return_value = fn.run(*sane_args, **kwargs)
112
113            if isinstance(return_value, tuple):
114                self._return_values.append(return_value)
115            else:
116                self._return_values.append((return_value,))
117
118            Agent._inform(fn)
119
120        return self._return_values[-1], fn._last_runtype, fn._last_time
121
122    def calculate(self, _force_run_type=None):
123        """
124        Calculate the final result of the workflow.
125
126        Args:
127            _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
128
129        Returns:
130            Any: The final result of the workflow.
131
132        Example:
133            result = workflow.calculate()
134
135        Note:
136            This method is a convenient way to obtain the final result without detailed information about each step.
137        """
138        output, tmp, tmp = self.run(_force_run_type=_force_run_type)
139        return output
class Workflow:
  5class Workflow:
  6    """
  7    Workflow class that aggregates all the steps of the analysis and the corresponding functions.
  8
  9    The Workflow class is designed to organize and execute a sequence of analysis steps in a workflow.
 10    It allows you to define a series of functions, their arguments, and their dependencies on the previous step's output.
 11    You can run the workflow sequentially and obtain the final result.
 12
 13    Args:
 14        *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs),
 15               where:
 16               - fn (callable): The function to be executed in this step.
 17               - args (tuple): The arguments to be passed to the function.
 18               - kwargs (dict): The keyword arguments to be passed to the function.
 19
 20    Methods:
 21        __init__(*args): Initialize the Workflow object with a list of analysis steps.
 22            - Each arg must be a tuple of three items (fn, args, kwargs).
 23
 24        run(_force_run_type=None): Run the workflow sequentially.
 25            - _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
 26
 27        calculate(_force_run_type=None): Calculate the final result of the workflow.
 28            - _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
 29
 30    Example:
 31        # Define a workflow with three steps
 32        workflow = Workflow(
 33            (step1_function, (arg1,), {"kwarg1": value1}),
 34            (step2_function, ("PREV_RETURN_VALUE_0", arg2), {"kwarg2": value2}),
 35            (step3_function, ("PREV_RETURN_VALUE_1",), {})
 36        )
 37
 38        # Run the workflow and get the final result
 39        result = workflow.calculate()
 40
 41    Note:
 42        - The Workflow class allows you to specify dependencies between steps using "PREV_RETURN_VALUE" placeholders.
 43        - The result of each step is stored and can be accessed later.
 44    """
 45
 46    def __init__(self, *args) -> None:
 47        """
 48        Initialize the Workflow object.
 49
 50        Args:
 51            *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs).
 52
 53        Returns:
 54            None
 55        """
 56
 57        self._methods = []
 58        self._return_values = []
 59
 60        for arg in args:
 61            if isinstance(arg, tuple) and len(arg) == 3:
 62                self._methods.append((arg[0], arg[1], arg[2]))
 63            else:
 64                raise TypeError("Each arg must be a tuple of 3 items (fn, args, kwargs)")
 65
 66    def run(self, _force_run_type=None):
 67        """
 68        Run the workflow sequentially.
 69
 70        Args:
 71            _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
 72
 73        Returns:
 74            Tuple: A tuple containing the final result, the run type of the last step, and the execution time.
 75
 76        Example:
 77            output, run_type, execution_time = workflow.run()
 78
 79        Note:
 80            - The result of each step is stored in self._return_values and can be accessed later.
 81            - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type.
 82        """
 83
 84        for method in self._methods:
 85            fn, args, kwargs = method
 86
 87            # in the list args, substitute 'PREV_RETURN_VALUE' with the return value of the previous method
 88            sane_args = []
 89            for arg in args:
 90                if isinstance(arg, str) and "PREV_RETURN_VALUE" in arg:
 91                    indices = [int(i) for i in arg.split("_") if i.isdigit()]
 92                    return_value = self._return_values[indices[0]][indices[1]]
 93                    sane_args.append(return_value)
 94                else:
 95                    sane_args.append(arg)
 96
 97            # in the dict kwargs, substitute 'PREV_RETURN_VALUE' with the return value of the previous method
 98            for key, value in kwargs.items():
 99                if isinstance(value, str) and "PREV_RETURN_VALUE" in value:
100                    indices = [int(i) for i in value.split("_") if i.isdigit()]
101                    return_value = self._return_values[indices[0]][indices[1]]
102                    kwargs[key] = return_value
103
104            # Get run type from the Agent
105            run_type = Agent.get_run_type(fn, sane_args, kwargs)
106            kwargs["run_type"] = run_type
107
108            if _force_run_type:
109                kwargs["run_type"] = _force_run_type
110
111            # TODO maybe we need to warn the agent its running and when it finishes
112            return_value = fn.run(*sane_args, **kwargs)
113
114            if isinstance(return_value, tuple):
115                self._return_values.append(return_value)
116            else:
117                self._return_values.append((return_value,))
118
119            Agent._inform(fn)
120
121        return self._return_values[-1], fn._last_runtype, fn._last_time
122
123    def calculate(self, _force_run_type=None):
124        """
125        Calculate the final result of the workflow.
126
127        Args:
128            _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
129
130        Returns:
131            Any: The final result of the workflow.
132
133        Example:
134            result = workflow.calculate()
135
136        Note:
137            This method is a convenient way to obtain the final result without detailed information about each step.
138        """
139        output, tmp, tmp = self.run(_force_run_type=_force_run_type)
140        return output

Workflow class that aggregates all the steps of the analysis and the corresponding functions.

The Workflow class is designed to organize and execute a sequence of analysis steps in a workflow. It allows you to define a series of functions, their arguments, and their dependencies on the previous step's output. You can run the workflow sequentially and obtain the final result.

Args: *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs), where: - fn (callable): The function to be executed in this step. - args (tuple): The arguments to be passed to the function. - kwargs (dict): The keyword arguments to be passed to the function.

Methods: __init__(*args): Initialize the Workflow object with a list of analysis steps. - Each arg must be a tuple of three items (fn, args, kwargs).

run(_force_run_type=None): Run the workflow sequentially.
    - _force_run_type (str, optional): Force a specific run type for all steps in the workflow.

calculate(_force_run_type=None): Calculate the final result of the workflow.
    - _force_run_type (str, optional): Force a specific run type for all steps in the workflow.

Example: # Define a workflow with three steps workflow = Workflow( (step1_function, (arg1,), {"kwarg1": value1}), (step2_function, ("PREV_RETURN_VALUE_0", arg2), {"kwarg2": value2}), (step3_function, ("PREV_RETURN_VALUE_1",), {}) )

# Run the workflow and get the final result
result = workflow.calculate()

Note: - The Workflow class allows you to specify dependencies between steps using "PREV_RETURN_VALUE" placeholders. - The result of each step is stored and can be accessed later.

Workflow(*args)
46    def __init__(self, *args) -> None:
47        """
48        Initialize the Workflow object.
49
50        Args:
51            *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs).
52
53        Returns:
54            None
55        """
56
57        self._methods = []
58        self._return_values = []
59
60        for arg in args:
61            if isinstance(arg, tuple) and len(arg) == 3:
62                self._methods.append((arg[0], arg[1], arg[2]))
63            else:
64                raise TypeError("Each arg must be a tuple of 3 items (fn, args, kwargs)")

Initialize the Workflow object.

Args: *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs).

Returns: None

def run(self, _force_run_type=None):
 66    def run(self, _force_run_type=None):
 67        """
 68        Run the workflow sequentially.
 69
 70        Args:
 71            _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
 72
 73        Returns:
 74            Tuple: A tuple containing the final result, the run type of the last step, and the execution time.
 75
 76        Example:
 77            output, run_type, execution_time = workflow.run()
 78
 79        Note:
 80            - The result of each step is stored in self._return_values and can be accessed later.
 81            - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type.
 82        """
 83
 84        for method in self._methods:
 85            fn, args, kwargs = method
 86
 87            # in the list args, substitute 'PREV_RETURN_VALUE' with the return value of the previous method
 88            sane_args = []
 89            for arg in args:
 90                if isinstance(arg, str) and "PREV_RETURN_VALUE" in arg:
 91                    indices = [int(i) for i in arg.split("_") if i.isdigit()]
 92                    return_value = self._return_values[indices[0]][indices[1]]
 93                    sane_args.append(return_value)
 94                else:
 95                    sane_args.append(arg)
 96
 97            # in the dict kwargs, substitute 'PREV_RETURN_VALUE' with the return value of the previous method
 98            for key, value in kwargs.items():
 99                if isinstance(value, str) and "PREV_RETURN_VALUE" in value:
100                    indices = [int(i) for i in value.split("_") if i.isdigit()]
101                    return_value = self._return_values[indices[0]][indices[1]]
102                    kwargs[key] = return_value
103
104            # Get run type from the Agent
105            run_type = Agent.get_run_type(fn, sane_args, kwargs)
106            kwargs["run_type"] = run_type
107
108            if _force_run_type:
109                kwargs["run_type"] = _force_run_type
110
111            # TODO maybe we need to warn the agent its running and when it finishes
112            return_value = fn.run(*sane_args, **kwargs)
113
114            if isinstance(return_value, tuple):
115                self._return_values.append(return_value)
116            else:
117                self._return_values.append((return_value,))
118
119            Agent._inform(fn)
120
121        return self._return_values[-1], fn._last_runtype, fn._last_time

Run the workflow sequentially.

Args: _force_run_type (str, optional): Force a specific run type for all steps in the workflow.

Returns: Tuple: A tuple containing the final result, the run type of the last step, and the execution time.

Example: output, run_type, execution_time = workflow.run()

Note: - The result of each step is stored in self._return_values and can be accessed later. - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type.

def calculate(self, _force_run_type=None):
123    def calculate(self, _force_run_type=None):
124        """
125        Calculate the final result of the workflow.
126
127        Args:
128            _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
129
130        Returns:
131            Any: The final result of the workflow.
132
133        Example:
134            result = workflow.calculate()
135
136        Note:
137            This method is a convenient way to obtain the final result without detailed information about each step.
138        """
139        output, tmp, tmp = self.run(_force_run_type=_force_run_type)
140        return output

Calculate the final result of the workflow.

Args: _force_run_type (str, optional): Force a specific run type for all steps in the workflow.

Returns: Any: The final result of the workflow.

Example: result = workflow.calculate()

Note: This method is a convenient way to obtain the final result without detailed information about each step.