nanopyx.methods.workflow
1from ..__agent__ import Agent 2 3 4class Workflow: 5 """ 6 Workflow class that aggregates all the steps of the analysis and the corresponding functions. 7 8 The Workflow class is designed to organize and execute a sequence of analysis steps in a workflow. 9 It allows you to define a series of functions, their arguments, and their dependencies on the previous step's output. 10 You can run the workflow sequentially and obtain the final result. 11 12 Args: 13 *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs), 14 where: 15 - fn (callable): The function to be executed in this step. 16 - args (tuple): The arguments to be passed to the function. 17 - kwargs (dict): The keyword arguments to be passed to the function. 18 19 Methods: 20 __init__(*args): Initialize the Workflow object with a list of analysis steps. 21 - Each arg must be a tuple of three items (fn, args, kwargs). 22 23 run(_force_run_type=None): Run the workflow sequentially. 24 - _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 25 26 calculate(_force_run_type=None): Calculate the final result of the workflow. 27 - _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 28 29 Example: 30 # Define a workflow with three steps 31 workflow = Workflow( 32 (step1_function, (arg1,), {"kwarg1": value1}), 33 (step2_function, ("PREV_RETURN_VALUE_0", arg2), {"kwarg2": value2}), 34 (step3_function, ("PREV_RETURN_VALUE_1",), {}) 35 ) 36 37 # Run the workflow and get the final result 38 result = workflow.calculate() 39 40 Note: 41 - The Workflow class allows you to specify dependencies between steps using "PREV_RETURN_VALUE" placeholders. 42 - The result of each step is stored and can be accessed later. 43 """ 44 45 def __init__(self, *args) -> None: 46 """ 47 Initialize the Workflow object. 48 49 Args: 50 *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs). 51 52 Returns: 53 None 54 """ 55 56 self._methods = [] 57 self._return_values = [] 58 59 for arg in args: 60 if isinstance(arg, tuple) and len(arg) == 3: 61 self._methods.append((arg[0], arg[1], arg[2])) 62 else: 63 raise TypeError("Each arg must be a tuple of 3 items (fn, args, kwargs)") 64 65 def run(self, _force_run_type=None): 66 """ 67 Run the workflow sequentially. 68 69 Args: 70 _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 71 72 Returns: 73 Tuple: A tuple containing the final result, the run type of the last step, and the execution time. 74 75 Example: 76 output, run_type, execution_time = workflow.run() 77 78 Note: 79 - The result of each step is stored in self._return_values and can be accessed later. 80 - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type. 81 """ 82 83 for method in self._methods: 84 fn, args, kwargs = method 85 86 # in the list args, substitute 'PREV_RETURN_VALUE' with the return value of the previous method 87 sane_args = [] 88 for arg in args: 89 if isinstance(arg, str) and "PREV_RETURN_VALUE" in arg: 90 indices = [int(i) for i in arg.split("_") if i.isdigit()] 91 return_value = self._return_values[indices[0]][indices[1]] 92 sane_args.append(return_value) 93 else: 94 sane_args.append(arg) 95 96 # in the dict kwargs, substitute 'PREV_RETURN_VALUE' with the return value of the previous method 97 for key, value in kwargs.items(): 98 if isinstance(value, str) and "PREV_RETURN_VALUE" in value: 99 indices = [int(i) for i in value.split("_") if i.isdigit()] 100 return_value = self._return_values[indices[0]][indices[1]] 101 kwargs[key] = return_value 102 103 # Get run type from the Agent 104 run_type = Agent.get_run_type(fn, sane_args, kwargs) 105 kwargs["run_type"] = run_type 106 107 if _force_run_type: 108 kwargs["run_type"] = _force_run_type 109 110 # TODO maybe we need to warn the agent its running and when it finishes 111 return_value = fn.run(*sane_args, **kwargs) 112 113 if isinstance(return_value, tuple): 114 self._return_values.append(return_value) 115 else: 116 self._return_values.append((return_value,)) 117 118 Agent._inform(fn) 119 120 return self._return_values[-1], fn._last_runtype, fn._last_time 121 122 def calculate(self, _force_run_type=None): 123 """ 124 Calculate the final result of the workflow. 125 126 Args: 127 _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 128 129 Returns: 130 Any: The final result of the workflow. 131 132 Example: 133 result = workflow.calculate() 134 135 Note: 136 This method is a convenient way to obtain the final result without detailed information about each step. 137 """ 138 output, tmp, tmp = self.run(_force_run_type=_force_run_type) 139 return output
5class Workflow: 6 """ 7 Workflow class that aggregates all the steps of the analysis and the corresponding functions. 8 9 The Workflow class is designed to organize and execute a sequence of analysis steps in a workflow. 10 It allows you to define a series of functions, their arguments, and their dependencies on the previous step's output. 11 You can run the workflow sequentially and obtain the final result. 12 13 Args: 14 *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs), 15 where: 16 - fn (callable): The function to be executed in this step. 17 - args (tuple): The arguments to be passed to the function. 18 - kwargs (dict): The keyword arguments to be passed to the function. 19 20 Methods: 21 __init__(*args): Initialize the Workflow object with a list of analysis steps. 22 - Each arg must be a tuple of three items (fn, args, kwargs). 23 24 run(_force_run_type=None): Run the workflow sequentially. 25 - _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 26 27 calculate(_force_run_type=None): Calculate the final result of the workflow. 28 - _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 29 30 Example: 31 # Define a workflow with three steps 32 workflow = Workflow( 33 (step1_function, (arg1,), {"kwarg1": value1}), 34 (step2_function, ("PREV_RETURN_VALUE_0", arg2), {"kwarg2": value2}), 35 (step3_function, ("PREV_RETURN_VALUE_1",), {}) 36 ) 37 38 # Run the workflow and get the final result 39 result = workflow.calculate() 40 41 Note: 42 - The Workflow class allows you to specify dependencies between steps using "PREV_RETURN_VALUE" placeholders. 43 - The result of each step is stored and can be accessed later. 44 """ 45 46 def __init__(self, *args) -> None: 47 """ 48 Initialize the Workflow object. 49 50 Args: 51 *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs). 52 53 Returns: 54 None 55 """ 56 57 self._methods = [] 58 self._return_values = [] 59 60 for arg in args: 61 if isinstance(arg, tuple) and len(arg) == 3: 62 self._methods.append((arg[0], arg[1], arg[2])) 63 else: 64 raise TypeError("Each arg must be a tuple of 3 items (fn, args, kwargs)") 65 66 def run(self, _force_run_type=None): 67 """ 68 Run the workflow sequentially. 69 70 Args: 71 _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 72 73 Returns: 74 Tuple: A tuple containing the final result, the run type of the last step, and the execution time. 75 76 Example: 77 output, run_type, execution_time = workflow.run() 78 79 Note: 80 - The result of each step is stored in self._return_values and can be accessed later. 81 - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type. 82 """ 83 84 for method in self._methods: 85 fn, args, kwargs = method 86 87 # in the list args, substitute 'PREV_RETURN_VALUE' with the return value of the previous method 88 sane_args = [] 89 for arg in args: 90 if isinstance(arg, str) and "PREV_RETURN_VALUE" in arg: 91 indices = [int(i) for i in arg.split("_") if i.isdigit()] 92 return_value = self._return_values[indices[0]][indices[1]] 93 sane_args.append(return_value) 94 else: 95 sane_args.append(arg) 96 97 # in the dict kwargs, substitute 'PREV_RETURN_VALUE' with the return value of the previous method 98 for key, value in kwargs.items(): 99 if isinstance(value, str) and "PREV_RETURN_VALUE" in value: 100 indices = [int(i) for i in value.split("_") if i.isdigit()] 101 return_value = self._return_values[indices[0]][indices[1]] 102 kwargs[key] = return_value 103 104 # Get run type from the Agent 105 run_type = Agent.get_run_type(fn, sane_args, kwargs) 106 kwargs["run_type"] = run_type 107 108 if _force_run_type: 109 kwargs["run_type"] = _force_run_type 110 111 # TODO maybe we need to warn the agent its running and when it finishes 112 return_value = fn.run(*sane_args, **kwargs) 113 114 if isinstance(return_value, tuple): 115 self._return_values.append(return_value) 116 else: 117 self._return_values.append((return_value,)) 118 119 Agent._inform(fn) 120 121 return self._return_values[-1], fn._last_runtype, fn._last_time 122 123 def calculate(self, _force_run_type=None): 124 """ 125 Calculate the final result of the workflow. 126 127 Args: 128 _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 129 130 Returns: 131 Any: The final result of the workflow. 132 133 Example: 134 result = workflow.calculate() 135 136 Note: 137 This method is a convenient way to obtain the final result without detailed information about each step. 138 """ 139 output, tmp, tmp = self.run(_force_run_type=_force_run_type) 140 return output
Workflow class that aggregates all the steps of the analysis and the corresponding functions.
The Workflow class is designed to organize and execute a sequence of analysis steps in a workflow. It allows you to define a series of functions, their arguments, and their dependencies on the previous step's output. You can run the workflow sequentially and obtain the final result.
Args: *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs), where: - fn (callable): The function to be executed in this step. - args (tuple): The arguments to be passed to the function. - kwargs (dict): The keyword arguments to be passed to the function.
Methods: __init__(*args): Initialize the Workflow object with a list of analysis steps. - Each arg must be a tuple of three items (fn, args, kwargs).
run(_force_run_type=None): Run the workflow sequentially.
- _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
calculate(_force_run_type=None): Calculate the final result of the workflow.
- _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
Example: # Define a workflow with three steps workflow = Workflow( (step1_function, (arg1,), {"kwarg1": value1}), (step2_function, ("PREV_RETURN_VALUE_0", arg2), {"kwarg2": value2}), (step3_function, ("PREV_RETURN_VALUE_1",), {}) )
# Run the workflow and get the final result
result = workflow.calculate()
Note: - The Workflow class allows you to specify dependencies between steps using "PREV_RETURN_VALUE" placeholders. - The result of each step is stored and can be accessed later.
46 def __init__(self, *args) -> None: 47 """ 48 Initialize the Workflow object. 49 50 Args: 51 *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs). 52 53 Returns: 54 None 55 """ 56 57 self._methods = [] 58 self._return_values = [] 59 60 for arg in args: 61 if isinstance(arg, tuple) and len(arg) == 3: 62 self._methods.append((arg[0], arg[1], arg[2])) 63 else: 64 raise TypeError("Each arg must be a tuple of 3 items (fn, args, kwargs)")
Initialize the Workflow object.
Args: *args: Variable-length arguments. Each argument is expected to be a tuple of three items (fn, args, kwargs).
Returns: None
66 def run(self, _force_run_type=None): 67 """ 68 Run the workflow sequentially. 69 70 Args: 71 _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 72 73 Returns: 74 Tuple: A tuple containing the final result, the run type of the last step, and the execution time. 75 76 Example: 77 output, run_type, execution_time = workflow.run() 78 79 Note: 80 - The result of each step is stored in self._return_values and can be accessed later. 81 - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type. 82 """ 83 84 for method in self._methods: 85 fn, args, kwargs = method 86 87 # in the list args, substitute 'PREV_RETURN_VALUE' with the return value of the previous method 88 sane_args = [] 89 for arg in args: 90 if isinstance(arg, str) and "PREV_RETURN_VALUE" in arg: 91 indices = [int(i) for i in arg.split("_") if i.isdigit()] 92 return_value = self._return_values[indices[0]][indices[1]] 93 sane_args.append(return_value) 94 else: 95 sane_args.append(arg) 96 97 # in the dict kwargs, substitute 'PREV_RETURN_VALUE' with the return value of the previous method 98 for key, value in kwargs.items(): 99 if isinstance(value, str) and "PREV_RETURN_VALUE" in value: 100 indices = [int(i) for i in value.split("_") if i.isdigit()] 101 return_value = self._return_values[indices[0]][indices[1]] 102 kwargs[key] = return_value 103 104 # Get run type from the Agent 105 run_type = Agent.get_run_type(fn, sane_args, kwargs) 106 kwargs["run_type"] = run_type 107 108 if _force_run_type: 109 kwargs["run_type"] = _force_run_type 110 111 # TODO maybe we need to warn the agent its running and when it finishes 112 return_value = fn.run(*sane_args, **kwargs) 113 114 if isinstance(return_value, tuple): 115 self._return_values.append(return_value) 116 else: 117 self._return_values.append((return_value,)) 118 119 Agent._inform(fn) 120 121 return self._return_values[-1], fn._last_runtype, fn._last_time
Run the workflow sequentially.
Args: _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
Returns: Tuple: A tuple containing the final result, the run type of the last step, and the execution time.
Example: output, run_type, execution_time = workflow.run()
Note: - The result of each step is stored in self._return_values and can be accessed later. - The run type of each step is determined using Agent.get_run_type() and can be overridden with _force_run_type.
123 def calculate(self, _force_run_type=None): 124 """ 125 Calculate the final result of the workflow. 126 127 Args: 128 _force_run_type (str, optional): Force a specific run type for all steps in the workflow. 129 130 Returns: 131 Any: The final result of the workflow. 132 133 Example: 134 result = workflow.calculate() 135 136 Note: 137 This method is a convenient way to obtain the final result without detailed information about each step. 138 """ 139 output, tmp, tmp = self.run(_force_run_type=_force_run_type) 140 return output
Calculate the final result of the workflow.
Args: _force_run_type (str, optional): Force a specific run type for all steps in the workflow.
Returns: Any: The final result of the workflow.
Example: result = workflow.calculate()
Note: This method is a convenient way to obtain the final result without detailed information about each step.