1
0
Fork 0
mirror of https://github.com/wallabag/wallabag.git synced 2025-07-12 16:58:37 +00:00

Send every imported item to the queue

Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format.
Then, the worker retrieve that information, find / create the entry and save it.
This commit is contained in:
Jeremy Benoist 2016-09-03 17:36:57 +02:00
parent 87c9995b6c
commit ef75e1220e
No known key found for this signature in database
GPG key ID: BCA73962457ACC3C
9 changed files with 226 additions and 159 deletions

View file

@ -219,24 +219,24 @@ lexik_maintenance:
old_sound_rabbit_mq: old_sound_rabbit_mq:
connections: connections:
default: default:
host: %rabbitmq_host% host: "%rabbitmq_host%"
port: %rabbitmq_port% port: "%rabbitmq_port%"
user: %rabbitmq_user% user: "%rabbitmq_user%"
password: %rabbitmq_password% password: "%rabbitmq_password%"
vhost: / vhost: /
lazy: false lazy: true
producers: producers:
wallabag: wallabag_pocket:
connection: default connection: default
exchange_options: exchange_options:
name: 'wallabag_exchange' name: 'wallabag.import.pocket'
type: topic type: topic
consumers: consumers:
entries: wallabag_pocket:
connection: default connection: default
exchange_options: exchange_options:
name: 'wallabag_exchange' name: 'wallabag.import.pocket'
type: topic type: topic
queue_options: queue_options:
name: 'wallabag_queue' name: 'wallabag.import.pocket'
callback: wallabag_import.consumer.entry callback: wallabag_import.consumer.pocket

View file

@ -41,11 +41,7 @@ parameters:
rss_limit: 50 rss_limit: 50
# pocket import
pocket_consumer_key: xxxxxxxx
# RabbitMQ processing # RabbitMQ processing
rabbitmq: false
rabbitmq_host: localhost rabbitmq_host: localhost
rabbitmq_port: 5672 rabbitmq_port: 5672
rabbitmq_user: guest rabbitmq_user: guest

View file

@ -1,39 +0,0 @@
<?php
namespace Wallabag\ImportBundle\Component\AMPQ;
use Doctrine\ORM\EntityManager;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Wallabag\CoreBundle\Helper\ContentProxy;
use Wallabag\CoreBundle\Repository\EntryRepository;
class EntryConsumer implements ConsumerInterface
{
private $em;
private $contentProxy;
private $entryRepository;
public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy)
{
$this->em = $em;
$this->entryRepository = $entryRepository;
$this->contentProxy = $contentProxy;
}
/**
* {@inheritdoc}
*/
public function execute(AMQPMessage $msg)
{
$storedEntry = unserialize($msg->body);
$entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']);
if ($entry) {
$entry = $this->contentProxy->updateEntry($entry, $entry->getUrl());
if ($entry) {
$this->em->persist($entry);
$this->em->flush();
}
}
}
}

View file

@ -0,0 +1,63 @@
<?php
namespace Wallabag\ImportBundle\Consumer\AMPQ;
use Doctrine\ORM\EntityManager;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Wallabag\ImportBundle\Import\PocketImport;
use Wallabag\UserBundle\Repository\UserRepository;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
class PocketConsumer implements ConsumerInterface
{
private $em;
private $userRepository;
private $pocketImport;
private $logger;
public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null)
{
$this->em = $em;
$this->userRepository = $userRepository;
$this->pocketImport = $pocketImport;
$this->logger = $logger ?: new NullLogger();
}
/**
* {@inheritdoc}
*/
public function execute(AMQPMessage $msg)
{
$storedEntry = json_decode($msg->body, true);
$user = $this->userRepository->find($storedEntry['userId']);
// no user? Drop message
if (null === $user) {
$this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
return;
}
$this->pocketImport->setUser($user);
$entry = $this->pocketImport->parseEntry($storedEntry);
if (null === $entry) {
$this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
return;
}
try {
$this->em->flush();
$this->em->clear($entry);
} catch (\Exception $e) {
$this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
return;
}
}
}

View file

@ -10,12 +10,29 @@ use Symfony\Component\Form\Extension\Core\Type\CheckboxType;
class PocketController extends Controller class PocketController extends Controller
{ {
/**
* Return Pocket Import Service with or without RabbitMQ enabled.
*
* @return \Wallabag\ImportBundle\Import\PocketImport
*/
private function getPocketImportService()
{
$pocket = $this->get('wallabag_import.pocket.import');
$pocket->setUser($this->getUser());
if ($this->get('craue_config')->get('rabbitmq')) {
$pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer'));
}
return $pocket;
}
/** /**
* @Route("/pocket", name="import_pocket") * @Route("/pocket", name="import_pocket")
*/ */
public function indexAction() public function indexAction()
{ {
$pocket = $this->get('wallabag_import.pocket.import'); $pocket = $this->getPocketImportService();
$form = $this->createFormBuilder($pocket) $form = $this->createFormBuilder($pocket)
->add('mark_as_read', CheckboxType::class, [ ->add('mark_as_read', CheckboxType::class, [
'label' => 'import.form.mark_as_read_label', 'label' => 'import.form.mark_as_read_label',
@ -24,7 +41,7 @@ class PocketController extends Controller
->getForm(); ->getForm();
return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ return $this->render('WallabagImportBundle:Pocket:index.html.twig', [
'import' => $this->get('wallabag_import.pocket.import'), 'import' => $this->getPocketImportService(),
'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true,
'form' => $form->createView(), 'form' => $form->createView(),
]); ]);
@ -35,7 +52,7 @@ class PocketController extends Controller
*/ */
public function authAction(Request $request) public function authAction(Request $request)
{ {
$requestToken = $this->get('wallabag_import.pocket.import') $requestToken = $this->getPocketImportService()
->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL)); ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL));
if (false === $requestToken) { if (false === $requestToken) {
@ -62,7 +79,7 @@ class PocketController extends Controller
public function callbackAction() public function callbackAction()
{ {
$message = 'flashes.import.notice.failed'; $message = 'flashes.import.notice.failed';
$pocket = $this->get('wallabag_import.pocket.import'); $pocket = $this->getPocketImportService();
$markAsRead = $this->get('session')->get('mark_as_read'); $markAsRead = $this->get('session')->get('mark_as_read');
$this->get('session')->remove('mark_as_read'); $this->get('session')->remove('mark_as_read');

View file

@ -3,12 +3,11 @@
namespace Wallabag\ImportBundle\Import; namespace Wallabag\ImportBundle\Import;
use OldSound\RabbitMqBundle\RabbitMq\Producer; use OldSound\RabbitMqBundle\RabbitMq\Producer;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use Doctrine\ORM\EntityManager; use Doctrine\ORM\EntityManager;
use GuzzleHttp\Client; use GuzzleHttp\Client;
use GuzzleHttp\Exception\RequestException; use GuzzleHttp\Exception\RequestException;
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; use Symfony\Component\Security\Core\User\UserInterface;
use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Entity\Entry;
use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Helper\ContentProxy;
use Craue\ConfigBundle\Util\Config; use Craue\ConfigBundle\Util\Config;
@ -21,21 +20,39 @@ class PocketImport extends AbstractImport
private $skippedEntries = 0; private $skippedEntries = 0;
private $importedEntries = 0; private $importedEntries = 0;
private $markAsRead; private $markAsRead;
protected $accessToken;
private $producer; private $producer;
private $rabbitMQ; protected $accessToken;
public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer) public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig)
{ {
$this->user = $tokenStorage->getToken()->getUser();
$this->em = $em; $this->em = $em;
$this->contentProxy = $contentProxy; $this->contentProxy = $contentProxy;
$this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->consumerKey = $craueConfig->get('pocket_consumer_key');
$this->logger = new NullLogger(); $this->logger = new NullLogger();
$this->rabbitMQ = $craueConfig->get('rabbitmq'); }
/**
* Set RabbitMQ Producer to send each entry to a queue.
* This method should be called when user has enabled RabbitMQ.
*
* @param Producer $producer
*/
public function setRabbitmqProducer(Producer $producer)
{
$this->producer = $producer; $this->producer = $producer;
} }
/**
* Set current user.
* Could the current *connected* user or one retrieve by the consumer.
*
* @param UserInterface $user
*/
public function setUser(UserInterface $user)
{
$this->user = $user;
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
@ -168,6 +185,12 @@ class PocketImport extends AbstractImport
$entries = $response->json(); $entries = $response->json();
if ($this->producer) {
$this->parseEntriesForProducer($entries['list']);
return true;
}
$this->parseEntries($entries['list']); $this->parseEntries($entries['list']);
return true; return true;
@ -197,88 +220,112 @@ class PocketImport extends AbstractImport
/** /**
* @see https://getpocket.com/developer/docs/v3/retrieve * @see https://getpocket.com/developer/docs/v3/retrieve
* *
* @param $entries * @param array $entries
*/ */
private function parseEntries($entries) private function parseEntries(array $entries)
{ {
$i = 1; $i = 1;
foreach ($entries as &$pocketEntry) { foreach ($entries as $pocketEntry) {
$url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; $entry = $this->parseEntry($pocketEntry);
$existingEntry = $this->em if (null === $entry) {
->getRepository('WallabagCoreBundle:Entry')
->findByUrlAndUserId($url, $this->user->getId());
if (false !== $existingEntry) {
++$this->skippedEntries;
continue; continue;
} }
$entry = new Entry($this->user);
if (!$this->rabbitMQ) {
$entry = $this->fetchContent($entry, $url);
// jump to next entry in case of problem while getting content
if (false === $entry) {
++$this->skippedEntries;
continue;
}
}
// 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
if ($pocketEntry['status'] == 1 || $this->markAsRead) {
$entry->setArchived(true);
}
// 0 or 1 - 1 If the item is starred
if ($pocketEntry['favorite'] == 1) {
$entry->setStarred(true);
}
$title = 'Untitled';
if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
$title = $pocketEntry['resolved_title'];
} elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
$title = $pocketEntry['given_title'];
}
$entry->setTitle($title);
$entry->setUrl($url);
// 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
$entry->setPreviewPicture($pocketEntry['images'][1]['src']);
}
if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
$this->contentProxy->assignTagsToEntry(
$entry,
array_keys($pocketEntry['tags'])
);
}
$pocketEntry['url'] = $url;
$pocketEntry['userId'] = $this->user->getId();
$this->em->persist($entry);
++$this->importedEntries;
// flush every 20 entries // flush every 20 entries
if (($i % 20) === 0) { if (($i % 20) === 0) {
$this->em->flush(); $this->em->flush();
$this->em->clear($entry);
} }
++$i; ++$i;
} }
$this->em->flush(); $this->em->flush();
}
if ($this->rabbitMQ) { public function parseEntry(array $pocketEntry)
foreach ($entries as $entry) { {
$this->producer->publish(serialize($entry)); $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
$existingEntry = $this->em
->getRepository('WallabagCoreBundle:Entry')
->findByUrlAndUserId($url, $this->user->getId());
if (false !== $existingEntry) {
++$this->skippedEntries;
return;
}
$entry = new Entry($this->user);
$entry = $this->fetchContent($entry, $url);
// jump to next entry in case of problem while getting content
if (false === $entry) {
++$this->skippedEntries;
return;
}
// 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
if ($pocketEntry['status'] == 1 || $this->markAsRead) {
$entry->setArchived(true);
}
// 0 or 1 - 1 If the item is starred
if ($pocketEntry['favorite'] == 1) {
$entry->setStarred(true);
}
$title = 'Untitled';
if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
$title = $pocketEntry['resolved_title'];
} elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
$title = $pocketEntry['given_title'];
}
$entry->setTitle($title);
$entry->setUrl($url);
// 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
$entry->setPreviewPicture($pocketEntry['images'][1]['src']);
}
if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
$this->contentProxy->assignTagsToEntry(
$entry,
array_keys($pocketEntry['tags'])
);
}
$this->em->persist($entry);
++$this->importedEntries;
return $entry;
}
/**
* Faster parse entries for Producer.
* We don't care to make check at this time. They'll be done by the consumer.
*
* @param array $entries
*/
public function parseEntriesForProducer($entries)
{
foreach ($entries as $pocketEntry) {
// set userId for the producer (it won't know which user is connected)
$pocketEntry['userId'] = $this->user->getId();
if ($this->markAsRead) {
$pocketEntry['status'] = 1;
} }
++$this->importedEntries;
$this->producer->publish(json_encode($pocketEntry));
} }
} }
} }

View file

@ -1,10 +1,11 @@
services: services:
wallabag_import.consumer.entry: wallabag_import.consumer.pocket:
class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer
arguments: arguments:
- "@doctrine.orm.entity_manager" - "@doctrine.orm.entity_manager"
- "@wallabag_core.entry_repository" - "@wallabag_user.user_repository"
- "@wallabag_core.content_proxy" - "@wallabag_import.pocket.import"
- "@logger"
wallabag_import.chain: wallabag_import.chain:
class: Wallabag\ImportBundle\Import\ImportChain class: Wallabag\ImportBundle\Import\ImportChain
@ -21,11 +22,9 @@ services:
wallabag_import.pocket.import: wallabag_import.pocket.import:
class: Wallabag\ImportBundle\Import\PocketImport class: Wallabag\ImportBundle\Import\PocketImport
arguments: arguments:
- "@security.token_storage"
- "@doctrine.orm.entity_manager" - "@doctrine.orm.entity_manager"
- "@wallabag_core.content_proxy" - "@wallabag_core.content_proxy"
- "@craue_config" - "@craue_config"
- "@old_sound_rabbit_mq.wallabag_producer"
calls: calls:
- [ setClient, [ "@wallabag_import.pocket.client" ] ] - [ setClient, [ "@wallabag_import.pocket.client" ] ]
- [ setLogger, [ "@logger" ]] - [ setLogger, [ "@logger" ]]

View file

@ -14,3 +14,9 @@ services:
- "@router" - "@router"
tags: tags:
- { name: kernel.event_subscriber } - { name: kernel.event_subscriber }
wallabag_user.user_repository:
class: Wallabag\UserBundle\Repository\UserRepository
factory: [ "@doctrine.orm.default_entity_manager", getRepository ]
arguments:
- WallabagUserBundle:User

View file

@ -27,32 +27,15 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase
protected $em; protected $em;
protected $contentProxy; protected $contentProxy;
protected $logHandler; protected $logHandler;
protected $producer;
private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false) private function getPocketImport($consumerKey = 'ConsumerKey')
{ {
$this->user = new User(); $this->user = new User();
$this->tokenStorage = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface')
->disableOriginalConstructor()
->getMock();
$token = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\TokenInterface')
->disableOriginalConstructor()
->getMock();
$this->contentProxy = $this->getMockBuilder('Wallabag\CoreBundle\Helper\ContentProxy') $this->contentProxy = $this->getMockBuilder('Wallabag\CoreBundle\Helper\ContentProxy')
->disableOriginalConstructor() ->disableOriginalConstructor()
->getMock(); ->getMock();
$token->expects($this->once())
->method('getUser')
->willReturn($this->user);
$this->tokenStorage->expects($this->once())
->method('getToken')
->willReturn($token);
$this->em = $this->getMockBuilder('Doctrine\ORM\EntityManager') $this->em = $this->getMockBuilder('Doctrine\ORM\EntityManager')
->disableOriginalConstructor() ->disableOriginalConstructor()
->getMock(); ->getMock();
@ -66,17 +49,12 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase
->with('pocket_consumer_key') ->with('pocket_consumer_key')
->willReturn($consumerKey); ->willReturn($consumerKey);
$this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer')
->disableOriginalConstructor()
->getMock();
$pocket = new PocketImportMock( $pocket = new PocketImportMock(
$this->tokenStorage,
$this->em, $this->em,
$this->contentProxy, $this->contentProxy,
$config, $config
$this->producer
); );
$pocket->setUser($this->user);
$this->logHandler = new TestHandler(); $this->logHandler = new TestHandler();
$logger = new Logger('test', [$this->logHandler]); $logger = new Logger('test', [$this->logHandler]);