Pimcore import workflows: automatische datafeeds bouwen
Een PIM systeem staat of valt met de kwaliteit en actualiteit van zijn data. In enterprise omgevingen betekent dit dagelijks miljoenen records synchroniseren vanuit ERP systemen, leveranciers en marketplaces. In dit artikel behandelen we production-grade import architectuur: delta imports die alleen gewijzigde data verwerken, dependency resolution voor complexe objectrelaties, en patterns voor herstelbare bulk operaties die tegen miljoenen records schalen.
Delta imports met content hashing
Full imports zijn onacceptabel bij grote datasets. Een delta import strategie met content hashing detecteert wijzigingen zonder de volledige dataset te vergelijken:
namespace App\Import\Service;
use Pimcore\Model\DataObject\Product;
use Pimcore\Db;
class DeltaImportService
{
private const HASH_TABLE = 'import_content_hashes';
public function shouldProcess(array $record): bool
{
$identifier = $record['sku'];
$newHash = $this->computeHash($record);
$db = Db::get();
$existingHash = $db->fetchOne(
'SELECT content_hash FROM ' . self::HASH_TABLE . ' WHERE identifier = ?',
[$identifier]
);
if ($existingHash === $newHash) {
return false;
}
$this->pendingHash = ['identifier' => $identifier, 'hash' => $newHash];
return true;
}
private function computeHash(array $record): string
{
ksort($record);
array_walk_recursive($record, function (&$value) {
if (is_string($value)) {
$value = trim(mb_strtolower($value));
}
});
return hash('xxh3', serialize($record));
}
public function markDeleted(array $processedIdentifiers): int
{
$db = Db::get();
$allIdentifiers = $db->fetchFirstColumn(
'SELECT identifier FROM ' . self::HASH_TABLE
);
$deletedIdentifiers = array_diff($allIdentifiers, $processedIdentifiers);
$deletedCount = 0;
foreach ($deletedIdentifiers as $sku) {
$product = Product::getBySku($sku, ['limit' => 1, 'unpublished' => true]);
if ($product) {
$product->setPublished(false);
$product->save();
$deletedCount++;
}
}
return $deletedCount;
}
}De xxh3 hash is extreem snel (10GB/s) en voldoende voor change detection. Voor feeds met 500.000 records reduceert dit typisch 95% van de processing tijd omdat alleen ~5% daadwerkelijk wijzigt.
Dependency resolution voor complexe relaties
Producten hangen af van categorieën, manufacturers en andere objecten. Naïeve imports falen wanneer dependencies niet bestaan. Een topological sort met deferred processing lost dit op:
namespace App\Import\Resolver;
use Pimcore\Model\DataObject;
class DependencyResolver
{
private array $deferredQueue = [];
private array $createdObjects = [];
private int $maxPasses = 3;
public function process(array $records, callable $processor): ImportResult
{
$result = new ImportResult();
for ($pass = 0; $pass < $this->maxPasses; $pass++) {
$remaining = [];
foreach ($records as $record) {
try {
$dependencies = $this->extractDependencies($record);
if ($this->allDependenciesMet($dependencies)) {
$object = $processor($record);
$this->createdObjects[$record['sku']] = $object->getId();
$result->success++;
} else {
$remaining[] = $record;
}
} catch (DependencyException $e) {
$remaining[] = $record;
}
}
if (empty($remaining) || count($remaining) === count($records)) {
break;
}
$records = $remaining;
}
$result->failed = count($remaining);
$result->failedRecords = $remaining;
return $result;
}
private function extractDependencies(array $record): array
{
$deps = [];
if (!empty($record['category_path'])) {
$deps[] = ['type' => 'category', 'path' => $record['category_path']];
}
if (!empty($record['parent_sku'])) {
$deps[] = ['type' => 'product', 'sku' => $record['parent_sku']];
}
foreach ($record['related_skus'] ?? [] as $relatedSku) {
$deps[] = ['type' => 'product', 'sku' => $relatedSku, 'optional' => true];
}
return $deps;
}
private function allDependenciesMet(array $dependencies): bool
{
foreach ($dependencies as $dep) {
if ($dep['optional'] ?? false) continue;
if (!$this->dependencyExists($dep)) {
return false;
}
}
return true;
}
}Deze multi-pass aanpak handelt circulaire dependencies af (product A verwijst naar B, B naar A) door optionele relaties pas in een latere pass te linken.
Performance: Pimcore internals omzeilen
Bij bulk imports worden Pimcore's convenience features bottlenecks. Versioning, search indexing en event dispatching kosten meer tijd dan de daadwerkelijke data operaties:
namespace App\Import\Service;
use Pimcore\Model\DataObject\Product;
use Pimcore\Model\DataObject\Service;
use Pimcore\Model\Version;
use Pimcore\Db;
class BulkImportService
{
public function importBatch(array $records): void
{
Version::disable();
\Pimcore\Model\DataObject::setHideUnpublished(false);
$this->disableSearchIndexing();
$db = Db::get();
$db->beginTransaction();
try {
foreach (array_chunk($records, 100) as $chunk) {
foreach ($chunk as $record) {
$product = $this->createOrUpdateProduct($record);
$product->save([
'versionNote' => 'Bulk import',
'isAutoSave' => true
]);
}
\Pimcore::collectGarbage();
Db::get()->getConfiguration()->setSQLLogger(null);
}
$db->commit();
} catch (\Throwable $e) {
$db->rollBack();
throw $e;
} finally {
Version::enable();
$this->enableSearchIndexing();
}
$this->dispatchSearchIndexJob();
}
private function disableSearchIndexing(): void
{
$dispatcher = \Pimcore::getEventDispatcher();
$this->originalListeners = $dispatcher->getListeners('pimcore.dataobject.postUpdate');
foreach ($this->originalListeners as $listener) {
if ($this->isSearchIndexListener($listener)) {
$dispatcher->removeListener('pimcore.dataobject.postUpdate', $listener);
}
}
}
}Benchmarks tonen 10-20x snelheidswinst bij disablen van versioning en search indexing. Bij 100.000 producten: van 8 uur naar 25 minuten.
Herstelbare imports met checkpointing
Lange imports kunnen falen door memory limits, timeouts of externe factoren. Checkpointing maakt imports hervatbaar vanaf het laatste succesvolle punt:
namespace App\Import\Service;
use Pimcore\Db;
use Symfony\Component\Lock\LockFactory;
class ResumableImportService
{
private const CHECKPOINT_TABLE = 'import_checkpoints';
public function __construct(
private LockFactory $lockFactory,
private string $importId
) {}
public function run(iterable $records, callable $processor): ImportReport
{
$lock = $this->lockFactory->createLock('import_' . $this->importId, 3600);
if (!$lock->acquire()) {
throw new ImportLockedException('Import already running');
}
try {
$checkpoint = $this->loadCheckpoint();
$report = new ImportReport($this->importId);
$report->resumedFrom = $checkpoint['processed_count'] ?? 0;
$currentIndex = 0;
$batchBuffer = [];
foreach ($records as $record) {
$currentIndex++;
if ($currentIndex <= $report->resumedFrom) {
continue;
}
$batchBuffer[] = $record;
if (count($batchBuffer) >= 100) {
$this->processBatch($batchBuffer, $processor, $report);
$this->saveCheckpoint($currentIndex, $report);
$batchBuffer = [];
$lock->refresh();
}
}
if (!empty($batchBuffer)) {
$this->processBatch($batchBuffer, $processor, $report);
}
$this->clearCheckpoint();
$report->completed = true;
return $report;
} finally {
$lock->release();
}
}
private function saveCheckpoint(int $processedCount, ImportReport $report): void
{
$db = Db::get();
$db->executeStatement(
'INSERT INTO ' . self::CHECKPOINT_TABLE . '
(import_id, processed_count, success_count, error_count, last_sku, updated_at)
VALUES (?, ?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
processed_count = VALUES(processed_count),
success_count = VALUES(success_count),
error_count = VALUES(error_count),
last_sku = VALUES(last_sku),
updated_at = NOW()',
[
$this->importId,
$processedCount,
$report->successCount,
$report->errorCount,
$report->lastProcessedSku
]
);
}
}Combineer dit met Symfony's Lock component om race conditions te voorkomen bij parallelle cron executions. De checkpoint tabel fungeert ook als audit log voor import historie.
Complexe import requirements?
Wij bouwen enterprise-grade import pipelines die schalen naar miljoenen records met volledige data integriteit.
Neem contact op