dcsimg

Data Reduction, Part 2

Last month, we left off part way through a data reduction effort with ten million Apache log records in a single MySQL table that was taking up far too much disk space and memory. We analyzed the data, found ways to normalize the schema to reduce the space required, and created the new tables. Now let's finish the job by creating a script that can intelligently move data from the old table into the new ones.

Last month, we left off part way through a data reduction effort with ten million Apache log records in a single MySQL table that was taking up far too much disk space and memory. We analyzed the data, found ways to normalize the schema to reduce the space required, and created the new tables. Now let’s finish the job by creating a script that can intelligently move data from the old table into the new ones.

The Plan

Migrating data from the old table to the normalized set of tables should be both simple and not overly burdensome on the database server. We should never do anything that will lock the original table for more than a second or so, otherwise it would prevent hits from being logged in a timely fashion and would cause Apache processes to block, waiting for the database.

The method we’ll use is to migrate entries in chronological order, from oldest to newest, and we’ll do so in small batches, pausing between each batch to reduce the overall impact of converting the data. It will take a non-trivial amount of time, but it should be a smooth transition. To reduce the number of queries sent to MySQL, we’ll try to cache frequently used data in memory along the way.

The code is included with this article as Listing One. Let’s walk through the code and see how it works.

The Code

Lines 1-10 tell Perl to enable warnings, unbuffer standard output, and pull in the Hits::DBI package. Hits::DBI is a simple sub-class of DBIx::DWIW, which is a handy module that makes using Perl’s DBI even easier than it already is. The relevant code in Hits::DBI looks like this:

package Hits::DBI;

use DBIx::DWIW;

sub connect
{
    my $conn = DBIx::DWIW->Connect(
        User => ‘apache’,
        Pass => ‘pa55word’,
        DB   => ‘jeremy_zawodny_com’,
        Host => ‘localhost’,
    ) or die “$!”;
    return $conn;
}

The DB attribute contains the name of the database we’ll connect to. Here, jeremy_zawodnycom contains the new tables.

From there, we setup some variables that control what the script does. The $old_tbl table is where we’re pulling records from. $interval specifies the time range (in seconds) we’ll use when selecting records. That’s 24 minutes worth of traffic. (Initial test runs of the code showed that using a full hour was a bit too heavy.) $delay specifies how long the code sleeps at the end of the processing loop before it requests another batch of records to process.

Lines 18-20 define several global caches that we’ll use to the increase the performance of the conversion. There’s a cache for each of the lookup tables — agent, referer, and hit — that we created last month.

Lines 24-26 bootstrap things by finding the timestamp of the oldest record in the table. From there, we’ll walk forward in time using $interval.

Line 28 is a sanity check to make sure the script never touches data that’s less than one day old. This means the script could be safely run once a day via cron and it should always leave the latest 24 hours worth of data in the original table.

Lines 30-38 setup the SQL queries we’ll use to fetch a batch of hits and then insert them into the hit table. On line 42, we initialize the $total counter to zero. It’s used to keep track of how many records we migrate.

The main loop in the code begins on line 44 and continues to line 90. It starts by fetching a batch of records. The records come back as a list of hash refs stored in @r. For each record, we extract the agent, referer, and URI, and convert them to numeric identifiers using the agent(), referer(), and uri() functions. (We’ll discuss those shortly.)

Starting at line 62, we use the accumulated data to add a new record to the hit table. We then track and count up any errors and increment the total number of rows migrated in $total. After each row is processed, we use the record’s timestamp to calculate $hightime, which is used to bound the results returned from the SQL query to fetch the next batch of rows. At the end of the loop (line 87), we pause for a few seconds before starting the next batch.

The rest of the code is composed of the agent(), referer(), and uri() functions. All three functions perform nearly identical tasks and probably should be combined to reduce the amount of code. (But doing so would add some complexity to the code and make it a bit harder to understand at first glance.)

The uri() function, for example, is passed a URI and is tasked with returning the URI’s id from the uri table. If it doesn’t exist, it will add the URI and return the newly created id. First, we check the global cache. If it’s found, we return immediately. Otherwise, we check the database table, inserting a new record and returning the new id. Here’s the code (with line numbers) that performs those steps:

146 return $uri_id{$uri} if
      defined $uri_id{$uri};
149 my $id = $conn->Scalar("SELECT id
      FROM uri WHERE name = ?", $uri);
150 $uri_id{$uri} = $id;
151 return $id if defined $id;
154 if ($conn->Execute("INSERT INTO
      uri VALUES (NULL, ?)", $uri))...

Functions agent() and referer() are similar to uri().

You might notice that the global caches are never cleared. In testing this code, the memory usage grew to roughly 26 MB and stabilized. There wasn’t sufficient diversity in any of the lookup tables to cause the caching to become a problem. If it was, you could simple empty the caches (%agent_id, %referer_id, and %uri_id) every time that $total grows by 100,000 or so (if not $total % 100000)




Listing One


1  #!/usr/local/bin/perl -w
2  
3  $|=1;
4  
5  use strict;
6  use lib “$ENV{HOME}/apache”;
7  use Hits::DBI;
8  
9  my $dbg     = $ENV{DEBUG};
10  my $conn    = Hits::DBI->connect or die;
11  my $now     = time;
12  my $old_tbl = ‘apache.access_jeremy_zawodny_com’;
13  my $interval= 1440;
14  my $delay   = 3;
15  
16  # Caches
17  
18  my %agent_id;
19  my %referer_id;
20  my %uri_id;
21  
22  # Find oldest records
23  
24  my $sql = “SELECT MIN(time_stamp)
      FROM $old_tbl”;
25  my $oldest = $conn->Scalar($sql);
26  my $hightime = $oldest + $interval;
27  
28  exit if $hightime >= (time – 1440*60);
29  
30  my $get_batch_sql = qq[
31    SELECT agent, bytes_sent, referer,
        remote_host, request_uri,
32      status, time_stamp
33      FROM   $old_tbl
34      WHERE  time_stamp <= ?
35      ORDER BY time_stamp ASC
36  ];
37  
38  my $hit_sql = qq[INSERT INTO hit VALUES
      (?, ?, ?, ?, ?, ?, ?)];
39  
40  # We’ll be sure to nuke based on time_stamp,
      request_uri, remote_host
41  
42  my $total = 0;
43  
44  while (my @r = $conn->Hashes($get_batch_sql,
      $hightime))
45  {
46    my $error = 0;
47    my $count = 0;
48    for my $r (@r)
49    {
50      print “[$r->{time_stamp}]
          $r->{request_uri}\n” if $dbg;
51       my $agent_id = agent($r->{agent});
52         print “($agent_id): $r->{agent}\n” if $dbg;
53  
54      # referer
55      my $referer_id = referer($r->{referer});
56      print “($referer_id): $r->{referer}\n”
          if $dbg;
57  
58      # uri
59      my $uri_id = uri($r->{request_uri});
60      print “($uri_id): $r->{request_uri}\n”
          if $dbg;
61  
62      # hit
63      if (not $conn->Execute($hit_sql,
           $r->{time_stamp}, $r->{remote_host},
64      $r->{status}, $r->{bytes_sent},
         $agent_id, $referer_id, $uri_id))
65      {
66        die “hm. hit.”;
67      }
68  
69      # delete
70      my $where = “time_stamp = ? and
           request_uri = ? and remote_host = ?”;
71         my $del_sql = “DELETE FROM $old_tbl
             WHERE $where”;
72         my $sel_sql = “SELECT * FROM $old_tbl
             WHERE $where”;
73         if (not $conn->Execute($del_sql,
             $r->{time_stamp}, $r->{request_uri},
             $r->{remote_host}))
74        {
75            warn “possible error:
                [$r->{time_stamp}]
                [$r->{request_uri}]
                [$r->{remote_host}]\n”;
76              $error++;
77         }
78         $count++;
79        $total++;
80        $hightime = $r->{time_stamp} + $interval;
81    }
82    if ($error)
83    {
84      die “exiting with $error errors\n”;
85    }
86    print “$total ($count)”;
87    sleep $delay;
88    print “\n”;
89    last if $hightime >= (time – 1440*60);
90  }
91  
92  exit;
93  
94  sub agent
95  {
96    # cache?
97    my $agent = shift;
98    return $agent_id{$agent} if defined
        $agent_id{$agent};
99  
100    # exists in db?
101    my $id = $conn->Scalar(“SELECT id FROM
         agent WHERE name = ?”, $agent);
102    $agent_id{$agent} = $id;
103    return $id if defined $id;
104  
105    # add to db…
106    if ($conn->Execute(“INSERT INTO agent
         VALUES (NULL, ?)”, $agent))
107    {
108      $id = $conn->LastInsertID;
109      $agent_id{$agent} = $id;
110      return $id;
111    }
112    else
113    {
114      die “hm. agent.”
115    }
116  }
117  
118  sub referer
119  {
120    # cache?
121    my $referer = shift;
122    return $referer_id{$referer} if defined
         $referer_id{$referer};
123  
124    # exists in db?
125    my $id = $conn->Scalar(“SELECT id FROM
         referer WHERE name = ?”, $referer);
126    $referer_id{$referer} = $id;
127    return $id if defined $id;
128  
129    # add to db…
130    if ($conn->Execute(“INSERT INTO referer
         VALUES (NULL, ?)”, $referer))
131    {
132      $id = $conn->LastInsertID;
133      $referer_id{$referer} = $id;
134      return $id;
135    }
136    else
137    {
138      die “hm. referer.”
139    }
140  }
141  
142  sub uri
143  {
144    # cache?
145    my $uri = shift;
146    return $uri_id{$uri} if defined
         $uri_id{$uri};
147  
148    # exists in db?
149    my $id = $conn->Scalar(“SELECT id FROM
         uri WHERE name = ?”, $uri);
150    $uri_id{$uri} = $id;
151    return $id if defined $id;
152  
153    # add to db…
154    if ($conn->Execute(“INSERT INTO uri VALUES
         (NULL, ?)”, $uri))
155    {
156      $id = $conn->LastInsertID;
157      $uri_id{$uri} = $id;
158      return $id;
159    }
160    else
161    {
162      die “hm. uri.”
163    }
164  }
165  
166 __END__



Jeremy Zawodny (Jeremy@Zawodny.com) plays with MySQL by day and spends his spare time flying gliders in California and writing a MySQL book for O’Reilly & Associates.

Fatal error: Call to undefined function aa_author_bios() in /opt/apache/dms/b2b/linux-mag.com/site/www/htdocs/wp-content/themes/linuxmag/single.php on line 62