/var/www/hkosl.com/b2b2c/webadmin/libraies/monolog/monolog/src/Monolog/Handler/AmqpHandler.php


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
<?php

/*
 * This file is part of the Monolog package.
 *
 * (c) Jordi Boggiano <j.boggiano@seld.be>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Monolog\Handler;

use 
Monolog\Logger;
use 
Monolog\Formatter\JsonFormatter;
use 
PhpAmqpLib\Message\AMQPMessage;
use 
PhpAmqpLib\Channel\AMQPChannel;
use 
AMQPExchange;

class 
AmqpHandler extends AbstractProcessingHandler
{
    
/**
     * @var AMQPExchange|AMQPChannel $exchange
     */
    
protected $exchange;

    
/**
     * @var string
     */
    
protected $exchangeName;

    
/**
     * @param AMQPExchange|AMQPChannel $exchange     AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
     * @param string                   $exchangeName
     * @param int                      $level
     * @param bool                     $bubble       Whether the messages that are handled can bubble up the stack or not
     */
    
public function __construct($exchange$exchangeName 'log'$level Logger::DEBUG$bubble true)
    {
        if (
$exchange instanceof AMQPExchange) {
            
$exchange->setName($exchangeName);
        } elseif (
$exchange instanceof AMQPChannel) {
            
$this->exchangeName $exchangeName;
        } else {
            throw new \
InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
        }
        
$this->exchange $exchange;

        
parent::__construct($level$bubble);
    }

    
/**
     * {@inheritDoc}
     */
    
protected function write(array $record)
    {
        
$data $record["formatted"];
        
$routingKey $this->getRoutingKey($record);

        if (
$this->exchange instanceof AMQPExchange) {
            
$this->exchange->publish(
                
$data,
                
$routingKey,
                
0,
                array(
                    
'delivery_mode' => 2,
                    
'content_type' => 'application/json',
                )
            );
        } else {
            
$this->exchange->basic_publish(
                
$this->createAmqpMessage($data),
                
$this->exchangeName,
                
$routingKey
            
);
        }
    }

    
/**
     * {@inheritDoc}
     */
    
public function handleBatch(array $records)
    {
        if (
$this->exchange instanceof AMQPExchange) {
            
parent::handleBatch($records);

            return;
        }

        foreach (
$records as $record) {
            if (!
$this->isHandling($record)) {
                continue;
            }

            
$record $this->processRecord($record);
            
$data $this->getFormatter()->format($record);

            
$this->exchange->batch_basic_publish(
                
$this->createAmqpMessage($data),
                
$this->exchangeName,
                
$this->getRoutingKey($record)
            );
        }

        
$this->exchange->publish_batch();
    }

    
/**
     * Gets the routing key for the AMQP exchange
     *
     * @param  array  $record
     * @return string
     */
    
protected function getRoutingKey(array $record)
    {
        
$routingKey sprintf(
            
'%s.%s',
            
// TODO 2.0 remove substr call
            
substr($record['level_name'], 04),
            
$record['channel']
        );

        return 
strtolower($routingKey);
    }

    
/**
     * @param  string      $data
     * @return AMQPMessage
     */
    
private function createAmqpMessage($data)
    {
        return new 
AMQPMessage(
            (string) 
$data,
            array(
                
'delivery_mode' => 2,
                
'content_type' => 'application/json',
            )
        );
    }

    
/**
     * {@inheritDoc}
     */
    
protected function getDefaultFormatter()
    {
        return new 
JsonFormatter(JsonFormatter::BATCH_MODE_JSONfalse);
    }
}