Import: Process Implementation

1. Overview

This document describes the ImportProcess implementation for asynchronous spreadsheet imports. This approach uses an ImportController class modeled on the existing FormController pattern, providing familiar patterns and no additional runtime dependencies.

2. Status

Implementation Status: Planned (Phase B)

This is the recommended initial implementation approach. See Import Fluxnova Implementation for the future BPMN-based alternative.

3. Architecture

3.1. Design Principles

  1. Reuse FormController patterns - The existing FormController successfully manages multi-step workflows with validation and state persistence

  2. Separate business logic from orchestration - Step handlers contain reusable logic

  3. Use existing entities - Leverage ProcessInstance for state tracking

  4. Generic context reference - Single contextId field supports multiple import types

3.2. Component Diagram

┌─────────────────────┐
│   ImportResource    │  REST API
│   (Controller)      │
└──────────┬──────────┘
           │
┌──────────▼──────────┐
│ ImportProcess       │  Orchestrator Service
│ Orchestrator        │
└──────────┬──────────┘
           │
┌──────────▼──────────┐
│  ImportController   │  Workflow Controller (like FormController)
│                     │
│  - processInstance  │
│  - stepHandlers     │
│  - next()           │
└──────────┬──────────┘
           │
    ┌──────┴──────┐
    │             │
┌───▼───┐   ┌─────▼─────┐   ┌────────────┐
│Column │   │   Cell    │   │    Row     │
│Mapping│   │  Mapping  │   │ Processing │
│Handler│   │  Handler  │   │  Handler   │
└───────┘   └───────────┘   └────────────┘

4. Database Schema

4.1. ImportJob Entity

CREATE TABLE import_job (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,

    -- Import type discriminator
    import_type VARCHAR(20) NOT NULL,  -- EVENT_PARTICIPANT, MEMBERSHIP, RESULT

    -- Current workflow state
    current_step VARCHAR(20) NOT NULL,  -- UPLOADED, COLUMN_MAPPING, etc.

    -- File storage (FK to existing Attachment entity)
    attachment_id BIGINT NOT NULL,

    -- Organisation scope
    organisation_id BIGINT NOT NULL,

    -- Generic context reference (event, period, or race ID based on import_type)
    context_id BIGINT NOT NULL,

    -- Progress tracking
    total_rows INT,
    processed_rows INT DEFAULT 0,
    success_count INT DEFAULT 0,
    error_count INT DEFAULT 0,

    -- Timestamps
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP,
    completed_at TIMESTAMP,
    expire_at TIMESTAMP,

    -- Foreign keys
    CONSTRAINT fk_import_attachment FOREIGN KEY (attachment_id) REFERENCES attachment(id),
    CONSTRAINT fk_import_org FOREIGN KEY (organisation_id) REFERENCES organisation(id)
);

Design Decision: The context_id field is a generic BIGINT without referential integrity constraints. This allows extending to new import types without schema changes. The import_type discriminator indicates how to interpret the context:

import_type context_id references

EVENT_PARTICIPANT

event.id

MEMBERSHIP

membership_period.id

RESULT

race.id

4.2. Supporting Tables

4.2.1. ImportColumnMapping

CREATE TABLE import_column_mapping (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    import_job_id BIGINT NOT NULL,
    column_index INT NOT NULL,
    source_header VARCHAR(255),
    target_field VARCHAR(50),
    is_required BOOLEAN DEFAULT FALSE,
    status VARCHAR(20) NOT NULL,  -- AUTO_MATCHED, MANUAL_MATCHED, UNMATCHED, IGNORED
    confidence_score DOUBLE,

    CONSTRAINT fk_colmap_job FOREIGN KEY (import_job_id)
        REFERENCES import_job(id) ON DELETE CASCADE
);

4.2.2. ImportCellMapping

CREATE TABLE import_cell_mapping (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    import_job_id BIGINT NOT NULL,
    target_field VARCHAR(50) NOT NULL,
    source_value VARCHAR(255) NOT NULL,
    normalized_value VARCHAR(255),
    target_entity_id BIGINT,
    status VARCHAR(20) NOT NULL,
    occurrence_count INT DEFAULT 1,

    CONSTRAINT fk_cellmap_job FOREIGN KEY (import_job_id)
        REFERENCES import_job(id) ON DELETE CASCADE
);

4.2.3. ImportRowResult

CREATE TABLE import_row_result (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    import_job_id BIGINT NOT NULL,
    row_number INT NOT NULL,
    outcome VARCHAR(20) NOT NULL,  -- CREATED, UPDATED, SKIPPED, ERROR
    message VARCHAR(500),
    created_entity_id BIGINT,

    CONSTRAINT fk_rowres_job FOREIGN KEY (import_job_id)
        REFERENCES import_job(id) ON DELETE CASCADE
);

5. Core Interfaces

5.1. ImportStepHandler

Step handlers contain the business logic shared with the future Fluxnova implementation:

public interface ImportStepHandler<I, O> {

    /**
     * Execute this step's logic.
     * @param context Import context with job data and services
     * @param input Input data for this step (e.g., submitted mappings)
     * @return Result indicating success, needs input, or error
     */
    StepResult<O> execute(ImportContext context, I input);

    /**
     * Check if this step can proceed without user input.
     */
    boolean canProceed(ImportContext context);

    /**
     * Get pending items requiring user resolution.
     */
    List<?> getPendingItems(ImportContext context);
}

5.2. StepResult

public class StepResult<T> {

    public enum Outcome {
        COMPLETED,    // Step finished successfully
        NEEDS_INPUT,  // Waiting for user to provide mappings
        ERROR         // Fatal error occurred
    }

    private Outcome outcome;
    private T output;
    private String message;
    private List<?> pendingItems;

    // Static factory methods
    public static <T> StepResult<T> completed(T output) { ... }
    public static <T> StepResult<T> needsInput(List<?> pending) { ... }
    public static <T> StepResult<T> error(String message) { ... }
}

5.3. ImportContext

Carries job data and services through the processing pipeline:

public class ImportContext {

    private ImportJob job;
    private SpreadsheetReader reader;
    private Organisation organisation;

    // Cached reference data for FK resolution
    private List<EventCategory> eventCategories;
    private List<Country> countries;
    private List<CustomListValue> customLists;

    // Services (injected via orchestrator)
    private EventParticipantServiceEx eventParticipantService;
    private MembershipImportService membershipService;
    private RaceResultServiceEx resultService;

    // Utility methods
    public Long getContextId() {
        return job.getContextId();
    }

    public ImportType getImportType() {
        return job.getImportType();
    }
}

6. ImportController

The central workflow controller, modeled on FormController:

/**
 * Manages an import job's lifecycle, modeled on FormController.
 * Wraps ImportJob and orchestrates step-by-step processing.
 */
public class ImportController {

    private final ImportJob job;
    private final Map<ImportStep, ImportStepHandler<?, ?>> handlers;
    private final ImportContext context;
    private final ImportJobRepository jobRepository;

    public ImportController(ImportJob job,
                           ImportContext context,
                           Map<ImportStep, ImportStepHandler<?, ?>> handlers,
                           ImportJobRepository jobRepository) {
        this.job = job;
        this.context = context;
        this.handlers = handlers;
        this.jobRepository = jobRepository;
    }

    /**
     * Process submitted data and advance if valid.
     * Similar to FormController.next()
     *
     * @param submission User-submitted mappings (may be null for initial call)
     * @return DTO with current status and any pending items
     */
    public ImportJobDTO next(ImportSubmissionDTO submission) {
        // Ensure job is still active
        if (job.getCurrentStep().isTerminal()) {
            return errorDTO("Import already " + job.getCurrentStep());
        }

        // Get handler for current step
        ImportStepHandler<?, ?> handler = handlers.get(job.getCurrentStep());

        // Apply submitted data (mappings) if provided
        if (submission != null) {
            applySubmission(submission);
        }

        // Execute the step
        StepResult<?> result = handler.execute(context, submission);

        if (result.getOutcome() == StepResult.Outcome.NEEDS_INPUT) {
            // Stay on current step, return pending items
            return statusDTO(result.getPendingItems());
        }

        if (result.getOutcome() == StepResult.Outcome.ERROR) {
            job.setCurrentStep(ImportStep.FAILED);
            jobRepository.save(job);
            return errorDTO(result.getMessage());
        }

        // Advance to next step
        return advanceToNextStep();
    }

    /**
     * Advance through steps until one needs input or all complete.
     * Similar to FormController.setNextStep()
     */
    private ImportJobDTO advanceToNextStep() {
        ImportStep nextStep = job.getCurrentStep().next();

        while (!nextStep.isTerminal()) {
            job.setCurrentStep(nextStep);
            jobRepository.save(job);

            ImportStepHandler<?, ?> handler = handlers.get(nextStep);

            // Check if this step can proceed automatically
            if (!handler.canProceed(context)) {
                return statusDTO(handler.getPendingItems(context));
            }

            // Execute the step
            StepResult<?> result = handler.execute(context, null);

            if (result.getOutcome() == StepResult.Outcome.NEEDS_INPUT) {
                return statusDTO(result.getPendingItems());
            }

            if (result.getOutcome() == StepResult.Outcome.ERROR) {
                job.setCurrentStep(ImportStep.FAILED);
                jobRepository.save(job);
                return errorDTO(result.getMessage());
            }

            nextStep = nextStep.next();
        }

        // All steps completed
        job.setCurrentStep(ImportStep.COMPLETED);
        job.setCompletedAt(Instant.now());
        job.setExpireAt(Instant.now().plus(7, ChronoUnit.DAYS));
        jobRepository.save(job);

        return completedDTO();
    }
}

7. ImportProcessOrchestrator

The service that creates and manages ImportController instances:

@Service
public class ImportProcessOrchestrator implements ImportOrchestrator {

    private final ImportJobRepository jobRepository;
    private final AttachmentService attachmentService;
    private final Map<ImportStep, ImportStepHandler<?, ?>> handlers;

    @Override
    public ImportJobDTO startImport(ImportStartRequest request) {
        // Store file as attachment
        Attachment attachment = attachmentService.store(
            request.getFile(),
            request.getOrganisationId()
        );

        // Create job with generic context_id
        ImportJob job = new ImportJob();
        job.setSourceFile(attachment);
        job.setImportType(request.getImportType());
        job.setContextId(request.getContextId());  // Generic reference
        job.setCurrentStep(ImportStep.UPLOADED);
        job.setOrganisation(request.getOrganisation());
        job.setCreatedAt(Instant.now());
        jobRepository.save(job);

        // Create controller and start processing
        ImportController controller = createController(job);
        return controller.next(null);  // Triggers column analysis
    }

    @Override
    public ImportJobDTO getStatus(Long jobId) {
        ImportJob job = findJob(jobId);
        ImportController controller = createController(job);

        // Return current status without advancing
        ImportStepHandler<?, ?> handler = handlers.get(job.getCurrentStep());
        return statusDTO(job, handler.getPendingItems(context));
    }

    @Override
    public ImportJobDTO submitColumnMappings(Long jobId, List<ColumnMappingDTO> mappings) {
        ImportJob job = findJob(jobId);

        ImportSubmissionDTO submission = new ImportSubmissionDTO();
        submission.setColumnMappings(mappings);

        ImportController controller = createController(job);
        return controller.next(submission);
    }

    @Override
    public ImportJobDTO submitCellMappings(Long jobId, List<CellMappingDTO> mappings) {
        ImportJob job = findJob(jobId);

        ImportSubmissionDTO submission = new ImportSubmissionDTO();
        submission.setCellMappings(mappings);

        ImportController controller = createController(job);
        return controller.next(submission);
    }

    @Override
    public void cancel(Long jobId) {
        ImportJob job = findJob(jobId);

        if (job.getCurrentStep().isTerminal()) {
            throw new IllegalStateException("Cannot cancel completed import");
        }

        job.setCurrentStep(ImportStep.CANCELLED);
        job.setCompletedAt(Instant.now());
        jobRepository.save(job);
    }

    private ImportController createController(ImportJob job) {
        ImportContext context = buildContext(job);
        return new ImportController(job, context, handlers, jobRepository);
    }
}

8. API Design

8.1. Endpoints

Method Endpoint Description

POST

/api/imports

Upload file and create import job

GET

/api/imports/{id}

Get current status and pending mappings

PUT

/api/imports/{id}/column-mappings

Submit column mapping corrections

PUT

/api/imports/{id}/cell-mappings

Submit cell mapping corrections

GET

/api/imports/{id}/results

Get paginated row results

DELETE

/api/imports/{id}

Cancel import

Note: The API uses ImportJob.id (Long) directly, not UUID. This aligns with the FormController pattern and simplifies the implementation.

8.2. Response Codes

Code Status Meaning

201 Created

Any

Import job created successfully

200 OK

Any

Status returned successfully

202 Accepted

After mapping submission

Mappings accepted, processing continues

406 Not Acceptable

COLUMN_MAPPING, CELL_MAPPING

Required mappings still missing

404 Not Found

-

Import job not found

409 Conflict

COMPLETED, FAILED, CANCELLED

Cannot modify terminal state

9. Step Handlers

9.1. ColumnMappingHandler

Analyzes spreadsheet headers and matches to dictionary:

@Component
public class ColumnMappingHandler implements ImportStepHandler<Void, List<ImportColumnMapping>> {

    private final ImportColumnMappingRepository mappingRepository;

    @Override
    public StepResult<List<ImportColumnMapping>> execute(ImportContext context, Void input) {
        ImportJob job = context.getJob();
        SpreadsheetReader reader = context.getReader();

        // Get header row
        List<String> headers = reader.getHeaders();

        // Match each header against dictionary
        List<ImportColumnMapping> mappings = new ArrayList<>();
        for (int i = 0; i < headers.size(); i++) {
            ImportColumnMapping mapping = matchHeader(headers.get(i), i, job);
            mappings.add(mapping);
        }

        // Save mappings
        mappingRepository.saveAll(mappings);

        // Check if all required columns matched
        boolean allRequiredMapped = mappings.stream()
            .filter(ImportColumnMapping::isRequired)
            .allMatch(m -> m.getStatus() != MappingStatus.UNMATCHED);

        if (allRequiredMapped) {
            return StepResult.completed(mappings);
        } else {
            return StepResult.needsInput(getUnmatchedRequired(mappings));
        }
    }

    @Override
    public boolean canProceed(ImportContext context) {
        // Check if mappings exist and all required are matched
        List<ImportColumnMapping> mappings = getMappings(context.getJob());
        return mappings.stream()
            .filter(ImportColumnMapping::isRequired)
            .allMatch(m -> m.getStatus() != MappingStatus.UNMATCHED);
    }

    @Override
    public List<?> getPendingItems(ImportContext context) {
        return getUnmatchedRequired(getMappings(context.getJob()));
    }
}

9.2. CellMappingHandler

Resolves FK values to entity IDs:

@Component
public class CellMappingHandler implements ImportStepHandler<Void, List<ImportCellMapping>> {

    @Override
    public StepResult<List<ImportCellMapping>> execute(ImportContext context, Void input) {
        ImportJob job = context.getJob();

        // Get FK columns that need resolution
        List<ImportColumnMapping> fkColumns = getForeignKeyColumns(job);

        // Scan spreadsheet for unique values in FK columns
        Map<String, Set<String>> uniqueValues = scanForUniqueValues(context, fkColumns);

        // Try to resolve each value
        List<ImportCellMapping> cellMappings = new ArrayList<>();
        for (var entry : uniqueValues.entrySet()) {
            String field = entry.getKey();
            for (String value : entry.getValue()) {
                ImportCellMapping mapping = resolveValue(context, field, value);
                cellMappings.add(mapping);
            }
        }

        // Save mappings
        cellMappingRepository.saveAll(cellMappings);

        // Check if all resolved
        boolean allResolved = cellMappings.stream()
            .allMatch(m -> m.getStatus() != MappingStatus.UNMATCHED);

        if (allResolved) {
            return StepResult.completed(cellMappings);
        } else {
            return StepResult.needsInput(getUnresolved(cellMappings));
        }
    }
}

9.3. RowProcessingHandler

Processes rows using existing import services:

@Component
public class RowProcessingHandler implements ImportStepHandler<Void, ImportJobDTO> {

    @Override
    @Async("importTaskExecutor")
    public StepResult<ImportJobDTO> execute(ImportContext context, Void input) {
        ImportJob job = context.getJob();
        SpreadsheetReader reader = context.getReader();

        // Get resolved mappings
        Map<Integer, String> columnMap = getColumnMap(job);
        Map<String, Map<String, Long>> cellMap = getCellMap(job);

        // Process each row
        int rowNum = 1;  // Skip header
        while (reader.hasNext()) {
            Row row = reader.next();

            try {
                ImportRowResult result = processRow(context, row, columnMap, cellMap);
                result.setRowNumber(rowNum);
                rowResultRepository.save(result);

                if (result.getOutcome() == ImportOutcome.CREATED ||
                    result.getOutcome() == ImportOutcome.UPDATED) {
                    job.incrementSuccess();
                } else {
                    job.incrementError();
                }
            } catch (UnmappedCellException e) {
                // New unmapped value discovered - need to pause
                savePendingCellMapping(job, e);
                job.setCurrentStep(ImportStep.CELL_MAPPING);
                jobRepository.save(job);
                return StepResult.needsInput(List.of(e.getMapping()));
            } catch (Exception e) {
                saveErrorResult(job, rowNum, e);
                job.incrementError();
            }

            job.incrementProcessed();
            if (rowNum % 100 == 0) {
                jobRepository.save(job);  // Periodic progress save
            }
            rowNum++;
        }

        return StepResult.completed(toDTO(job));
    }
}

10. Async Processing

10.1. Thread Pool Configuration

@Configuration
@EnableAsync
public class ImportAsyncConfig {

    @Bean(name = "importTaskExecutor")
    public Executor importTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("import-");
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

11. Purging

11.1. Retention Rules

Status Retention Rationale

COMPLETED, FAILED, CANCELLED

7 days

Allow review and debugging

UPLOADED, COLUMN_MAPPING, CELL_MAPPING

90 days

User may return to complete

PROCESSING

No auto-purge

Requires investigation

11.2. Scheduled Cleanup

@Service
public class ImportJobCleanupService {

    @Scheduled(cron = "0 0 2 * * ?")  // Daily at 2 AM
    @Transactional
    public void cleanupExpiredImports() {
        Instant now = Instant.now();

        // Terminal states: 7 days
        deleteByStatusesAndDate(
            Set.of(COMPLETED, FAILED, CANCELLED),
            now.minus(7, ChronoUnit.DAYS)
        );

        // Abandoned: 90 days
        deleteByStatusesAndDate(
            Set.of(UPLOADED, COLUMN_MAPPING, CELL_MAPPING),
            now.minus(90, ChronoUnit.DAYS)
        );
    }
}