Doing Many Things at Once

Since the first version of Unix back some three decades ago, the fork() system call has been the normal way to get many things to happen at once. Forking is a very nice (some say "elegant") model of concurrent execution: individual processes have entirely separate address spaces, with little chance of interference from other tasks, at the cost of a lot of overhead for interprocess communication.

Since the first version of Unix back some three decades ago, the fork() system call has been the normal way to get many things to happen at once. Forking is a very nice (some say “elegant”) model of concurrent execution: individual processes have entirely separate address spaces, with little chance of interference from other tasks, at the cost of a lot of overhead for interprocess communication.

Starting some dozen years ago, user-space thread libraries have popped up. Threads can simplify inter-task communication, but can also make it easier for threads to stomp on each other’s data and control flow. The Perl developers have taken different stabs at incorporating interfaces to thread libraries, with some relative success.

But even while that was going on, there were also independent efforts by the Perl user community to try to wrestle with concurrent tasks. The earliest of those that I recall was simply using select() to watch for the “first ready” I/O (or a timeout) simultaneously, and then dispatching to the proper handling routine. This was later wrapped into IO::Select. In parallel with that, the Event module was created, using an XS-level interface to handle filehandle readiness and signals in an orderly way.

Somewhere in here, a group of developers loosely coordinated by Rocco Caputo started developing and expanding the POE environment, which has become a powerful means to manage concurrent tasks within a process, or even multiple processes. See http://poe.perl.org for details.

At the lowest level of POE, a “kernel” dispatches events to a session’s handler. These events are nearly always generated as a result of other handlers having been run, although “system events” and “signal events” happen as well. So, you define a session (similar to a thread in threading or a process in Unix), set up its handlers mapped to events, and then let the kernel sort out the events.

Starting from this lowest level, many “components” and “wheels” have been written to encapsulate common patterns, such as watching a file grow, watching a directory for incoming items, or managing connections to a TCP port. On top of this you’ll also find ready-to-use code for HTTP servers, IRC connections, and HTTP Web page fetches.

POE definitely has some interesting uses, especially for networking code, and it fits well in the “Perl as glue” model. POE also plays nicely with existing Event loops, as well as Tk, Gtk, and Curses programs.

In looking at POE, I decided to tackle a familar task: checking Web links. A naive Web link checker will fetch a page, wait for it to return, note the success, perhaps look at the page for further links to follow, and then move on to the next page until it’s done. But that means the program is spending most of its time waiting for I/O (or a DNS lookup). Instead, it could go fetch a different page, but that’s where it gets messy.

In the past, I’ve used LWP::Parallel and LWP::UA and forking and pre-forking to tackle this problem, and each approach has its own advantages and disadvantages. So, I decided to try this with POE as well.

The design is simple. Starting at a top-level URL, fetch that page. If the page is available, and has HTML links, recursively scan those links, until there’s no more to do. To keep it fast, concurrently scan as many as ten links at at time, so that the I/O wait or DNS wait time of one fetch overlaps the CPU hard stuff of another. After a couple of hours of reading the docs (and bugging Rocco online), I have Listing One.

Listing One: A POE-based concurrent link checker – Part 1

1 #!/usr/bin/perl -w
2 use strict;
3 $|++;
5 use constant POCO_HTTP => “ua”;
6 use POE qw(Component::Client::HTTP);
8 my $TOP = “http://directory.google.comm/Top/Computers/Programming/Languages/Perl/“;
10 POE::Component::Client::HTTP->spawn(Alias => POCO_HTTP, Timeout => 30);
11 POE::Component::My::Master->spawn(UA => POCO_HTTP, TODO => [$TOP]);
12 $poe_kernel->run;
13 exit 0;
15 BEGIN {
16 package POE::Component::My::Master;
17 use POE::Session; # for constants
19 sub spawn {
20 my $class = shift;
21 POE::Session->create
22 (package_states =>
23 [$class => [qw(_start ready done)]],
24 heap => {KIDMAX => 10, KIDS => 0, @_});
25 }
27 sub _start {
28 my $heap = $_[HEAP];
29 for (@{$heap->{TODO}}) {
30 $heap->{DONE}{$_ = make_canonical($_)} = 1;
31 }
32 $_[KERNEL]->yield(“ready”, “initial”);
33 }
35 sub ready {
36 ## warn “ready because $_[ARG0]\n”;
37 my $heap = $_[HEAP];
38 my $kernel = $_[KERNEL];
39 return if $heap->{KIDS} >= $heap->{KIDMAX};
40 return unless my $url = shift @{$heap->{TODO}};
41 ## warn “doing: $url\n”;
42 $heap->{KIDS}++;
43 POE::Component::My::Checker->spawn
44 (UA => $heap->{UA},
45 URL => $url,
46 POSTBACK => $_[SESSION]->postback(“done”, $url),
47 );
48 $kernel->yield(“ready”, “looping”);
49 }
51 sub done {
52 my $heap = $_[HEAP];
53 my ($request,$response) = @_[ARG0,ARG1];
55 my ($url) = @$request;
56 my @links = @{$response->[0]};
58 = for (@links) {
59 $_ = make_canonical($_);
60 push @{$heap->{TODO}}, $_
61 unless $heap->{DONE}{$_}++;
62 }
64 $heap->{KIDS}–;
65 $_[KERNEL]->yield(“ready”, “child done”);
66 }
68 sub make_canonical { # not a POE
69 require URI;
70 my $uri = URI->new(shift);
71 $uri->fragment(undef); # toss fragment
72 $uri->canonical->as_string; # return value
73 }
75 } # end / POE::Component::My::Master
77 BEGIN {
78 package POE::Component::My::Checker;
79 use POE::Session;
81 sub spawn {
82 my $class = shift;
83 POE::Session->create
84 (package_states =>
85 [$class => [qw(_start response)]],
86 heap => {@_});
87 }
89 sub _start {
90 require HTTP::Request::Common;
91 my $heap = $_[HEAP];
92 my $url = $heap->{URL};
93 my $request = HTTP::Request::Common::GET($url);
94 $_[KERNEL]->post($heap->{UA}, ‘request’, ‘response’, $request);
95 }
97 sub response {
98 my $url = $_[HEAP]{URL};
99 my ($request_packet, $response_packet) = / @_[ARG0, ARG1];
100 my ($request, $request_tag) = @$request_packet;
101 my ($response) = @$response_packet;
103 my @links;
105 if ($response->is_success) {
106 if ($response->base =~ m{^\Q$TOP}) {
107 if ($response->content_type eq “text/html”) {
108 require HTML::SimpleLinkExtor;
109 my $e = HTML::SimpleLinkExtor->new ($response->base);
110 $e->parse($response->content);
111 @links = grep m{^http:}, $e->links;
112 ## warn “parsed: $url\n”;
113 } else {
114 ## warn “not HTML: $url\n”;
115 }
116 } else {
117 warn “valid: $url\n”;
118 }
119 } else {
120 warn “BAD (“, $response->code, “): $url\n”;
121 }
123 $_[HEAP]{POSTBACK}(\@links);
124 }
125 } # end / POE::Component::My::Checker

Lines 1 through 3 start nearly every program I write, turning on warnings, compiler restictions, and disabling the normal block-oriented buffering of STDOUT.

Line 5 defines a constant for the name of the POE session representing the user agent. We need this as a constant to enable the individual checker sessions to post the page fetch requests.

Line 6 pulls in POE, and also brings in POE::Component ::Client::HTTP. The POE module has a non-standard import routine, so the names are not symbols to be imported, as in most other modules.

Line 8 defines the constant for the top-level URL to be checked. This is a very simple checker that looks at the URL given in $TOP, descends through all HTML pages below that page, and verifies (without examining the contents) any URLs that are outside this tree. The URL given here is the Google view of the DMOZ entry for Perl, altered ever-so-slightly so that people won’t run this program without first reading this description. I think you’ll note that the change is obvious.

Lines 10 and 11 set up the two top-level sessions. First, the user agent session is created, given the session name in POCO_ HTTP, and a timeout of thirty seconds. Then, the master session is spawned, placing two values into the session’s “heap.” The role of the master session is to manage the task queue and the number of child checker sessions.

Once the two sessions are spawned, we “boot” the POE kernel in line 12. This call to run will not return as long as there are sessions with pending or potential events, as determined by the POE kernel. And once we’re done, we exit in line 13. (How appropriate.)

There. All done. Oh wait, how does the master session do its job? That’s next, beginning in line 15. I’ve set up a BEGIN block from lines 15 to 75 to define the class, similar to how a use would bring it in. Line 16 establishes the package name of this module, and line 17 defines the constants needed for a POE session.

Lines 19 to 25 are invoked as a class method, as we did in line 11. The classname gets shifted off in line 20, and the new session is created by calling POE::Session‘s create method. create has many, many parameters, but we need only two: the “states” that we’d like to register and some initial values for the “heap”.

In this case, we’ll register _start, ready, and done as our three interesting events, mapping to subroutines of the same name. (Events can be mapped to arbitrary subroutines, or even method calls instead, but we’ll keep it simple here.)

The “heap” is a set of local named parameters belonging to each session. By inserting values into the heap as the session is spawned, we’ve effectively given named arguments directly into the session. As an alternate design, we could arrange to pass items into the _start event instead, but we’d just end up storing those into the heap anyway, so this skips that copying step. Note that UA and TODO come from line 11, but KIDMAX and KIDS are defined here. We’ll see how all four of these items are used in a minute.

Lines 27 to 33 define the subroutine for the _start event. The POE kernel always sends this event as the session is being spawned. The parameters for an event handler are given as offsets into the @_ array, defined by a list of names. The value of HEAP gives us the proper item of @_ representing the heap hashref, which we copy into $heap in line 28.

The element TODO of the heap gives an arrayref of URLs that are left to be checked. Initially, it’s just the top-level URL from line 8 (via line 11 and line 24). For each item in this list, we’ll make it “canonical” by passing it through the make_canonical subroutine (defined later), and then create a hash with that key and a value of 1. The hash is used to ensure that we look at a given URL only once. The hash is kept in the heap hash under a key of DONE.

Once we have a “todo” list, and a “done” hash, it’s time to start spawning the children sessions to check the URLs. We’ll do that by a request to yield to the ready event. This instructs the kernel that it can now call that subroutine the next time it’s looking for something to do. The parameter initial is passed to the subroutine as one of the user parameters, strictly for debugging and tracing. Had this routine not yielded, the kernel would have recognized that there were no further events, and exited immediately.

The ready event triggers the same-named subroutine, defined in lines 35 to 49. Line 36 pulls out the debugging tag (the line is commented out in this version, but you can uncomment it while you’re playing to see when and where ready gets called).

The heap and the kernel are pulled out of the subroutine argument list in lines 37 and 38. Lines 39 and 40 decide if we have any chance to launch a new checker session, and if we have anything to do in that case. We can’t exceed KIDMAX number of sessions, and we won’t have anything to do if the TODO list is empty.

If we make it to line 41, it’s time to probe a new URL. Again, the tracing message is commented out. Spawning the new session consists of counting it (line 42), launching the sessions (lines 43 to 47), and then looping back with another event to this same handler. Without the loop, we’d never have multiple checker sessions running.

The checker session is launched like the master session, by calling the class method spawn, passing it initial values for the heap. The UA is simply a copy of the master session’s UA heap variable, while the URL is the URL to check. The third heap value is POSTBACK, a postback coderef. When the child session is finished, it will invoke this coderef. This coderef will invoke the done event, passing it two parameters: an array ref consisting of all the parameters given during the creation of the postback (here, the $url value), and a second array ref giving all of the parameters given in the postback’s invocation. Yes, it’s a method call that returns a coderef, that when invoked, invokes yet a different subroutine. Powerful and useful, but perhaps a bit of work to get your head around. Hopefully, the upcoming example will make it clear.

In fact, let’s skip over the description of done until we see how it would have been triggered. So, skipping down to lines 77 to 125, we see the spawn method defined starting in line 81. Again, we’re expecting two events, called _start and response, and these are both mapped to their same-named subroutines. The heap is created from the incoming parameters, as defined earlier in lines 44 to 46.

When the _start event is triggered by the kernel, we’ll pull in the HTTP::Request::Common module to create the LWP request that fetches the given URL. As this is a require, it reads the module only the first time. The $request object in line 93 is then “posted” to the user agent session in line 94, using its request event (the second element of the method call). When the user agent session is done, it calls the checker session’s response method. Finally, we include the LWP request object.

After posting the request to the user agent session, the checker session has no active events, and will remain idle. The user agent will fetch the page, and then post a response event, which triggers the subroutine starting in line 97. The first two “user” arguments to this process are the request object packet, and the response object packet. Peering inside those, we can extract the request and response LWP objects, into $request and $response in lines 100 and 101 respectively.

Line 103 defines the “return value” from this event, which will be 0 or more links to additionally check.

Starting in line 105, we decide what to do with and report about this response. If it’s a successful fetch (line 105) and the URL was inside the tree of interest (line 106) and it’s an HTML page (line 107), then we need to parse the HTML to pull out any further links worth checking. Again, leveraging off yet another CPAN module, we’ll grab HTML::SimpleLinkExtor to do the job. The “base url” for relative links is passed while created the extractor object in line 109. Line 110 looks at the response content for links of interest. Line 111 grabs those links as a list into @links, being careful to grab only HTTP links, and not HTTPS or email or FTP links. Again, a debugging trace line has been commented out in line 112.

If the page wasn’t HTML, we note that in line 114. If it was outside the tree of interest, we end up in line 117. These are the links that we wanted only to verify that they could be fetched, and so we note that.

Finally, if we end up in line 120, we’ve got a bad link, and we report the error. Some of the reported errors are actually “redirects”, but if they’re always a redirect, maybe we should change our link to point at the redirected page. At that point, I was getting distracted by the main purpose of this example, which is to show how to make POE work, so I’m leaving that as an exercise for the reader.

And now for that postback. The coderef contained in $_[HEAP]{POSTBACK} is called, passing it the arrayref pointing to the 0 or more links to be followed as a result of examining this page. This postback is like a post call, but against the session that created the postback (in this case, the master session), triggering an event in that session (in this case, “done”), and passing it two user parameters.

Let’s skip up to done to see how that works. First, we pull out the heap, request list, and response list in lines 52 and 53. The request list is the list created in line 46, which is just the URL. The response list is the list of links to be visited, created down in line 123. These are extracted in lines 55 and 56.

For each of the links that we found for this URL, we’ll force them to be “canonical” (given in a moment), and then add them to the todo list (line 60) unless we’ve already seen them before (line 61).

Because a postback call would happen only when the child was done (or nearly done, with only cleanup to do on the way out), we now have one fewer “active” kid, noted in line 64. And since that might mean that it’s time to fire up another URL fetching, we’ll again note the ready event in line 65, this time annotating it with “child done” for the reason.

Now, for that make_canonical routine, defined in lines 68 to 73. Given a string representing a URL, we want to clean it up, and remove any “fragment identifier” (the part after the #). We’ll do this again with a CPAN module (part of LWP).

And that’s all there is to it. It’s a toy program, but fully functional, and if you run it as-is (with the one modification noted above), you’ll discover exactly how many of the DMOZ entries in the Perl section are still valid.

To be practical, we’d probably want a lot more control over deciding whether a URL is to be followed if it contains HTML, or just probed, or ignored, and probably some summary report at the end. But that’s not the point of this demonstration.

Randal L. Schwartz is the chief Perl guru at Stonehenge Consulting and can be reached at merlyn@stonehenge.com. Code listings for this column can be found at http://www.stonehenge.com/merlyn/LinuxMag/ and at http://www.linux-mag.com/downloads/2002-10/perl.

Comments are closed.