Sarracenia Programming Guide

Working with Plugins

FIXME: Missing sections are highlighted by FIXME. What is here should be accurate!

Revision Record

version: 2.18.05b4
date:June 2018

Audience

Readers of this manual should be comfortable with light scripting in python version 3. Sarracenia includes a number of points where processing can be customized by small snippets of user provided code, known as plugins. The plugins themselves are expected to be concise, and an elementary knowledge of python should suffice to build new plugins in a copy/paste manner, with many samples being available to read.

Plugin Script Ideas

Examples of things that would be fun to do with plugins.

  • Common Alerting Protocol or CAP, is an XML format that provides a warnings for many types of events, indicating the area of coverage. There is a 'polygon' field in the warning, that the source could add to messages using an on_post plugin. Subscribers would have access to the 'polygon' header through use of an on_message plugin, enabling them determine whether the alert affected an area of interest without downloading the entire warning.
  • A source that applies compression to products before posting, could add a header such as 'uncompressed_size' and 'uncompressed_sum' to allow subscribers, with an on_message plugin to compare a file that has been locally uncompressed to an upstream file offerred in compressed form.

Introduction

A Sarracenia data pump is a web server with notifications for subscribers to know, quickly, when new data has arrived. To find out what data is already available on a pump, view the tree with a web browser. For simple immediate needs, one can download data using the browser itself, or a standard tool such as wget. The usual intent is for sr_subscribe to automatically download the data wanted to a directory on a subscriber machine where other software can process it.

Often, the purpose of automated downloading is to have other code ingest the files and perform further processing. Rather than having a separate process have to look at a file in a directory, One can insert customized processing at various points:

Examples are available using the list command:

blacklab% sr_subscribe list

packaged plugins: ( /home/peter/src/sarracenia/sarra/plugins )
      bad_plugin1.py       bad_plugin2.py       bad_plugin3.py                cp.py     destfn_sample.py
      download_cp.py       download_dd.py      download_scp.py     download_wget.py          file_age.py
       file_check.py          file_log.py       file_rxpipe.py        file_total.py           harness.py
         hb_cache.py            hb_log.py         hb_memory.py          hb_pulse.py         html_page.py
         line_log.py         line_mode.py               log.py         msg_2http.py        msg_2local.py
   msg_2localfile.py     msg_auditflow.py     msg_by_source.py       msg_by_user.py         msg_delay.py
       msg_delete.py      msg_download.py          msg_dump.py        msg_fdelay.py msg_filter_wmo2msc.py
 msg_from_cluster.py     msg_hour_tree.py           msg_log.py     msg_print_lag.py   msg_rename4jicc.py
   msg_rename_dmf.py msg_rename_whatfn.py       msg_renamer.py msg_replace_new_dir.py          msg_save.py
     msg_skip_old.py        msg_speedo.py msg_sundew_pxroute.py    msg_test_retry.py   msg_to_clusters.py
        msg_total.py        part_check.py  part_clamav_scan.py        poll_pulse.py       poll_script.py
   post_hour_tree.py          post_log.py    post_long_flow.py     post_override.py   post_rate_limit.py
       post_total.py         watch_log.py              wget.py

configuration examples: ( /home/peter/src/sarracenia/sarra/examples/subscribe )
            all.conf     all_but_cap.conf            amis.conf            aqhi.conf             cap.conf
     cclean_f91.conf       cdnld_f21.conf       cfile_f44.conf        citypage.conf       clean_f90.conf
           cmml.conf cscn22_bulletins.conf         ftp_f70.conf            gdps.conf         ninjo-a.conf
          q_f71.conf           radar.conf            rdps.conf            swob.conf           t_f30.conf
     u_sftp_f60.conf
user plugins: ( /home/peter/.config/sarra/plugins )
        destfn_am.py         destfn_nz.py       msg_tarpush.py              wget.py

general: ( /home/peter/.config/sarra )
          admin.conf     credentials.conf         default.conf

user configurations: ( /home/peter/.config/sarra/subscribe )
     cclean_f91.conf       cdnld_f21.conf       cfile_f44.conf       clean_f90.conf         ftp_f70.conf
          q_f71.conf           t_f30.conf      u_sftp_f60.conf

blacklab%

The packages plugins are shown in the first grouping of available ones. Many of them have arguments which are documented by listing them. For example:

blacklab% sr_subscribe list msg_log.py
#!/usr/bin/python3

"""
  the default on_msg handler for subscribers.  Prints a simple notice.

"""

class Msg_Log(object):  # Mandatory: declare a class, with a capital letter in it.

    def __init__(self,parent):  # Mandatory: declare a constructor.
        parent.logger.debug("msg_log initialized")

    def on_message(self,parent):  # Mandatory: Declare an function to be called when messages are accepted.
        msg = parent.msg
        parent.logger.info("msg_log received: %s %s%s topic=%s lag=%g %s" % \
           tuple( msg.notice.split()[0:3] + [ msg.topic, msg.get_elapse(), msg.hdrstr ] ) )
        return True

msg_log = Msg_Log(self)   # Mandatory: Declare a variable of the class.

self.on_message = msg_log.on_message # Mandatory: assign the function to the entry point.

blacklab%

To modify it, copy it from the examples directory installed as part of the package to the editable preference one:

blacklab% sr_subscribe add msg_log.py

And then modify it for the purpose:

blacklab% sr_subscribe edit msg_log.py

The msg_log.py plugin above is a single entry one. For single entry point plugins, consult bad_plugins1, 2, and 3 to identify the mandatory elements. as one would imagine, all the plugins that begin with msg_ are for on_msg events. Similarly, file_ ones provide examples of on_file, etc... for the other types of single entry plugins.

If a plugin doesn't have such a prefix, there is a second form of plugins called simply a plugin, where a group of routines to implement an overall function. Examine The log.py and wget.py routines for examples of this format.

One can also see which plugins are active in a configuration by looking at the messages on startup:

blacklab% sr_subscribe foreground clean_f90
2018-01-08 01:21:34,763 [INFO] sr_subscribe clean_f90 start
2018-01-08 01:21:34,763 [INFO] log settings start for sr_subscribe (version: 2.18.01a3):
2018-01-08 01:21:34,763 [INFO]  inflight=.tmp events=create|delete|link|modify use_pika=False
2018-01-08 01:21:34,763 [INFO]  suppress_duplicates=False retry_mode=True retry_ttl=900000
2018-01-08 01:21:34,763 [INFO]  expire=900000 reset=False message_ttl=None prefetch=1 accept_unmatch=False delete=False
2018-01-08 01:21:34,763 [INFO]  heartbeat=300 default_mode=000 default_mode_dir=775 default_mode_log=600 discard=False durable=True
2018-01-08 01:21:34,763 [INFO]  preserve_mode=True preserve_time=True realpath=False base_dir=None follow_symlinks=False
2018-01-08 01:21:34,763 [INFO]  mirror=True flatten=/ realpath=False strip=0 base_dir=None report_back=True
2018-01-08 01:21:34,763 [INFO]  Plugins configured:
2018-01-08 01:21:34,763 [INFO]      do_download:
2018-01-08 01:21:34,763 [INFO]      on_message: Msg_FDelay Msg_2LocalFile Msg_AuditFlow Msg_Delete
2018-01-08 01:21:34,764 [INFO]      on_part:
2018-01-08 01:21:34,764 [INFO]      on_file: File_Log
2018-01-08 01:21:34,764 [INFO]      on_post: Post_Log
2018-01-08 01:21:34,764 [INFO]      on_heartbeat: Hb_Log Hb_Memory Hb_Pulse
2018-01-08 01:21:34,764 [INFO] log_settings end.
2018-01-08 01:21:34,764 [INFO] sr_subscribe run
2018-01-08 01:21:34,764 [INFO] AMQP  broker(localhost) user(tsub) vhost(/)
2018-01-08 01:21:34,766 [INFO] Binding queue q_tsub.sr_subscribe.clean_f90.39474678.00703117 with key v02.post.# from exchange xpublic on broker amqp://tsub@localhost/
2018-01-08 01:21:34,768 [INFO] reading from to tsub@localhost, exchange: xpublic
2018-01-08 01:21:34,769 [INFO] report_back to tsub@localhost, exchange: xs_tsub
^C2018-01-08 01:21:35,353 [INFO] signal stop
2018-01-08 01:21:35,353 [INFO] sr_subscribe stop
blacklab%

Plugin Script Basics

An example, of the plugin format, one can configure use of file_noop.py in a configuration like so:

plugin file_noop

The content of the file to be placed (on Linux) in ~/.config/sarra/plugins would be: .. code:: python

# MUST: declare a class with Upper case characters in it.

class File_Noop(object):
def __init__(self,parent):
parent.declare_option( 'file_string' ) # declare options to avoid 'unknown option' messages being logged.
def on_start(self,parent):
if not hasattr(parent,'file_string'): # set default values here, if necessary.
parent.file_string='hello world'
def on_file(self,parent):
parent.logger.info("file_noop: I have no effect but adding a log line with %s in it" % parent.file_string ) return True

self.plugin = 'File_Noop' # MUST: set the value of the plugin variable to the name of the class.

There is an initialization portion which runs when the component is started, a perform section which is to be invoked on the appropriate event. Setting the plugin requires the magic last two lines in the sample plugin, where the last line needs to reflect the type of plugin (on_file for an on_file plugin, on_message, for an on_message one, etc...)

The only argument the script receives is parent, which has all of option settings from configuration files and command line as attributes. For example, if a setting like:

msg_speedo_interval 10

is set in a configuration file, then the plugin script will see parent.msg_speedo_interval as a variable set to '10' (the string, not the number) By convention when inventing new configuration settings, the name of the plugin is used as a prefix (In this example, msg_speedo)

In addition to the command line options, there is also a logger available as shown in the sample above. The logger is a python3 logger object, as documented here: https://docs.python.org/3/library/logging.html. To allow users to tune the verbosity of logs, use priority specific method to classify messages:

logger.debug - spelunking in progress... a lot of detail.
logger.info - informative messages that are not essential
logger.warn - a difficulty that is likely problematic, but the component still functions to some degree.
logger.error - The component failed to do something.

In the above message, logger.info is used, indicating an informative message. Another useful attribute available in parent, is 'msg', which has all the attributes of the message being processed. All of the headers from the message, as defined in the sr_post(1) <sr_post.1.rst> configuration file, are available to the plugin, such as the message checksum as parent.msg.headers.sum. Consult the Variables Available section for an exhaustive list. Generally, it is best to output only debug log messages in the __init__ routine for a plugin, because it is executed every time an sr status command is run, which can rapidly become unwieldy.

Should one of these scripts return False, the processing of the message/file will stop there and another message will be consumed from the broker.

Better File Reception

For example, rather than using the file system, sr_subscribe could indicates when each file is ready by writing to a named pipe:

blacklab% sr_subscribe edit dd_swob.conf

broker amqp://anonymous@dd.weather.gc.ca
subtopic observations.swob-ml.#

file_rxpipe_name /local/home/peter/test/rxpipe

on_file file_rxpipe
directory /tmp
mirror True
accept .*
# rxpipe is a builtin on_file script which writes the name of the file received to
# a pipe named '.rxpipe' in the current working directory.

With the on_file option, one can specify a processing option such as rxpipe. With rxpipe, every time a file transfer has completed and is ready for post-processing, its name is written to the linux pipe (named .rxpipe) in the current working directory. So the code for post-processing becomes:

do_something <.rxpipe

No filtering out of working files by the user is required, and ingestion of partial files is completely avoided.

In the case where a large number of sr_subscribe instances are working On the same configuration, there is slight probability that notifications may corrupt one another in the named pipe. We should probably verify whether this probability is negligeable or not.

Advanced File Reception

While the on_file directive specifies the name of an action to perform on receipt of a file, those actions are not fixed, but simply small scripts provided with the package, and customizable by end users. The rxpipe module is just an example provided with sarracenia:

class File_RxPipe(object):

    def __init__(self,parent):
        parent.declare_option( 'file_rxpipe_name' ):

    def on_start(self,parent):
        if not hasattr(parent,'file_rxpipe_name'):
            parent.logger.error("Missing file_rxpipe_name parameter")
            return
        self.rxpipe = open( parent.file_rxpipe_name[0], "w" )

    def on_file(self, parent):
        self.rxpipe.write( parent.msg.new_file + "\n" )
        self.rxpipe.flush()
        return None

self.plugin = 'File_RxPipe'

With this fragment of python, when sr_subscribe is first called, it ensures that a pipe named npipe is opened in the specified directory by executing the __init__ function within the declared RxPipe python class. Then, whenever a file reception is completed, the assignment of self.on_file ensures that the rx.on_file function is called.

The rxpipe.on_file function just writes the name of the file dowloaded to the named pipe. The use of the named pipe renders data reception asynchronous from data processing. as shown in the previous example, one can then start a single task do_something which processes the list of files fed as standard input to it, from a named pipe.

In the examples above, file reception and processing are kept entirely separate. If there is a problem with processing, the file reception directories will fill up, potentially growing to an unwieldy size and causing many practical difficulties. When a plugin such as on_file is used, the processing of each file downloaded is run before proceeding to the next file.

If the code in the on_file script is changed to do actual processing work, then rather than being independent, the processing could provide back pressure to the data delivery mechanism. If the processing gets stuck, then the sr_subscriber will stop downloading, and the queue will be on the server, rather than creating a huge local directory on the client. Different models apply in different situations.

An additional point is that if the processing of files is invoked in each instance, providing very easy parallel processing built into sr_subscribe.

Using Credentials in Plugins

To implement support of additional protocols, one would write a _do_download script. the scripts would access the credentials value in the script with the code :

  • ok, details = parent.credentials.get(msg.urlcred)
  • if details : url = details.url

The details options are element of the details class (hardcoded):

  • print(details.ssh_keyfile)
  • print(details.passive)
  • print(details.binary)
  • print(details.tls)
  • print(details.prot_p)

For the credential that defines protocol for download (upload), the connection, once opened, is kept opened. It is reset (closed and reopened) only when the number of downloads (uploads) reaches the number given by the batch option (default 100)

All download (upload) operations uses a buffer. The size, in bytes, of the buffer used is given by the bufsize option (default 8192)

Sending vs. Subscribing

FIXME: new_file vs. remote_file if you are using

Variables Available

Without peering into the python source code of sarracenia, it is hard to know what values are available to plugin scripts. As a cheat to save developers from having to understand the source code, a diagnostic plugin might be helpful.

If one sets on_message msg_dump in a configuration, the entire list of available variables can be displayed in a log file.

Make the above file an on_file (or other trigger) script in a configuration, start up a receiver (and if it is a busy one, then stop it immediately, as it creates very large report messages for every message received.) Essentially the entire program state is available to plugins.

A sample output is shown (reformatted for legibility) is given below. For every field xx listed, a plugin script can access it as parent.xx (e.g. parent.queue_name ):

peter@idefix:~/test$ sr_subscribe foreground dd.conf
^C to stop it immediately after the first message.
peter@idefix:~/test$ tail -f ~/.cache/sarra/log/sr_subscribe_dd_0001.log

# the following is reformatted to look reasonable on a page.
2016-01-14 17:13:01,649 [INFO] {
'kbytes_ps': 0,
'queue_name': None,
'flatten': '/',
'exchange': 'xpublic',
'discard': False,
'report_back': True,
'source': None,
'pidfile': '/local/home/peter/.cache/sarra/.sr_subscribe_dd_0001.pid',
'event': 'IN_CLOSE_WRITE|IN_ATTRIB|IN_MOVED_TO|IN_MOVE_SELF',
'basic_name': 'sr_subscribe_dd',
'cluster_aliases': [],
'expire': None,
'currentRegexp': re.compile('.*'),
'handler': <logging.handlers.TimedRotatingFileHandler
object at 0x7f4fcdc4d780>,
'accept_unmatch': False,
'reconnect': False,
'isrunning': False,
'on_line': None,
'masks': [('.*/grib2/.*', '/local/home/peter/test/dd', None, re.compile('.*/grib2/.*'), False),
('.*grib2.tar.*', '/local/home/peter/test/dd', None, re.compile('.*grib2.tar.*'), False),
('.*', '/local/home/peter/test/dd', None, re.compile('.*'), True)],
'logrotate': 5,
'pid': 14079,
'consumer': <sarra.sr_consumer.sr_consumer object at 0x7f4fcdc489b0>,
'post_document_root': None,
'manager': None,
'publisher': <sarra.sr_amqp.Publisher object at 0x7f4fcdbdae48>,
'post_broker': ParseResult(scheme='amqp',
netloc='guest:guest@localhost',
path='/',
params='',
query='',
fragment=''),
'currentPattern': '.*',
'partflg': '1',
'notify_only': False,
'program_dir': 'subscribe',
'on_part': None,
'to_clusters': None,
'site_data_dir': '/usr/share/ubuntu/sarra',
'source_from_exchange': False,
'new_url': ParseResult(scheme='file', netloc='',
path='/local/home/peter/test/dd/bulletins/alphanumeric/20160114/SA/CYVT/22/SACN62_CYVT_142200___11878',
params='', query='', fragment=''),
'sumflg': 'd',
'user_log_dir': '/local/home/peter/.cache/sarra/log',
'topic_prefix': 'v02.post',
'on_post': None,
'do_poll': None,
'message_ttl': None,
'user_scripts_dir': '/local/home/peter/.config/sarra/scripts',
'appname': 'sarra',
'debug': False,
'chmod': 775,
'destination': None,
'subtopic': None,
'events': 'IN_CLOSE_WRITE|IN_DELETE',
'document_root': '/local/home/peter/test/dd',
'inplace': True,
'last_nbr_instances': 6,
'config_name': 'dd',
'instance_str': 'sr_subscribe dd 0001',
'randomize': False,
'vip': None,
'parts': '1',
'inflight': '.tmp',
'cache_url': {},
'queue_share': True,
'overwrite': True,
'appauthor': 'science.gc.ca',
'no': 1,
'url': None,
'bindings': [('xpublic', 'v02.post.#')],
'blocksize': 0,
'cluster': None,
'rename': None,
'user_config_dir': '/local/home/peter/.config/sarra',
'users': {},
'currentDir': '/local/home/peter/test/dd',
'instance': 1,
'sleep': 0,
'user_cache_dir': '/local/home/peter/.cache/sarra',
'report_clusters': {},
'strip': 0,
'msg': <sarra.sr_message.sr_message object at 0x7f4fcdc54518>,
'site_config_dir': '/etc/xdg/xdg-ubuntu/sarra',
'user_args': ['--no', '1'],
'program_name': 'sr_subscribe',
'on_file': <bound method Transformer.perform of <sarra.sr_config.Transformer object at 0x7f4fcdc48908>>,
'cwd': '/local/home/peter/test',
'nbr_instances': 6,
'credentials': <sarra.sr_credentials.sr_credentials object at 0x7f4fcdc911d0>,
'on_message': None,
'currentFileOption': None,
'user_config': 'dd.conf',
'lpath': '/local/home/peter/.cache/sarra/log/sr_subscribe_dd_0001.log',
'bufsize': 8192,
'do_download': None,
'post_exchange': None,
'report_exchange': 'xlog',
'new_path': '/local/home/peter/test/dd/bulletins/alphanumeric/20160114/SA/CYVT/22/SACN62_CYVT_142200___11878',
'instance_name': 'sr_subscribe_dd_0001',
'statefile': '/local/home/peter/.cache/sarra/.sr_subscribe_dd.state',
'use_pattern': True,
'admin': None,
'gateway_for': [],
'interface': None,
'logpath': '/local/home/peter/.cache/sarra/log/sr_subscribe_dd_0001.log',
'recompute_chksum': False,
'user_queue_dir': '/local/home/peter/.cache/sarra/queue',
'mirror': True,
'broker': ParseResult(scheme='amqp', netloc='anonymous:anonymous@dd.weather.gc.ca', path='/', params='', query='', fragment=''),
'durable': False,
'logger': <logging.RootLogger object at 0x7f4fcdc48a20>,
'user_data_dir': '/local/home/peter/.local/share/sarra',
'flow': None}

No thought has yet been given to plug_in compatatibility across versions. Unclear how much of this state will vary over time. Similar to program configuration settings, all of the fields involved in processing individual messages are available in the parent.msg object. A similar dump to the above is here (e.g of a python scripts can use parent.msg.partsr , and/or parent.msg.header.parts in their code.):

2016-01-14 17:13:01,649 [INFO] message =
{'partstr': '1,78,1,0,0',
'suffix': '.78.1.0.0.d.Part',
'subtopic': 'alphanumeric.20160617.CA.CWAO.12',
'in_partfile': False,
'notice': '20160617120454.820 http://dd2.weather.gc.ca/ bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919',
'checksum': 'ab1ba0020e91119fb024a2c115ccd908',
'pub_exchange': None,
'local_checksum': None,
'chunksize': 78,
'time': '20160617120454.820',
'path': 'bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919',
'report_exchange': 'xs_anonymous',
'part_ext': 'Part',
'topic_prefix': 'v02.post',
'current_block': 0,
'tbegin': 1466165094.82,
'remainder': 0,
'to_clusters': ['DD', 'DDI.CMC', 'DDI.EDM'],
'local_offset': 0,
'mtype': 'post',
 'user': 'anonymous',
 'bufsize': 8192, 'new_url':
 ParseResult(scheme='file', netloc='', path='/home/peter/test/dd/bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919', params='', query='', fragment=''), 'exchange': 'xpublic', 'url': ParseResult(scheme='http', netloc='dd2.weather.gc.ca', path='/bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919', params='', query='', fragment=''),
'onfly_checksum': 'ab1ba0020e91119fb024a2c115ccd908',
 'host': 'blacklab',
 'filesize': 78,
 'block_count': 1,
'sumalgo': <sarra.sr_util.checksum_d object at 0x7f77554234e0>,
'headers': {
     'sum': 'd,ab1ba0020e91119fb024a2c115ccd908',
     'parts': '1,78,1,0,0',
     'filename': 'CACN00_CWAO_171133__WAR_00919',
     'to_clusters': 'DD,DDI.CMC,DDI.EDM',
     'source': 'metpx',
     'rename': '/home/peter/test/dd/bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919',
     'from_cluster': 'DD'},
'hdrstr': 'parts=1,78,1,0,0 sum=d,ab1ba0020e91119fb024a2c115ccd908 from_cluster=DD source=metpx to_clusters=DD,DDI.CMC,DDI.EDM rename=/home/peter/test/dd/bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919 message=Downloaded ',
 'report_notice': '20160617120454.820 http://dd2.weather.gc.ca/ bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919 201 blacklab anonymous 3.591402',
 'version': 'v02',
 'parent': <sarra.sr_subscribe.sr_subscribe object at 0x7f775682b4a8>,
 'logger': <logging.RootLogger object at 0x7f77563359e8>,
 'length': 78,
 'topic': 'v02.post.bulletins.alphanumeric.20160617.CA.CWAO.12',
 'inplace': True,
 'urlcred': 'http://dd2.weather.gc.ca/',
 'sumstr': 'd,ab1ba0020e91119fb024a2c115ccd908',
 'report_topic': 'v02.report.bulletins.alphanumeric.20160617.CA.CWAO.12',
 'publisher': None,
 'code': 201,
 'urlstr': 'http://dd2.weather.gc.ca/bulletins/alphanumeric/20160617/CA/CWAO/12/CACN00_CWAO_171133__WAR_00919',
 'lastchunk': True,
 'sumflg': 'd',
 'offset': 0,
 'partflg': '1',
 'report_publisher': <sarra.sr_amqp.Publisher object at 0x7f77551c7518>}

Debugging on_ Scripts

When initially developing a plugin script, it can be painful to run it in the complete framework. Attempting to run even the above trivial plugin:

blacklab% python noop.py
Traceback (most recent call last):
  File "noop.py", line 25, in <module>
    filenoop  = File_Noop(self)
NameError: name 'self' is not defined
blacklab%

To do basic syntax work, one can add some debugging scaffolding. Taking the above code just add:

class File_Noop(object):
      def __init__(self,parent):
          parent.declare_option( 'file_string' )

      def on_start(self,parent):
          if not hasattr(parent,'file_string'): # prior to 2.18.1a4, include on_start code in __init__
             parent.file_string='hello world'

      def on_file(self,parent):
          logger = parent.logger

          logger.info("file_noop: I have no effect but adding a log line with %s in it" % parent.file_string )

          return True

# after > 2.18.4
self.plugin = 'File_Noop'

# prior to sarra 2.18.4
#file_noop=File_Noop(self)
#self.on_file=file_noop.on_file

## DEBUGGING CODE START

class TestLogger:
    def silence(self,str):
        pass

    def __init__(self):
        self.debug   = self.silence
        self.error   = print
        self.info    = self.silence
        self.warning = print


class TestParent(object):
    def __init__(self):
        self.logger=TestLogger()
        pass

testparent=TestParent()

filenoop  = File_Noop(testparent)
testparent.on_file = filenoop.on_file

So now it can be invoked with:

blacklab% python noop.py
blacklab%

Which confirms that there are at least no syntax errors. One will need to add more scaffolding depending on the complexity of the plugin. One can append an invocation of the plugin to the test script, like so:

self.on_file(self)

and then the routine will run. the more complex the plugin, the more needs to be added to the debugging scaffolding. Once that sort of basic testing is completed, just remove the scaffolding.

For more complicated tests, just add more testing code:

cat >fifo_test.py <<EOT
#!/usr/bin/python3

"""
when a file is downloaded, write the name of it to a named pipe called .rxpipe
at the root of the file reception tree.

"""
import os,stat,time

class Transformer(object):

    def __init__(self):
        pass

    def on_file(self,parent):
        msg    = parent.msg

        # writing filename in pipe
        f = open('/users/dor/aspy/mjg/mon_fifo','w')
        f.write(parent.new_file)
        f.flush()
        f.close()

        # resume process as usual ?
        return True

transformer=Transformer()
#self.on_file = transformer.on_file

"""
for testing outside of a sr_ component plugin environment,
we comment out the normal activiation line of the script above
and insert a little wrapper, so that it can be invoked
at the command line:
       python3  fifo_test.py

"""
class TestLogger():
    def silence(self,str):
        pass

    def __init__(self):
        self.debug   = print
        self.error   = print
        self.info    = print
        self.warning = print

class TestMessage() :
    def __init__(self):
        self.headers = {}

class TestParent(object):
    def __init__(self):
        self.new_file = "a string"
        self.msg = TestMessage()
        self.logger = TestLogger()
        pass

testparent=TestParent()

transformer.on_file(testparent)

The part after the #self.on_file line is only a test harness. One creates a calling object with the fields needed to test the fields the plugin will use in the TestParent and TestMessage classes. Also consult the harness.py plugin available to include the above code for plugin testing.

File Notification Without Downloading

If the data pump exists in a large shared environment, such as a Supercomputing Centre with a site file system. In that case, the file might be available without downloading. So just obtaining the file notification and transforming it into a local file is sufficient:

blacklab% sr_subscribe edit dd_swob.conf

broker amqp://anonymous@dd.weather.gc.ca
subtopic observations.swob-ml.#
document_root /data/web/dd_root
on_message do_something

accept .*
# do_something will catenate document_root with the path in
# the notification to obtain the full local path.

on_message is a scripting hook, exactly like on_file, that allows specific processing to be done on receipt of a message. A message will usually correspond to a file, but for large files, there will be one message per part. One can use the parent.msg.partstr to find out which part you have (See sr_post.1 for details on partstr encoding.

Ensure the on_message plugin returns 'False' to prevent downloading.

FIXME: perhaps show a way of checking the parts header to with an if statement in order to act on only the first part message for long files.

FIXME: is .py needed on on_ triggers?

do_scripts

In the case where large files are being downloaded, and one wants to do it quickly, the sarracenia's built-in methods are inherently a bit limited by the speed of python for low-level operations. While built-in methods are reasonably efficient and low overhead, it could be argued that when large files are to be downloaded, using an efficient, dedicated downloader written in a low level language like C is more effective. These examples are included with every installation of sarracenia, and can be modified to be used with other tools.

Here is an example of implementing conditional use of a more efficient download method. Start with an on_message script that evaluates the condition to determine whether to invoke the custom downloader:

#!/usr/bin/python3

"""
  Trigger an alternate method for downloading bigger files.

  This is a means of invoking a more efficienty binary downloader when it makes sense to do so in place
  of the built-in downloader, typically for larger files.   Set the msg_download_threshold to the
  maximum size of the file to download using built in methods.  Default: 10M (ten megabytes)

  if a file larger than 10M is advertised, then the URL scheme is replaced 'http' -> 'download'

  This means the do_download plugin (download_wget) will be invoked for that file.

  example, if you see a file url with an sftp protocol, and larger than 10 megabytes, the trigger the substition:

  msg_download_threshold 10M
  msg_download_protocol  'sftp'

  on_message msg_download

  do_download download_wget

  APPLICATION NOTES:

    - The built-in downloading logic is pretty good in almost all cases. It is rarely adviseable to use
      this functionality from a performance perspective.

    - Ideal use case:  LAN/Short-haul link with very high bandwidth, where lare peak i/o rates are achievable
      given adequate disk and network connections, such as a high speed LAN. Even there, it should only
      be used for larger files.

    - Where there is a large tree with many small files, it is very likely that the built-in downloader
      is more efficient than forking any other downloader because, in addition to saving the fork/exec/reap
      overhead, the built-in downloader preserves connections to be used for multiple downloads, so
      connection establishement, log-in etc.. is not needed for every file.  It is actually going
      to be about as efficienty as possible for the small file case, as those overheads dominate
      the transfer time.

    - As a file gets bigger, the transfer time will eventually dominate the setup-time, and at that
      point, it can make sense to switch to a forking download.  Need experience with cases to pick
      a good threshold for that.  Made it a setting defaulting to 10M for now.

    - the native downloader does partitioning of files for passage through multiple pumps and is preferable
      for that case to avoid the 'capybara through an anaconda' syndrome.  In cases 'dataless' transfers,
      where the data does not traverse any pump, this is not a consideration.

    - For most long-haul use cases, the bounding constraint is the bandwidth on the link so again
      the built-in downloader is likely close to optimal. Partitioning of the file enables portions of it
      to be delivered and for post-processing tasks, such as anti-virus to overlap with the file transfer.
      when using alternate schemes wihout partitioning, one must await until the complet file has arrived.


"""

import os,stat,time
import calendar

class DOWNLOAD_REWRITE(object):

      import urllib.parse

      def __init__(self,parent):
          if not hasattr( parent, "msg_download_threshold" ):
             parent.msg_download_threshold = [ "10M" ]

          if not hasattr( parent, "msg_download_protocol" ):
             parent.msg_download_protocol = [ "http" ]

      def on_message(self,parent):
          logger = parent.logger
          msg    = parent.msg

          if type(parent.msg_download_threshold) is list:
             parent.msg_download_threshold = parent.chunksize_from_str( parent.msg_download_threshold[0] )

          if msg.headers['sum'][0] == 'L' or msg.headers['sum'][0] == 'R' : return True

          parts = msg.partstr.split(',')
          if parts[0] == '1':
              sz=int(parts[1])
          else:
              sz=int(parts[1])*int(parts[2])

          logger.debug("msg_download sz: %d, threshold: %d download: %s to %s, " % ( \
                sz, parent.msg_download_threshold, parent.msg.urlstr, msg.new_file ) )
          if sz >= parent.msg_download_threshold :
              for p in parent.msg_download_protocol :
                  parent.msg.urlstr = msg.urlstr.replace(p,"download")

              parent.msg.url = urllib.parse.urlparse(msg.urlstr)
              logger.info("msg_download triggering alternate method for: %s to %s, " % (parent.msg.urlstr, msg.new_file))

          return True

download_rewrite = DOWNLOAD_REWRITE(self)
self.on_message = download_rewrite.on_message

So one "invents" a new URL scheme that refers to the alternate downloader. In this case, URLs which are to be downloaded using an alternate tool get the their 'http:' replaced by 'download:'. In the example above, posts where the file is bigger than a threshold value (10 megabytes by default) will be marked for download with an alternate method by having their URL altered.

This on_message msg_download plugin needs to be coupled with the use of a do_download plugin. When the alternate schema is encountered, the component will invoke that plugin. Example of that plugin:

#!/usr/bin/python3

"""
  Example use of do_download option.

  Custom downloading method to work with the message_download on_message plugin.
  See that plugin for more detailed information.

  This downloader will be invoked when an unknown protocol scheme is specified as a URL (we use 'wget')
  the script replaces 'download' by 'http' in the protocol, and then spawns a wget binary to perform
  an efficient download.

  note that because this involves a for exec to launch a binary, it would be best to only launch this sort
  of download for larger files. the message_download implements this threshold behaviour.

  Caveats:
     This downloader just uses the name that wget will set for a file on download,
     no options about local file naming are implemented.

  If you have python >= 3.5, replace 'subprocess.call' by subprocess.run, and the stout and stderr will do the right thing.
  for 'call' also need to change result == 0 to result.returncode == 0 .

  I didn't find a simple way to do the 'right thing' in < 3.5 API.

"""

import os,stat,time
import calendar

class WGET_DOWNLOAD(object):

   def __init__(self,parent):
      if not hasattr(parent,'download_wget_command'):
         parent.download_wget_command= [ '/usr/bin/wget' ]

   def perform(self,parent):
      logger = parent.logger
      msg    = parent.msg

      import subprocess

      msg.urlstr = msg.urlstr.replace("download:","http:")
      cmd = parent.download_wget_command[0].split() + [ msg.urlstr ]
      logger.info("download_wget invoking: %s " % cmd )
      result =  subprocess.call( cmd )

      if result == 0:  # Success!
         if parent.reportback:
            msg.report_publish(201,'Downloaded')
         return True

      if parent.reportback:
         msg.report_publish(499,'wget download failed')
      return False

wget_download = WGET_DOWNLOAD(self)
self.do_download = wget_download.perform

Why Doesn't Import Work?

There is an issue where the place in the code where plugins are read is different from where the plugin routines are executed, and so class level imports do not work as expected

#!/usr/bin/python3

import os,sys,stat,time,datetime,string,socket
from ftplib import FTP

class Renamer(object):
    def __init__(self):
        pass

    def perform(self,parent):
        infile = parent.local_file
        Path = os.path.dirname(infile)
        Filename = os.path.basename(infile)

        # FTP upload
        def uploadFile(ftp, upfile):
            ftp.storbinary('STOR ' + upfile, open(upfile, 'rb'), 1024)
            ftp.sendcmd('SITE CHMOD 666 ' + upfile)

        # ftp = FTP('hoho.haha.ec.gc.ca')
        ftp = FTP('127.272.44.184')
        logon = ftp.login('px', 'pwgoeshere')
        path = ftp.cwd('/apps/px/rxq/ont2/')
        os.chdir( Path )
        uploadFile(ftp, Filename)
        ftp.quit()

renamer=Renamer()
self.on_file = renamer.perform

When the code is run, this happens:

2018-05-23 20:57:31,958 [ERROR] sr_subscribe/run going badly, so sleeping for 0.01 Type: <class 'NameError'>, Value: name 'FTP' is not defined,  ...
2018-05-23 20:57:32,091 [INFO] file_log downloaded to: /apps/urp/sr_data/TYX_N0S:NOAAPORT2:CMC:RADAR_US:BIN:20180523205529
2018-05-23 20:57:32,092 [INFO] confirmed added to the retry process 20180523205531.8 http://ddi1.cmc.ec.gc.ca/ 20180523/UCAR-UNIDATA/RADAR_US/NEXRAD3/N0S/20/TYX_N0S:NOAAPORT2:CMC:RADAR_US:BIN:20180523205529

2018-05-23 20:57:32,092 [ERROR] sr_subscribe/run going badly, so sleeping for 0.02 Type: <class 'NameError'>, Value: name 'FTP' is not defined,  ...
2018-05-23 20:57:32,799 [INFO] file_log downloaded to: /apps/urp/sr_data/CXX_N0V:NOAAPORT2:CMC:RADAR_US:BIN:20180523205533
2018-05-23 20:57:32,799 [INFO] confirmed added to the retry process 20180523205535.46 http://ddi2.cmc.ec.gc.ca/ 20180523/UCAR-UNIDATA/RADAR_US/NEXRAD3/N0V/20/CXX_N0V:NOAAPORT2:CMC:RADAR_US:BIN:20180523205533
2018-05-23 20:57:32,799 [ERROR] sr_subscribe/run going badly, so sleeping for 0.04 Type: <class 'NameError'>, Value: name 'FTP' is not defined,  ...

The solution is to move the import inside the perform routine as the first line, like so:

.
.
.

def perform(self,parent):
    from ftplib import FTP
    infile = parent.local_file
    Path = os.path.dirname(infile)
.
.
.

Polling

FIXME Sample polling.

Checksum Plugins

FIXME

Accessing Messages without Sarracenia

FIXME, link to amqplib, or java bindings, and a pointer to the sr_post and sr_report section 7 man pages.