Reference: pipeline elements
Collection of useful modules which can be used as sources/steps
Data sources
- async snap.elements.source.read_array_from_txt(fname: str, size: int, columns: dict, delay: float = 0)
Data source from a text file/input stream. Watch the given file and read the new data in the text table format
- Parameters:
fname –
Path to file to read or “stdin” for reading from standard input. File should contain text data organized in space separated columns, like:
a1 b1 c1 a2 b2 c2 a3 b3 c3 ...
size – Maximal number of rows to read.
columns – Dict of format
"val_name": column_numberdelay – Minimal delay between reading each data chunk, in seconds.
- Yields:
dict with data organized by the columns with column names as key
- async snap.elements.source.read_lines(fname: str, delay: float = 0)
Data source from a text file/input stream. Watch the given file and read the new data in the text table format
- Parameters:
fname – Path to file to read or “stdin” for reading from standard input. File is read line by line and each line is yielded separately.
delay – Minimal delay between reading each data chunk, in seconds.
- Yields:
Each line as str
Timing elements
- snap.elements.timing.every(seconds)
Can be either a source, or a step.
As a source: produce “True” values with given minimum delay.
As a step: get the data from previous step and forward it downstream with given minimum delay.
- Parameters:
seconds (float) – Minimal delay between iterations.
- Input:
(if source) anything
- Output:
data unchanged
Steps for output
- snap.elements.output.dump(prefix='DUMP', rewrite=False)
A processing step. Print the incoming data to stdout with given prefix
- Parameters:
prefix (str) – String prefix before each output
rewrite (bool) – If true, will end this line with r, so it will be overwritten
- Input:
data: to be written as string representation
repr(data)- Output:
data unchanged
- snap.elements.output.dump_to_file(fname)
A processing step. Save the incoming data to a text file
- Parameters:
fname (str) – A file name. File is opened for writing.
- Input:
data to be written as string representation
repr(data)- Output:
data unchanged
Steps for real-time display
Wrappers of the tqdm progress-bar to show the iterations count or data values
- snap.elements.tqdm.counter(**kwargs)
A monitoring step. Display a tqdm progress bar, which is incremented with every data portion. For full arguments list see tqdm documentation
- Keyword Arguments:
desc (str) – Prefix for the progressbar description
total (int or float, optional) – The number of expected iterations (displays progressbar until this number is reached, becomes a ticker afterwards)
- Input:
data (anything)
- Output:
data unchanged
- snap.elements.tqdm.meter(**kwargs)
A monitoring step. Display a tqdm progress bar, where the progress is set by the incoming data (must be float) For full arguments list see tqdm documentation
- Keyword Arguments:
desc (str) – Prefix for the progressbar description
total (int or float, optional) – Maximal value
- Input:
data (float): A value to be displayed
- Output:
data unchanged
Misc processing steps
- snap.elements.misc.run_shell(cmd)
A processing step. Run shell command every time the data arrives.
- Parameters:
cmd (str) – A command to run. If cmd is a template, like
echo {foo} {bar}, data should contain keys “foo” and “bar”- Input:
data(dict) will be used to define the command:
run_cmd = cmd.format(**data)- Output:
(str) stdout of the executed command
Input/output interfaces
Collection of IO interfaces
Interfaces provide similar functions:
send(data)A processing step, which sends the given data object, and also returns the data unchanged, so one can chain several senders/processing steps
recv()A data source, which yields the data objects once they are received
ZeroMQ interface
Wrapper around pyzmq module
- async snap.elements.io.zmq.recv(address: str)
Data source. Receive data from zmq.PULL socket on given address.
- Parameters:
address – socket address to bind, in format
<protocol>/<address>:<port>, for exampletcp:/127.0.0.1:9000- Yields:
received message
- snap.elements.io.zmq.send(address: str)
Processing step. Send data to given addresses via zmq.PUSH socket
- Parameters:
address – socket address to connect, in format
<protocol>/<address>:<port>, for exampletcp:/127.0.0.1:9000- Input:
data(any pickle-serializable object) to be sent
- Output:
data unchanged