Import: Process Implementation
1. Overview
This document describes the ImportProcess implementation for asynchronous spreadsheet imports. This approach uses an ImportController class modeled on the existing FormController pattern, providing familiar patterns and no additional runtime dependencies.
2. Status
Implementation Status: Planned (Phase B)
This is the recommended initial implementation approach. See Import Fluxnova Implementation for the future BPMN-based alternative.
3. Architecture
3.1. Design Principles
-
Reuse FormController patterns - The existing
FormControllersuccessfully manages multi-step workflows with validation and state persistence -
Separate business logic from orchestration - Step handlers contain reusable logic
-
Use existing entities - Leverage
ProcessInstancefor state tracking -
Generic context reference - Single
contextIdfield supports multiple import types
3.2. Component Diagram
┌─────────────────────┐
│ ImportResource │ REST API
│ (Controller) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ ImportProcess │ Orchestrator Service
│ Orchestrator │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ ImportController │ Workflow Controller (like FormController)
│ │
│ - processInstance │
│ - stepHandlers │
│ - next() │
└──────────┬──────────┘
│
┌──────┴──────┐
│ │
┌───▼───┐ ┌─────▼─────┐ ┌────────────┐
│Column │ │ Cell │ │ Row │
│Mapping│ │ Mapping │ │ Processing │
│Handler│ │ Handler │ │ Handler │
└───────┘ └───────────┘ └────────────┘
4. Database Schema
4.1. ImportJob Entity
CREATE TABLE import_job (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
-- Import type discriminator
import_type VARCHAR(20) NOT NULL, -- EVENT_PARTICIPANT, MEMBERSHIP, RESULT
-- Current workflow state
current_step VARCHAR(20) NOT NULL, -- UPLOADED, COLUMN_MAPPING, etc.
-- File storage (FK to existing Attachment entity)
attachment_id BIGINT NOT NULL,
-- Organisation scope
organisation_id BIGINT NOT NULL,
-- Generic context reference (event, period, or race ID based on import_type)
context_id BIGINT NOT NULL,
-- Progress tracking
total_rows INT,
processed_rows INT DEFAULT 0,
success_count INT DEFAULT 0,
error_count INT DEFAULT 0,
-- Timestamps
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP,
completed_at TIMESTAMP,
expire_at TIMESTAMP,
-- Foreign keys
CONSTRAINT fk_import_attachment FOREIGN KEY (attachment_id) REFERENCES attachment(id),
CONSTRAINT fk_import_org FOREIGN KEY (organisation_id) REFERENCES organisation(id)
);
Design Decision: The context_id field is a generic BIGINT without referential integrity constraints. This allows extending to new import types without schema changes. The import_type discriminator indicates how to interpret the context:
| import_type | context_id references |
|---|---|
EVENT_PARTICIPANT |
event.id |
MEMBERSHIP |
membership_period.id |
RESULT |
race.id |
4.2. Supporting Tables
4.2.1. ImportColumnMapping
CREATE TABLE import_column_mapping (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
import_job_id BIGINT NOT NULL,
column_index INT NOT NULL,
source_header VARCHAR(255),
target_field VARCHAR(50),
is_required BOOLEAN DEFAULT FALSE,
status VARCHAR(20) NOT NULL, -- AUTO_MATCHED, MANUAL_MATCHED, UNMATCHED, IGNORED
confidence_score DOUBLE,
CONSTRAINT fk_colmap_job FOREIGN KEY (import_job_id)
REFERENCES import_job(id) ON DELETE CASCADE
);
4.2.2. ImportCellMapping
CREATE TABLE import_cell_mapping (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
import_job_id BIGINT NOT NULL,
target_field VARCHAR(50) NOT NULL,
source_value VARCHAR(255) NOT NULL,
normalized_value VARCHAR(255),
target_entity_id BIGINT,
status VARCHAR(20) NOT NULL,
occurrence_count INT DEFAULT 1,
CONSTRAINT fk_cellmap_job FOREIGN KEY (import_job_id)
REFERENCES import_job(id) ON DELETE CASCADE
);
4.2.3. ImportRowResult
CREATE TABLE import_row_result (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
import_job_id BIGINT NOT NULL,
row_number INT NOT NULL,
outcome VARCHAR(20) NOT NULL, -- CREATED, UPDATED, SKIPPED, ERROR
message VARCHAR(500),
created_entity_id BIGINT,
CONSTRAINT fk_rowres_job FOREIGN KEY (import_job_id)
REFERENCES import_job(id) ON DELETE CASCADE
);
5. Core Interfaces
5.1. ImportStepHandler
Step handlers contain the business logic shared with the future Fluxnova implementation:
public interface ImportStepHandler<I, O> {
/**
* Execute this step's logic.
* @param context Import context with job data and services
* @param input Input data for this step (e.g., submitted mappings)
* @return Result indicating success, needs input, or error
*/
StepResult<O> execute(ImportContext context, I input);
/**
* Check if this step can proceed without user input.
*/
boolean canProceed(ImportContext context);
/**
* Get pending items requiring user resolution.
*/
List<?> getPendingItems(ImportContext context);
}
5.2. StepResult
public class StepResult<T> {
public enum Outcome {
COMPLETED, // Step finished successfully
NEEDS_INPUT, // Waiting for user to provide mappings
ERROR // Fatal error occurred
}
private Outcome outcome;
private T output;
private String message;
private List<?> pendingItems;
// Static factory methods
public static <T> StepResult<T> completed(T output) { ... }
public static <T> StepResult<T> needsInput(List<?> pending) { ... }
public static <T> StepResult<T> error(String message) { ... }
}
5.3. ImportContext
Carries job data and services through the processing pipeline:
public class ImportContext {
private ImportJob job;
private SpreadsheetReader reader;
private Organisation organisation;
// Cached reference data for FK resolution
private List<EventCategory> eventCategories;
private List<Country> countries;
private List<CustomListValue> customLists;
// Services (injected via orchestrator)
private EventParticipantServiceEx eventParticipantService;
private MembershipImportService membershipService;
private RaceResultServiceEx resultService;
// Utility methods
public Long getContextId() {
return job.getContextId();
}
public ImportType getImportType() {
return job.getImportType();
}
}
6. ImportController
The central workflow controller, modeled on FormController:
/**
* Manages an import job's lifecycle, modeled on FormController.
* Wraps ImportJob and orchestrates step-by-step processing.
*/
public class ImportController {
private final ImportJob job;
private final Map<ImportStep, ImportStepHandler<?, ?>> handlers;
private final ImportContext context;
private final ImportJobRepository jobRepository;
public ImportController(ImportJob job,
ImportContext context,
Map<ImportStep, ImportStepHandler<?, ?>> handlers,
ImportJobRepository jobRepository) {
this.job = job;
this.context = context;
this.handlers = handlers;
this.jobRepository = jobRepository;
}
/**
* Process submitted data and advance if valid.
* Similar to FormController.next()
*
* @param submission User-submitted mappings (may be null for initial call)
* @return DTO with current status and any pending items
*/
public ImportJobDTO next(ImportSubmissionDTO submission) {
// Ensure job is still active
if (job.getCurrentStep().isTerminal()) {
return errorDTO("Import already " + job.getCurrentStep());
}
// Get handler for current step
ImportStepHandler<?, ?> handler = handlers.get(job.getCurrentStep());
// Apply submitted data (mappings) if provided
if (submission != null) {
applySubmission(submission);
}
// Execute the step
StepResult<?> result = handler.execute(context, submission);
if (result.getOutcome() == StepResult.Outcome.NEEDS_INPUT) {
// Stay on current step, return pending items
return statusDTO(result.getPendingItems());
}
if (result.getOutcome() == StepResult.Outcome.ERROR) {
job.setCurrentStep(ImportStep.FAILED);
jobRepository.save(job);
return errorDTO(result.getMessage());
}
// Advance to next step
return advanceToNextStep();
}
/**
* Advance through steps until one needs input or all complete.
* Similar to FormController.setNextStep()
*/
private ImportJobDTO advanceToNextStep() {
ImportStep nextStep = job.getCurrentStep().next();
while (!nextStep.isTerminal()) {
job.setCurrentStep(nextStep);
jobRepository.save(job);
ImportStepHandler<?, ?> handler = handlers.get(nextStep);
// Check if this step can proceed automatically
if (!handler.canProceed(context)) {
return statusDTO(handler.getPendingItems(context));
}
// Execute the step
StepResult<?> result = handler.execute(context, null);
if (result.getOutcome() == StepResult.Outcome.NEEDS_INPUT) {
return statusDTO(result.getPendingItems());
}
if (result.getOutcome() == StepResult.Outcome.ERROR) {
job.setCurrentStep(ImportStep.FAILED);
jobRepository.save(job);
return errorDTO(result.getMessage());
}
nextStep = nextStep.next();
}
// All steps completed
job.setCurrentStep(ImportStep.COMPLETED);
job.setCompletedAt(Instant.now());
job.setExpireAt(Instant.now().plus(7, ChronoUnit.DAYS));
jobRepository.save(job);
return completedDTO();
}
}
7. ImportProcessOrchestrator
The service that creates and manages ImportController instances:
@Service
public class ImportProcessOrchestrator implements ImportOrchestrator {
private final ImportJobRepository jobRepository;
private final AttachmentService attachmentService;
private final Map<ImportStep, ImportStepHandler<?, ?>> handlers;
@Override
public ImportJobDTO startImport(ImportStartRequest request) {
// Store file as attachment
Attachment attachment = attachmentService.store(
request.getFile(),
request.getOrganisationId()
);
// Create job with generic context_id
ImportJob job = new ImportJob();
job.setSourceFile(attachment);
job.setImportType(request.getImportType());
job.setContextId(request.getContextId()); // Generic reference
job.setCurrentStep(ImportStep.UPLOADED);
job.setOrganisation(request.getOrganisation());
job.setCreatedAt(Instant.now());
jobRepository.save(job);
// Create controller and start processing
ImportController controller = createController(job);
return controller.next(null); // Triggers column analysis
}
@Override
public ImportJobDTO getStatus(Long jobId) {
ImportJob job = findJob(jobId);
ImportController controller = createController(job);
// Return current status without advancing
ImportStepHandler<?, ?> handler = handlers.get(job.getCurrentStep());
return statusDTO(job, handler.getPendingItems(context));
}
@Override
public ImportJobDTO submitColumnMappings(Long jobId, List<ColumnMappingDTO> mappings) {
ImportJob job = findJob(jobId);
ImportSubmissionDTO submission = new ImportSubmissionDTO();
submission.setColumnMappings(mappings);
ImportController controller = createController(job);
return controller.next(submission);
}
@Override
public ImportJobDTO submitCellMappings(Long jobId, List<CellMappingDTO> mappings) {
ImportJob job = findJob(jobId);
ImportSubmissionDTO submission = new ImportSubmissionDTO();
submission.setCellMappings(mappings);
ImportController controller = createController(job);
return controller.next(submission);
}
@Override
public void cancel(Long jobId) {
ImportJob job = findJob(jobId);
if (job.getCurrentStep().isTerminal()) {
throw new IllegalStateException("Cannot cancel completed import");
}
job.setCurrentStep(ImportStep.CANCELLED);
job.setCompletedAt(Instant.now());
jobRepository.save(job);
}
private ImportController createController(ImportJob job) {
ImportContext context = buildContext(job);
return new ImportController(job, context, handlers, jobRepository);
}
}
8. API Design
8.1. Endpoints
| Method | Endpoint | Description |
|---|---|---|
POST |
|
Upload file and create import job |
GET |
|
Get current status and pending mappings |
PUT |
|
Submit column mapping corrections |
PUT |
|
Submit cell mapping corrections |
GET |
|
Get paginated row results |
DELETE |
|
Cancel import |
Note: The API uses ImportJob.id (Long) directly, not UUID. This aligns with the FormController pattern and simplifies the implementation.
8.2. Response Codes
| Code | Status | Meaning |
|---|---|---|
201 Created |
Any |
Import job created successfully |
200 OK |
Any |
Status returned successfully |
202 Accepted |
After mapping submission |
Mappings accepted, processing continues |
406 Not Acceptable |
COLUMN_MAPPING, CELL_MAPPING |
Required mappings still missing |
404 Not Found |
- |
Import job not found |
409 Conflict |
COMPLETED, FAILED, CANCELLED |
Cannot modify terminal state |
9. Step Handlers
9.1. ColumnMappingHandler
Analyzes spreadsheet headers and matches to dictionary:
@Component
public class ColumnMappingHandler implements ImportStepHandler<Void, List<ImportColumnMapping>> {
private final ImportColumnMappingRepository mappingRepository;
@Override
public StepResult<List<ImportColumnMapping>> execute(ImportContext context, Void input) {
ImportJob job = context.getJob();
SpreadsheetReader reader = context.getReader();
// Get header row
List<String> headers = reader.getHeaders();
// Match each header against dictionary
List<ImportColumnMapping> mappings = new ArrayList<>();
for (int i = 0; i < headers.size(); i++) {
ImportColumnMapping mapping = matchHeader(headers.get(i), i, job);
mappings.add(mapping);
}
// Save mappings
mappingRepository.saveAll(mappings);
// Check if all required columns matched
boolean allRequiredMapped = mappings.stream()
.filter(ImportColumnMapping::isRequired)
.allMatch(m -> m.getStatus() != MappingStatus.UNMATCHED);
if (allRequiredMapped) {
return StepResult.completed(mappings);
} else {
return StepResult.needsInput(getUnmatchedRequired(mappings));
}
}
@Override
public boolean canProceed(ImportContext context) {
// Check if mappings exist and all required are matched
List<ImportColumnMapping> mappings = getMappings(context.getJob());
return mappings.stream()
.filter(ImportColumnMapping::isRequired)
.allMatch(m -> m.getStatus() != MappingStatus.UNMATCHED);
}
@Override
public List<?> getPendingItems(ImportContext context) {
return getUnmatchedRequired(getMappings(context.getJob()));
}
}
9.2. CellMappingHandler
Resolves FK values to entity IDs:
@Component
public class CellMappingHandler implements ImportStepHandler<Void, List<ImportCellMapping>> {
@Override
public StepResult<List<ImportCellMapping>> execute(ImportContext context, Void input) {
ImportJob job = context.getJob();
// Get FK columns that need resolution
List<ImportColumnMapping> fkColumns = getForeignKeyColumns(job);
// Scan spreadsheet for unique values in FK columns
Map<String, Set<String>> uniqueValues = scanForUniqueValues(context, fkColumns);
// Try to resolve each value
List<ImportCellMapping> cellMappings = new ArrayList<>();
for (var entry : uniqueValues.entrySet()) {
String field = entry.getKey();
for (String value : entry.getValue()) {
ImportCellMapping mapping = resolveValue(context, field, value);
cellMappings.add(mapping);
}
}
// Save mappings
cellMappingRepository.saveAll(cellMappings);
// Check if all resolved
boolean allResolved = cellMappings.stream()
.allMatch(m -> m.getStatus() != MappingStatus.UNMATCHED);
if (allResolved) {
return StepResult.completed(cellMappings);
} else {
return StepResult.needsInput(getUnresolved(cellMappings));
}
}
}
9.3. RowProcessingHandler
Processes rows using existing import services:
@Component
public class RowProcessingHandler implements ImportStepHandler<Void, ImportJobDTO> {
@Override
@Async("importTaskExecutor")
public StepResult<ImportJobDTO> execute(ImportContext context, Void input) {
ImportJob job = context.getJob();
SpreadsheetReader reader = context.getReader();
// Get resolved mappings
Map<Integer, String> columnMap = getColumnMap(job);
Map<String, Map<String, Long>> cellMap = getCellMap(job);
// Process each row
int rowNum = 1; // Skip header
while (reader.hasNext()) {
Row row = reader.next();
try {
ImportRowResult result = processRow(context, row, columnMap, cellMap);
result.setRowNumber(rowNum);
rowResultRepository.save(result);
if (result.getOutcome() == ImportOutcome.CREATED ||
result.getOutcome() == ImportOutcome.UPDATED) {
job.incrementSuccess();
} else {
job.incrementError();
}
} catch (UnmappedCellException e) {
// New unmapped value discovered - need to pause
savePendingCellMapping(job, e);
job.setCurrentStep(ImportStep.CELL_MAPPING);
jobRepository.save(job);
return StepResult.needsInput(List.of(e.getMapping()));
} catch (Exception e) {
saveErrorResult(job, rowNum, e);
job.incrementError();
}
job.incrementProcessed();
if (rowNum % 100 == 0) {
jobRepository.save(job); // Periodic progress save
}
rowNum++;
}
return StepResult.completed(toDTO(job));
}
}
10. Async Processing
10.1. Thread Pool Configuration
@Configuration
@EnableAsync
public class ImportAsyncConfig {
@Bean(name = "importTaskExecutor")
public Executor importTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("import-");
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.initialize();
return executor;
}
}
11. Purging
11.1. Retention Rules
| Status | Retention | Rationale |
|---|---|---|
COMPLETED, FAILED, CANCELLED |
7 days |
Allow review and debugging |
UPLOADED, COLUMN_MAPPING, CELL_MAPPING |
90 days |
User may return to complete |
PROCESSING |
No auto-purge |
Requires investigation |
11.2. Scheduled Cleanup
@Service
public class ImportJobCleanupService {
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
@Transactional
public void cleanupExpiredImports() {
Instant now = Instant.now();
// Terminal states: 7 days
deleteByStatusesAndDate(
Set.of(COMPLETED, FAILED, CANCELLED),
now.minus(7, ChronoUnit.DAYS)
);
// Abandoned: 90 days
deleteByStatusesAndDate(
Set.of(UPLOADED, COLUMN_MAPPING, CELL_MAPPING),
now.minus(90, ChronoUnit.DAYS)
);
}
}
12. Related Documentation
-
File Import Architecture - XLSX/CSV parsing and column mapping
-
Async Import Architecture - State machine and API design
-
Import Fluxnova Implementation - Future BPMN-based implementation
-
Event Participant Import - EP-specific columns and logic
-
Membership Import - Membership-specific columns and logic
-
Results Import - Results-specific columns and logic