Devops's Blog

Запись лога в кафку



Будем считывать поток данных с лога postfix maillog и фильтровать его
1. Первая строка - берем номер очереди и значение хедера messageId , который будет добавлять при отправке письма
2. Вторая строка - берем номер очереди и кому письмо и его статус
3. Третья строка - берем номер очереди и статус не доставки

Внесем настройки в postfix для логирования messageId и чтоб отчет о не доставке шел на root

добавим в main.cf


header_checks = regexp:/etc/postfix/header_checks
ecipient_canonical_maps = hash:/etc/postfix/canonical
[root@mail postfix]# cat /etc/postfix/header_checks
/^messageId/    WARN
/^Subject/      WARN
[root@mail postfix]# cat canonical
test1@domen.ru  root@domen.ru
test2@domen.ru  root@domen.ru
test3@domen.ru  root@domen.ru

Так можно сделать тестовую отправку


[root@mail user]# echo "Это тестовое письмо 2" | mail -s "Проверка отправки почты 4" -s "$(echo -e "messageId 8e3d1471-0fe8-4801-ad07-11111111")" -S smtp="domen.ru:25" -r test1@domen.ru, test@list.ru

пишем php скрипт


<?php
ini_set('max_execution_time', 0);
ini_set('memory_limit', '2048M');
error_reporting(E_ALL);
ini_set("log_errors", true);
ini_set('display_errors', true);
ini_set("error_log", "/var/log/mail_status_".date("d-m-Y"));
ini_set('default_charset', 'UTF-8');

class mail_status
{
        private $line;
        private $message_1;
        private $message_2;
        private $message_3;
        private $rk,$topic;

         private  function logfile($log_str) {
               $fd = fopen("/var/log/mail_status_".date("d-m-Y"), 'a') or die("не удалось создать файл");
               fwrite($fd, $log_str."\n");
               fclose($fd);
         }
        public function kafka_conn()
        {
               $conf = new RdKafka\Conf();
               $conf->set('security.protocol', 'ssl');
               $conf->set('ssl.ca.location', '/etc/postfix/mail_status/ssl/kafka.client.truststore.pem');
               $conf->set('ssl.certificate.location', '/etc/postfix/mail_status/ssl/client.keystore.pem');
               $conf->set('ssl.key.location', '/etc/postfix/mail_status/ssl/client.keystore.key');
               #$conf->set('log_level', (string) LOG_DEBUG);
               #$conf->set('debug', 'all');
               $this->rk = new RdKafka\Producer($conf);
               $this->rk->addBrokers("kafka-1.local:9093,kafka-2.local:9093,kafka-3.local:9093");
               $this->topic = $this->rk->newTopic("MAIL_STATUS");

        }

        public function read_real_file($cmd)
        {
                $descriptorspec = array(
                  0 => array("pipe", "r"),
                  1 => array("pipe", "w"),
                  2 => array("pipe", "w")
                );

               flush();
               $process = proc_open($cmd, $descriptorspec, $pipes, realpath('./'), array());
               if (is_resource($process)) {
                      while ($this->line = fgets($pipes[1])) {
                             $pos_1 = strpos($this->line, "messageId");
                             $pos_2 = strpos($this->line, "warning:");
                             $pos_3 = strpos($this->line, "header");
                             $pos_4 = strpos($this->line, "Subject:");
                             $pos_5 = strpos($this->line, "to=<");
                             $pos_6 = strpos($this->line, "status=");
                             $pos_7 = strpos($this->line, "sender");
                             $pos_8 = strpos($this->line, "non-delivery");
                             $pos_9 = strpos($this->line, "root@domen.ru");
                             if($pos_1==true && $pos_2==true && $pos_3==true && $pos_4==true) {$this->parse_1();}
                             if($pos_5==true && $pos_6==true && $pos_9==false) {$this->parse_2();}
                             if($pos_7==true && $pos_8==true) {$this->parse_3();}
                             flush();
                            }

                                        }

        }
        private function parse_1()
        {
                $arv=explode(" ",$this->line);
                for($i=0;$i<count($arv);$i++)
                {
                        if($arv[$i]=="messageId")
                        {
                                $this->message_1=substr($arv[5], 0, -1)." ".$arv[$i+1];
                                $this->topic->produce(RD_KAFKA_PARTITION_UA, 0, $this->message_1);
                                $this->rk->flush(10000);
                                $this->logfile($this->message_1);
                                #echo $this->message_1."\n";
                                break;
                        }

                }

        }

        private function parse_2()
        {
                $arv=explode(" ",$this->line);
                for($i=0;$i<count($arv);$i++)
                {
                        if(preg_match("/to=/i", $arv[$i]))
                        {
                                $email=$arv[$i];
                                $email=str_replace('to=<', '', $email);
                                $email=str_replace('>,', '', $email);
                        }
                        if(preg_match("/status=/i", $arv[$i]))
                        {
                                $status=str_replace('status=', '', $arv[$i]);
                                $this->message_2=substr($arv[5], 0, -1)." ".$email." ".$status;
                                $this->topic->produce(RD_KAFKA_PARTITION_UA, 0, $this->message_2);
                                $this->rk->flush(10000);
                                $this->logfile($this->message_2);
                                #echo $this->message_2."\n";
                                break;
                        }

                }

        }

        private function parse_3()
        {
                $arv=explode(" ",$this->line);
                for($i=0;$i<count($arv);$i++)
                {
                        if($arv[$i]=="sender")
                        {
                                $this->message_3=substr($arv[5], 0, -1)." ".$arv[$i+1];
                                $this->topic->produce(RD_KAFKA_PARTITION_UA, 0, $this->message_3);
                                $this->rk->flush(10000);
                                $this->logfile($this->message_3);
                                #echo $this->message_3."\n";
                                break;
                        }

                }

        }


}

$mail_status = new mail_status();
$mail_status->kafka_conn();
$mail_status->read_real_file("tail -f /var/log/maillog");

?>

обернем скрипт в сервис


[root@mail postfix]# cat /usr/lib/systemd/system/mail_status_log.service
[Unit]
Description=mail_log

[Service]
ExecStart=/bin/php /etc/postfix/mail_status/mail_status.php

[Install]
WantedBy=multi-user.target

[root@mail postfix]# systemctl status mail_status_log.service
● mail_status_log.service - mail_log
   Loaded: loaded (/usr/lib/systemd/system/mail_status_log.service; enabled; vendor preset: disabled)
   Active: active (running) since Sun 2024-02-04 00:07:12 MSK; 19h ago
 Main PID: 3178051 (php)
    Tasks: 10 (limit: 76110)
   Memory: 13.0M
   CGroup: /system.slice/mail_status_log.service
           ├─3178051 /bin/php /etc/postfix/mail_status/mail_status.php
           └─3178058 tail -f /var/log/maillog

Читаем кафку должно получиться что то вроде этого


[root@kafka-1 user]# /opt/kafka/bin/kafka-console-consumer.sh -bootstrap-server kafka-1.local:9092 --topic MAIL_STATUS --from-beginning
407C4103DD69 8e3d1471-0fe8-4801-ad07-11111111
407C4103DD69 test@list.ru sent
54BB31008473 test@local.ru bounced
54BB31008473 non-delivery