modify(' - '.self::THRESHOLD_MINUTES . ' minutes'); $startQueryTime = microtime(true); $query = $this->entityManager->createQuery( 'UPDATE App\Entity\Client c SET c.status = :status WHERE c.status = :currentStatus AND c.updatedAt < :threshold' ); $query->setParameter('status', ClientStatus::DISCONNECTED); $query->setParameter('currentStatus', ClientStatus::OG_LIVE); $query->setParameter('threshold', $threshold); $updatedCount = $query->execute(); $queryTime = microtime(true) - $startQueryTime; $startMercureTime = microtime(true); $clients = $this->entityManager->createQueryBuilder() ->select('c') ->from(Client::class, 'c') ->where('c.status = :status') ->andWhere('c.updatedAt < :threshold') ->setParameter('status', ClientStatus::DISCONNECTED) ->setParameter('threshold', $threshold) ->getQuery() ->getResult(); $this->dispatchMercureEvent($clients); $mercureTime = microtime(true) - $startMercureTime; $io->success("Updated $updatedCount clients to DISCONNECTED status."); $io->note("Query time: " . round($queryTime, 3) . "s"); $io->note("Mercure dispatch time: " . round($mercureTime, 3) . "s"); return Command::SUCCESS; } private function dispatchMercureEvent(array $clients, int $chunkSize = 10000): void { $chunks = array_chunk($clients, $chunkSize); foreach ($chunks as $chunk) { $data = []; foreach ($chunk as $client) { $data[] = [ '@id' => '/clients/' . $client->getUuid(), 'status' => $client->getStatus(), ]; } $update = new Update( 'clients', json_encode($data) ); $this->hub->publish($update); } } }