Import: Fluxnova Implementation
1. Overview
This document describes the Fluxnova implementation for asynchronous spreadsheet imports. This approach uses the Fluxnova BPMN engine (FINOS Camunda 7 fork) to orchestrate the import workflow, providing visual process modelling and built-in monitoring.
|
Implementation Status: Future This implementation will be considered when the async-communication work begins, allowing both features to share the Fluxnova runtime. See Import Process Implementation for the current approach. |
2. Rationale for Future Implementation
2.1. When to Migrate
Consider migrating to Fluxnova when:
-
Async-communication feature begins - Share the Fluxnova runtime investment
-
Process monitoring becomes critical - Cockpit dashboard provides visibility
-
Complex branching is needed - BPMN excels at conditional flows
-
Timer-based workflows required - Native BPMN timer events
2.2. Migration Path
The ImportProcess implementation is designed for easy migration:
| Component | Migration Effort | Notes |
|---|---|---|
ImportStepHandler implementations |
None - reused as-is |
Same interface, same logic |
ImportOrchestrator interface |
None - same contract |
Just different implementation |
Database entities |
Additive - add BPMN tables |
ImportJob unchanged |
REST Controller |
None - calls interface |
Transparent switch |
Configuration |
Config change only |
|
3. Architecture
3.1. Design Principles
-
Reuse step handlers - Same
ImportStepHandlerimplementations as ImportProcess -
BPMN-driven flow - Process definition controls transitions
-
Message correlation - User input triggers process continuation
-
Process variables - Gateway decisions based on handler results
3.2. Component Diagram
┌─────────────────────┐
│ ImportResource │ REST API
│ (Controller) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ FluxnovaImport │ Orchestrator Service
│ Orchestrator │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ RuntimeService │ Fluxnova/Camunda API
│ (Message Correlation)│
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ BPMN Process │ spreadsheet-import.bpmn
│ Definition │
└──────────┬──────────┘
│
┌──────┴──────┐
│ │
┌───▼───┐ ┌─────▼─────┐ ┌────────────┐
│Column │ │ Cell │ │ Row │
│Mapping│ │ Mapping │ │ Processing │
│Delegate│ │ Delegate │ │ Delegate │
└───┬───┘ └─────┬─────┘ └──────┬─────┘
│ │ │
└─────────────┴────────────────┘
│
┌────────▼────────┐
│ ImportStepHandler│ Same handlers as ImportProcess!
│ implementations │
└─────────────────┘
4. BPMN Process Definition
4.1. Process Overview
┌─────────┐ ┌─────────────┐ ┌───────────────┐
│ Start │────►│ Analyze │────►│ Columns │
│ Event │ │ Columns │ │ Resolved? │
└─────────┘ └─────────────┘ └───────┬───────┘
│
┌─────────────────────┴─────────────────────┐
│ No │ Yes
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Wait for │ │ Resolve │
│ Column │◄──┐ │ Cell │
│ Mappings │ │ │ Values │
└───────┬───────┘ │ └───────┬───────┘
│ │ │
▼ │ ▼
┌───────────────┐ │ ┌───────────────┐
│ Columns │───┘ No │ Cells │
│ Resolved? │ │ Resolved? │
└───────┬───────┘ └───────┬───────┘
│ Yes │
└───────────────────────┬───────────────────┘
│
┌───────────────────────┴─────────────────────┐
│ No │ Yes
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Wait for │ │ Process │
│ Cell │◄──┐ │ Rows │
│ Mappings │ │ └───────┬───────┘
└───────┬───────┘ │ │
│ │ ▼
▼ │ ┌───────────────┐
┌───────────────┐ │ │ End │
│ Cells │───┘ No │ Event │
│ Resolved? │ └───────────────┘
└───────┬───────┘
│ Yes
└────────────────────────────────────────────►
4.2. BPMN XML (Simplified)
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:camunda="http://camunda.org/schema/1.0/bpmn">
<bpmn:process id="spreadsheet-import" name="Spreadsheet Import" isExecutable="true">
<!-- Start -->
<bpmn:startEvent id="start" />
<!-- Analyze Columns -->
<bpmn:serviceTask id="analyzeColumns" name="Analyze Columns"
camunda:delegateExpression="${columnMappingDelegate}">
</bpmn:serviceTask>
<!-- Check if columns resolved -->
<bpmn:exclusiveGateway id="columnsResolvedGateway" />
<!-- Wait for column mappings (message catch) -->
<bpmn:intermediateCatchEvent id="waitColumnMappings" name="Wait for Column Mappings">
<bpmn:messageEventDefinition messageRef="ColumnMappingsSubmitted" />
</bpmn:intermediateCatchEvent>
<!-- Re-check after user input -->
<bpmn:serviceTask id="recheckColumns" name="Recheck Columns"
camunda:delegateExpression="${columnMappingDelegate}">
</bpmn:serviceTask>
<!-- Resolve Cell Values -->
<bpmn:serviceTask id="resolveCells" name="Resolve Cell Values"
camunda:delegateExpression="${cellMappingDelegate}">
</bpmn:serviceTask>
<!-- Check if cells resolved -->
<bpmn:exclusiveGateway id="cellsResolvedGateway" />
<!-- Wait for cell mappings (message catch) -->
<bpmn:intermediateCatchEvent id="waitCellMappings" name="Wait for Cell Mappings">
<bpmn:messageEventDefinition messageRef="CellMappingsSubmitted" />
</bpmn:intermediateCatchEvent>
<!-- Process Rows -->
<bpmn:serviceTask id="processRows" name="Process Rows"
camunda:delegateExpression="${rowProcessingDelegate}">
</bpmn:serviceTask>
<!-- End -->
<bpmn:endEvent id="end" />
<!-- Sequence Flows -->
<bpmn:sequenceFlow id="flow1" sourceRef="start" targetRef="analyzeColumns" />
<bpmn:sequenceFlow id="flow2" sourceRef="analyzeColumns" targetRef="columnsResolvedGateway" />
<bpmn:sequenceFlow id="flow3" sourceRef="columnsResolvedGateway" targetRef="waitColumnMappings">
<bpmn:conditionExpression>${needsColumnMapping == true}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="flow4" sourceRef="columnsResolvedGateway" targetRef="resolveCells">
<bpmn:conditionExpression>${needsColumnMapping == false}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="flow5" sourceRef="waitColumnMappings" targetRef="recheckColumns" />
<bpmn:sequenceFlow id="flow6" sourceRef="recheckColumns" targetRef="columnsResolvedGateway" />
<bpmn:sequenceFlow id="flow7" sourceRef="resolveCells" targetRef="cellsResolvedGateway" />
<bpmn:sequenceFlow id="flow8" sourceRef="cellsResolvedGateway" targetRef="waitCellMappings">
<bpmn:conditionExpression>${needsCellMapping == true}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="flow9" sourceRef="cellsResolvedGateway" targetRef="processRows">
<bpmn:conditionExpression>${needsCellMapping == false}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="flow10" sourceRef="waitCellMappings" targetRef="resolveCells" />
<bpmn:sequenceFlow id="flow11" sourceRef="processRows" targetRef="end" />
</bpmn:process>
<!-- Message Definitions -->
<bpmn:message id="ColumnMappingsSubmitted" name="ColumnMappingsSubmitted" />
<bpmn:message id="CellMappingsSubmitted" name="CellMappingsSubmitted" />
</bpmn:definitions>
5. JavaDelegate Implementations
5.1. ColumnMappingDelegate
Wraps the existing ColumnMappingHandler:
@Component("columnMappingDelegate")
public class ColumnMappingDelegate implements JavaDelegate {
private final ColumnMappingHandler handler; // Same handler as ImportProcess!
private final ImportContextFactory contextFactory;
@Override
public void execute(DelegateExecution execution) {
Long jobId = (Long) execution.getVariable("jobId");
ImportContext context = contextFactory.create(jobId);
// Execute the same handler logic
StepResult<?> result = handler.execute(context, null);
// Set process variable for gateway decision
execution.setVariable("needsColumnMapping",
result.getOutcome() == StepResult.Outcome.NEEDS_INPUT);
// Update ImportJob status
updateJobStatus(jobId, result);
}
}
5.2. CellMappingDelegate
Wraps the existing CellMappingHandler:
@Component("cellMappingDelegate")
public class CellMappingDelegate implements JavaDelegate {
private final CellMappingHandler handler; // Same handler!
@Override
public void execute(DelegateExecution execution) {
Long jobId = (Long) execution.getVariable("jobId");
ImportContext context = contextFactory.create(jobId);
StepResult<?> result = handler.execute(context, null);
execution.setVariable("needsCellMapping",
result.getOutcome() == StepResult.Outcome.NEEDS_INPUT);
updateJobStatus(jobId, result);
}
}
5.3. RowProcessingDelegate
Wraps the existing RowProcessingHandler:
@Component("rowProcessingDelegate")
public class RowProcessingDelegate implements JavaDelegate {
private final RowProcessingHandler handler; // Same handler!
@Override
public void execute(DelegateExecution execution) {
Long jobId = (Long) execution.getVariable("jobId");
ImportContext context = contextFactory.create(jobId);
StepResult<?> result = handler.execute(context, null);
// Row processing may discover new unmapped values
if (result.getOutcome() == StepResult.Outcome.NEEDS_INPUT) {
// Throw BPMN error to redirect to cell mapping
throw new BpmnError("UNMAPPED_VALUE", "New unmapped value discovered");
}
updateJobStatus(jobId, result);
}
}
6. FluxnovaImportOrchestrator
The orchestrator service that interacts with the Fluxnova runtime:
@Service
@ConditionalOnProperty(name = "import.orchestrator", havingValue = "fluxnova")
public class FluxnovaImportOrchestrator implements ImportOrchestrator {
private final RuntimeService runtimeService;
private final ImportJobRepository jobRepository;
private final AttachmentService attachmentService;
@Override
public ImportJobDTO startImport(ImportStartRequest request) {
// Create job (same as ImportProcess)
ImportJob job = createJob(request);
// Start BPMN process
runtimeService.startProcessInstanceByKey(
"spreadsheet-import",
String.valueOf(job.getId()), // Business key = job ID
Map.of("jobId", job.getId())
);
return getStatus(job.getId());
}
@Override
public ImportJobDTO getStatus(Long jobId) {
ImportJob job = findJob(jobId);
// Query Camunda for current activity
ProcessInstance pi = runtimeService.createProcessInstanceQuery()
.processInstanceBusinessKey(String.valueOf(jobId))
.singleResult();
if (pi == null) {
// Process completed
return toDTO(job);
}
// Get current activity to determine state
String activityId = getCurrentActivityId(pi.getId());
return toDTO(job, activityId);
}
@Override
public ImportJobDTO submitColumnMappings(Long jobId, List<ColumnMappingDTO> mappings) {
// Apply mappings to database
applyColumnMappings(jobId, mappings);
// Signal process to continue
runtimeService.createMessageCorrelation("ColumnMappingsSubmitted")
.processInstanceBusinessKey(String.valueOf(jobId))
.correlateAll();
return getStatus(jobId);
}
@Override
public ImportJobDTO submitCellMappings(Long jobId, List<CellMappingDTO> mappings) {
// Apply mappings to database
applyCellMappings(jobId, mappings);
// Signal process to continue
runtimeService.createMessageCorrelation("CellMappingsSubmitted")
.processInstanceBusinessKey(String.valueOf(jobId))
.correlateAll();
return getStatus(jobId);
}
@Override
public void cancel(Long jobId) {
// Delete the process instance
ProcessInstance pi = runtimeService.createProcessInstanceQuery()
.processInstanceBusinessKey(String.valueOf(jobId))
.singleResult();
if (pi != null) {
runtimeService.deleteProcessInstance(pi.getId(), "User cancelled");
}
// Update job status
ImportJob job = findJob(jobId);
job.setCurrentStep(ImportStep.CANCELLED);
job.setCompletedAt(Instant.now());
jobRepository.save(job);
}
private String getCurrentActivityId(String processInstanceId) {
return runtimeService.getActiveActivityIds(processInstanceId)
.stream()
.findFirst()
.orElse(null);
}
}
7. Configuration
7.1. Application Properties
# Enable Fluxnova orchestrator
import:
orchestrator: fluxnova
# Fluxnova/Camunda configuration
camunda:
bpm:
database:
schema-update: true
history-level: audit
admin-user:
id: admin
password: ${CAMUNDA_ADMIN_PASSWORD}
7.2. Bean Configuration
@Configuration
public class ImportOrchestratorConfig {
@Bean
@ConditionalOnProperty(name = "import.orchestrator",
havingValue = "process-definition",
matchIfMissing = true)
public ImportOrchestrator importProcessOrchestrator(...) {
return new ImportProcessOrchestrator(...);
}
@Bean
@ConditionalOnProperty(name = "import.orchestrator",
havingValue = "fluxnova")
public ImportOrchestrator fluxnovaImportOrchestrator(...) {
return new FluxnovaImportOrchestrator(...);
}
}
8. Advantages of Fluxnova
8.1. Cockpit Monitoring
The Camunda Cockpit provides:
-
Real-time view of running imports
-
Historical process instances
-
Variable inspection
-
Manual intervention for stuck processes
8.2. Error Handling
BPMN error boundary events enable:
<bpmn:boundaryEvent id="unmappedError" attachedToRef="processRows">
<bpmn:errorEventDefinition errorRef="UNMAPPED_VALUE" />
</bpmn:boundaryEvent>
<bpmn:sequenceFlow sourceRef="unmappedError" targetRef="resolveCells" />
When row processing discovers an unmapped value, the process automatically redirects to cell mapping.
8.3. Timer Events
For future enhancements like timeout handling:
<bpmn:boundaryEvent id="mappingTimeout" attachedToRef="waitColumnMappings">
<bpmn:timerEventDefinition>
<bpmn:timeDuration>P7D</bpmn:timeDuration>
</bpmn:timerEventDefinition>
</bpmn:boundaryEvent>
<bpmn:sequenceFlow sourceRef="mappingTimeout" targetRef="cancelDueToTimeout" />
9. Comparison: ImportProcess vs Fluxnova
| Aspect | ImportProcess | Fluxnova |
|---|---|---|
Runtime |
None (pure Spring) |
Fluxnova engine + database tables |
Monitoring |
Custom queries |
Cockpit dashboard |
Process visibility |
Code inspection |
Visual BPMN diagram |
Error recovery |
Manual restart |
Compensation handlers |
Timers |
@Scheduled |
Native BPMN timers |
Learning curve |
Low (FormController pattern) |
Medium (BPMN concepts) |
Operational overhead |
Low |
Medium |
10. Related Documentation
-
Import Process Implementation - Current ImportController-based implementation
-
File Import Architecture - XLSX/CSV parsing and column mapping
-
Async Import Architecture - State machine and API design
-
design-journal/2026-01/camunda-embedded-workflow-engine.adoc- Fluxnova integration patterns