Internals Reference

Servers

class mario.servers.Pipeline(name, concurrency=10, backend_class=<class 'mario.storage.inmemory.EphemeralStorageBackend'>)[source]

Pipeline server class

A pipeline must be defined only after you already at least one Step.

handle_finished_job(job)[source]

called when a job just finished processing.

When overriding this method make sure to call super() first

initialize()[source]

initializes the backend. Subclasses can overload this in order to define their own backends

on_finished(event)[source]

called when a job just finished processing. You can override this at will

on_started(event)[source]

called when a job just started processing.

This method is ok to be overriden by subclasses in order to take action appropriate action.

Clients

class mario.clients.PipelineClient(pull_connect_address)[source]

Pipeline client

Has the ability to push jobs to a pipeline server

connect()[source]

connects to the server

enqueue_job(data)[source]

pushes a job to the pipeline.

Note that the data must be a dictionary with the following
keys:
  • name - the pipeline name
  • instructions - a dictionary with instructions for the first step to execute
Parameters:data – the dictionary with the formatted payload.
Returns:the payload sent to the server, which contains the job id

EXAMPLE:

>>> from mario.clients import PipelineClient

>>> properly_formatted = {
...     "name": "example1",
...     "instructions": {
...          "size": 100",
...     },
... }
>>> client = PipelineClient('tcp://127.0.0.1:5050')
>>> client.connect()
>>> ok, payload_sent = client.enqueue_job(properly_formatted)

Storage Backends

class mario.storage.BaseStorageBackend(name, *args, **kw)[source]

base class for storage backends

connect()[source]

this method is called by the pipeline once it started to listen on zmq sockets, so this is also an appropriate time to implement your own connection to a database in a backend subclass pass

consume_job_of_type(job_type)[source]

dequeues a job for the given type. must return None when no job is ready.

Make sure to requeue this job in case it could not be fed into an immediate worker.

enqueue_job(job)[source]

adds the job to its appropriate queue name

get_next_available_worker_for_type(job_type)[source]

randomly picks a workers that is currently available

initialize()[source]

backend-specific constructor. This method must be overriden by subclasses in order to setup database connections and such

register_worker(worker)[source]

register the worker as available. must return a boolean. True if the worker was successfully registered, False otherwise

unregister_worker(worker)[source]

unregisters the worker completely, removing it from the roster

class mario.storage.EphemeralStorageBackend(name, *args, **kw)[source]

in-memory storage backend. It dies with the process and has no option for persistence whatsoever. Used only for testing purposes.

class mario.storage.RedisStorageBackend(name, *args, **kw)[source]

Redis Storage Backend

Utilities

class mario.util.CompressedPickle(*args, **kw)[source]

Serializes to and from zlib compressed pickle

mario.util.parse_port(address)[source]

parses the port from a zmq tcp address

Parameters:address – the string of address
Returns:an int or None
mario.util.read_internal_file(path)[source]

reads an internal file, mostly used for loading lua scripts

mario.util.sanitize_name(name)[source]

ensures that a job type or pipeline name are safe for storage and handling.

Parameters:name – the string
Returns:a safe string

Console

mario.console.servers.execute_command_forwarder()[source]

executes an instance of subscriber/publisher forwarder for scaling communications between multiple minions and masters.

Parameters:
  • --subscriber – the address where the forwarder subscriber where master servers can connect to.
  • --publisher – the address where the forwarder publisher where minion servers can connect to.
$ mario forwarder \
    --subscriber=tcp://0.0.0.0:6000 \
    --publisher=tcp://0.0.0.0:6060 \
    --subscriber-hwm=1000 \
    --publisher-hwm=1000 \
mario.console.servers.execute_command_run_pipeline()[source]

executes an instance of the pipeline manager server.

Parameters:--sub-bind – address where the server will listen to announcements from Steps
$ mario pipeline \
    --sub-bind=tcp://0.0.0.0:6000 \
    --job-pull-bind=tcp://0.0.0.0:5050
mario.console.servers.execute_command_run_step()[source]

executes an instance of the step server.

Parameters:--pub-bind – address where the server will listen to announcements from Steps
$ mario step \
    --pub-connect=tcp://127.0.0.1:6000
    # --push-connect=tcp://192.168.0.10:3000 # optional (can be used multiple times)
    # --pullf-connect=tcp://192.168.0.10:5050 # optional (can be used multiple times)
    # --pull-bind=tcp://0.0.0.0:5050    # optional (can be used only once)
mario.console.servers.execute_command_streamer()[source]

executes an instance of pull/push streamer for scaling pipelines and/or steps

Parameters:
  • --pull – the address where the streamer pull where master servers can connect to.
  • --push – the address where the streamer push where minion servers can connect to.
$ mario streamer \
    --pull=tcp://0.0.0.0:5050 \
    --push=tcp://0.0.0.0:6060 \
    --pull-hwm=1000 \
    --push-hwm=1000 \
mario.console.clients.execute_command_enqueue()[source]

executes an instance of the pipeline manager server.

$ mario enqueue tcp://0.0.0.0:5050 <pipeline-name> [json instructions]