1. 程式人生 > >How to create beautiful pipelines on Elixir with Opus

How to create beautiful pipelines on Elixir with Opus

Use case

At Quiqup, we have a business model that requires our drivers to perform different kinds of work. For example, depending on who is ordering, our drivers might need to just collect pre-ordered food from a restaurant or walk into a store and pick many items. Also, sometimes they need to pay at the venue on behalf of the customer and if so, we need to let them know. Other times, they don’t. Those are just a taste of the many decisions our drivers and support team have to make in just one step of each delivery.

Faced with such a complex situation with so many moving parts, we decided to create pipeline modules to which we could refer and know, without going deep into the implementation, how the process for such decisions works.

This is where the with statement shines. We came up with something like:

Not the prettiest of sights — we know. Though not completely impossible to read, we can imagine how awful it would be to maintain and expand as our business requirements get more complex. Not to mention the Pandora’s Box that functions like create_next_stage?

and create_stage may open as more and more decisions are added to the mix.

Opus to the rescue

In brief, the equivalent of that code example mentioned above using Opus would be something like:

Before digging into each line of the code above, let’s understand the basics.

How Opus works

Note: this article was written for the version 0.5.1

Opus’s core aspect is Opus.Pipeline and you can use it in a module so that it works as a pipeline. By pipeline, we mean that the initial input we need to provide will pass through different stages inside this pipeline, and each stage will provide an output which is subsequently the input for the next stage.

Take a look at the image below extracted from the official docs:

Also, it’s worth mentioning that a pipeline is not a process and it does not hold state.

By default, a pipeline will not leak any exception that might be raised inside it. Instead, it will only return either {:ok, any()} or {:error, %Opus.PipelineError{}}. But you can change this behaviour by making Opus leak either any exceptions or specific ones, for the entire pipeline or for specific stages. More details about the implementation below.

When an {:error, %Opus.PipelineError{}} is returned, it means that Opus stopped at a certain stage and didn't run the following ones.

To call the pipeline and inserting the initial output, you call the function call/1 implemented by Opus on your module when you use Opus.Pipeline in the module:

After that, Opus will call the first stage. In our example, it is the check stage. Each stage has its own way to work (more details below).

The stages

In general, each stage will receive an atomized function name to be executed (eg. :validate_order). All those functions must be implemented with an arity of 1, and the given argument will be the current pipeline data. It is convenient to insert either a struct or a map to the pipeline.

That said, this is how it works (check out all the details in the official doc):

check

The function must return a boolean. If true, Opus will allow the pipeline to continue and will call the next stage giving the same pipeline data as it was given to this stage. If false, Opus will halt the pipeline and will return a tuple {:error, %Opus.PipelineError{}}.

You may choose to pass an error_message: atom | String.t option to this stage in order to match the validation error, eg: check :valid_params?, error_message: :invalid_initial_input or to provide an elegant error to be used, eg: check :valid_params?, error_message: "Invalid input. Expected :order to be given.". The given value will be returned inside %Opus.PipelineError{}.

step

The function will perform any action and will return a new pipeline data to be received by the next stage. That said, you can return the same received pipeline with more data in it, or a new map with another data and the initial input will be lost.

This stage is also useful to just add new data to the pipeline without making anything special. In our example, we have step :assign_courier which is doing that, like this:

This is useful for decoupling the access of something in the pipeline or to add a flag to help you to take decisions on the next stages, like:

link

Receives another Opus pipeline module. This is useful when you want to perform something more complex at any time in the pipeline and you want to use another pipeline for that. Opus will call its .call/1 function and the return of the last stage of this module will be the new pipeline data for the current module.

This was important for us because we could perform a different kind of persistence depending on the slug in the next order stage. So, instead of having a common step :persist_stage we came up with something like:

And this call?/1 (which, admittedly, can be confusing) was implemented so that we could use the if option that Opus gives us (more details below). This allows us to check if we should call that specific persistence, based on what we had in the current pipeline data.

tee

This is the same as step except if anything bad happens, Opus won't halt the pipeline or return any {:error, _}. This is useful when you want to perform something that might be wrong, but you don't want to treat it as an error. Instead, you can log it somewhere and move on.

In our case, we wanted to call an external service, and because nothing is available 100% of the time, we don’t want to halt our pipeline or return an error for something we can try again later on.

Stage options

Opus gives you some incredible options in each stage, such as performing an action with an anonymous function, or using conditionals to only deploy a certain stage when the conditional is true (as we saw above).

Check out the official documentation for more info on this, too.

Putting it all together, here is what you can do:

We used:

  • with: accepts an anonymous function to be executed, avoiding the need for defining that function name you gave as the argument;
  • error_message: since Opus has its own convention to create the error message when receiving {:error, %Opus.PipelineError{}}, you can define your own error message for a stage;
  • raise: Opus won’t raise any error unless you allow it. That said, you can give a value true (default is false), then Opus won't catch any raised exception for that stage, or you can give a list of exceptions so Opus won't catch them either.

This option can be given for the use macro, like:

  • instrument?: if you want to disable instrumentation (more below), set it to false (default is true).
  • if: only executes that stage when a given conditional is true.
  • retry_times: super useful when you need to perform something external that might not be available (must be an integer).
  • retry_backoff: the delay between each retry. Works together with retry_times and it receives either an atom related to a function or an anonymous function. This function must return an Enumerable.t with as many numbers as retries given to retry_times. Opus uses ElixirRetry under the hood, which allows you to use functions like lin_backoff/2 and cap/2.

Calling a specific stage

For testing purposes, it is super helpful to call a specific stage or call the entire pipeline except for a certain stage. For example, in our case, we used to split the pipeline into two or more blocks and test each part alone. And thankfully, Opus gives us that power:

Instrumentation

This is probably one of the most amazing features of Opus. It allows us to create instrumentation inside our pipeline or completely decouple.

Basically, there are three hooks, before_stage, stage_skipped, stage_completed, pipeline_started and pipeline_completed, and you can "listen" to them inside the module, as:

or create an instrumentation module by adding it to your config/*.exs, as:

config :opus, :instrumentation, [PipelineInstrumentation]

You also can disable the instrumentation for a certain stage as mentioned in the options above, or for the entire module, like:

Check out the documentation for more useful examples on instrumentations.

Graph

Opus implements a pipeline flow visualization with the help of Graphviz. As mentioned above, we employ complex business logic for creating tasks our beloved couriers must carry out and this feature is very helpful as an attachment to the internal documentation of our products.

Run Opus.Graph.generate(:your_app) and that’s when all the magic happens. This command will return a statused tuple with the generated image.

This is what it looks like:

Check out the documentation to see another example.

What’s next?

Opus has 0.6.0 on the way to be published soon and it is going to add a new stage called skip. This is how it's gonna work:

Instead of receiving a function name, it will receive an if: :function, which makes skip more of a flow control than a stage itself. If this function returns true, then the pipeline is halted but a {:ok, :skipped} is returned by Opus.

skip should be used when you want to check but don't want to return an error when the check fails. The next stage will receive the current pipeline data, as it happens on check stage.

If you want to play with it, you can try to install Opus referencing the master branch.

Conclusion

Opus helped us to write readable, decoupled and maintainable pipeline modules. I can’t think of any other way we could expand our logic and implement them so easily and cleanly.

It would be great to see other use cases and thoughts around Opus, so if you’ve played with it at some point, do share some code with us!

Happy code and long live the Alchemy o/