SR_Shovel

Copy Messages Between Brokers

Manual section:8
Date: October 2017
Version: 2.17.10a2
Manual group:Metpx-Sarracenia Suite

SYNOPSIS

sr_shovel foreground|start|stop|restart|reload|status configfile sr_shovel cleanup|declare|setup configfile

DESCRIPTION

sr_shovel copies messages on one broker (given by the broker option) to another (given by the post_broker option.) subject to filtering by (exchange, subtopic, and optionally, accept/reject.)

The topic_prefix option must to be set to:

There is no default. On startup, the sr_shovel component takes two argument: an action start|stop|restart|reload|status... (self explanatory.) and a configuration file described below.

The actions cleanup, declare, setup can be used to manage resources on the rabbitmq server. The resources are either queues or exchanges. declare creates the resources. setup creates and additionnaly does the bindings of queues.

sr_shovel is an sr_subscribe with the following presets:

no-download True
suppress_duplicates off

CONFIGURATION

In general, the options for this component are described by the sr_subscribe(7) page which should be read first. It fully explains the option configuration language, and how to find the option settings.

Consuming Options

This program consumes sr_post(7) or sr_report(7) messages. One needs to set the options to connect to the broker to receive messages from:

broker amqp{s}://<user>:<pw>@<brokerhost>[:port]/<vhost>

Setting the bindings on the queue :

  • exchange <name> (default: xpublic)
  • topic_prefix <amqp pattern> (default: varies -- developer option)
  • subtopic <amqp pattern> (subtopic need to be set)

Using regular expression filtering messages :

  • accept <regexp pattern> (optional)
  • reject <regexp pattern> (optional)
  • accept_unmatch <boolean> (default: False)

Running a plugin on selected messages :

  • on_message <script_name> (optional)

POSTING OPTIONS

There is no required option for posting messages. By default, the program publishes the selected consumed message with its exchange onto the current cluster, with the feeder account.

The user can overwrite the defaults with options :

  • post_broker amqp{s}://<user>:<pw>@<post_brokerhost>[:port]/<vhost>
  • post_exchange <name> (default: None)
  • post_exchange_split <number> (default: 0)
  • on_post <script_name> (optional)

The post_broker option sets the credential informations to connect to the output RabbitMQ server. The default is the value of the feeder option in default.conf.

The post_exchange option sets a new exchange for the selected messages. The default is to publish under the exchange it was consumed.

The post_exchange_split is documented in sr_subscribe.

Before a message is published, a user can set to trigger a script. The option on_post would be used to do such a setup. The message is only published if the script returns True.

QUEUE Save/Restore

If a queue builds up on a broker because a subscriber is unable to process messages, overall broker performance will suffer, so leaving the queue lying around is a problem. As an administrator, one could keep a configuration like this around:

% more ~/tools/save.conf
broker amqp://tfeed@localhost/
topic_prefix v02.post
exchange xpublic

post_rate_limit 50
on_post post_rate_limit
post_broker amqp://tfeed@localhost/

The configuration relies on the use of an administrator or feeder account. note the queue which has messages in it, in this case q_tsub.sr_subscribe.t.99524171.43129428. Invoke the shovel in save mode to consumer messages from the queue and save them to disk:

% cd ~/tools
% sr_shovel -save -queue q_tsub.sr_subscribe.t.99524171.43129428 foreground save.conf


2017-03-18 13:07:27,786 [INFO] sr_shovel start
2017-03-18 13:07:27,786 [INFO] sr_sarra run
2017-03-18 13:07:27,786 [INFO] AMQP  broker(localhost) user(tfeed) vhost(/)
2017-03-18 13:07:27,788 [WARNING] non standard queue name q_tsub.sr_subscribe.t.99524171.43129428
2017-03-18 13:07:27,788 [INFO] Binding queue q_tsub.sr_subscribe.t.99524171.43129428 with key v02.post.# from exchange xpublic on broker amqp://tfeed@localhost/
2017-03-18 13:07:27,790 [INFO] report_back to tfeed@localhost, exchange: xreport
2017-03-18 13:07:27,792 [INFO] sr_shovel saving to /home/peter/.cache/sarra/shovel/save/sr_shovel_save_0000.save for future restore
2017-03-18 13:07:27,794 [INFO] sr_shovel saving 1 message topic: v02.post.observations.swob-ml.20170318.CPSL.2017-03-18-1600-CPSL-AUTO-swob.xml
2017-03-18 13:07:27,795 [INFO] sr_shovel saving 2 message topic: v02.post.hydrometric.doc.hydrometric_StationList.csv
        .
        .
        .
2017-03-18 13:07:27,901 [INFO] sr_shovel saving 188 message topic: v02.post.hydrometric.csv.ON.hourly.ON_hourly_hydrometric.csv
2017-03-18 13:07:27,902 [INFO] sr_shovel saving 189 message topic: v02.post.hydrometric.csv.BC.hourly.BC_hourly_hydrometric.csv

^C2017-03-18 13:11:27,261 [INFO] signal stop
2017-03-18 13:11:27,261 [INFO] sr_shovel stop


% wc -l /home/peter/.cache/sarra/shovel/save/sr_shovel_save_0000.save
189 /home/peter/.cache/sarra/shovel/save/sr_shovel_save_0000.save
%

The messages are written to a file in the caching directory for future use, with the name of the file being based on the configuration name used. the file is in json format, one message per line (lines are very long.) and so filtering with other tools is possible to modify the list of saved messages. Note that a single save file per configuration is automatically set, so to save multiple queues, one would need one configurations file per queue to be saved. Once the subscriber is back in service, one can return the messages saved to a file into the same queue:

% sr_shovel -restore_to_queue q_tsub.sr_subscribe.t.99524171.43129428 foreground save.conf

2017-03-18 13:15:33,610 [INFO] sr_shovel start
2017-03-18 13:15:33,611 [INFO] sr_sarra run
2017-03-18 13:15:33,611 [INFO] AMQP  broker(localhost) user(tfeed) vhost(/)
2017-03-18 13:15:33,613 [INFO] Binding queue q_tfeed.sr_shovel.save with key v02.post.# from exchange xpublic on broker amqp://tfeed@localhost/
2017-03-18 13:15:33,615 [INFO] report_back to tfeed@localhost, exchange: xreport
2017-03-18 13:15:33,618 [INFO] sr_shovel restoring 189 messages from save /home/peter/.cache/sarra/shovel/save/sr_shovel_save_0000.save
2017-03-18 13:15:33,620 [INFO] sr_shovel restoring message 1 of 189: topic: v02.post.observations.swob-ml.20170318.CPSL.2017-03-18-1600-CPSL-AUTO-swob.xml
2017-03-18 13:15:33,620 [INFO] msg_log received: 20170318165818.878 http://localhost:8000/ observations/swob-ml/20170318/CPSL/2017-03-18-1600-CPSL-AUTO-swob.xml topic=v02.post.observations.swob-ml.20170318.CPSL.2017-03-18-1600-CPSL-AUTO-swob.xml lag=1034.74 sundew_extension=DMS:WXO_RENAMED_SWOB:MSC:XML::20170318165818 source=metpx mtime=20170318165818.878 sum=d,66f7249bd5cd68b89a5ad480f4ea1196 to_clusters=DD,DDI.CMC,DDI.EDM,DDI.CMC,CMC,SCIENCE,EDM parts=1,5354,1,0,0 toolong=1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ß from_cluster=DD atime=20170318165818.878 filename=2017-03-18-1600-CPSL-AUTO-swob.xml
   .
   .
   .
2017-03-18 13:15:33,825 [INFO] post_log notice=20170318165832.323 http://localhost:8000/hydrometric/csv/BC/hourly/BC_hourly_hydrometric.csv headers={'sundew_extension': 'BC:HYDRO:CSV:DEV::20170318165829', 'toolong': '1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ßñç1234567890ß', 'filename': 'BC_hourly_hydrometric.csv', 'to_clusters': 'DD,DDI.CMC,DDI.EDM,DDI.CMC,CMC,SCIENCE,EDM', 'sum': 'd,a22b2df5e316646031008654b29c4ac3', 'parts': '1,12270407,1,0,0', 'source': 'metpx', 'from_cluster': 'DD', 'atime': '20170318165832.323', 'mtime': '20170318165832.323'}
2017-03-18 13:15:33,826 [INFO] sr_shovel restore complete deleting save file: /home/peter/.cache/sarra/shovel/save/sr_shovel_save_0000.save


2017-03-18 13:19:26,991 [INFO] signal stop
2017-03-18 13:19:26,991 [INFO] sr_shovel stop
%

All the messages saved are returned to the named return_to_queue. Note that the use of the post_rate_limit plugin prevents the queue from being flooded with hundreds of messages per second. The rate limit to use will need to be tuned in practice.

by default the file name for the save file is chosen to be in ~/.cache/sarra/shovel/<config>_<instance>.save. To Choose a different destination, save_file option is available:

sr_shovel -save_file `pwd`/here -restore_to_queue q_tsub.sr_subscribe.t.99524171.43129428 ./save.conf foreground

will create the save files in the current directory named here_000x.save where x is the instance number (0 for foreground.)

SEE ALSO

sr_subscribe(1) - sarra downloader.

sr_report(7) - the format of report messages.

sr_report(1) - process report messages.

sr_post(1) - post announcemensts of specific files.

sr_post(7) - The format of announcement messages.

sr_sarra(1) - Subscribe, Acquire, and ReAdvertise tool.

sr_watch(1) - the directory watching daemon.

http://metpx.sf.net/ - sr_subscribe is a component of MetPX-Sarracenia, the AMQP based data pump.