Building automated import/export workflows in Pimcore
A PIM system stands or falls with the quality and timeliness of its data. In enterprise environments, this means synchronizing millions of records daily from ERP systems, suppliers and marketplaces. In this article, we cover production-grade import architecture: delta imports that only process changed data, dependency resolution for complex object relationships, and patterns for resumable bulk operations that scale to millions of records.
Delta imports with content hashing
Full imports are unacceptable for large datasets. A delta import strategy with content hashing detects changes without comparing the entire dataset:
namespace App\Import\Service;
use Pimcore\Model\DataObject\Product;
use Pimcore\Db;
class DeltaImportService
{
private const HASH_TABLE = 'import_content_hashes';
public function shouldProcess(array $record): bool
{
$identifier = $record['sku'];
$newHash = $this->computeHash($record);
$db = Db::get();
$existingHash = $db->fetchOne(
'SELECT content_hash FROM ' . self::HASH_TABLE . ' WHERE identifier = ?',
[$identifier]
);
if ($existingHash === $newHash) {
return false;
}
$this->pendingHash = ['identifier' => $identifier, 'hash' => $newHash];
return true;
}
private function computeHash(array $record): string
{
ksort($record);
array_walk_recursive($record, function (&$value) {
if (is_string($value)) {
$value = trim(mb_strtolower($value));
}
});
return hash('xxh3', serialize($record));
}
public function markDeleted(array $processedIdentifiers): int
{
$db = Db::get();
$allIdentifiers = $db->fetchFirstColumn(
'SELECT identifier FROM ' . self::HASH_TABLE
);
$deletedIdentifiers = array_diff($allIdentifiers, $processedIdentifiers);
$deletedCount = 0;
foreach ($deletedIdentifiers as $sku) {
$product = Product::getBySku($sku, ['limit' => 1, 'unpublished' => true]);
if ($product) {
$product->setPublished(false);
$product->save();
$deletedCount++;
}
}
return $deletedCount;
}
}The xxh3 hash is extremely fast (10GB/s) and sufficient for change detection. For feeds with 500,000 records, this typically reduces processing time by 95% because only ~5% actually changes.
Dependency resolution for complex relationships
Products depend on categories, manufacturers and other objects. Naive imports fail when dependencies don't exist. A topological sort with deferred processing solves this:
namespace App\Import\Resolver;
use Pimcore\Model\DataObject;
class DependencyResolver
{
private array $deferredQueue = [];
private array $createdObjects = [];
private int $maxPasses = 3;
public function process(array $records, callable $processor): ImportResult
{
$result = new ImportResult();
for ($pass = 0; $pass < $this->maxPasses; $pass++) {
$remaining = [];
foreach ($records as $record) {
try {
$dependencies = $this->extractDependencies($record);
if ($this->allDependenciesMet($dependencies)) {
$object = $processor($record);
$this->createdObjects[$record['sku']] = $object->getId();
$result->success++;
} else {
$remaining[] = $record;
}
} catch (DependencyException $e) {
$remaining[] = $record;
}
}
if (empty($remaining) || count($remaining) === count($records)) {
break;
}
$records = $remaining;
}
$result->failed = count($remaining);
$result->failedRecords = $remaining;
return $result;
}
private function extractDependencies(array $record): array
{
$deps = [];
if (!empty($record['category_path'])) {
$deps[] = ['type' => 'category', 'path' => $record['category_path']];
}
if (!empty($record['parent_sku'])) {
$deps[] = ['type' => 'product', 'sku' => $record['parent_sku']];
}
foreach ($record['related_skus'] ?? [] as $relatedSku) {
$deps[] = ['type' => 'product', 'sku' => $relatedSku, 'optional' => true];
}
return $deps;
}
private function allDependenciesMet(array $dependencies): bool
{
foreach ($dependencies as $dep) {
if ($dep['optional'] ?? false) continue;
if (!$this->dependencyExists($dep)) {
return false;
}
}
return true;
}
}This multi-pass approach handles circular dependencies (product A references B, B references A) by linking optional relationships in a later pass.
Performance: bypassing Pimcore internals
During bulk imports, Pimcore's convenience features become bottlenecks. Versioning, search indexing and event dispatching cost more time than the actual data operations:
namespace App\Import\Service;
use Pimcore\Model\DataObject\Product;
use Pimcore\Model\DataObject\Service;
use Pimcore\Model\Version;
use Pimcore\Db;
class BulkImportService
{
public function importBatch(array $records): void
{
Version::disable();
\Pimcore\Model\DataObject::setHideUnpublished(false);
$this->disableSearchIndexing();
$db = Db::get();
$db->beginTransaction();
try {
foreach (array_chunk($records, 100) as $chunk) {
foreach ($chunk as $record) {
$product = $this->createOrUpdateProduct($record);
$product->save([
'versionNote' => 'Bulk import',
'isAutoSave' => true
]);
}
\Pimcore::collectGarbage();
Db::get()->getConfiguration()->setSQLLogger(null);
}
$db->commit();
} catch (\Throwable $e) {
$db->rollBack();
throw $e;
} finally {
Version::enable();
$this->enableSearchIndexing();
}
$this->dispatchSearchIndexJob();
}
private function disableSearchIndexing(): void
{
$dispatcher = \Pimcore::getEventDispatcher();
$this->originalListeners = $dispatcher->getListeners('pimcore.dataobject.postUpdate');
foreach ($this->originalListeners as $listener) {
if ($this->isSearchIndexListener($listener)) {
$dispatcher->removeListener('pimcore.dataobject.postUpdate', $listener);
}
}
}
}Benchmarks show 10-20x speed improvement when disabling versioning and search indexing. For 100,000 products: from 8 hours to 25 minutes.
Resumable imports with checkpointing
Long imports can fail due to memory limits, timeouts or external factors. Checkpointing makes imports resumable from the last successful point:
namespace App\Import\Service;
use Pimcore\Db;
use Symfony\Component\Lock\LockFactory;
class ResumableImportService
{
private const CHECKPOINT_TABLE = 'import_checkpoints';
public function __construct(
private LockFactory $lockFactory,
private string $importId
) {}
public function run(iterable $records, callable $processor): ImportReport
{
$lock = $this->lockFactory->createLock('import_' . $this->importId, 3600);
if (!$lock->acquire()) {
throw new ImportLockedException('Import already running');
}
try {
$checkpoint = $this->loadCheckpoint();
$report = new ImportReport($this->importId);
$report->resumedFrom = $checkpoint['processed_count'] ?? 0;
$currentIndex = 0;
$batchBuffer = [];
foreach ($records as $record) {
$currentIndex++;
if ($currentIndex <= $report->resumedFrom) {
continue;
}
$batchBuffer[] = $record;
if (count($batchBuffer) >= 100) {
$this->processBatch($batchBuffer, $processor, $report);
$this->saveCheckpoint($currentIndex, $report);
$batchBuffer = [];
$lock->refresh();
}
}
if (!empty($batchBuffer)) {
$this->processBatch($batchBuffer, $processor, $report);
}
$this->clearCheckpoint();
$report->completed = true;
return $report;
} finally {
$lock->release();
}
}
private function saveCheckpoint(int $processedCount, ImportReport $report): void
{
$db = Db::get();
$db->executeStatement(
'INSERT INTO ' . self::CHECKPOINT_TABLE . '
(import_id, processed_count, success_count, error_count, last_sku, updated_at)
VALUES (?, ?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
processed_count = VALUES(processed_count),
success_count = VALUES(success_count),
error_count = VALUES(error_count),
last_sku = VALUES(last_sku),
updated_at = NOW()',
[
$this->importId,
$processedCount,
$report->successCount,
$report->errorCount,
$report->lastProcessedSku
]
);
}
}Combine this with Symfony's Lock component to prevent race conditions during parallel cron executions. The checkpoint table also serves as an audit log for import history.
Complex import requirements?
We build enterprise-grade import pipelines that scale to millions of records with full data integrity.
Get in touch