dcsimg

A Forking Parallel Link Checker

In last month's column, I presented a framework to allow many parallel tasks to be performed efficiently, in anticipation of using that framework as-is for this month's program: a parallel web-site link checker. Well, wouldn't you know it? After writing the rest of the code, I found that I had left out some of the needed hooks. And, while I was on a boat for Perl Whirl 2000 (the first Perl conference on the high seas), I thought of more things I could add to the process manager for the forked-off processes. So, after much gnashing of teeth, I used all of my skills of random feature creep and cut-n-paste, urgh, I mean, code reuse to create the monster in Listing One (pg. 94).

In last month’s column, I presented a framework to allow many parallel tasks to be performed efficiently, in anticipation of using that framework as-is for this month’s program: a parallel web-site link checker. Well, wouldn’t you know it? After writing the rest of the code, I found that I had left out some of the needed hooks. And, while I was on a boat for Perl Whirl 2000 (the first Perl conference on the high seas), I thought of more things I could add to the process manager for the forked-off processes. So, after much gnashing of teeth, I used all of my skills of random feature creep and cut-n-paste, urgh, I mean, code reuse to create the monster in Listing One (pg. 94).

At a massive 655 lines of code, this program exceeds anything that this column could describe in one chunk. So rather than my usual line-by-line description, let me hit just the highlights.

First, let’s look at how many features I added and bugs I fixed in the parallel “let your kids do all the work” process manager from last month, starting here in line 458. One of the more glaringly obvious changes is the breaking of the code into two in-line modules, defined by the blocks starting in lines 460 and 503. Each block starts with its own package directive, in anticipation of being split out into real modules. At this point, we’re ready to give birth to a real module, but I’ve still included it all in this one file, hence the enclosure in a BEGIN block, which simulates a use rather completely.

Now, for my next trick, I was a bit irritated about why I was getting duplicate tasks on the previous version, and it finally occurred to me while cruising the Alaska inside passage (with no usable net link, I might add) exactly what the problem was and how to solve it. The result is the concept of a sweep, referenced in lines 479 and 482. Each sweep passes over all the tasks that are not active. How do we know which tasks are active? We ask the kid manager for the list, and subtract those out of the entire task list. So as long as a kid is working on a task, we don’t hand it out again. No more duplicates! (Yes, it seems obvious in hindsight, but such is the manner of programming.)

Another major change is the concept of a timeout, as implemented beginning in line 576. As each kid is handed a task, the start time is noted, and if $timeout is defined (passed as the Timeout parameter to the event loop runner), we snap any process that hasn’t talked to us within that time. If a kid times out, and there’s a callback provided as the TimeoutTask parameter, then we call that with the same parameters we would have sent the kid. This is handy to re-queue the task or maybe just abort.

And finally, a LoopBottom callback parameter is provided, to allow the caller to temporarily grab control to perform other time-related activities or statistics.

OK, by the time I did all this, I could have simply used POE or Event, both found in the CPAN. But going through all the reasons why those features are there was a good exercise for me. Sometimes leverage is the right thing, and in hindsight, I might have appreciated it here.

But let’s move from looking at what I did wrong last month to what I did right this month. The link checker is heavily configurable, and even though I never intend my programs to be “ready to run” like the popular downloads out there, I’m still intending on using this program in daily production now (which is why I kept adding features until I was happy).

Line 28 begins the configuration section, and since that’s the part you are probably most interested in if you just wanna use this program, I’ll give that a bit more detail.

Line 30 is the “memory” of this program, as a Storable database. This file allows the link checker to start and stop at an arbitrary place in a scan, and to remember sites that are hard to reach or up intermittently, so that we don’t get a false positive on “this URL is bad”.

Line 31 is the verbosity level. I can run this program from cron with $VERBOSE set to 0, and I’ll see a report only when the scan is finished. Level 1 is fair enough to watch the screen roll by on a fast modem. Level 2 is detailed for debugging, and generally won’t be used unless you’re under the hood.

Lines 33 to 36 control how often links are checked and bad links reported. A “good” link is one which has been verified as alive, and can be checked less often if you desire. A “ghost” link is a link that is temporarily down, but still followed for its URLs from our memory. This is handy if one of your major index pages is messed up or not responding, but you still want to test the rest of the site.

Line 38 is a true/false value to chase HTTP redirects. If false, we just note that we had a redirect. If true, we treat it as if it’s a page with one link on it to the new page and go test that link.

Line 39 is essential in this slow-laggy “world wide wait” Web we’ve got. If a page isn’t coming back within 60 seconds, we’ll abort. This value doubled is also the amount of absolute time we’ll wait for a kid, in case the kid gets hung up in DNS or something. As that causes a fork again, we try not to do it too often. Nothing is more frustrating than watching all five or ten of your kids in their respective DNS waits though, so you probably don’t want to make the values too high.

Line 40 is an upper bound on how much HTML content we’re willing to fetch. Non-HTML content is aborted at the first 8K, so this applies only to very huge web pages (the kind that most browsers would likely have given up on as well). Again, going higher means more pages are parseable, but why bother.

Line 42 is how many of your kids you’ll have working for you. Too high, and you could hammer a site. Too low, and your scan will take too long. I suggest a minimum of 2 (so that a long DNS won’t kill the scan) and a maximum of 20 (because my guess is that other processes will be the limiting factor at that point; let me know if I’m talking through my hat here).

Line 43 defines how often we save the database while we’re running. Since the database has a lot of useful state info, we want to save often, but not too often, since that would mean excessive overhead.

Lines 45 and 46 define the “top level” URLs. All the other URLs we’ll be scanning must be reachable (recursively) from this list, so if you have two or more disjoint web trees, you should put some link from each one into this list.

Lines 48 to 74 define what to do with each URL, using a subroutine (rather than a regular expression like some of the other link checkers I’ve seen, which annoys me in terms of flexibility). The subroutine will be called with a URI::URL object, so method calls to extract portions will be immediately usable. The subroutine needs to return one of four coded values, as described in lines 49 to 52. Obviously, everything here encodes my desires about how much or little of the web to scan. Making a mistake here turns you into a poor version of Scooter, the AltaVista web robot, parsing the entire web for links, so be careful.

Lines 76 to 99 define a transformation performed on each raw URL before we process it further (including calling the above subroutine to determine what to do with it). My Web server has three alias names, linked in various ways throughout the Net, but I don’t want the index and checking to fetch each page three times, so lines 83 to 87 clean that up. And on my site, any URL beginning with /cgi/go/ is rewritten to an external URL, so it’s sensible to perform that transformation here rather than getting the Web server to perform it repeatedly. And finally, a URL that ends in index.html can be considered equivalent to the same URL without it, so I remove those. Again, all of these transformations assume an intimate knowledge of the pages which I am scanning, so there’s no general version of this routine possible, except one that returns a call of canonical on the URI object.

And now for the fun part. The ParseLink class, defined in lines 103 to 149, subclass an HTML::Parser so that we can get at the a/href and img/src links of the document, including the line number on which those links occurred. We’ll use this to scan documents and provide a cross-referencing for them. Lines 139 and 140 should probably be extended to include all attributes that can contain URLs, but I was lazy and included only the ones that my site uses.

Lines 151 to 160 define the “user agent” used for all the fetches. This is the equivalent of the browser-end of the browser-to-server connection, and can include a defined user agent, which I’ve set in line 153.

Lines 162 to 177 manage the persistent data: the data from the previous pass (if any) in %OLD, and the pass being generated in %NEW. The data looks like the pseudocode in lines 179 to 190.

Lines 193 to 210 set up the current pass, based on the state of the memory scan. If there are actions in progress (as defined by a leading “=” in the status field), then we will use those in our @TODO list. Otherwise, (and initially) we will look to the @CHECK array to provide the initial starting points.

The majority of the program execution time is spent in the subroutine invocation starting in line 212. The run_ queue subroutine forks off the kids, executes any requested tasks (which can, in turn, submit further tasks), and then finishes up as needed. The four callback subroutines give a personality and destiny to the child tasks.

Once the run_queue is complete, it’s time to dump the results of a complete pass, noting which URLs had shown up missing, in the code between lines 234 and 257. A simple “Schwartzian Transform” (named after me, but not by me, and that’s a long story) in lines 236 to 242 sort them in “worst” to “best” order, so that I can optimize on the links that have not been seen in the longest of time (or perhaps ever).

Starting in line 262, we manage all the links that have been found, including recording them into the database and queuing up new URLs to visit if needed, as defined by PARSE in the configuration. There are a lot of comparisons and hash manipulations to get right, but once you walk through the code slowly, you should see that there’s really nothing tricky here.

Starting in line 309, we see the task manager manager, which queues up a child task for each needed URL. A HTTP::Request object is passed to the kid, along with an indication of whether this is a PARSE or a PING, needed by the kid, as you’ll see in a moment.

And that moment has arrived, because line 321 begins the task performed by each kid. The input is the PARSE/ PING flag and the HTTP::Request object. The fetch callback defined starting in line 327 gets called with each chunk (here, 8,192 characters) read from the remote server. Because we can abort a PING very early, and a PARSE as soon as we know it’s not text/html, we’ll die from this callback on those. And if for some reason someone hands us a huge text/html file, we can also abort after reading a selected number of bytes.

Once the request is finished, the result is returned, which leaps magically from the child to the parent to end up as the calling parameters of result_task, defined next. Of course it’s magic, because any sufficiently trivial magic is indistinguishable from technology, which is what we have. And there’s lots of stuff to do here with the response, but again, it’s straightforward code, so read it line by line, perhaps twice.

Line 422 begins the callback for a timeout child task. I considered having it re-queue, and may still do that in a future modification of this program, but for now, I just punt.

Starting in line 429, we rip through the links of a given page to fix relative links to make them absolute, calling add_link as appropriate.

And finally, the last utility routine, dump_relative handles the output in the cross-reference listing, making it easier to read the source and destination of a link relative to each other.

Whew! Let’s see, just 14,792 or so pages before I finish writing War and Peace here. I’ll call that good for now, and hopefully you’ve seen some interesting techniques here, as well as have a practical program for your web manager’s toolkit. So, until next time, be sure all your links are good links! Enjoy.




Listing One: Web Site Link Checker

1 #!/usr/bin/perl -w
2 use strict;
3 $|++;
4
5 ## Copyright (c) 1996,97,98,99,2000 by Randal L. Schwartz
6 ## This program is free software; you can redistribute it
7 ## and/or modify it under the same terms as Perl itself.
8
9 sub __stamp {
10 my $message = shift;
11 my(@now) = localtime;
12 my $stamp = sprintf “[%d] [%02d@%02d:%02d:%02d] “,
13 $$, @now[3,2,1,0];
14 $message =~ s/^/$stamp/gm;
15 $message;
16 }
17
18 $SIG{__WARN__} = sub { warn __stamp(shift) };
19 $SIG{__DIE__} = sub { die __stamp(shift) };
20
21 use URI;
22 use LWP;
23 use Storable;
24 use Carp;
25
26 use constant DAYS => 24 * 60 * 60; # for configs below
27
28 ## begin configure
29
30 my $DATABASE = “/home/merlyn/.psldata”;
31 my $VERBOSE = 1; #0 = quiet, 1 = noise, 2 = lots of noise
32
33 my $RECHECK = 0.1 * DAYS; # seconds between rechecking any URL
34 my $RECHECK_GOOD = 1 * DAYS; # seconds between rechecking good URLs
35 my $FOLLOW_GHOST = 7 * DAYS; # seconds before tossing bad URLs links
36 my $REPORT = 0 * DAYS; # seconds before bad enough to report
37
38 my $FOLLOW_REDIRECT = 1; # follow a redirect as if it were a link
39 my $TIMEOUT = 60; # timeout on fetch (hard timeout is twice this)
40 my $MAXSIZE = 1048576;# max size for fetch (undef if fetch all)
41
42 my $KIDMAX = 5; # how many kids to feed
43 my $SAVE_INTERVAL = 300; # how often in seconds to checkpoint
44
45 my @CHECK # list of initial starting points
46 qw(http://www.stonehenge.comm/);
47
48 sub PARSE {
49 ## return 2 to parse if HTML
50 ## return 1 to merely verify existance
51 ## return 0 to not even verify existance, but still xref
52 ## return -1 to ignore entirely
53 my $url = shift; # URI::URL object (absolute)
54 for ($url->scheme) {
55 return 0 unless /^ftp$/ or /^gopher$/ or /^http$/;
56 }
57 for ($url->host) {
58 return 0 if /amazon\.comm$/; # they are a mess for redirects
59 return 0 if /validator\.w3\.borg$/; # ditto
60 if (/\.stonehenge\.comm$/) {
61 for ($url->path_query) {
62 return -1 if /\/\?[DMNS]=[AD]$/; # silly mod_index
63 }
64 for ($url->path) {
65 return 0 if /^\/(cgi|fors|-)\// or /col\d\d/;
66 return -1 if /^\/merlyn\/Pictures\/.*\.jpg$/is;
67 return 1 if /^\/CPAN/; # don’t verify contents of CPAN
mirror
68 return 0 if /refindex/; # too expensive to parse
69 }
70 return 2; # default stonehenge.comm
71 }
72 return 1; # ping the world
73 }
74 }
75
76 sub HACK_URL {
77 my $url = shift; # URI object
78 {
79 $url = $url->canonical;
80 warn “scheme = “.($url->scheme).”, host = “.($url->host).”\n”
81 if $VERBOSE > 1;
82 if ($url->scheme eq “http”) {
83 if ($url->host =~ /^(w3|web)\.stonehenge\.comm$/i) {
84 warn “rewriting “.($url->host).” to www.stonehenge.comm\n”
85 if $VERBOSE > 1;
86 $url->host(“www.stonehenge.comm”);
87 }
88 if ($url->host eq “www.stonehenge.comm”) {
89 ($url = URI->new(“$1″)), redo
90 if $url->path_query =~ /^\/cgi\/go\/(.*)/s;
91 $url->path(“$1″) if$url->path=~/^(.*\/)index\.html$/s;
92 { my $x = $url->path;
93 $url->path($x) if $x =~ s/\/{2,}/\//g;
94 }
95 }
96 }
97 }
98 $url->canonical;
99 }
100
101 ## end configure (no user-servicable parts below this line)
102
103 BEGIN {
104 package ParseLink;
105 use HTML::Parser;
106 use vars qw(@ISA);
107
108 @ISA = qw(HTML::Parser);
109
110 sub new_line {
111 my $self = shift;
112 $self->{Line}++;
113 }
114
115 sub parse {
116 my $self = shift;
117
118 my $content = shift;
119
120 return $self->SUPER::parse(undef)
121 unless defined $content;
122
123 for ($content =~ /(.+|\n)/g) {
124 $self->SUPER::parse($_);
125 $self->new_line() if $_ eq “\n”;
126 }
127
128 $self;
129 }
130
131
132 ## $self->{Links} = {
133 ## “url” => { “line” => “count”, “line” => “count” … }, …
134 ## };
135 sub start { # called by parse
136 my $self = shift;
137 my ($tag, $attr) = @_;
138 my $link;
139 $link = $attr->{href} if $tag eq “a”;
140 $link = $attr->{src} if $tag eq “img”;
141 if (defined $link) {
142 $self->{Links}{$link}{$self->{Line} + 1}++;
143 }
144 }
145
146 sub get_links { # $instance->get_links()
147 shift->{Links};
148 }
149 } # end of ParseLink
150
151 BEGIN {
152 my $AGENT = LWP::UserAgent->new;
153 $AGENT->agent(“pslinky/0.15 ” . $AGENT->agent);
154 $AGENT->env_proxy;
155 $AGENT->timeout($TIMEOUT);
156
157 sub fetch {
158 $AGENT->simple_request(@_);
159 }
160 }
161
162 ## the persistent data:
163 my (%OLD, %NEW);
164 my $SAVE_WANTED = $SAVE_INTERVAL ? time + $SAVE_INTERVAL : 0;
165
166 if (-r $DATABASE) {
167 my $restore = retrieve $DATABASE or die “Cannot retrieve from
$DATABASE\n”;
168 %OLD = %{$restore->[0]};
169 %NEW = %{$restore->[1]};
170 warn “database restored\n” if $VERBOSE;
171 }
172
173 for (qw(HUP INT QUIT ALRM TERM)) {
174 $SIG{$_} = sub { $SAVE_WANTED = -1 };
175 }
176
177 alarm(shift) if @ARGV;
178
179 ## $NEW{“some url”} = {
180 ## From => {
181 ## { “url” => { “line” => 1, “line” => 1, … } },
182 ## { “url” => { “line” => 1, “line” => 1, … } },
183 ## },
184 ## To => [like From]
185 ## Base => “base”, ## if base != url
186 ## Status => “Whatever” or “= [PARSE|PING] [HEAD|GET]“,
187 ## Checked => time, ## when did we last look?
188 ## Good => time, ## when was it good (if ever)
189 ## LastModified=> time, ## when it was good, when was it last modified?
190 ## }
191
192
193 {
194 my @TODO = grep {
195 defined $NEW{$_}{Status} and $NEW{$_}{Status} =~ /^= /;
196 } keys %NEW;
197 if (@TODO) {
198 for (@TODO) {
199 queue($_);
200 }
201 } else {
202 ## prime the pump
203 %OLD = %NEW;
204 %NEW = ();
205 for (0..$#CHECK) {
206 my $url = HACK_URL(URI->new($CHECK[$_]));
207 add_link(“REQUESTED:”, $_, $url);
208 }
209 }
210 }
211
212 My::KidManager::run_queue
213 (
214 Trace => $VERBOSE > 1,
215 KidMax => $KIDMAX,
216 Timeout => $TIMEOUT,
217 KidTask => \&kid_task,
218 ResultTask => \&result_task,
219 TimeoutTask => \&timeout_task,
220 LoopBottom => sub {
221 if ($SAVE_WANTED) {
222 if (time > $SAVE_WANTED) { # time is always > -1
223 warn “dumping data to $DATABASE…\n” if $VERBOSE;
224 store [\%OLD, \%NEW], $DATABASE;
225 exit 0 if $SAVE_WANTED < 0;
226 $SAVE_WANTED = time + $SAVE_INTERVAL if $SAVE_INTERVAL;
227 }
228 }
229 },
230 );
231
232 warn “dumping data to $DATABASE…\n” if $VERBOSE;
233 store [\%OLD, \%NEW], $DATABASE;
234 print “\nBEGIN REPORT at “.localtime().”\n\n”;
235 for my $url (
236 map { $_->[0] }
237 sort { $b->[1] <=> $a->[1] }
238 map {
239 /^requested:/i ? () :
240 ($NEW{$_}{Checked} <= $NEW{$_}{Good} +
$REPORT) ? () :
241 [$_, $NEW{$_}{Good}];
242 } keys %NEW) {
243 my $entry = $NEW{$url}; # href
244 my $status = $entry->{Status};
245 my $base = $entry->{Base};
246 print “$url”;
247 print ” (base $base)” if defined $base;
248 print “:\n status: $status\n”;
249 for (qw(Checked Good LastModified)) {
250 if (my $stamp = $entry->{$_}) {
251 print ” $_ => “.localtime($stamp).”\n”;
252 }
253 }
254 dump_relative($url, “from”, $entry->{From});
255 dump_relative($url, “to”, $entry->{To});
256 }
257 print “\nEND REPORT\n\n”;
258 exit 0;
259
260 ## subroutines
261
262 sub add_link {
263 my ($from,$line,$url) = @_;
264
265 confess “not URL: $url” unless ref $url;
266
267 my $parse = PARSE($url);
268 return if $parse < 0;
269 $NEW{$url}{From}{$from}{$line}++;
270 $NEW{$from}{To}{$url}{$line}++;
271 return if exists $NEW{$url}{Status};
272 for (qw(Checked Good LastModified)) {
273 $NEW{$url}{$_} = $OLD{$url}{$_} || 0;
274 }
275 if ($parse >= 2) {
276 warn “Parsing $url\n” if $VERBOSE;
277 if (time < $NEW{$url}{Checked} + $RECHECK or
278 time < $NEW{$url}{Good} + $RECHECK_GOOD and
279 $NEW{$url}{LastModified} > 0) {
280 warn “… too early to recheck\n” if $VERBOSE;
281 $NEW{$url}{Status} = $OLD{$url}{Status};
282 my $base;
283 if ($OLD{$url}{Base}) {
284 $NEW{$url}{Base} = $base = $OLD{$url}{Base};
285 }
286 follow_links($url, $OLD{$url}{To} || {}, $base);
287 } else {
288 ## $NEW{$url}{Status} = “= PARSE HEAD”;
289 $NEW{$url}{Status} = “= PARSE GET”;
290 queue($url);
291 }
292 } elsif ($parse >= 1) {
293 warn “Pinging $url\n” if $VERBOSE;
294 if (time < $NEW{$url}{Checked} + $RECHECK or
295 time < $NEW{$url}{Good} + $RECHECK_GOOD) {
296 warn “… too early to recheck\n” if $VERBOSE;
297 $NEW{$url}{Status} = $OLD{$url}{Status};
298 } else {
299 $NEW{$url}{Status} = “= PING HEAD”;
300 queue($url);
301 }
302 } else { # $parse <= 0
303 warn “Skipping $url\n” if $VERBOSE;
304 $NEW{$url}{Status} = “Skipped”;
305 $NEW{$url}{Checked} = 0; # we no longer check this
306 }
307 }
308
309 sub queue {
310 my $url = shift;
311
312 my $status = $NEW{$url}{Status};
313 warn “Queueing $url for $status\n” if $VERBOSE > 1;
314 confess “bad status for $url: $status”
315 unless my($kind,$method) = $status =~ /^= (\S+) (\S+)/;
316 my $req = HTTP::Request->new($method => “$url”);
317 $req->if_modified_since($NEW{$url}{LastModified});
318 My::TaskManager::add_task($url, $kind, $req);
319 }
320
321 sub kid_task {
322 my ($url, $kind, $req) = @_;
323 warn “child “, $req->method, ” for “, “$url\n” if $VERBOSE > 1;
324 my $content;
325 my $content_type;
326 my $res = fetch($req,
327 sub {
328 my ($data, $response, $protocol) = @_;
329 unless ($content_type) {
330 if($content_type = $response->content_type) {
331 if ($kind eq “PING”) {
332 warn “aborting $url for ping\n”;# if
$VERBOSE > 1;
333 die “ping only”;
334 }
335 if ($content_type ne “text/html”) {
336 warn “aborting $url for
$content_type\n”;#if $VERBOSE>1;
337 die “content type is $content_type”;
338 }
339 }
340 }
341 $content .= $data;
342 if ($MAXSIZE and length $content > $MAXSIZE) {
343 warn “aborting $url for content length\n”;#
if $VERBOSE > 1;
344 die “content length is “, length $content;
345 }
346 }, 8192);
347 $res->content($content); # stuff what we got
348 return $res;
349 }
350
351 sub result_task {
352 my ($url, $res) = @_;
353 warn “results for $url:\n” if $VERBOSE;
354 my $status = $NEW{$url}{Status};
355 confess “bad status $status”
356 unless my($kind,$method) = $status =~ /^= (\S+) (\S+)/;
357 my $links = $OLD{$url}{To} || {};
358 my $base;
359 if ($res->is_success) {
360 my $content_type = $res->content_type;
361 warn “… successful fetch\n” if $VERBOSE;
362 if (“$kind $method” eq “PARSE HEAD” and $content_type eq
“text/html”) {
363 warn “… requeue for PARSE GET\n” if $VERBOSE;
364 $NEW{$url}{Status} = “= PARSE GET”;
365 queue($url);
366 return;
367 }
368 $base = $res->base->as_string;
369 $NEW{$url}{Checked} = $NEW{$url}{Good} = time;
370 $NEW{$url}{Base} = $base if $base ne $url;
371 $NEW{$url}{LastModified} = $res->last_modified || $res->date;
372 unless ($content_type eq “text/html”) {
373 warn “… not HTML\n” if $VERBOSE;
374 $NEW{$url}{Status} = “Verified (content = “.($res->
ontent_type).”)”;
375 return;
376 }
377 if ($kind eq “PARSE”) {
378 $NEW{$url}{Status} = “Verified and parsed”;
379 warn “… parsing\n” if $VERBOSE;
380 my $p = ParseLink->new;
381 $p->parse($res->content);
382 $p->eof;
383 $links = $p->get_links;
384 } else {
385 $NEW{$url}{Status} = “Verified (contents not examined)”;
386 warn “… good ping\n” if $VERBOSE;
387 }
388 follow_links($url, $links, $base);
389 return;
390 } elsif ($res->code == 304) {
391 warn “… not modified\n” if $VERBOSE;
392 $NEW{$url}{Status} = $OLD{$url}{Status};
393 $NEW{$url}{Checked} = $NEW{$url}{Good} = time;
394 } elsif ($res->is_redirect) {
395 my $location = $res->header(“Location”);
396 warn “… redirect to $location\n” if $VERBOSE;
397 add_link($url, -1, HACK_URL(URI->new_abs($location, $url)))
398 if $FOLLOW_REDIRECT;
399 $NEW{$url}{Status} = “Redirect (status = “.($res->
code).”) to $location”;
400 $NEW{$url}{Checked} = time;
401 return;
402 } else {
403 warn “… not verified\n” if $VERBOSE;
404 if (“$kind $method” eq “PING HEAD”) {
405 warn “… requeue for PING GET\n” if $VERBOSE;
406 $NEW{$url}{Status} = “= PING GET”;
407 queue($url);
408 return;
409 }
410 $NEW{$url}{Status} = “NOT Verified (status = “.($res->code).”)”;
411 $NEW{$url}{Checked} = time;
412 return if time > $NEW{$url}{Good} + $FOLLOW_GHOST;
413 warn “… but following ghost links\n” if $VERBOSE;
414 if (exists $OLD{$url}{Base}) {
415 $NEW{$url}{Base} = $base = $OLD{$url}{Base};
416 }
417 follow_links($url, $links, $base);
418 return;
419 }
420 }
421
422 sub timeout_task {
423 my ($url, $kind, $req) = @_;
424 warn “$url timed out\n” if $VERBOSE;
425 $NEW{$url}{Status} = “NOT Verified (Timed out)”;
426 $NEW{$url}{Checked} = time;
427 }
428
429 sub follow_links {
430 my ($url, $links, $base) = @_;
431
432 for my $link (sort keys %$links) {
433 my $abs = $link;
434 if ($base) {
435 $abs = URI->new_abs($link,$base);
436 } else {
437 $abs = URI->new($link);
438 }
439 $abs->fragment(undef); # blow away any fragment
440 $abs = HACK_URL($abs);
441 warn “… link $abs ($link)\n” if $VERBOSE > 1;
442 for my $line (sort keys %{$links->{$link}}) {
443 add_link($url, $line, $abs);
444 }
445 }
446 }
447
448 sub dump_relative {
449 my ($url,$label,$urls) = @_;
450 for my $other_url (sort keys %$urls) {
451 my $relative = URI->new($other_url)->rel($url) || “.”;
452 print ” $label $relative at “;
453 print join ” “, sort { $a <=> $b } keys %{$urls->
{$other_url}};
454 print “\n”;
455 }
456 }
457
458 ### forking task manager from here down
459
460 BEGIN { # task manager
461 package My::TaskManager;
462
463 my %tasks;
464
465 my %inactive_tasks;
466
467 sub add_task { # external entry point
468 my $key = shift;
469 $tasks{$key} = [@_];
470 }
471
472 sub remove_task {
473 delete $tasks{+shift};
474 }
475
476 sub next_inactive_task {
477 my $task;
478
479 ## use current sweep:
480 ($task) = each %inactive_tasks and return $task;
481
482 ## compute new sweep:
483 %inactive_tasks = ();
484 $inactive_tasks{$_} = 1 for keys %tasks;
485 delete @inactive_tasks{My::KidManager::active_tasks()};
486 ($task) = each %inactive_tasks and return $task;
487
488 ## nothing to do
489 return undef;
490 }
491
492 sub total_task_count {
493 return scalar keys %tasks;
494 }
495
496 sub task_parameters {
497 my $key = shift;
498
499 return $tasks{$key};
500 }
501 }
502
503 BEGIN { # kid manager
504 package My::KidManager;
505
506 use Storable qw(freeze thaw);
507 use POSIX qw(WNOHANG);
508 use IO::Select;
509 use IO::Pipe;
510
511 my %kids;
512 my $kid_max = 5;
513 my $kid_task;
514 my $result_task;
515 my $timeout_task;
516 my $loop_bottom = sub { };
517 my $trace = 0;
518 my $timeout;
519
520 sub run_queue { # external entry point
521 {
522 my %parms = @_;
523 $kid_max = delete $parms{KidMax} if exists
$parms{KidMax};
524 $kid_task = delete $parms{KidTask} if exists
$parms{KidTask};
525 $result_task = delete $parms{ResultTask} if exists
$parms{ResultTask};
526 $timeout_task = delete $parms{TimeoutTask} if exists
$parms{TimeoutTask};
527 $loop_bottom = delete $parms{LoopBottom} if exists
$parms{LoopBottom};
528 $trace = delete $parms{Trace} if exists $parms{Trace};
529 $timeout = delete $parms{Timeout} if exists $parms{Timeout};
530 die “unknown parameters for run_queue: “, join ” “,
keys %parms
531 if keys %parms;
532 }
533
534 {
535 warn “to go: “, My::TaskManager::total_task_count(),”\n”
if $trace;
536 warn”activetasks:”,join” “, active_tasks(),”\n” if$trace;
537 ## reap kids
538 while ((my $kid = waitpid(-1, WNOHANG)) > 0) {
539 warn “$kid reaped\n” if $trace;
540 delete $kids{$kid};
541 }
542 ## verify live kids
543 for my $kid (keys %kids) {
544 next if kill 0, $kid;
545 warn “*** $kid found missing ***\n”; # shouldn’t
happen normally
546 delete $kids{$kid};
547 }
548 ## launch kids
549 if (keys %kids < $kid_max and
550 my $task = My::TaskManager::next_inactive_task() and
551 my $kid = create_kid()) {
552 send_to_kid($kid, $task);
553 }
554 ## see if any ready results
555 READY:
556 for my $ready (IO::Select->new(map $_->[1], values
%kids)->can_read(1)) {
557 my ($kid) = grep $kids{$_}[1] == $ready, keys %kids;
558 {
559 last unless read($ready, my $length, 4) == 4;
560 $length = unpack “L”, $length;
561 lastunlessread($ready,my$message,$length)==$length;
562 $message = thaw($message) or die “Cannot thaw”;
563 My::TaskManager::remove_task($message->[0]);
564 $result_task->(@$message);
565 if(my$task=My::TaskManager::next_inactive_task()){
566 send_to_kid($kid, $task);
567 } else { # close it down
568 $kids{$kid}[0]->close;
569 }
570 next READY;
571 }
572 ## something broken with this kid…
573 kill 15, $kid;
574 delete $kids{$kid}; # forget about it
575 }
576 ## timeout kids
577 if (defined $timeout) {
578 my $oldest = time – $timeout;
579 for my $kid (keys %kids) {
580 next unless defined $kids{$kid}[2];
581 next unless defined $kids{$kid}[3];
582 next if $kids{$kid}[3] > $oldest;
583 if (my $task = $kids{$kid}[2]) {
584 my $param_ref = My::TaskManager::task_
parameters($task);
585 My::TaskManager::remove_task($task);
586 warn”timeoutfor $kid on $task\n” if$trace;
587 $timeout_task->($task, @$param_ref);
588 }
589 kill 15, $kid;
590 delete $kids{$kid};
591 }
592 }
593 $loop_bottom->();
594 redo if %kids or My::TaskManager::total_task_count();
595 }
596 }
597
598 sub create_kid {
599 my $to_kid = IO::Pipe->new;
600 my $from_kid = IO::Pipe->new;
601 defined(my$kid=fork)orreturn;#ifcan’tfork,trytomake do
602 unless ($kid) { # I’m the kid
603 $to_kid->reader;
604 $from_kid->writer;
605 $from_kid->autoflush(1);
606 $SIG{$_} = ‘DEFAULT’ for grep !/^–/, keys %SIG; #
very important!
607 do_kid($to_kid, $from_kid);
608 exit 0; # should not be reached
609 }
610 $from_kid->reader;
611 $to_kid->writer;
612 $to_kid->autoflush(1);
613 $kids{$kid} = [$to_kid, $from_kid];
614 $kid;
615 }
616
617 sub send_to_kid {
618 my ($kid, $task) = @_;
619
620 {
621 ## if we get a SIGPIPE here, no biggy, we’ll requeue
request later
622 local $SIG{PIPE} = ‘IGNORE’;
623 my $param_ref = My::TaskManager::task_parameters($task);
624 my $message = freeze([$task, @$param_ref]);
625 print{$kids{$kid}[0]}pack(“L”,length($message)),$message;
626 $kids{$kid}[2] = $task; # show as active
627 $kids{$kid}[3] = time; # for hard timeouts
628 }
629 }
630
631 sub active_tasks {
632 ## return count or list
633 return grep defined($_), map { $kids{$_}[2] } keys %kids;
634 }
635
636 sub do_kid {
637 my($input, $output) = @_;
638 warn “kid launched\n” if $trace;
639 {
640 last unless read($input, my $length, 4) == 4;
641 $length = unpack “L”, $length;
642 lastunlessread($input, my$message,$length)== $length;
643 $message = thaw($message) or die “Cannot thaw”;
644 my ($key, @values) = @$message;
645 my @results = $kid_task->($key, @values);
646 $message = freeze([$key, @results]);
647 print $output pack(“L”, length($message)), $message;
648 redo;
649 }
650 warn “kid ending\n” if $trace;
651 exit 0;
652 }
653
654 }
655 sub URI::mailto::host { “”; } # workaround bug in LWP



Randal L. Schwartz is the chief Perl guru at Stonehenge Consulting and co-author of Learning Perl and Programming Perl. He can be reached at merlyn@stonehenge.com.

Comments are closed.