Hyperf采坑记录
阻塞函数会对CSP编程带来哪些问题?
如果你用的swoole版本低于v4.5.4,并且没有使用hyperf提供的协程Guzzle客户端,那么对于应用程序并发能力的影响是极其大的。Hyperf的协程依托swoole开启数个worker进程,当某一个进程调用了某些trap函数,该进程将会把控制权让出直到有了返回结果,反映到现象层面就是这个进程在执行某些IO操作的期间不工作了。
案例:
客户端请求A服务,A服务中有一些数据需要B服务的支撑;
如果同时发出两个请求,不外乎应该出现下面两种情况:
- 两个耗时相同均在10秒左右的话,说明并未发生阻塞,该进程正常工作;
- 如果其中一个大幅超过10秒,说明存在排队处理现象,发生了阻塞;
我本地的swoole版本为4.5.2,SWOOLE_HOOK_CURL是在4.5.4才正式被包含进入SWOOLE_HOOK_ALL范畴,所以只要Flag不写成“SWOOLE_HOOK_ALL|SWOOLE_HOOK_CURL”,且使用原生Guzzle客户端就可以进行模拟了;
试验的结果:
#request A
curl http://127.0.0.1:9501 0.00s user 0.00s system 0% cpu 10.011 total
#request B
curl http://127.0.0.1:9501 0.00s user 0.00s system 0% cpu 19.347 total
在A请求发送期间,该进程并没有接受B请求,AB两个请求是串行执行的。
如果Flag新增CURL_HOOK,则该问题可以避免(前提是swool版本至少>v4.4LTS)。
#request A
curl http://127.0.0.1:9501 0.00s user 0.00s system 0% cpu 10.037 total
#request B
curl http://127.0.0.1:9501 0.00s user 0.00s system 0% cpu 10.011 total
解决:
1、在使用hyperf框架做业务,发生IO操作最好使用官方提供的协程客户端。如果发生上述阻塞问题的话会对并发产生极大影响,理论同一时刻同时处理的任务数量最高也就等于设置的进程数量。
2、swool版本尽量使用次新版,除了协程HOOK的问题,还会伴随很多特性与Fixed的利好。
服务层可以在构造函数中初始化其他平级Service类么?
可以,但是需要注意循环引用,在业务系统中业务层代码经常会出现Service层互相调用的情况,发生了会OOM(新版hyperf会设置一个最大递归深度500,超出500阈值会抛出一个CircularDependencyException的异常,不会直接显示OOM的异常信息)。譬如如下代码:
interface AInterface
{
public function aFoo(): bool;
}
interface BInterface
{
public function b1Foo(): bool;
public function b2Foo():bool;
}
class AService implements AInterface
{
protected $bService;
function __construct(BInterface $bService)
{
$this->bService = $bService;
}
public function aFoo(): bool
{
return $this->bService->b1Foo();
}
}
class BService implements BInterface
{
protected $aService;
function __construct(AInterface $aService)
{
$this->aService = $aService;
}
public function b1Foo(): bool
{
return true;
}
public function b2Foo():bool
{
$aRet = $this->aService->aFoo();
//do something..
return $aRet;
}
}
如果没有做TDD的话,这类错误实际上是比较难以发现的,只有产生请求后才会报出来,如果是个冷接口上线且测试覆盖度不够,上线后会偶现这个CircularDependencyException异常。
解决:
1、避免在构造类中实例化(make)或注入同层Service,按需加载需要的Service.
class AService implements AInterface
{
public function aFoo(): bool
{
$bService = ApplicationContext::getContainer()->get(BInterface::class);
return $bService->b1Foo();
}
}
class BService implements BInterface
{
public function b1Foo(): bool
{
return true;
}
public function b2Foo():bool
{
$aService = ApplicationContext::getContainer()->get(AInterface::class);
$aRet = $aService->AFoo();
//do something..
return $aRet;
}
}
当然即便是这样去做其实也避免不了两个函数间的互相调用,如果发生了两个函数间的调用那么异常信息则是“PHP Fatal error: Allowed memory size of xxx”,因为函数不是Di层面解决的问题,所以和上面的深度异常没有多大关系,直接抛出语言级别的OOM。
2、方案1其实是尽可能的避免循环调用,但其实终归是从根本上解决不了问题,但我们还有个办法,就是解决掉出问题的coding姿势。在PHP中我们还有一个特性可以用——trait。利用trait可以将需要公共业务逻辑剥离出来,赋予业务类想要的能力。同时需要团队约定,同层禁止相互调用。
3、坚持TDD,测试驱动开发有很多好处,尤其是在面向PHP CLI编程的情景下,不仅在对测试或是CI交付上有一定的质量把关,在开发速度与效率上也会提升很多(因为不用启动服务)。并且TDD的生命周期是伴随着项目长效的,每一个Case在需求没有变更的情况下都可以反复使用,虽然在开发阶段多写了一些代码,但是收益是非常高的。
为什么升级到2.0.x以上的版本,使用AOP切其他三方库会失效?
2.0.x(2020.5月)后AOP做了很多改动,这个三方库AOP失效实际上是引入了ProxyManager这个类,主要作用就是优化了扫描切入内容,由1.x的匹配模式,改为了”有目的“的进行匹配。匹配的内容实际上就是APP目录下的类以及ClassLoader中的已注册类。
ProxyManager.php
//这里的composerLoaserClassMap来自于ClassLoader中的spl_autoload_functions方法返回的已注册的类
public function __construct(
array $reflectionClassMap = [],
array $composerLoaderClassMap = [],
string $proxyDir = ''
) {
$this->classMap = $this->mergeClassMap($reflectionClassMap, $composerLoaderClassMap);
$this->proxyDir = $proxyDir;
$this->filesystem = new Filesystem();
$this->proxies = $this->generateProxyFiles($this->initProxiesByReflectionClassMap(
$this->classMap
));
}
问题其实已经显而易见了,由于2.0升级文档中没有明确标注需要composer dump-autoload -o ,所以很多同学在升级的过程中忽略了这个命令,导致很多三方库的aspect失效(这个问题在2.0–>2.1升级文档中已经补充说明了…)。
解决:
1、”composer dump-autoload -o”,填充好vendor/composer/autoload_classmap.php文件内容;
2、在composer.json文件中指明需要优化加载。PS:新的骨架项目已包含该项
"config": {
"optimize-autoloader": true
}
只有这样才能够将三方包纳入到匹配范围中,生成正确的代理类,实现切面。
如何对amqp消息进行“手动消费”,而非靠进程自启?
Hyperf官方提倡的是消费方式是使用注解的方式起自定义进程进行消费:
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
*/
class DemoConsumer extends ConsumerMessage
{
public function consume($data): string
{
print_r($data);
return Result::ACK;
}
}
这种方式可控性比较差,尤其是在分布式部署环境下。我们消费MQ消息可能会遇到各种各样的场景,根据复杂程度无非就是简单消费和复杂消费,简单消费就是简单的数据存取,状态变更等,复杂消费就要分情况了,从资源的角度可以理解为以下两点:
- CPU密集型:消费的过程中会占用较多的CPU资源比如为了降低载荷做了削峰处理自产自消。
- IO密集型:在完成消费的同时,需要协调多个外部服务协同处理;
在复杂消费场景下使用直接面向C端的硬件资源来处理显然就不是很合适了,并且线上的资源可能还会分为很多组,比如灰度资源,环境可能也会比较复杂。Hyperf其实提供了一种禁用自定义进程消费模式:@Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1, enable=false)
但是这个只是为了避免联合开发时候造成的互相消费引起的开发不便而引入的特性,如果单独给相关消费服务单独分配硬件资源,就必须保证各个环境的代码差异化,CICD阶段会引入不必要的思维负担。
解决:
所以比较好的解决方式是统一团队标准准:不启动自定义进程,由Command来控制消费,指定消费的硬件资源就好了。
使用Command来消费其实也很简单,原理其实就是使用hyperf自带的消费器消费ConsumerMessage就好了。
/**
* 示例类:
* 消费消息
*
* Class DemoConsumer
* @package App\Amqp\Consumer
*/
class DemoConsumer extends ConsumerMessage
{
protected $exchange = 'amqptest.exchange';
protected $routingKey='amqptest.binding';
protected $queue='amqptest';
public function consumeMessage($data, AMQPMessage $message): string
{
$messageProperties = $message->get_properties();
echo "消费消息ID->{$data['id']}" . PHP_EOL;
echo "MessageID->{$messageProperties['message_id']}" . PHP_EOL;
return Result::ACK;
}
public function getExchangeBuilder(): ExchangeBuilder
{
return parent::getExchangeBuilder()->setDurable(true);
}
public function isEnable(): bool
{
return false;
}
}
Command:
/**
* 示例类:
*
* 这是一个使用 command 对 AMQP 进行消费的示例
*
* @Command
*/
class DemoCustomerCommand extends BaseCommand
{
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
parent::__construct('amqp:customer');
}
public function configure()
{
parent::configure();
$this->setDescription('Hyperf Demo Command');
}
public function handle()
{
$consumer = new Consumer(
$this->container,
$this->container->get(PoolFactory::class),
$this->container->get(StdoutLoggerInterface::class)
);
$consumer->consume(new DemoConsumer());
$this->line('Hello Hyperf!', 'info');
}
}
至此可以为消费端分配单独的硬件机器处理消费任务。值得一提的是MessageId很重要,如果在阿里云上开启了amqp的日志收集,MessageId是检查消费成功与否的唯一准确线索。
使用阿里云ACM,在执行command模式下运行为何获取不到ACM配置?
如果你用的是1.x版本,那么你触发了一个隐藏彩蛋。ACM的工作模式是自定义进程通过管道消息将配置分发给其他worker进程,其他worker进程接到消息后完成对本地config的覆盖任务,当然在进程启动的时候也会拉一把配置,这个启动监听的位置在[config-aliyun-acm/src/Listener/BootProcessListener.php]。
1.x版本分别监听了两个事件
- BeforeWorkerStart
- BeforeProcessHandle
这个两个事件和command没有半毛钱关系,启动Command也不会拉起自定义进程。
当时我们是这样处理的,写一个Command基类,在Command命令启动后主动的拉一把ACM,将自己的config内容进行覆盖处理,然后再执行子类的handle实现。$config = ApplicationContext::getContainer()->get(ConfigInterface::class); if ($config->get('aliyun_acm.enable', false)) { $acmConfig = ApplicationContext::getContainer()->get(ClientInterface::class)->pull(); foreach ($acmConfig as $key => $value) { $config->set($key, $value); } self::$config = $config; }
这样做实际上是比较丑陋的,虽然拉取ACM也是初始化一部分,但是放在构造里还是怪怪的,之后读了一下Command的源码,发现了一个隐藏彩蛋。。。
protected function execute(InputInterface $input, OutputInterface $output) { $callback = function () { try { $this->eventDispatcher && $this->eventDispatcher->dispatch(new Event\BeforeHandle($this)); call([$this, 'handle']); $this->eventDispatcher && $this->eventDispatcher->dispatch(new Event\AfterHandle($this)); } catch (\Throwable $exception) { .......... } finally { $this->eventDispatcher && $this->eventDispatcher->dispatch(new Event\AfterExecute($this)); } return 0; }; .......... return $callback(); }
实际上在执行handle方法的时候实际上在上下是埋了两个事件的,这两个事件只是没有启动而已,因为1.x版本的command中的eventDispatcher属性恒等于空。但有了这个也就好办多了,用框架自身的机制去处理业务需要实际上是最好的,因为无论怎么改框架都是需要考虑BC的,但如果直接魔改源码很可能后期框架升级后你的回滚成本与风险会被推到一个比较高的高度,这不是我们希望看到的。
于是就有了下面这个版本:
在自定义BaseCommand中给eventDispatcher成员赋值使其具备事件分发的能力:class BaseCommand extends HyperfCommand { public function __construct(string $name = null) { // 开启 Command 事件 $this->eventDispatcher = ApplicationContext::getContainer()->get(EventDispatcherInterface::class); parent::__construct($name); } public function handle() { return; } }
创建一个事件监听,监听command启动,拉取ACM配置到本地:
class CommandListener implements ListenerInterface { public function listen(): array { return [ BeforeHandle::class, ]; } public function process(object $event) { if ($event instanceof BeforeHandle) { // 初始化ACM CommandConfig::initConfig(); } } }
这样1.x的ACM问题就解决了,接下来说说2.x的ACM。
2.x的ACM官方主动的开启了该事件。。在Command命令中引入了一个新的options “–enable-event-dispatcher”
[command/src/EnableEventDispatcher.php]public function enableDispatcher(InputInterface $input) { if ($input->getOption('enable-event-dispatcher')) { $this->eventDispatcher = ApplicationContext::getContainer()->get(EventDispatcherInterface::class); } }
并且在ACM的启动监听类中也主动加入了Command事件的监听
[config-aliyun-acm/src/Listener/BootProcessListener.php]public function listen(): array { return [ BeforeWorkerStart::class, BeforeProcessHandle::class, BeforeHandle::class, ]; }
解决:
1.x的解决方案见上面代码。2.x解决方案ACM只需要在command命令后面新增 ”–enable-event-dispatcher“ options即可。