Manitou-Mail logo title

Source file: mdx/script/manitou-mdx

#!/usr/bin/perl

# manitou-mdx v-1.3.1
# Copyright (C) 2004-2014 Daniel Verite

# This file is part of Manitou-Mail (see http://www.manitou-mail.org)

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.

#####################################################################
# manitou-mdx: mail-database exchanger
# This program basically does the following:
#
# 1) polls directories where incoming mail files are dropped
# (by an external delivery agent): parse them, run plugins
# and apply filter rules, and import them into the database.
# 2) polls the database for new outgoing mail to pass
# on to a local delivery agent like sendmail.
#
#####################################################################

use strict;

use Encode;
use DBI;
use DBD::Pg;
use MIME::Head;
use MIME::Entity;
use MIME::Words qw(:all);
use MIME::Parser;
use MIME::Body;
use Mail::Address;
use Mail::Internet;
use POSIX qw(strftime mktime tmpnam getuid);
use File::stat;
use IO::Handle;
use IPC::Open3;
use URI::Escape;
use Getopt::Long;
use File::Temp qw(tempfile tempdir);
use File::Basename qw(basename dirname);
use Fcntl qw(:seek);
use Time::HiRes qw(gettimeofday tv_interval);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError) ;

use Manitou::Filters;
use Manitou::Jobs;
use Manitou::MailFormat;
use Manitou::Words qw(load_stopwords index_words flush_word_vectors
		     clear_word_vectors last_flush_time);
use Manitou::Tags qw(action_tag);
use Manitou::Attachments qw(flatten_and_insert_attach
			    detach_text_attachments
			    attach_parts create_html_part has_attachments);
use Manitou::Encoding qw(encode_dbtxt decode_dbtxt header_decode);
use Manitou::Config qw(getconf getconf_bool add_mbox readconf
		       set_common_conf mailboxes);
use Manitou::Database qw(db_connect);
use Manitou::Log qw(notice_log error_log init_log debug_log warning_log);
use Data::Dumper;

my $DEBUG=$ENV{'MANITOU_MDX_DEBUG'};

my $dbh;
my $global_end;
my $mail_id;			# currently processed
my $verbosity;

# hash of mail_id that we failed to send and do not want to retry
my %hsend_blocked;

my %preprocess_plugins;		# mbox => array of plugins in the order of execution
my %postprocess_plugins;
my %mimeprocess_plugins;
my %outgoing_plugins;
my @maintenance_plugins;

my %loaded_plugins;

my %options;


# header -> mail_addresses.addr_type
my %hAdrTypes=( "From" => 1,
		"To" => 2,
		"Cc" => 3,
		"ReplyTo" => 4,
		"Bcc" => 5
	      );


STDOUT->autoflush(1);
STDERR->autoflush(1);

main_multi();

# Write the current time into the runtime_info table to tell we're
# currently running
sub update_runtime_timestamp {
  local $dbh->{AutoCommit}=1;
  my ($key)=@_;
  my $t=time;
  my $sth=$dbh->prepare("UPDATE runtime_info SET rt_value=? WHERE rt_key=?");
  $sth->execute($t,$key);
  if (!$sth->rows) {
    my $sthi=$dbh->prepare("INSERT INTO runtime_info(rt_key,rt_value) VALUES (?,?)");
    $sthi->execute($key,$t);
    $sthi->finish;
  }
  $sth->finish;
  return $t;
}

sub update_runtime_errcount {
  local $dbh->{AutoCommit}=1;
  if (!$dbh->do("UPDATE runtime_info SET rt_value=to_char(to_number(rt_value,'9999999999')+1, '9999999999') WHERE rt_key='nb_errors'")) {
    $dbh->do("INSERT INTO runtime_info(rt_key,rt_value) VALUES ('nb_errors','1')");
  }
}


sub do_flush_word_vectors {
  $dbh->begin_work;
  print "Flushing vectors..." if ($verbosity);
  Manitou::Words::flush_word_vectors($dbh);
  Manitou::Words::clear_word_vectors;
  $dbh->commit;
  print "done\n" if ($verbosity);
}

sub list_pretagged_files {
  my $spooldir=$_[0];
  my @dirs=readdir($_[1]);
  my %tags;
  for my $d (@dirs) {
    next if ($d eq "." || $d eq "..");
    my $t=uri_unescape($d);
    $t=Encode::decode("utf-8", $t, Encode::FB_PERLQQ);
    $t =~ s/\//->/g;
    if (opendir(my $dh, $spooldir."/".$d)) {
      my @files=grep (/^mail-(\d+\-\d+\-\d+)\.received$/, readdir($dh));
      foreach (@files) {
	$tags{$d."/".$_} = $t;
      }
      closedir $dh;
    }
  }
  return %tags;
}

# mbox: identity from the configuration file
# origin: {type=>tempfile|dot-received|maildir , location=>'/some/path' }
sub import_mailfiles {
  my ($mbox, $origin)=@_;
  my ($done, $ret);

  my @files;
  my $dir;
  my %pretagged_files;

  if ($origin->{type} eq "dot-received") {
    $dir=$origin->{location};
    my $dh;
    if (!opendir($dh, $dir)) {
      error_log("Unable to access mailfiles_directory $dir: $!");
      return 0;
    }

    if (getconf("mailfiles_directory_flattened_folders", $mbox)) {
      %pretagged_files = list_pretagged_files($dir, $dh);
      @files = keys %pretagged_files;
    }
    else {
      @files=grep (/^mail-(\d+\-\d+\-\d+)\.received$/, readdir($dh));
    }
    closedir $dh;
  }
  elsif ($origin->{type} eq "maildir") {
    $dir=$origin->{location}."/new";
    my $dh;
    if (!opendir($dh, $dir)) {
      error_log("Unable to access directory $dir: $!");
      return 0;
    }
    @files=grep { -f "$dir/$_" } readdir($dh);
    closedir $dh;
  }
  elsif ($origin->{type} eq "tempfile") {
    @files = (basename($origin->{location}));
    $dir = dirname($origin->{location});
  }
  else {
    error_log("Unknown type in import for identity '$mbox'");
  }

  my $import_count=0;
  foreach my $rel_fname (sort @files) {
    # we don't want to import too much files in one go to avoid
    # having maintenance plugins and the sending of outgoing mail
    # being delayed
    last if ($global_end || $import_count>=20);
    my $fname = "$dir/$rel_fname";
    my $st=stat($fname);
    my $proc_filename = $fname;
    my $basename = $fname;
    my $do_process=1;
    if ($origin->{type} eq "dot-received") {
      $proc_filename =~ s/(.*)\.received$/$1.$$.processing/;
      $basename=$1;
      if (!$st || !rename($fname, $proc_filename)) {
	$do_process=0;	
	error_log("Cannot rename $fname");
      }
    }
    if (getconf_bool("auto_db_reconnect") && !$dbh->ping()) {
      db_reconnect();
    }
    if ($do_process) {
      my $t0 = [gettimeofday];

      $import_count++;
      my $str_file_date=strftime("%Y-%m-%d %H:%M:%S", localtime ($st->mtime));
      my %plugins_ctxt;
      $plugins_ctxt{filename}=$proc_filename;
      $plugins_ctxt{filesize} = $st->size;
      $plugins_ctxt{dbh}=$dbh;
      $plugins_ctxt{stage}="preprocess";
      $plugins_ctxt{status}=0;	# can be updated by plugins
      $plugins_ctxt{notice_log} = \&notice_log;
      $plugins_ctxt{error_log} = \&error_log;

      my $l=getconf("tags_incoming", $mbox);
      @{$plugins_ctxt{tags}}=@{$l} if ($l);
      if (exists $pretagged_files{$rel_fname}) {
	push @{$plugins_ctxt{tags}}, $pretagged_files{$rel_fname};
      }
      @{$plugins_ctxt{tags}}=@{$l} if ($l);
      for my $preproc_plugin (@{$preprocess_plugins{$mbox}}) {
	$preproc_plugin->process(\%plugins_ctxt);
      }
      my %ctxt;
      my $fh;
      if (open($fh, $proc_filename)) {
	$ctxt{'filename'} = $fname;
	$ctxt{'filesize'} = $st->size;
	$ctxt{'proc_filename'} = $proc_filename;
	$ctxt{'str_date'} = $str_file_date;
	if (defined($plugins_ctxt{'tags'})) {
	  @{$ctxt{'tags'}}=@{$plugins_ctxt{'tags'}};
	}
	if ($plugins_ctxt{'status'}) {
	  $ctxt{'status'}=$plugins_ctxt{'status'};
	}
	$ret = import_message($mbox, $fh, \%ctxt);
	close($fh);
      }
      else {
	$ret=0;
      }
      $done = $proc_filename;
      if ($ret>0) {
	$done = "$basename.processed";
      } elsif ($ret==0) {
	$done = "$basename.error";
      } elsif ($ret==-1) {
	$done = "$basename.discarded";
      }
      my $done_ok=1;
      if ($origin->{type} eq "dot-received") {
	if (!rename($proc_filename, $done)) {
	  error_log("Rename failed: $proc_filename => $done");
	  $done_ok=0;
	}
      }
      if ($done_ok) {
	if ($ret>0) {
	  $plugins_ctxt{filename}=$done;
	  $plugins_ctxt{stage}="postprocess";
	  if (defined($ctxt{'tags'})) {
	    @{$plugins_ctxt{'tags'}}=@{$ctxt{'tags'}};
	  }
	  for my $plugin (@{$postprocess_plugins{$mbox}}) {
	    $plugin->process(\%plugins_ctxt);
	  }
	  my $msg=sprintf("Imported: %s in %0.2fs", $fname, tv_interval($t0));
	  notice_log($msg);
	  update_runtime_timestamp("last_import");
	}
	elsif ($ret==0) {
	  error_log("Import failed: $fname");
	  update_runtime_timestamp("last_error");
	  update_runtime_errcount();
	}
	elsif ($ret==-1) {
	  error_log("Discarded: $fname");
	}
	my $cmd=getconf("postprocess_mailfile_cmd", $mbox);
	if (defined $cmd) {
	  my $result="imported";
	  if ($ret==0) { $result="error"; }
	  elsif ($ret==-1) { $result="discarded"; }
	  system("/bin/sh", "-c", $cmd, $result, $done, $mbox);
	  if (($?>>8)!=0) {
	    warning_log("Execution of postprocess_mailfile_cmd failed (\`$cmd\`, exit code=".($?>>8).")");
	  }
	}
	if ($origin->{type} eq "maildir") {
	  if (!unlink($proc_filename)) {
	    error_log("Failed to delete file $proc_filename: $!");
	  }
	}
	if ($ret!=0 && $origin->{type} eq "dot-received" &&
	    getconf_bool("delete_processed_mail_files", $mbox)) {
	  if (!unlink($done)) {
	    error_log("Failed to delete file $done: $!");
	  }
	}
      }
    }

    # Word indexing: flush the word vectors to the db if necessary
    my $widx_size=Manitou::Words::queue_size();
    if ($widx_size > 0) {
      if ($widx_size >= getconf("flush_word_index_max_queued") ||
	 time-Manitou::Words::last_flush_time() >= getconf("flush_word_index_interval")) {
	do_flush_word_vectors;
      }
    }
  }
  # returns the number of files that are still candidates to being imported
  # in most cases that will be 0
  return @files-$import_count;
}

sub min {
  my $m=shift;
  foreach (@_) {
    $m=$_ if ($_<$m);
  }
  return $m;
}

sub is_excluded {
  my $c=shift;
  my $r=getconf("exclude_contents");
  if (defined $r) {
    my @x=split /\s+,\s+/, $r;
    foreach (@x) {
      return 1 if ($_ eq $c);
    }
  }
  return 0;
}


sub db_reconnect {
  db_connect();
  notice_log("Successful database reconnect");
}


# Extract one message from a mailbox and copy it into a temporary file
# The first line may be ^From_ but it may also be the next line
# when the ^From_ has already been read as the end marker of the previous
# message
sub extract_mbox_tmpfile {
  my ($pfh)=@_;  # may be at end of file
  my $fh=$$pfh;
  my $end=0;
  my $line=0;
  my $llwe=0; # last line was empty
  my ($ofh,$name);
  while (<$fh>) {
    $line++;
    if (/^From /) {
      if ($line==1) {
	next;
      }
      else {
	last if ($llwe); # empty line followed by ^From_ means end of mail
      }
    }
    if (!defined $ofh) {
      ($ofh,$name)= tempfile();
      if (!$ofh) {
	error_log("Could not create temporary file when parsing mailbox:$!");
	return undef;
      }
    }
    print $ofh $_;
    $llwe=($_ eq "\n")?1:0;
  }
  close $ofh if (defined $ofh);
  return $name;
}

sub main_multi {
  my $mailbox_file;
  my $conf_file;
  my $global_tag;
  my $global_mbox;
  my $option_fork;
  my $option_pidfile;
  my ($option_import_list, $option_import_basedir);
  my $mbox_skip=0;
  my $rc = GetOptions("mboxfile:s" => \$mailbox_file,
		      "status:i", => \$options{'status'},
		      "conf:s" => \$conf_file,
		      "tag:s" => \$global_tag,
		      "fork" => \$option_fork,
		      "import-list:s" => \$option_import_list,
		      "import-basedir:s" => \$option_import_basedir,
		      "pidfile:s" => \$option_pidfile,
		      "mailbox:s" => \$global_mbox,
		      "skip:i" => \$mbox_skip,
		      "verbosity" => \$verbosity);

  if (!$rc) {
    print STDERR "Usage: $0 [--conf=config_file] [[--mboxfile=path [--skip=# of msgs]] [--status=import_status] [--tag=tagname] ]\n";
    print STDERR "Usage: $0 [--conf=config_file] --import-list=path [--import-basedir=directory] [--status=import_status] [--tag=tagname] [--mailbox=mailbox_name]\n";
    print STDERR "Usage: $0 [--conf=config_file] --fork [--pidfile=/path/to/pidfile]\n";
    exit 1;
  }

  if (!defined $conf_file && -e "/etc/manitou-mdx.conf") {
    $conf_file="/etc/manitou-mdx.conf";	# default config file
  }
  if (defined $conf_file) {
    my %err;
    if (!readconf($conf_file, \%err)) {
      print STDERR "Error in config file: ", $err{msg}, "\n";
      exit 1;
    }
  }
  init_log();
  init_temp_dir();


  if (defined $global_mbox && $global_mbox ne "") {
    # let init_mailboxes create the mailbox if necessary
    add_mbox($global_mbox);
  }

  # If $option_fork is set, we connect to the database only to test if the
  # connection succeeds, so that we can exit with a useful error message
  # instead of forking.
  $dbh = db_connect();

  init_identities();
  init_plugins();
  load_stopwords($dbh);

  # Build the list of directories where mailfiles are to be looked for
  # A directory points to a mailbox (possibly an anonymous mailbox)
  my %mailfiles_dirs;
  my %maildirs;
  my %spool_mboxes;
  for my $m (mailboxes()) {
    my $dir=getconf("mailfiles_directory", $m);
    if (defined $dir) {
      $mailfiles_dirs{$m}= {type=>"dot-received", location=>$dir};
    }
    $dir=getconf("spool_maildir", $m);
    if (defined $dir) {
      $maildirs{$m} = {type=>"maildir", location=>$dir};
    }
    $dir=getconf("spool_mailbox", $m);
    if (defined $dir) {
      $spool_mboxes{$m} = {type=>"mailbox", location=>$dir, last_size=>0, last_mtime=>0};
    }
  }

  if ($option_fork) {
    $dbh->disconnect;
    # Do as many error checks we can before closing stderr
    if (!%mailfiles_dirs && !%maildirs && !%spool_mboxes) {
      print STDERR "No mailspool defined. In daemon mode, the configuration should define spool_maildir or spool_mailbox or mailfiles_directory for at least one identity.\n";
      exit 1;
    }

    my $f=fork();
    if ($f>0) {
      if (defined $option_pidfile) {
	open(F, ">$option_pidfile") or die "Cannot open $option_pidfile: $!\n";
	print F "$f\n";
	close(F);
      }
      exit(0);			# exit of parent process
    }
    else {
      # reopen
      open(STDIN, "</dev/null");
      open(STDOUT, ">/dev/null");
      open(STDERR, ">/dev/null");
      # reconnect
      $dbh = db_connect();
    }
  }


  if (defined($mailbox_file)) {
    my $mail_cnt=0;
    my $filename;
    open (F, $mailbox_file) or die "$mailbox_file: $!\n";
    my $end=0;
    umask(077);
    my $opened=0;
    my $fh_tmp;
    while (!$end) {
      $_=<F>;
      if (!defined($_)) { $end=1; }
      if ($end || /^From /) {
	if ($mail_cnt>$mbox_skip) {
	  # import mail
	  if (defined $fh_tmp) {
	    close $fh_tmp;
	    $fh_tmp=undef;
	  }
	  my $st=stat($filename) or die "$filename: $!\n";
	  open (MAIL_FILE,"$filename") or die "$filename: $!\n";
	  print "\rImporting $mail_cnt";
	  my %ctxt;
	  $ctxt{'filename'}=$filename;
	  $ctxt{'str_date'}=undef;
	  $ctxt{'proc_filename'}=$filename;
	  $ctxt{'filesize'}=$st->size;
	  if ($global_tag ne "") {
	    push @{$ctxt{'tags'}}, $global_tag;
	  }
	  import_message($global_mbox, \*MAIL_FILE, \%ctxt);
	  close(MAIL_FILE);
	  unlink($filename);
	  if (Manitou::Words::queue_size() > 0) {
	    if (time-Manitou::Words::last_flush_time() >=
		getconf("flush_word_index_interval")) {
	      do_flush_word_vectors;
	    }
	  }
	}
	if (!$end) {
	  if (++$mail_cnt > $mbox_skip) {
	    ($fh_tmp,$filename) = tempfile();
	    die $! if (!defined $fh_tmp);
	  }
	}
      }
      else {
	if (defined $fh_tmp) {
	  print $fh_tmp $_ or die "$filename: $!\n";
	}
      }
    }
    close (F);
    print "\n" if ($mail_cnt>$mbox_skip);
  }
  elsif (@ARGV) {
    foreach my $fname (@ARGV) {
      my $st=stat($fname) or die "$fname: $!";
      my $str_file_date=strftime("%Y-%m-%d %H:%M:%S", localtime($st->mtime));
      my $fh;
      open($fh, $fname) or die "$fname: $!\n";
      my %ctxt;
      $ctxt{'filename'}=$fname;
      $ctxt{'filesize'} = $st->size;
      $ctxt{'proc_filename'}=$fname;
      $ctxt{'str_date'}=$str_file_date;
      if ($global_tag ne "") {
	push @{$ctxt{'tags'}}, $global_tag;
      }
      my $l=getconf("tags_incoming", $global_mbox);
      if ($l) {
	push @{$ctxt{tags}}, @{$l};
      }
      import_message($global_mbox, $fh, \%ctxt);
      close($fh);
    }
  }
  elsif (defined $option_import_list) {
    open(my $fhl, '<', $option_import_list) or die "$option_import_list: $!";
    while (my $fname=<$fhl>) {
      chomp $fname;
      my $orig_fname=$fname;
      if (defined $option_import_basedir) {
	$fname = "$option_import_basedir/$fname";
      }
      my $st=stat($fname) or die "$fname: $!";
      my $str_file_date=strftime("%Y-%m-%d %H:%M:%S", localtime($st->mtime));
      open(my $fh, "<", $fname) or die "$fname: $!\n";
      my %ctxt;
      $ctxt{'filename'}=$fname;
      $ctxt{'filesize'} = $st->size;
      $ctxt{'proc_filename'}=$fname;
      $ctxt{'str_date'}=$str_file_date;
      if ($global_tag ne "") {
	push @{$ctxt{'tags'}}, $global_tag;
      }
      my $l=getconf("tags_incoming", $global_mbox);
      if ($l) {
	push @{$ctxt{tags}}, @{$l};
      }
      import_message($global_mbox, $fh, \%ctxt);
      print "$orig_fname\n" if ($verbosity);
      close($fh);

      if (Manitou::Words::queue_size() >= getconf("flush_word_index_max_queued")) {
	do_flush_word_vectors;
      }
    }
    close $fhl;
  }
  else {
    # daemon mode
    $SIG{'TERM'} = 'sigterm';
    $SIG{'INT'} = 'sigterm';
    LogSuccess("starting in daemon mode");
    $dbh->begin_work;
    Manitou::Words::flush_jobs_queue($dbh);
    $dbh->commit;

    my $last_checked_incoming=time-getconf("incoming_check_interval")-1;

    $global_end=0;
    $mail_id=0;
    my $last_alive;
    my $alive_interv=getconf('alive_interval');
    update_runtime_timestamp("last_alive") if ($alive_interv);

    my $in_interv=getconf("incoming_check_interval");
    my $out_interv=getconf("outgoing_check_interval");
    my $no_send=(getconf_bool("no_send"));
    my $still_to_go;

    # Start by sending any pending outgoing mail
    my $last_checked_outgoing=time;
    send_mails() unless $no_send;

    # Main loop
    while (!$global_end) {
      if ($still_to_go>0 || time >= $last_checked_incoming+$in_interv) {
	# import
	$last_checked_incoming=time;
	$still_to_go=0;
	for my $mbox (keys %mailfiles_dirs) {
	  $still_to_go += import_mailfiles($mbox, $mailfiles_dirs{$mbox});
	}
	for my $mbox (keys %maildirs) {
	  $still_to_go += import_mailfiles($mbox, $maildirs{$mbox});
	}
	for my $mbox (keys %spool_mboxes) {
	  my $o=$spool_mboxes{$mbox};
	  my $stmb=stat($o->{location});
	  if ($stmb) {
	    if ($stmb->mtime!=$o->{last_mtime} || $stmb->size!=$o->{last_size}) {
	      $o->{last_mtime}=$stmb->mtime;
	      $o->{last_size}=$stmb->size;
	      my $cmd=getconf("movemail_command", $mbox);
	      $cmd="movemail" if (!defined $cmd);
	      my $tmpdir=getconf("tmpdir");
	      my $dest_inbox=$tmpdir."/inbox-".$mbox;
	      if ($cmd) {
		$cmd="$cmd \"" . getconf("spool_mailbox", $mbox) . "\" \"$dest_inbox\"";
		my $output=`$cmd`;
		if (($?>>8)!=0) {
		  error_log("movemail error: (\`$cmd\`, exit code=".($?>>8)."): $output");
		}
		else {
		  my $fh;
		  if (open($fh, $dest_inbox)) {
		    while(1) {
		      my ($tmpname) = extract_mbox_tmpfile(\$fh);
		      last if (!$tmpname);
		      my $htmp = {type=>"tempfile", location=>$tmpname};
		      import_mailfiles($mbox, $htmp);
		      unlink($tmpname);
		    }
		    close($fh);
		    unlink($dest_inbox);
		  }
		}
	      }
	    }
	  }
	}
      }
      if (!$no_send) {
	# send
	if (time >= $last_checked_outgoing+$out_interv) {
	  $last_checked_outgoing=time;
	  if (getconf_bool("auto_db_reconnect") && !$dbh->ping()) {
	    db_reconnect();
	  }
	  send_mails();
	}
      }
      else {
	$last_checked_outgoing=time;
      }
      if ($alive_interv && time >= $last_alive+$alive_interv) {
	# confirm that we're running
	$last_alive = update_runtime_timestamp("last_alive");
      }

      # compute the minimum lapse of time before we'll need to
      # do something, and then sleep that amount of time
      my $t=time;
      my ($mplugin,$maint_when) = check_maintenance_schedule($t);

      while ($maint_when && $maint_when<=$t) {
	run_maint_plugin($mplugin);
	($mplugin,$maint_when) = check_maintenance_schedule($t);
      }
      my $nx_act=min($last_checked_incoming+$in_interv,
		     $last_checked_outgoing+$out_interv);
      $nx_act = min($nx_act, $last_alive+$alive_interv) if ($alive_interv);
      $nx_act = min($nx_act, $maint_when) if ($maint_when);

      my $widx_size=Manitou::Words::queue_size();
      if ($widx_size >0) {
	my $when_widx = Manitou::Words::last_flush_time() + getconf("flush_word_index_interval");
	if ($when_widx <= $t || $widx_size >= getconf("flush_word_index_max_queued")) {
	  do_flush_word_vectors;
	}
	else {
	  $nx_act=min($nx_act, $when_widx);
	}
      }
      Manitou::Jobs::check_end_jobs($dbh);
      Manitou::Jobs::process_jobs_queue($dbh);
      check_new_jobs();

      # sleep unless there's something better to do right now
      if (!($nx_act<=$t || $still_to_go>0)) {
	my $bits;
	vec($bits, $dbh->{pg_socket}, 1)=1;
	my $found = select($bits, undef, undef, $nx_act-$t);
	if ($found) {
	  while (my $notif = $dbh->pg_notifies) {
	    my ($notif_name, $pid, $payload) = @$notif;
	    process_notification($notif_name, $payload);
	  }
	}
      }
    }
  }

  if (Manitou::Words::queue_size() > 0) {
    do_flush_word_vectors;
  }
  $dbh->disconnect;
}

sub check_new_jobs {
  my $sthj = $dbh->prepare("SELECT job_id,job_type,job_args,status FROM jobs_queue WHERE coalesce(status,0)=0");
  $sthj->execute;
  my $job=$sthj->fetchrow_hashref;
  if ($job) {
    if ($job->{job_type} eq 'import_mailbox') {
      $dbh->do("UPDATE jobs_queue SET status=1 WHERE job_id=?", {}, $job->{job_id});
      if (job_import_mailbox($job)) {
	$dbh->do("DELETE FROM jobs_queue WHERE job_id=?",{}, $job->{job_id});
      }
    }
  }
}

sub process_notification {
  my ($name, $payload)=@_;
  print "notification: $name, payload: $payload\n";
  check_new_jobs();
}

sub job_import_mailbox {
  # Input: h{job_id,job_type,job_args}
  my $row=shift;
  if ($row->{job_args} =~ /^\d+$/) {
    notice_log("Starting mailbox import job #$row->{job_id}");
    if (import_database_mailbox($row->{job_args})) {
      notice_log("Mailbox import job #$row->{job_id} (import_id=$row->{job_args}) complete");
    }
    else {
      error_log("Mailbox import job #$row->{job_id} (import_id=$row->{job_args}) did not complete successfully");
    }
  }
}

sub import_database_mailbox {
  my $import_id=shift;
  my $result=1;
  my $end=0;
  my $previous_completion;

  my $s1=$dbh->prepare("SELECT tag_id,mail_status,apply_filters,auto_purge FROM import_mbox WHERE import_id=?");
  $s1->execute($import_id);
  my $import = $s1->fetchrow_hashref;
  $s1->finish;

  my $s2=$dbh->prepare("SELECT count(*),sum(case when status=1 then 1 else 0 end) FROM import_message WHERE import_id=?");
  $s2->execute($import_id);
  my ($cnt_total, $cnt_done)=$s2->fetchrow_array;
  $s2->finish;

  # We retrieve only one message per query from import_message. This was meant to start
  # the import while the next messages were still being uploaded. Actually ATM this is not
  # the case, the UI won't start the import until the entire mbox has been uploaded
  # and we need them (at least the count) to send a proper progress report anyway.
  my $sth=$dbh->prepare("SELECT mail_number, encoded_mail FROM import_message WHERE import_id=? AND coalesce(status,0)=0 ORDER BY mail_number LIMIT 1");

  my $sth2=$dbh->prepare("SELECT status FROM import_mbox where import_id=?");
  my $su = $dbh->prepare("UPDATE import_message SET status=?,mail_id=? WHERE mail_number=? AND import_id=?");

  $dbh->do("UPDATE import_mbox SET status=1 WHERE import_id=?", {} , $import_id);

  while (!$end && $result) {
    $sth2->execute($import_id);
    my ($import_status) = $sth2->fetchrow_array;
    if ($sth2->rows==0 || $import_status==2) {
      # The import has been aborted
      $end=1;
      $result=0;
      last;
    }

    $sth->execute($import_id);
    my @r=$sth->fetchrow_array;
    if (!@r) {
      $end=1;
      last;
    }

    my ($fh,$filename) = tempfile();
    if (!defined $fh) {
      error_log("Unable to create temp file: $!");
      $result=0;
      last;
    }
    print $fh $r[1];
    close($fh);

    my $st=stat($filename);
    if (!open($fh,"$filename")) {
      error_log("Unable to open $filename: $!");
      $result=0;
      last;
    }

    my %ctxt;
    $ctxt{'filename'}=$filename;
    $ctxt{'str_date'}=undef;
    $ctxt{'proc_filename'}=$filename;
    $ctxt{'filesize'}=$st->size;
    $ctxt{'skip_filters'}=1 if ($import->{apply_filters} eq "N");
    $ctxt{tag_id}=$import->{tag_id} if ($import->{tag_id}>0);
    $ctxt{status}=$import->{mail_status} if ($import->{mail_status});
    my $mail_id=import_message(undef, $fh, \%ctxt); # mbox is undef
    if ($mail_id>0) {
      $su->execute(1, $mail_id, $r[0], $import_id);
      if ($cnt_total>0) {
	my $c=sprintf("%0.2f", (++$cnt_done)/$cnt_total);
	if ($c ne $previous_completion) {
	  $previous_completion=$c;
	  $dbh->do("UPDATE import_mbox SET completion=? WHERE import_id=?", {}, $c, $import_id);
	  $dbh->do("NOTIFY mbox_import_progress");  # TODO: payload
	}
      }
    }
    else {
      $su->execute(2, $r[0], $import_id);
    }
    close($fh);
    unlink($filename);
    $sth->finish;
  }

  if ($result==1) {
    $dbh->begin_work;
    $dbh->do("DELETE FROM import_message WHERE import_id=?", {}, $import_id);
    if ($import->{auto_purge} eq "Y") {
      $dbh->do("DELETE FROM import_mbox WHERE import_id=?", {}, $import_id);
    }
    else {
      $dbh->do("UPDATE import_mbox SET status=3 WHERE import_id=? AND status=1", {} , $import_id);
    }
    $dbh->do("NOTIFY mbox_import_progress");
    $dbh->commit;
  }
  return $result;
}

# Returns the timestamp at which the maintenance plugin $p should be run
# after the timestamp $now. The result must be >$now
sub next_maint_run {
  my ($p,$now)=@_;
  if ($p->{frequency_type} eq "interval") {
    return $now+$p->{frequency}*60;
  }
  elsif ($p->{frequency_type} eq "pit") {
    my ($h,$mn)=split /:/, $p->{frequency};
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) =
      localtime($now);
    if ($h eq "*") {
      return $now+(($mn>=$min)?($mn-$min):(60-$min+$mn))*60;
    }
    else {
      my ($dh,$dm);
      if ($h==$hour) {
	if ($mn==min) { $dh=24; $dm=0; }
	elsif ($mn>$min) { $dh=0; $dm=$mn-$min; }
	else { $dh=23; $dm=60-$min+$mn; }
      }
      elsif ($h>$hour) {
	$dh=$h-$hour;
	if ($mn>=$min) {
	  $dm=$mn-$min
	}
	else {
	  $dm=60-$min+$mn;
	  $dh--;
	}
      }
      else { # $h<$hour
	$dh=24-$hour+$h;
	if ($mn>=$min) {
	  $dm=$mn-$min;
	}
	else {
	  $dm=60-$min+$mn;
	  $dh--;
	}
      }
      return $now+($dh*60+$dm)*60;
    }
  }
  else {
    die $p->{frequency_type} . ": unsupported frequency type\n";
  }
}

# Compute the minimum number of seconds between $now and the launch
# of any maintenance plugin
# return ($mplugin,$t) where $mplugin is a reference to the plugin
# and $t the number of seconds.
sub check_maintenance_schedule {
  my ($now)=@_;
  my $tmin=0;
  my $pmin;
  for my $p (@maintenance_plugins) {
    if (!$p->{next_run}) { # the plugin has never been run nor scheduled
      $p->{next_run} = next_maint_run($p,$now);
    }
    if ($p->{next_run}<$tmin || $tmin==0) {
      $tmin=$p->{next_run};
      $pmin=$p;
    }
  }
  if ($pmin) {
    # round off the timestamp to an integral number of minutes
    return ($pmin, $tmin-($tmin%60));
  }
  return 0;
}

sub run_maint_plugin {
  my ($p)=@_;
  my %ctxt;
  $ctxt{'dbh'}=$dbh;
  $ctxt{'stage'}="maintenance";
  $ctxt{'notice_log'} = \&notice_log;
  $ctxt{'error_log'} = \&error_log;
  notice_log("Running maintenance plugin: $p->{name}");
  $p->process(\%ctxt);
  $p->{next_run}=next_maint_run($p, time);
}

sub init_temp_dir {
  my $dir = getconf('tmpdir');
  if (defined $dir) {
    if (! -d $dir) {
      mkdir($dir, 0700) or die "Cannot create $dir: $!\nPlease set the 'tmpdir' configuration parameter to a usable directory for temporary files";
    }
    my $st=stat($dir);
    if (($st->mode & 7) != 0) {
      die "Security check failed: $dir has unsecure permissions: use 0700 permissions.\n" unless (getconf('security_checks') eq "no");
    }
    if ($st->uid != getuid()) {
      my @pw_me=getpwuid(getuid());
      my @pw_dir=getpwuid($st->uid);
      my $msg=sprintf("Security problem: $dir belongs to user '%s' while we are running as user '%s'\n", $pw_dir[0], $pw_me[0]);
      die $msg unless (getconf('security_checks') eq "no");
    }
  }
  else {
    $dir=File::Temp::tempdir(CLEANUP=>1);
    if (!$dir) {
      die "Could not create a temporary directory";
    }
    set_common_conf('tmpdir', $dir);
  }
}

sub LogError {
  error_log(@_);
}

sub LogSuccess {
  notice_log(@_);
}

# Remove the non-paired occurrences of symbols from a phrase
# and returns that phrase
sub enforce_pairs {
  my ($phrase,$symboles) = @_;
  my @parenth; # stack of open parenthesis
  my $replc =  "";
  # last character: '(' or similar opening element
  my $f = chop($symboles);
  # first character: ')' or similar closing element
  my $o = $symboles;
  while ($phrase =~ /[$o$f]/g) {
    if ("$&" eq $o) {
      # opening
      push (@parenth, pos($phrase)-1);
    }
    elsif ("$&" eq $f) {
      # closing, pop the last opened element
      if (@parenth != 0) {
	pop (@parenth);
      }
      else {
	# if no '(' can be popped, remove the ')' from the phrase
	substr ($phrase,pos($phrase)-1,1) = $replc;
      }
    }
  }

  # Remove the '(' that have no matching ')'
  while (@parenth != 0) {
    substr ($phrase,pop(@parenth),1) = $replc;
  }
  return $phrase;
}


sub insert_body {
  my ($mail_id, $btext, $bhtml) = @_;
  my $sth = $dbh->prepare("INSERT INTO body(mail_id,bodytext,bodyhtml) VALUES (?,?,?)");

  # the DBD character escaping does not allow for '\0' characters
  # we just remove them
  my $enc_txt = encode_dbtxt($$btext);
  my $h_txt = encode_dbtxt($$bhtml);
  $enc_txt =~ s/\000//g;
  $h_txt =~ s/\000//g;

  $sth->bind_param(1, $mail_id);
  $sth->bind_param(2, $enc_txt);
  $sth->bind_param(3, $h_txt);

  my $rc = $sth->execute;

  $sth->finish;
}

sub insert_header {
  my ($mail_id, $btext) = @_;
  my $sth = $dbh->prepare("INSERT INTO header(mail_id,lines) VALUES (?,?)");
  my $enc_txt = encode_dbtxt($$btext);
  $sth->bind_param(1, $mail_id);
  $sth->bind_param(2, $enc_txt);
  $sth->execute;
  $sth->finish;
}

sub convnull {
  my ($str) = @_;
  if (defined $str) {
    return $str;
  } else {
    return "";
  }
}

sub sigterm {
  LogError ("SIGTERM caught. Exiting...");
  $global_end=1;
}

# Import the message and deal with database errors
sub import_message {
  my ($mbox_name, $mail_handle, $mail_ctxt) = @_;
  # short-circuit the processing if the message is to be discarded
  return -1 if ($mail_ctxt->{status}==-1);

  my $id;
  eval {
    $id = import_message_2(@_);
  };
  if ($@) {
    if ($dbh->err > 0) {
      error_log("Database error: $@");
    }
    else {
      error_log("Import error: $@");
    }
    $dbh->rollback();
    Manitou::Words::clear_last_indexed_mail($dbh, $mail_id) if ($mail_id>0);
    return 0;
  }
  return $id;
}

# Return values
# >0: OK
# 0: error
# -1: mail discarded
sub import_message_2 {
  my ($mbox_name, $mail_handle, $mail_ctxt) = @_;
  my ($mail_filename, $mail_str_date)=($mail_ctxt->{'filename'}, $mail_ctxt->{'str_date'});
  my ($head, $body_text, $body_html, $top, $attachments);
  my $parser = new MIME::Parser;
  my $gzfh; # handle for Gunzip
  my $failed=0;
  $mail_id=0;

  # Mime Parser configuration
  $parser->output_dir(getconf("tmpdir"));
#  $parser->decode_headers(1);
  $parser->output_to_core(20000);
  $parser->parse_nested_messages(0);

  # check if it'gzip'ed
  my $header2;
  my $rd2=read($mail_handle, $header2, 2);
  seek $mail_handle, 0, SEEK_SET;
  if ($rd2==2 && $header2 eq "\x1f\x8b") {
    my $gzfh=new IO::Uncompress::Gunzip $mail_handle
      or die "IO::Uncompress::Gunzip failed: $GunzipError";
    $top = $parser->read($gzfh);
  }
  else {
    $top = $parser->read($mail_handle);
  }

  # Discard the mail if it can't be parsed
  if (!$top) {
    error_log("Malformed mail message");
    close $gzfh if (defined $gzfh);
    return 0;
  }
#  $top->head->unfold;

  $dbh->begin_work;

  # Get a new mail id from the sequence
  $mail_id = get_sequence_nextval("seq_mail_id");

  my %pl_ctxt;
  if (defined($mimeprocess_plugins{$mbox_name})) {
    $pl_ctxt{'filename'}=$mail_filename;
    $pl_ctxt{'stage'}="mimeprocess";
    $pl_ctxt{'mimeobj'}=$top;
    $pl_ctxt{'mail_id'}=$mail_id;
    $pl_ctxt{'dbh'}=$dbh;
    if ($mail_ctxt->{'tags'}) {
      @{$pl_ctxt{'tags'}} = @{$mail_ctxt->{'tags'}};
    }
    for my $plugin (@{$mimeprocess_plugins{$mbox_name}}) {
      $plugin->process(\%pl_ctxt);
    }
    if (defined $pl_ctxt{'tags'}) {
      @{$mail_ctxt->{'tags'}} = @{$pl_ctxt{'tags'}};
    }
  }

  $attachments = 0;
  if ($top->effective_type && $top->effective_type ne "text/plain") {
    $body_text = "";
  }
  else {
    $body_text=$top->bodyhandle->as_string unless (!$top->bodyhandle);
    my $charset = $top->head->mime_attr("content-type.charset") || 'iso-8859-1';
    $charset='iso-8859-1' if (!Encode::resolve_alias($charset));
    $body_text = Encode::decode($charset, $body_text, Encode::FB_PERLQQ);
  }

  my $thread_id = get_thread_id ($top);

  $mail_ctxt->{mail_id} = $mail_id;
  $mail_ctxt->{thread_id} = $thread_id;
  $mail_ctxt->{mailbox_address} = $mbox_name;

  my $action;
  $action = apply_filters($mail_ctxt, $top, \$body_text, 'I') unless ($mail_ctxt->{skip_filters});

  if ($action ne "discard") {

    my $status=$options{'status'}+0;
    if ($mail_ctxt->{status}) {
      $status |= $mail_ctxt->{status};
    }
    #if ($action eq "trash") { $status |= 16+32; }

    insert_mail($mbox_name,
		$mail_id,
		$attachments,
		$mail_str_date,
		$top,
		$thread_id,
		$status,
		$mail_ctxt);

    if (getconf("store_filenames", $mbox_name)) {
      store_filename($mail_id, $mail_filename);
    }
    if (getconf_bool("store_raw_mail", $mbox_name)) {
      store_raw_mail($mail_id, $mail_ctxt->{proc_filename});
    }

    # Insert attachments before the body since detach_text_attachments
    # does affect $body_text
    if ($top->effective_type && $top->effective_type ne "text/plain") {
      if ($top->effective_type eq "text/html") {
	my $charset = $top->head->mime_attr("content-type.charset") || 'iso-8859-1';
	$charset ='iso-8859-1' if (!Encode::resolve_alias($charset));
	$body_html = Encode::decode($charset, $top->bodyhandle->as_string);
      }
      else {
	if (getconf_bool("detach_text_plain", $mbox_name)) {
	  $attachments += detach_text_attachments($dbh, $top, $mail_id, \$body_text, \$body_html);
	}
	else {
	  $attachments += flatten_and_insert_attach($dbh, $top, $mail_id);
	}
      }
    }

    if ($attachments>0) {
      my $sthua=$dbh->prepare("UPDATE mail SET flags=flags|1 WHERE mail_id=?");
      $sthua->execute($mail_id);
    }

    insert_body($mail_id, \$body_text, \$body_html);

    my $safe_header;
    foreach (decode_mimewords($top->head->as_string)) {
      my @t=@{$_};
      if (!defined($t[1])) {	# us-ascii substring
	$t[0] =~ s/\r?\n(\s)/$1/sog if defined ($t[0]);
	$safe_header .= $t[0];
      }
      else {			# other charset: convert it to perl internal format
	my $tu;
	eval {
	  $tu=Encode::decode($t[1], $t[0]);
	};
	if ($@) {
	  # if the decode fails (typically if the charset is unknown)
	  # we fall down to using the string as is
	  $tu=$t[0];
	}
	$tu =~ s/\r?\n(\s)/$1/sog if (defined($tu));
	$safe_header .= $tu;
      }
    }

    insert_header($mail_id, \$safe_header);

    if (getconf_bool('index_words', $mbox_name)) {
      my $idx_header = Manitou::Words::header_contents_to_ftidx($safe_header);
      my $ref_body=\$body_text;
      my $truncated_body;

      if ($top->head->get("From") =~ /^MAILER-DAEMON@/) {
	# Some SMTP servers (e.g. qmail) may send bounces that regurgitate the
	# entire original message in one big destructured piece,
	# possibly with encoded attachments that can't be recognized
	# as such for lack of a proper MIME structure.
	# As a workaround to avoid the vast pollution of the word index
	# that it may incur, we limit the indexing at 500 lines of text.
	my $nbl=500;
	my $pos=0;
	while (($nbl--)>0 && $pos>=0) {
	  $pos=index($body_text, "\n", $pos);
	  $pos++ unless ($pos==-1);
	}
	if ($nbl<0 && $pos>0) {
	  $truncated_body=substr($body_text, 0, $pos);
	  $ref_body=\$truncated_body;
	}
      }

      my $other_words;
      if (defined $body_html && getconf_bool('index_words_html_parts', $mbox_name)) {
	$other_words = Manitou::Words::html_to_text(\$body_html);
      }

      my %extractors = Manitou::Attachments::text_extractors($mbox_name);

      if (scalar %extractors || getconf_bool('index_words_html_parts', $mbox_name)) {
	# If there are no extractors defined, we may still extract words
	# with our internal html-to-plaintext converter
	Manitou::Attachments::launch_text_extractors($dbh, $mail_id,
						     \%extractors,
						     \$other_words);
      }

      index_words($dbh, $mail_id, $ref_body, \$idx_header, \$other_words);

      my $sthj=$dbh->prepare("INSERT INTO jobs_queue(mail_id,job_type) VALUES(?,?)") or die $dbh->errstr;
      $sthj->execute($mail_id, "widx");
      # end of code block to mutualize
    }

    if ($mail_id>0 && defined $mail_ctxt->{'tags'}) {
      for my $t (@{$mail_ctxt->{'tags'}}) {
	Manitou::Tags::insert_tag($dbh, $mail_id, $t);
      }
    }

    if ($mail_id>0 && defined $mail_ctxt->{tag_id}) {
      Manitou::Tags::action_tag($dbh, $mail_id, $mail_ctxt->{tag_id});
    }
  }
  else {
    # discarded
    $mail_id=-1;
  }
  $dbh->commit;
  $top->purge;
  close $gzfh if (defined $gzfh);
  return $mail_id;
}

# Input: (<msgid1>,<msgid2>,<msgid3>...)
# Output: thread_id or undef
sub get_references {
  my ($thread_msg_ids)=@_;
  my $thread;
  # first message found with a message-id that matches one of our references
  my $ref_mail_id;
  my $sth=$dbh->prepare("SELECT mail_id,thread_id FROM mail WHERE message_id=?");

  # Lookup the thread_id
  for my $s (@{$thread_msg_ids}) {
    if ($s =~ /\<(.*)\>/) {
      $sth->execute($1);
      my @res;
      if (@res=$sth->fetchrow_array) {
	$ref_mail_id=$res[0];
	if (defined($res[1])) {
	  $thread=$res[1];
	}
	last; # stop at the first thread_id found
      }
    }
#    else {
#      print STDERR "$s: cannot be parsed as a Message-Id\n";
#    }
  }
  if ($ref_mail_id) {
    # If at least one of the database messages is referred to by
    # the incoming mail, then give them the same thread_id
    if (!$thread) {
      $thread=get_sequence_nextval("seq_thread_id");
    }
    my $sthu=$dbh->prepare("UPDATE mail set thread_id=? WHERE mail_id=?");
    $sthu->execute($thread,$ref_mail_id);
  }

  $sth->finish;
  return $thread;
}

sub get_sequence_nextval {
  my ($seq) = @_;
  my ($nextval, $sth, $row);
  $sth = $dbh->prepare("SELECT nextval('".$seq."')");
  $sth->execute;
  my @row = $sth->fetchrow_array;
  if ($row[0]) {
    $nextval = $row[0];
  } else {
    $nextval = 1;
  }
  $sth->finish;
  return $nextval;
}


sub store_filename {
  my ($mail_id, $filename) = @_;
  my ($sth, $rc);
  $sth = $dbh->prepare ("INSERT INTO files(mail_id,filename) VALUES (?,?)");
  $rc = $sth->execute($mail_id,$filename);
  $sth->finish;
}

sub store_raw_mail {
  my ($mail_id, $filename) = @_;
  my $sth = $dbh->prepare ("INSERT INTO raw_mail(mail_id,mail_text) VALUES (?,?)");
  my $obj_id=$dbh->func($filename, 'lo_import');
  my $rc = $sth->execute($mail_id, $obj_id);
  $sth->finish;
}

sub insert_addresses {
  my ($mail,$mail_id)=@_;
  my @haddr;

  my $sth = $dbh->prepare("SELECT addr_id,recv_pri FROM addresses WHERE email_addr=?") or die $dbh->errstr;

  my $sth_insert_ad = $dbh->prepare("INSERT INTO addresses(addr_id,email_addr,name) VALUES (?,?,?)") or die $dbh->errstr;

  for my $adrtype (keys %hAdrTypes) {
    my @addrs;
    eval {
      no warnings 'all';
      @addrs=Mail::Address->parse(header_decode($mail->head->get($adrtype)));
    };
    for my $a (@addrs) {
      my $pos=0;
      if ($a->address) {
	my $la=lc($a->address);
	my $ln=lc($a->name);
	$sth->execute($la) or die $sth->errstr;
	my ($id,$addr_pri)=$sth->fetchrow_array;
	if (!$id) {
	  $id=get_sequence_nextval("seq_addr_id");
	  $sth_insert_ad->execute($id,
				  substr($la,0,300),
				  substr($ln,0,300))
	    or die $sth_insert_ad->errstr;
	}
	# update addresses.last_recv_from
	if ($adrtype eq "From") {
	  my $sth_upd = $dbh->prepare("UPDATE addresses SET last_recv_from=now(),nb_recv_from=1+coalesce(nb_recv_from,0) WHERE addr_id=?");
	  $sth_upd->execute ($id);
	}
	push @haddr, { "email"=>$a->address,
		       "addr_id"=>$id,
		       "pos"=>$pos,
		       "prio"=>$addr_pri,
		       "type"=> $hAdrTypes{$adrtype} };

      }
    }
  }
  return \@haddr;
}

# Compare 'References' and 'In-Reply-To' fields with the
# contents of mail.message_id in the db, and returns the db thread id
# if it exists, or undef.
sub get_thread_id {
  my ($mailobj) = @_;
  my @thread_msgs;
  if ($mailobj->head->get('In-Reply-To') =~ /.*\<(.*)\>/) {
    push @thread_msgs, "<$1>";
  }
  my $other_mails=$mailobj->head->get('References');
  chomp $other_mails;
  push @thread_msgs, split(/\s+/, $other_mails);

  my $thread_id = get_references(\@thread_msgs);
  return $thread_id;
}

sub insert_mail {
  my ($mbox_name, $mail_id, $attachments,$file_date, $mailobj, $thread_id,$status, $ctxt) = @_;
  my ($rc, $sth);
  my $priority;

  my $from = convnull(header_decode($mailobj->head->get('From')));
  my $to = convnull(header_decode($mailobj->head->get('To')));
  my $subject = convnull(header_decode($mailobj->head->get('Subject')));
  my $sender_date = Manitou::MailFormat::reformat_sender_date($mailobj->head->get('Date'));

  my $query=q{
      INSERT INTO mail (mail_id,
			sender,
			sender_fullname,
			recipients,
			subject,
			msg_date,
			sender_date,
			status,
			flags,
			message_id,
		        thread_id,
		        in_reply_to,
		        priority,
			identity_id,
			raw_size)
	VALUES (?, ?, ?, ?, ?, ?::timestamptz, ?::timestamptz,?,?,?,?,?,?,?,?)};

  $sth = $dbh->prepare($query);

  chomp ($from);
  chomp ($to);
  chomp ($subject);

  $from = enforce_pairs ($from, "()");
  $to = enforce_pairs ($to, "()");

  $from = enforce_pairs ($from, "<>");
  $to = enforce_pairs ($to, "<>");

  my @adr_from;
  my @adr_to;
  eval {
    @adr_from=Mail::Address->parse($from);
    @adr_to=Mail::Address->parse($to);
  };
  my $bp = 0;	# parameters counter
  $sth->bind_param (++$bp, $mail_id);

  if (@adr_from) {
    my $sender = encode_dbtxt(substr($adr_from[0]->address,0,200));
    $sth->bind_param(++$bp, $sender);
    my $fullname = encode_dbtxt(substr($adr_from[0]->name,0,200));
    $sth->bind_param(++$bp, $fullname);
  }
  else {
    $sth->bind_param(++$bp, undef);
    $sth->bind_param(++$bp, undef);
  }
  # To
  if (@adr_to) {
    $sth->bind_param(++$bp, $to);
  }
  else {
    $sth->bind_param(++$bp, undef);
  }

  # Subject
  # clean it before: (see the comment before the call to insert_header)
  $subject =~ tr/\x00-\x08//d;
  $subject =~ tr/\x0b-\x1F//d;
  $subject =~ tr/\x0a/ /;
  $subject = substr($subject,0,1000);
  $sth->bind_param(++$bp, encode_dbtxt($subject));

  # Receive Date
  if ((getconf('preferred_datetime', $mbox_name) eq "sender") ||
      !defined($file_date)) {
    $file_date = $sender_date;		# the same as the sender's date
  }
  $sth->bind_param (++$bp, $file_date);

  # Sender_Date
  $sth->bind_param (++$bp, $sender_date);

  # status
  $sth->bind_param (++$bp, $status);

  # flags
  $sth->bind_param (++$bp, $attachments>0?1:0);

  my $haddr=insert_addresses($mailobj,$mail_id);

  $priority = $ctxt->{priority};
  # Adds the (optional) priorities of From addresses
  foreach (@$haddr) {
    $priority += $_->{prio} if (defined $_->{prio} && $_->{type} eq $hAdrTypes{"From"});
  }

  my $msg_id=$mailobj->head->get('Message-Id');
  if ($msg_id =~ /\<(.*)\>/) {
    $msg_id=$1;
  }

  my $in_reply_to;
  if ($mailobj->head->get('In-Reply-To') =~ /.*\<(.*)\>/) {
    $in_reply_to=$1;
  }

  $sth->bind_param(++$bp, substr(encode_dbtxt($msg_id),0,100));

  $sth->bind_param (++$bp, $thread_id);

  my $in_reply_to_id;
  if (defined($in_reply_to)) {
    # Search for a message to which this message would reply
    my $sthm=$dbh->prepare("SELECT max(mail_id) FROM mail where message_id=?");
    $sthm->execute($in_reply_to);
    my @row=$sthm->fetchrow_array;
    if (@row) {
      $in_reply_to_id=$row[0];
    }
    $sthm->finish;
    if (@row) {
      my $stht = $dbh->prepare("SELECT value FROM config WHERE conf_key='auto_tag_thread'");
      $stht->execute;
      my @rt=$stht->fetchrow_array;
      if (@rt && $rt[0] eq "1") {
	# Apply to the new mail the same tags as the message it replies to
	my $sthtag=$dbh->prepare("SELECT tag FROM mail_tags WHERE mail_id=?");
	$sthtag->execute($row[0]);
	my @rttag;
	while (@rttag=$sthtag->fetchrow_array) {
	  action_tag($dbh, $mail_id, $rttag[0]);
	}
      }
    }
  }

  $sth->bind_param(++$bp, $in_reply_to_id);

  $sth->bind_param(++$bp, $priority);

  my $identity_id = $ctxt->{identity_id};
  if (!defined $identity_id) {
    $identity_id=getconf("identity_id", $mbox_name);
  }
  $sth->bind_param(++$bp, $identity_id);

  $sth->bind_param(++$bp, $ctxt->{filesize});

  $sth->execute or die("Can't execute statement: $DBI::errstr");

  $sth->finish;

  my $sth_ma_insert = $dbh->prepare("INSERT INTO mail_addresses(mail_id,addr_type,addr_pos,addr_id) VALUES (?,?,?,?)") or die $dbh->errstr;
  foreach (@$haddr) {
    $sth_ma_insert->execute($mail_id, $_->{type}, $_->{pos}, $_->{addr_id}) or die $sth_ma_insert->errstr;
  }

  if ($status==0) {
    $dbh->do("NOTIFY new_message");
  }

  return $thread_id;
}


# direction='I' for incoming, 'O' for outgoing
sub apply_filters {
  my ($ctxt, $mime_obj, $pbody, $direction) = @_;
  my %exprs;
  my %actions;
  my %all_exprs;
  my $final_action;
  my @sorted_keys; # list of expr_id sorted by apply_order
  my $sth = $dbh->prepare("SELECT expr_id,name,expression,direction FROM filter_expr ORDER BY apply_order");
  $sth->execute;
  while (my @r=$sth->fetchrow_array) {
    my %h;
    $h{expr_id}=$r[0];
    $h{name}=decode_dbtxt($r[1]);
    $h{expr}=decode_dbtxt($r[2]);
    $h{direction}=$r[3];
    $exprs{$r[0]}=\%h;
    push @sorted_keys, $r[0];
    $all_exprs{$h{name}}=\%h;
  }
  $sth->finish;
  $sth=$dbh->prepare("SELECT expr_id,action_type,action_arg FROM filter_action ORDER BY expr_id,action_order");
  $sth->execute;
  while (my @r=$sth->fetchrow_array) {
    push @{$actions{$r[0]}}, [ $r[1],decode_dbtxt($r[2]) ];
  }
  $sth->finish;
  my $stop_filters = 0;

  for my $n (@sorted_keys) {
    last if ($stop_filters);
    my $e=$exprs{$n};
    if (defined $actions{$n} && ($e->{direction} eq "B" || $e->{direction} eq $direction)) {
      my $res;
      my $r=Manitou::Filters::process_filter_mimeobj(
	$e->{expr}, $mime_obj, \$res, $ctxt, $dbh, \%all_exprs, $n);
      if (!$r) {
	print STDERR "filter ERROR: filter_expr=", $e->{expr}, " result=$res\n";
      }
      elsif ($res) {
	if (getconf_bool("log_filter_hits", $ctxt->{mailbox_address})) {
	  Manitou::Filters::log_filter_hit($dbh, $ctxt, $n);
	}
	# apply the actions
	foreach (@{$actions{$e->{expr_id}}}) {
	  my ($action_type,$action_arg)=($_->[0],$_->[1]);
	  if ($DEBUG > 3) {
	    debug_log("applying action '$action_type' with arg '$action_arg'");
	  }
	  if ($action_type eq "tag") {
	    push @{$ctxt->{tags}}, $action_arg;
	  }
	  elsif ($action_type eq "status") {
	    my @st = split(/\+/, $action_arg);
	    # note: it replaces the previous retcode; if it was 1 (discard)
	    # then the message won't be discarded after all
	    for my $sttus (@st) {
	      if ($sttus eq 'T') {  # obsolete since 1.2.0, superseded by the discard action
		$ctxt->{status} |= 0x10+0x20; # trashed+processed
		$final_action = "trash";
	      }
	      elsif ($sttus eq 'R') { # read
		$ctxt->{status} |= 0x1;
	      }
	      elsif ($sttus eq 'P' || $sttus eq 'A') { # processed/archived
		$ctxt->{status} |= 0x20;
	      }
	      elsif ($sttus eq 'D') { # deleted - obsolete since 1.2.0, superseded by the discard action
		if (!defined($final_action)) {
		  $final_action = "discard";
		}
	      }
	    }
	  }
	  elsif ($action_type eq "discard") {
	    if ($action_arg eq "trash") {
	       $ctxt->{status} |= 0x10+0x20;
	       $final_action = "trash";
	     }
	    elsif ($action_arg eq "delete") {
	      $final_action = "discard";
	    }
	  }
	  elsif ($action_type eq "priority") {
	    if (substr($action_arg,0,2) eq "+=") {
	      $ctxt->{priority} += int(substr($action_arg,2));
	    }
	    elsif (substr($action_arg,0,1) eq "=") {
	      $ctxt->{priority} = int(substr($action_arg,1));
	    }
	  }
	  elsif ($action_type eq "redirect" && $action_arg ne "") {
	    redirect({"mailfile"=>$ctxt->{proc_filename},    # we redirect the original mailfile
		     "From"=>$ctxt->{mailbox_address},  # the sender
		     "To"=>$action_arg});		# the recipient
	  }
	  elsif ($action_type eq "set header") {
	    my ($htag, $hcontents) = split(/:/, $action_arg, 2);
	    $mime_obj->set($htag, $hcontents) if (length($hcontents)>0);
	  }
	  elsif ($action_type eq "remove header") {
	    $mime_obj->delete($action_arg) if (length($action_arg)>0);
	  }
	  elsif ($action_type eq "set identity") {
	    $ctxt->{identity_id} = Manitou::Config::get_identity_id($dbh, $action_arg);
	    if (!defined $ctxt->{identity_id}) {
	      warning_log(sprintf("Identity '%s' not found for set identity action", $action_arg));
	    }
	  }
	  elsif ($action_type eq "stop") {
	    $stop_filters=1;
	    last;
	  }
	}
      }
    }
  }
  return $final_action;
}

# Redirects a message (filter action)
# Args:
# From => our sender address
# To => redirection address
# mailfile => optional path to a file
# mimeobj => pre-constructed MIME::Entity object if no mailfile given
sub redirect {
  my $args=shift;
  my $fh;

  my $cmd=getconf("local_delivery_agent", $args->{From});
  if (!defined($cmd)) {
    print STDERR "redirect: unable to pass the mail to a local delivery agent.\nCheck your configuration file for the 'local_delivery_agent' entry\n";
    return 0;
  }
  my $top;
  if (defined $args->{mailfile}) {
    if (!open($fh, $args->{mailfile})) {
      print STDERR "Unable to open ".$args->{mailfile}.": $!\n";
      return 0;
    }
    $top = Mail::Internet->new($fh); # MIME::Entity->new($fh) doesn't work here!
  }
  elsif (defined $args->{mimeobj}) {
    $top = $args->{mimeobj}->dup();
  }
  else {
    print STDERR "Filter: redirect action cancelled since no mimeobj and no mailfile\n";
    return 0;
  }
  $top->head->combine("From");
  $top->head->replace("From", $args->{From});
  $top->head->combine("To");
  $top->head->replace("To", $args->{To});
  my $subject = $top->head->get("Subject");
  if (defined $subject) {
    $top->head->replace("Subject", $subject . " (by way of <".$args->{From}.">)");
  }

  $cmd =~ s/\$FROM\$/$args->{From}/g;
  my $mfh;
  if (!open($mfh, "|$cmd")) {
    print STDERR "Error while passing redirected mail to the local delivery agent (\`$cmd\`): $!\n";
    close($fh) if (defined $fh);
    return 0;
  }
  $top->print($mfh);
  close($mfh);
  close($fh) if (defined $fh);
  return 1;
}

sub send_one_mail {
  my ($from, $subject, $mail_id) = @_;
  my $decl_charset = getconf("preferred_charset", $from) || 'iso-8859-1';
  my @charsets = split(/\s+/, $decl_charset);

  my $cmd = getconf("local_delivery_agent", $from);
  if (!defined($cmd)) {
    print STDERR "Unable to pass the mail to a local delivery agent.\nCheck your configuration file for the 'local_delivery_agent' entry\n";
    return 0;
  }
  $cmd =~ s/\$FROM\$/$from/g;

  $dbh->begin_work;

  my $sthb = $dbh->prepare("SELECT bodytext,bodyhtml FROM body WHERE mail_id=?") || die "Can't prepare statement: $DBI::errstr";
  $sthb->execute($mail_id) || die "Can't execute statement: $DBI::errstr";
  my ($db_body, $html_body) = $sthb->fetchrow_array;
  $sthb->finish;

  $db_body = decode_dbtxt($db_body);
  $html_body = decode_dbtxt($html_body);

  my ($body,$body_charset) = Manitou::MailFormat::encode_text_body($db_body, @charsets);

  my $top;
  my $text_part;
  my $html_part;
  my %mime_args = (From => $from, Encoding => '-SUGGEST', Subject => $subject,
		   'X-Mailer' => undef);
  if (!defined $html_body) {
    $mime_args{Charset} = $body_charset;
    $mime_args{Data} = $body;
    $top = MIME::Entity->build(%mime_args);
    $text_part = \$top;
  }
  else {
    my $dual_part;
    # multipart/alternative with text and HTML. TODO: also handle html-only
    if (has_attachments($dbh, $mail_id)) {
      $mime_args{'Type'} = 'multipart/mixed';
      $top = MIME::Entity->build(%mime_args);
      $dual_part = MIME::Entity->build('Type'=>'multipart/alternative');
      $top->add_part($dual_part);
    }
    else {
      $mime_args{'Type'} = 'multipart/alternative';
      $top = MIME::Entity->build(%mime_args);
      $dual_part = $top;
    }

    my $p = MIME::Entity->build('Charset' => $body_charset,
				'Encoding' => '-SUGGEST',
				'Data' => $body,
			        'X-Mailer' => undef);
    $text_part = $p;
    $dual_part->add_part($p);

    $html_part = create_html_part($dbh, $mail_id, \$html_body);
    $dual_part->add_part($html_part);
  }

# format=flowed is unsupported at the moment
#   if (defined $format_flowed) {
#     my $ct = $text_part->head->get("Content-Type");
#     if ($ct =~ /^text\/plain/) {
#       chomp $ct;
#       $ct.="; format=flowed";
#       $text_part->head->replace("Content-Type", $ct);
#     }
#   }

  my $header_lines;
  my $sthd = $dbh->prepare ("SELECT lines FROM header WHERE mail_id=?") || die "Can't prepare statement: $DBI::errstr";
  $sthd->execute($mail_id) || die "Can't execute statement: $DBI::errstr";
  ($header_lines) = $sthd->fetchrow_array;
  $header_lines = decode_dbtxt($header_lines);

  Manitou::MailFormat::encode_header($top, $header_lines, @charsets);

  # That shouldn't happen, but if the sender is not set, we try to get
  # it from the "From:" header
  if ($from eq '' || !defined $from) {
    my $v = $top->head->get("From");
    if (defined $v) {
      my @addr_from=Mail::Address->parse($v);
      $from = $addr_from[0]->address if (@addr_from);
    }
  }

  my $bcc=getconf("outgoing_bcc", $from);
  if (defined($bcc)) {
    my $oldbcc=$top->head->get("Bcc");
    if (defined($oldbcc)) {
      $bcc = "$oldbcc, $bcc";
    }
    $top->head->replace("Bcc", $bcc);
  }

  attach_parts($dbh, $mail_id, $top, getconf("tmpdir"));

  my @opl;
  @opl=@{$outgoing_plugins{$from}} if (defined $outgoing_plugins{$from});
  push @opl, @{$outgoing_plugins{"common"}} if (defined $outgoing_plugins{"common"});
  if (@opl) {
    my %pl_ctxt = (stage => "outgoing",
		   mimeobj => $top,
		   mail_id => $mail_id,
		   dbh => $dbh);
    for my $plugin (@opl) {
      $plugin->process(\%pl_ctxt);
    }
  }

  if (getconf_bool('index_words', $from)) {
    my $other_words;
    # Extract words from the "html body"
    if (defined $html_body && getconf_bool('index_words_html_parts', $from)) {
      $other_words = Manitou::Words::html_to_text(\$html_body);
    }

    # Extract words from other attachments
    my %extractors = Manitou::Attachments::text_extractors($from);

    if (scalar %extractors || getconf_bool('index_words_html_parts', $from)) {
      # If there are no extractors defined, we may still extract words
      # with our internal html-to-plaintext converter
      Manitou::Attachments::launch_text_extractors($dbh, $mail_id,
						   \%extractors,
						   \$other_words);
    }

    my $headerx = Manitou::Words::header_contents_to_ftidx($header_lines);
    index_words($dbh, $mail_id, \$db_body, \$headerx, \$other_words);

    my $sthj=$dbh->prepare("INSERT INTO jobs_queue(mail_id,job_type) VALUES(?,?)") or die $dbh->errstr;
    $sthj->execute($mail_id, "widx");
  }

  my %fctxt;			# context for filters
  $fctxt{mail_id}=$mail_id;
  $fctxt{tags}=();
  $fctxt{priority}=undef;
  my $faction = apply_filters(\%fctxt, $top, \$db_body, "O");
  # Apply filter actions to the database
  for my $tag (@{$fctxt{'tags'}}) {
    Manitou::Tags::insert_tag($dbh, $mail_id, $tag);
  }
  # TODO: other filter actions (status, priority)

  # Pipe the message to the delivery agent
  my $ret=0;
  my $in=IO::Handle->new();
  my $out=IO::Handle->new();
  my $err=IO::Handle->new();
  eval {
    $SIG{'PIPE'} = 'IGNORE';
    my $pid = open3($in, $out, $err, $cmd);
    die $! if ($pid==0);
    $top->print($in) or die $!;
    close($in);
    $top->purge;
    waitpid($pid, 0);
  };
  if ($@) {
    error_log("Error while passing outgoing mail to the local delivery agent (\`$cmd\`): $@");
  }
  else {
    my $e=<$err>;
    close($err);
    close($out);
    if ($e ne "" || ($?>>8)!=0) {
      error_log("Local delivery agent error: (\`$cmd\`, exit code=".($?>>8)."): $e");
    }
    else {
      # Set the SENT and ARCHIVED status bits
      my $sths = $dbh->prepare("UPDATE mail SET status=status|(256+32) WHERE mail_id=?");
      $sths->execute($mail_id);
      $sths->finish;
      $ret=1; # OK
    }
  }
  $SIG{'PIPE'}='DEFAULT';

  $dbh->commit;
  return $ret;
}

sub send_mails {
  my $sth1 = $dbh->prepare("SELECT mail_id FROM mail_status WHERE status=129");
  $sth1->execute;
  my @res;
  while (@res = $sth1->fetchrow_array) {
    my $mail_id=$res[0];
    next if (exists $hsend_blocked{$mail_id});
    my $sth = $dbh->prepare ("SELECT sender,subject FROM mail WHERE mail_id=? AND status=129");
    $sth->execute($mail_id);
    my @row;
    while (@row = $sth->fetchrow_array) {
      if (send_one_mail($row[0], $row[1], $mail_id)) {
	update_runtime_timestamp("last_sent");
	notice_log("Sent outgoing message #$mail_id");
	my $nb_blocked = scalar (keys %hsend_blocked);
	if ($nb_blocked>0) {
	  # If messages were blocked due to previous errors
	  # and one has got through, we retry them all
	  notice_log("Starting to send $nb_blocked outgoing frozen message(s) due to recent success");
	  %hsend_blocked=();
	}
      }
      else {
	error_log("Outgoing message #$mail_id NOT sent");
	$hsend_blocked{$mail_id}=time;
      }
    }
    $sth->finish;
  }
  $sth1->finish;
}


sub init_plugins {
  my $d = getconf("plugins_directory");
  unshift(@INC, $d) if defined $d;

  for my $mbox (mailboxes()) {
    for my $pl_type ("incoming_preprocess_plugins",
		     "incoming_mimeprocess_plugins",
		     "incoming_postprocess_plugins",
		     "outgoing_plugins",
		     "maintenance_plugins")
      {
	my $plist = getconf($pl_type, $mbox, 1);
	  for my $p (@{$plist}) {
	    $p =~ s/^\s+//;	# trim leading blanks
	    $p =~ s/\s+$//;	# trim trailing blanks
	    my $args;
	    my $plugin;
	    my ($freq,$freq_type);
	    if ($pl_type eq "maintenance_plugins") {
	      if ($mbox ne "common") {
		die "Configuration error: maintenance_plugins are only allowed in the [common] section ($mbox)\n";
	      }
	      # maintenance_plugins have a frequency at the start of their
	      # declaration, expressed as
	      # HH:MN => every day at given time
	      # *:MN => every hour at given minute
	      # X mn or Xmn => every X minutes
	      # X h or Xh => every X hours
	      if ($p =~ /^([0-9]+)\s*(mn|h)\s+(.*)$/) {
		$freq=$1;
		$freq=$freq*60 if ($2 eq "h");
		$freq_type="interval";
		$p=$3;
	      }
	      elsif ($p =~ /^([0-9]{1,2}|\*)\:([0-9]{1,2})\s+(.*)$/) {
		$freq_type="pit"; # point in time
		$freq="$1:$2";
		$p=$3;
		if ($1 ne "*") {
		  if ($1>=24) {
		    die "maintenance_plugins: incorrect hour $1.\nHour must be between 0 and 23.\n";
		  }
		}
		if ($2>=60) {
		    die "maintenance_plugins: incorrect minutes $2.\nMinutes must be between 0 and 59.\n";
		}
	      }
	      else {
		die "maintenance_plugins: unrecognized frequency at start of declaration.\nAccepted syntax is Xmn or Xh where X is a number, or HH:MM, or *:MM.\nExamples:\nmaintenance_plugins = 2h plugin1 \\\n 10mn plugin2 \\\n 07:00 plugin3\n";
	      }
	    }
	    elsif ($mbox eq "common" && $pl_type ne "outgoing_plugins") {
	      die "Configuration error: $pl_type are only allowed in mailboxes sections, not in the [common] section\n";
	    }

	    if ($p =~ /^([a-zA-Z_0-9]+)\s*\((.*)\)$/) { # has args
	      $plugin=$1;
	      $args=$2;
	    }
	    elsif (!($p =~ /^[a-zA-Z_0-9]+$/ )) {
	      print STDERR "init_plugins: unrecognized plugin declaration: $p\n";
	      exit 1;
	    }
	    else {		# no args
	      $plugin=$p;
	    }
	    if (!$loaded_plugins{$plugin}) {
	      require "Manitou/Plugins/$plugin.pm";
	      $loaded_plugins{$plugin}=1;
	    }
	    my $evplugin;
	    if (defined($args)) {
	      $evplugin = "Manitou::Plugins::$plugin" . '::init($dbh,' .$args.')';
	    }
	    else {
	      $evplugin = "Manitou::Plugins::$plugin" . '::init($dbh)';
	    }
	    my $pl=eval $evplugin;
	    if ($@ || !defined($pl)) {
	      print STDERR "Error in initializing plugin $plugin for mailbox $mbox: $@\n";
	      exit 1;
	    }
	    $pl->{name}=$plugin;
	    $pl->{type}=$pl_type;

	    if ($pl_type eq "incoming_preprocess_plugins") {
	      push @{$preprocess_plugins{$mbox}}, $pl;
	    }
	    elsif ($pl_type eq "incoming_mimeprocess_plugins") {
	      push @{$mimeprocess_plugins{$mbox}}, $pl;
	    }
	    elsif ($pl_type eq "incoming_postprocess_plugins") {
	      push @{$postprocess_plugins{$mbox}}, $pl;
	    }
	    elsif ($pl_type eq "outgoing_plugins") {
	      push @{$outgoing_plugins{$mbox}}, $pl;
	    }
	    elsif ($pl_type eq "maintenance_plugins") {
	      $pl->{frequency}=$freq;
	      $pl->{frequency_type}=$freq_type;
	      push @maintenance_plugins, $pl;
	    }
	  }
	}
  }
}

# Assign its identity_id field to each mbox_conf entry
# If entries are missing in the MAILBOXES table, insert them
sub init_identities {
  my $sth=$dbh->prepare("SELECT identity_id FROM identities WHERE lower(email_addr)=?");
  for my $mbox (mailboxes()) {
    next if ($mbox eq 'common');
    $sth->execute($mbox);
    my @r=$sth->fetchrow_array;
    if (!@r) {
      # Create the identity if it doesn't exist
      my $sthcr=$dbh->prepare("INSERT INTO identities(email_addr) VALUES(?)");
      $sthcr->execute($mbox);
      $sth->execute($mbox);
      @r=$sth->fetchrow_array;
    }
    add_mbox($mbox, $r[0]);
  }
}

__END__

=head1 NAME

 manitou-mdx - Manitou-Mail Mail-Database eXchanger daemon

=head1 SYNOPSIS

  manitou-mdx [--conf=/path/to/config] [--fork] [--pidfile=/path/to/pidfile]
  manitou-mdx [--conf=/path/to/config] --mboxfile=/path/to/mbox [--status=status] [--tag=tag] [--mailbox=mailbox] [--skip=number] [-v]
  manitou-mdx [--conf=/path/to/config] --import-list=path [--import-basedir=directory] [--status=import_status] [--tag=tagname] [--mailbox=mailbox_name];

=head1 DESCRIPTION

When invoked without argument or with --fork, run continuously to import into the database the messages that appear in the spool directories as defined in the configuration file.

=over

=item B<fork>:
 Create a new process to run in background and quit if no error occurred.

=item B<pidfile>:
 Write into I<pidfile> the process ID of the background process.

=item B<mboxfile>:
 Import a mailbox file (mbox format) and quit. When this options is used, manitou-mdx won't try to import any files from the spool directories defined in the configuration file or to send any outgoing message.

=item B<status>:
  When importing a mailbox, set the specificied status (a numeric value) to each imported message.

=item B<tag>:
  When importing a mailbox, assign the specificied tag (a name) to each imported message.

=item B<skip>:
 When importing a mailbox, skip the specificied number of messages before starting to import. This is useful to import in several steps.

=item B<mailbox>:
 When importing a mailbox, assign the imported messages to the specified mailbox. The mailbox name is generally a destination email address.

=item B<import-list>:
 Import mail files that are listed in the file, one line per file name. When specified, --import-basedir is a directory name that is prepended to all file names read from this file.

=back

=head1 CONFIGURATION

=over

The default configuration file is /etc/manitou-mdx.conf

=back

=cut

HTML source code generated by GNU Source-Highlight plus some custom post-processing

List of all available source files