Telephone +44(0)1524 64544
Email: info@shadowcat.co.uk

IO::Pipeline

An experiment in functional crack pipes

Sun Apr 4 22:00:00 2010

Digg! submit to reddit Delicious RSS Feed Readers

Since it's Easter Sunday, and being a non-denominational mst that meant there wasn't really anything I should be doing and (almost) everything was closed, I decided to have another documentation day and get another module shipped. The result, IO::Pipeline 0.009002, should be appearing on a CPAN mirror near you about the time this post hits the aggregators. Since I'm pretty much all written out for the day having done that, here's the good bits inline for your reading pleasure:

NAME

IO::Pipeline - map and grep for filehandles, unix pipe style

SYNOPSIS

  my $source = <<'END';
  2010-03-21 16:15:30 1NtNoI-000658-6V Completed
  2010-03-21 16:17:29 1NtNlx-00062B-0R Completed
  2010-03-21 16:20:37 1NtNtF-0006AE-G6 Completed
  2010-03-21 16:28:37 no host name found for IP address 218.108.42.254
  2010-03-21 16:28:51 H=(ZTZUWWCRQY) [218.108.42.254] F=<pansiesyd75@setupper.com> rejected RCPT <inline@trout.me.uk>: rejected because 218.108.42.254 is in a black list at zen.spamhaus.org 
  2010-03-21 16:28:51 unexpected disconnection while reading SMTP command from (ZTZUWWCRQY) [218.108.42.254] (error: Connection reset by peer)
  2010-03-21 16:35:57 no host name found for IP address 123.122.231.66
  2010-03-21 16:35:59 H=(LFMTSDM) [123.122.231.66] F=<belladonnai6@buybuildanichestore.com> rejected RCPT <tal@fyrestorm.co.uk>: rejected because 123.122.231.66 is in a black list at zen.spamhaus.org
  END 
  
  open my $in, '<', \$source
    or die "Failed to create filehandle from scalar: $!";
  
  my $out;
  
  $in
    | pmap { [ /^(\S+) (\S+) (.*)$/ ] }
    | pgrep { $_->[2] =~ /rejected|Completed/ }
    | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }
    | pmap { join(' ', @$_)."\n" }
    | psink { $out .= $_ };
  
  print $out;

will print:

  2010-03-21 16:15:30 Completed
  2010-03-21 16:17:29 Completed
  2010-03-21 16:20:37 Completed
  2010-03-21 16:28:51 Rejected
  2010-03-21 16:35:59 Rejected

DESCRIPTION

IO::Pipeline was born of the idea that I really like writing map/grep type expressions in perl, but writing:

  map { ... } <$fh>;

does a slurp of the filehandle, and when processing big log files I tend to Not Want That To Happen. Plus, map restricts us to right-to-left processing and I've always been fond of the shell metaphor of connecting commands together left-to-read in a pipeline.

So, this module was born.

  use IO::Pipeline;

will export three functions - pmap, pgrep and psink. The first two are the meat of the module, the last one is a means to test by sending results somewhere other than a filehandle (or to chain IO::Pipeline output on to ... well, anywhere else, really).

pmap and pgrep both return pipeline objects (currently of class IO::Pipeline, but this is considered an implementation detail, not a feature - so please don't write code that relies on it) that provide an overloaded '|' operator.

  my $mapper = pmap { "[header] ".$_ };
  my $filter = pgrep { /ALERT/ };

When you use | to chain two pipeline objects together, you get another pipeline object:

  my $combined = $mapper | $filter;

Although since we're going left to right, you probably want to do the grep first:

  my $combined = $filter | $mapper;

(but it's all the same to IO::Pipeline, of course)

When you use | with a filehandle on one side, that sets the start or finish of the pipeline, so:

  my $combined_with_input = $readable_fh | $combined;
  my $combined_with_output = $combined | $writeable_fh;

and if you don't want a real filehandle for the second option, you can use psink:

  my $output = '';
  
  my $combined_with_output = $combined | psink { $output .= $_ };

Once both an input and an output have been provided, IO::Pipeline runs the full pipeline, reading from the input and pushing one line at a time down the pipe to the output until the input filehandle is exhausted.

Non-completed pipeline objects are completely re-usable though - so you can (and are expected to) do things like:

  my $combined_to_stoud = $combined | \*STDOUT;
  
  foreach my $file (@files_to_process) {
  
    open my $in, '<', $file
      or die "Couldn't open ${file}: $!";
  
    $in | $combined_to_stdout;
  }

EXPORTED FUNCTIONS

pmap

  my $mapper = pmap { <return zero or more new lines based on $_> };

A pipeline part built with pmap gets invoked for each line on the pipeline, with the line in both $_ and $_[0].

It may, as with perl's map operator, return zero or more elements. If it returns nothing at all, IO::Pipeline will go back to the start of the pipe chain and read another line to restart processing with. If it returns one or more lines, each one is fed in turn into the rest of the pipe chain.

Most of the time, you probably just want to modify the line somehow and then return it (note that $_ is a copy of the input line so this is safe):

  my $fix_teh = pmap { s/teh/the/g; $_; };

Note that you still need to actively return $_ for the pipe to continue (again, as with perl's map operator).

pgrep

  my $filter = pgrep { <return true or false to keep or throw away $_> };

A pipeline part built with pgrep gets invoked for each line on the pipeline, with the line in both $_ and $_[0].

If it returns a true value, the line is passed on to the next stage of the pipeline. If it returns a false value, the line is thrown away and IO::Pipeline will go back to the start of the pipe chain and read another line to restart processing with.

The upshot of this is that any pgrep can be turned trivially into a pmap:

  my $filter = pgrep { /ALERT/ };

is precisely equivalent to:

  my $filter = pmap { /ALERT/ ? ($_) : () };

but the pgrep form is rather clearer.

psink

  my $output = '';
  
  my $sink = psink { $output .= $_ };

A pipe sink is an alternative to an output filehandle as the last element of a pipeline. Where in the case of a normal filehandle a line would be printed to the handle, given a sink IO::Pipeline will call the code block provided. So:

  $pipeline | \*STDOUT;

and

  $pipeline | psink { print STDOUT $_; }

will have exactly the same end result.

If you're looking for the source version of this, there isn't one built in because IO::Handle::Util already provides an io_from_getline construct that does that, along with a bunch more things that you may find very useful.

DECONSTRUCTING THE SYNOPSIS

Start with an input filehandle:

  $in

Next, we split the line up - so

  2010-03-21 16:15:30 1NtNoI-000658-6V Completed

becomes

  [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]

using a regexp in list context so that all the match values fall out into a new anonymous array reference:

    | pmap { [ /^(\S+) (\S+) (.*)$/ ] }

Now we've separated out the message, we want to throw away anything that isn't either a 'rejected' or 'Completed' line, so we test the last element of the split line for that:

    | pgrep { $_->[2] =~ /rejected|Completed/ }

Now we know which is which, we want to turn

  [ '2010-03-21', '16:15:30', '1NtNoI-000658-6V Completed' ]

into

  [ '2010-03-21', '16:15:30', 'Completed' ]

and similarly for rejected lines. Since we know both lines are one or the other, we can simply test for 'rejected' in the line -

  $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed'

and then we construct a new array reference consisting of the first two elements of the original array

  @{$_}[0, 1]

plus the new value for the third element:

    | pmap { [ @{$_}[0, 1], $_->[2] =~ /rejected/ ? 'Rejected' : 'Completed' ] }

This done, we can now reassemble the line using join (remembering to add a newline since IO::Pipeline doesn't in case you didn't want one)

    | pmap { join(' ', @$_)."\n" }

and then in lieu of sending it somewhere else, since this is just a demonstration code fragment, add a sink that appends things onto the end of a variable so that we can examine the results:

    | psink { $out .= $_ };

SUPPORT

Right now, your best routes are probably (a) to come ask questions on #perl on irc.freenode.net or #perl-help on irc.perl.org (I'm on there with nick mst if nobody else around at the time manages to help you first) or (b) to email me directly at the address given in AUTHOR above. You're also welcome to use rt.cpan.org to report bugs (which you can do without a login by mailing bugs-IO-Pipeline at that domain), but please cc my email address as well on grounds of me being a Bad Person and thereby not always spotting tickets.

SOURCE CODE

This code lives in git.shadowcat.co.uk and can be viewed via gitweb using

  http://git.shadowcat.co.uk/gitweb/gitweb.cgi?p=p5sagit/IO-Pipeline.git;a=summary

or checked out via git-daemon using

  git://git.shadowcat.co.uk/p5sagit/IO-Pipeline.git

MST IS AN IDIOT

... and that's why the release you'll see on the link above is 0.009002 rather than 001 - see the Changes file or play spot the difference yourself to identify the ridiculously stupid mistake that resulted in my shipping two dists in the space of not many more minutes.

Oh, and:

HAPPY EASTER!

-- mst, out.