Reference: pipeline elements

Collection of useful modules which can be used as sources/steps

Data sources

async snap.elements.source.read_array_from_txt(fname: str, size: int, columns: dict, delay: float = 0)

Data source from a text file/input stream. Watch the given file and read the new data in the text table format

Parameters:
  • fname

    Path to file to read or “stdin” for reading from standard input. File should contain text data organized in space separated columns, like:

    a1 b1 c1
    a2 b2 c2
    a3 b3 c3
    ...
    

  • size – Maximal number of rows to read.

  • columns – Dict of format "val_name": column_number

  • delay – Minimal delay between reading each data chunk, in seconds.

Yields:

dict with data organized by the columns with column names as key

async snap.elements.source.read_lines(fname: str, delay: float = 0)

Data source from a text file/input stream. Watch the given file and read the new data in the text table format

Parameters:
  • fname – Path to file to read or “stdin” for reading from standard input. File is read line by line and each line is yielded separately.

  • delay – Minimal delay between reading each data chunk, in seconds.

Yields:

Each line as str

Timing elements

snap.elements.timing.every(seconds)

Can be either a source, or a step.

  • As a source: produce “True” values with given minimum delay.

  • As a step: get the data from previous step and forward it downstream with given minimum delay.

Parameters:

seconds (float) – Minimal delay between iterations.

Input:

(if source) anything

Output:

data unchanged

Steps for output

snap.elements.output.dump(prefix='DUMP', rewrite=False)

A processing step. Print the incoming data to stdout with given prefix

Parameters:
  • prefix (str) – String prefix before each output

  • rewrite (bool) – If true, will end this line with r, so it will be overwritten

Input:

data: to be written as string representation repr(data)

Output:

data unchanged

snap.elements.output.dump_to_file(fname)

A processing step. Save the incoming data to a text file

Parameters:

fname (str) – A file name. File is opened for writing.

Input:

data to be written as string representation repr(data)

Output:

data unchanged

Steps for real-time display

Wrappers of the tqdm progress-bar to show the iterations count or data values

snap.elements.tqdm.counter(**kwargs)

A monitoring step. Display a tqdm progress bar, which is incremented with every data portion. For full arguments list see tqdm documentation

Keyword Arguments:
  • desc (str) – Prefix for the progressbar description

  • total (int or float, optional) – The number of expected iterations (displays progressbar until this number is reached, becomes a ticker afterwards)

Input:

data (anything)

Output:

data unchanged

snap.elements.tqdm.meter(**kwargs)

A monitoring step. Display a tqdm progress bar, where the progress is set by the incoming data (must be float) For full arguments list see tqdm documentation

Keyword Arguments:
  • desc (str) – Prefix for the progressbar description

  • total (int or float, optional) – Maximal value

Input:

data (float): A value to be displayed

Output:

data unchanged

Misc processing steps

snap.elements.misc.run_shell(cmd)

A processing step. Run shell command every time the data arrives.

Parameters:

cmd (str) – A command to run. If cmd is a template, like echo {foo} {bar}, data should contain keys “foo” and “bar”

Input:

data(dict) will be used to define the command: run_cmd = cmd.format(**data)

Output:

(str) stdout of the executed command

Input/output interfaces

Collection of IO interfaces

Interfaces provide similar functions:

  • send(data)

    A processing step, which sends the given data object, and also returns the data unchanged, so one can chain several senders/processing steps

  • recv()

    A data source, which yields the data objects once they are received

ZeroMQ interface

Wrapper around pyzmq module

async snap.elements.io.zmq.recv(address: str)

Data source. Receive data from zmq.PULL socket on given address.

Parameters:

address – socket address to bind, in format <protocol>/<address>:<port>, for example tcp:/127.0.0.1:9000

Yields:

received message

snap.elements.io.zmq.send(address: str)

Processing step. Send data to given addresses via zmq.PUSH socket

Parameters:

address – socket address to connect, in format <protocol>/<address>:<port>, for example tcp:/127.0.0.1:9000

Input:

data(any pickle-serializable object) to be sent

Output:

data unchanged