Import: Fluxnova Implementation

1. Overview

This document describes the Fluxnova implementation for asynchronous spreadsheet imports. This approach uses the Fluxnova BPMN engine (FINOS Camunda 7 fork) to orchestrate the import workflow, providing visual process modelling and built-in monitoring.

Implementation Status: Future

This implementation will be considered when the async-communication work begins, allowing both features to share the Fluxnova runtime. See Import Process Implementation for the current approach.

2. Rationale for Future Implementation

2.1. When to Migrate

Consider migrating to Fluxnova when:

  1. Async-communication feature begins - Share the Fluxnova runtime investment

  2. Process monitoring becomes critical - Cockpit dashboard provides visibility

  3. Complex branching is needed - BPMN excels at conditional flows

  4. Timer-based workflows required - Native BPMN timer events

2.2. Migration Path

The ImportProcess implementation is designed for easy migration:

Component Migration Effort Notes

ImportStepHandler implementations

None - reused as-is

Same interface, same logic

ImportOrchestrator interface

None - same contract

Just different implementation

Database entities

Additive - add BPMN tables

ImportJob unchanged

REST Controller

None - calls interface

Transparent switch

Configuration

Config change only

import.orchestrator: fluxnova

3. Architecture

3.1. Design Principles

  1. Reuse step handlers - Same ImportStepHandler implementations as ImportProcess

  2. BPMN-driven flow - Process definition controls transitions

  3. Message correlation - User input triggers process continuation

  4. Process variables - Gateway decisions based on handler results

3.2. Component Diagram

┌─────────────────────┐
│   ImportResource    │  REST API
│   (Controller)      │
└──────────┬──────────┘
           │
┌──────────▼──────────┐
│ FluxnovaImport      │  Orchestrator Service
│ Orchestrator        │
└──────────┬──────────┘
           │
┌──────────▼──────────┐
│   RuntimeService    │  Fluxnova/Camunda API
│ (Message Correlation)│
└──────────┬──────────┘
           │
┌──────────▼──────────┐
│  BPMN Process       │  spreadsheet-import.bpmn
│  Definition         │
└──────────┬──────────┘
           │
    ┌──────┴──────┐
    │             │
┌───▼───┐   ┌─────▼─────┐   ┌────────────┐
│Column │   │   Cell    │   │    Row     │
│Mapping│   │  Mapping  │   │ Processing │
│Delegate│  │  Delegate │   │  Delegate  │
└───┬───┘   └─────┬─────┘   └──────┬─────┘
    │             │                │
    └─────────────┴────────────────┘
                  │
         ┌────────▼────────┐
         │ ImportStepHandler│  Same handlers as ImportProcess!
         │ implementations  │
         └─────────────────┘

4. BPMN Process Definition

4.1. Process Overview

┌─────────┐     ┌─────────────┐     ┌───────────────┐
│  Start  │────►│   Analyze   │────►│   Columns     │
│  Event  │     │   Columns   │     │   Resolved?   │
└─────────┘     └─────────────┘     └───────┬───────┘
                                            │
                      ┌─────────────────────┴─────────────────────┐
                      │ No                                        │ Yes
                      ▼                                           ▼
              ┌───────────────┐                           ┌───────────────┐
              │ Wait for      │                           │   Resolve     │
              │ Column        │◄──┐                       │   Cell        │
              │ Mappings      │   │                       │   Values      │
              └───────┬───────┘   │                       └───────┬───────┘
                      │           │                               │
                      ▼           │                               ▼
              ┌───────────────┐   │                       ┌───────────────┐
              │   Columns     │───┘ No                    │    Cells      │
              │   Resolved?   │                           │   Resolved?   │
              └───────┬───────┘                           └───────┬───────┘
                      │ Yes                                       │
                      └───────────────────────┬───────────────────┘
                                              │
                      ┌───────────────────────┴─────────────────────┐
                      │ No                                          │ Yes
                      ▼                                             ▼
              ┌───────────────┐                             ┌───────────────┐
              │ Wait for      │                             │   Process     │
              │ Cell          │◄──┐                         │   Rows        │
              │ Mappings      │   │                         └───────┬───────┘
              └───────┬───────┘   │                                 │
                      │           │                                 ▼
                      ▼           │                         ┌───────────────┐
              ┌───────────────┐   │                         │     End       │
              │    Cells      │───┘ No                      │    Event      │
              │   Resolved?   │                             └───────────────┘
              └───────┬───────┘
                      │ Yes
                      └────────────────────────────────────────────►

4.2. BPMN XML (Simplified)

<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL"
                  xmlns:camunda="http://camunda.org/schema/1.0/bpmn">

  <bpmn:process id="spreadsheet-import" name="Spreadsheet Import" isExecutable="true">

    <!-- Start -->
    <bpmn:startEvent id="start" />

    <!-- Analyze Columns -->
    <bpmn:serviceTask id="analyzeColumns" name="Analyze Columns"
                      camunda:delegateExpression="${columnMappingDelegate}">
    </bpmn:serviceTask>

    <!-- Check if columns resolved -->
    <bpmn:exclusiveGateway id="columnsResolvedGateway" />

    <!-- Wait for column mappings (message catch) -->
    <bpmn:intermediateCatchEvent id="waitColumnMappings" name="Wait for Column Mappings">
      <bpmn:messageEventDefinition messageRef="ColumnMappingsSubmitted" />
    </bpmn:intermediateCatchEvent>

    <!-- Re-check after user input -->
    <bpmn:serviceTask id="recheckColumns" name="Recheck Columns"
                      camunda:delegateExpression="${columnMappingDelegate}">
    </bpmn:serviceTask>

    <!-- Resolve Cell Values -->
    <bpmn:serviceTask id="resolveCells" name="Resolve Cell Values"
                      camunda:delegateExpression="${cellMappingDelegate}">
    </bpmn:serviceTask>

    <!-- Check if cells resolved -->
    <bpmn:exclusiveGateway id="cellsResolvedGateway" />

    <!-- Wait for cell mappings (message catch) -->
    <bpmn:intermediateCatchEvent id="waitCellMappings" name="Wait for Cell Mappings">
      <bpmn:messageEventDefinition messageRef="CellMappingsSubmitted" />
    </bpmn:intermediateCatchEvent>

    <!-- Process Rows -->
    <bpmn:serviceTask id="processRows" name="Process Rows"
                      camunda:delegateExpression="${rowProcessingDelegate}">
    </bpmn:serviceTask>

    <!-- End -->
    <bpmn:endEvent id="end" />

    <!-- Sequence Flows -->
    <bpmn:sequenceFlow id="flow1" sourceRef="start" targetRef="analyzeColumns" />
    <bpmn:sequenceFlow id="flow2" sourceRef="analyzeColumns" targetRef="columnsResolvedGateway" />

    <bpmn:sequenceFlow id="flow3" sourceRef="columnsResolvedGateway" targetRef="waitColumnMappings">
      <bpmn:conditionExpression>${needsColumnMapping == true}</bpmn:conditionExpression>
    </bpmn:sequenceFlow>
    <bpmn:sequenceFlow id="flow4" sourceRef="columnsResolvedGateway" targetRef="resolveCells">
      <bpmn:conditionExpression>${needsColumnMapping == false}</bpmn:conditionExpression>
    </bpmn:sequenceFlow>

    <bpmn:sequenceFlow id="flow5" sourceRef="waitColumnMappings" targetRef="recheckColumns" />
    <bpmn:sequenceFlow id="flow6" sourceRef="recheckColumns" targetRef="columnsResolvedGateway" />

    <bpmn:sequenceFlow id="flow7" sourceRef="resolveCells" targetRef="cellsResolvedGateway" />

    <bpmn:sequenceFlow id="flow8" sourceRef="cellsResolvedGateway" targetRef="waitCellMappings">
      <bpmn:conditionExpression>${needsCellMapping == true}</bpmn:conditionExpression>
    </bpmn:sequenceFlow>
    <bpmn:sequenceFlow id="flow9" sourceRef="cellsResolvedGateway" targetRef="processRows">
      <bpmn:conditionExpression>${needsCellMapping == false}</bpmn:conditionExpression>
    </bpmn:sequenceFlow>

    <bpmn:sequenceFlow id="flow10" sourceRef="waitCellMappings" targetRef="resolveCells" />
    <bpmn:sequenceFlow id="flow11" sourceRef="processRows" targetRef="end" />

  </bpmn:process>

  <!-- Message Definitions -->
  <bpmn:message id="ColumnMappingsSubmitted" name="ColumnMappingsSubmitted" />
  <bpmn:message id="CellMappingsSubmitted" name="CellMappingsSubmitted" />

</bpmn:definitions>

5. JavaDelegate Implementations

5.1. ColumnMappingDelegate

Wraps the existing ColumnMappingHandler:

@Component("columnMappingDelegate")
public class ColumnMappingDelegate implements JavaDelegate {

    private final ColumnMappingHandler handler;  // Same handler as ImportProcess!
    private final ImportContextFactory contextFactory;

    @Override
    public void execute(DelegateExecution execution) {
        Long jobId = (Long) execution.getVariable("jobId");
        ImportContext context = contextFactory.create(jobId);

        // Execute the same handler logic
        StepResult<?> result = handler.execute(context, null);

        // Set process variable for gateway decision
        execution.setVariable("needsColumnMapping",
            result.getOutcome() == StepResult.Outcome.NEEDS_INPUT);

        // Update ImportJob status
        updateJobStatus(jobId, result);
    }
}

5.2. CellMappingDelegate

Wraps the existing CellMappingHandler:

@Component("cellMappingDelegate")
public class CellMappingDelegate implements JavaDelegate {

    private final CellMappingHandler handler;  // Same handler!

    @Override
    public void execute(DelegateExecution execution) {
        Long jobId = (Long) execution.getVariable("jobId");
        ImportContext context = contextFactory.create(jobId);

        StepResult<?> result = handler.execute(context, null);

        execution.setVariable("needsCellMapping",
            result.getOutcome() == StepResult.Outcome.NEEDS_INPUT);

        updateJobStatus(jobId, result);
    }
}

5.3. RowProcessingDelegate

Wraps the existing RowProcessingHandler:

@Component("rowProcessingDelegate")
public class RowProcessingDelegate implements JavaDelegate {

    private final RowProcessingHandler handler;  // Same handler!

    @Override
    public void execute(DelegateExecution execution) {
        Long jobId = (Long) execution.getVariable("jobId");
        ImportContext context = contextFactory.create(jobId);

        StepResult<?> result = handler.execute(context, null);

        // Row processing may discover new unmapped values
        if (result.getOutcome() == StepResult.Outcome.NEEDS_INPUT) {
            // Throw BPMN error to redirect to cell mapping
            throw new BpmnError("UNMAPPED_VALUE", "New unmapped value discovered");
        }

        updateJobStatus(jobId, result);
    }
}

6. FluxnovaImportOrchestrator

The orchestrator service that interacts with the Fluxnova runtime:

@Service
@ConditionalOnProperty(name = "import.orchestrator", havingValue = "fluxnova")
public class FluxnovaImportOrchestrator implements ImportOrchestrator {

    private final RuntimeService runtimeService;
    private final ImportJobRepository jobRepository;
    private final AttachmentService attachmentService;

    @Override
    public ImportJobDTO startImport(ImportStartRequest request) {
        // Create job (same as ImportProcess)
        ImportJob job = createJob(request);

        // Start BPMN process
        runtimeService.startProcessInstanceByKey(
            "spreadsheet-import",
            String.valueOf(job.getId()),  // Business key = job ID
            Map.of("jobId", job.getId())
        );

        return getStatus(job.getId());
    }

    @Override
    public ImportJobDTO getStatus(Long jobId) {
        ImportJob job = findJob(jobId);

        // Query Camunda for current activity
        ProcessInstance pi = runtimeService.createProcessInstanceQuery()
            .processInstanceBusinessKey(String.valueOf(jobId))
            .singleResult();

        if (pi == null) {
            // Process completed
            return toDTO(job);
        }

        // Get current activity to determine state
        String activityId = getCurrentActivityId(pi.getId());
        return toDTO(job, activityId);
    }

    @Override
    public ImportJobDTO submitColumnMappings(Long jobId, List<ColumnMappingDTO> mappings) {
        // Apply mappings to database
        applyColumnMappings(jobId, mappings);

        // Signal process to continue
        runtimeService.createMessageCorrelation("ColumnMappingsSubmitted")
            .processInstanceBusinessKey(String.valueOf(jobId))
            .correlateAll();

        return getStatus(jobId);
    }

    @Override
    public ImportJobDTO submitCellMappings(Long jobId, List<CellMappingDTO> mappings) {
        // Apply mappings to database
        applyCellMappings(jobId, mappings);

        // Signal process to continue
        runtimeService.createMessageCorrelation("CellMappingsSubmitted")
            .processInstanceBusinessKey(String.valueOf(jobId))
            .correlateAll();

        return getStatus(jobId);
    }

    @Override
    public void cancel(Long jobId) {
        // Delete the process instance
        ProcessInstance pi = runtimeService.createProcessInstanceQuery()
            .processInstanceBusinessKey(String.valueOf(jobId))
            .singleResult();

        if (pi != null) {
            runtimeService.deleteProcessInstance(pi.getId(), "User cancelled");
        }

        // Update job status
        ImportJob job = findJob(jobId);
        job.setCurrentStep(ImportStep.CANCELLED);
        job.setCompletedAt(Instant.now());
        jobRepository.save(job);
    }

    private String getCurrentActivityId(String processInstanceId) {
        return runtimeService.getActiveActivityIds(processInstanceId)
            .stream()
            .findFirst()
            .orElse(null);
    }
}

7. Configuration

7.1. Application Properties

# Enable Fluxnova orchestrator
import:
  orchestrator: fluxnova

# Fluxnova/Camunda configuration
camunda:
  bpm:
    database:
      schema-update: true
    history-level: audit
    admin-user:
      id: admin
      password: ${CAMUNDA_ADMIN_PASSWORD}

7.2. Bean Configuration

@Configuration
public class ImportOrchestratorConfig {

    @Bean
    @ConditionalOnProperty(name = "import.orchestrator",
                          havingValue = "process-definition",
                          matchIfMissing = true)
    public ImportOrchestrator importProcessOrchestrator(...) {
        return new ImportProcessOrchestrator(...);
    }

    @Bean
    @ConditionalOnProperty(name = "import.orchestrator",
                          havingValue = "fluxnova")
    public ImportOrchestrator fluxnovaImportOrchestrator(...) {
        return new FluxnovaImportOrchestrator(...);
    }
}

8. Advantages of Fluxnova

8.1. Cockpit Monitoring

The Camunda Cockpit provides:

  • Real-time view of running imports

  • Historical process instances

  • Variable inspection

  • Manual intervention for stuck processes

8.2. Error Handling

BPMN error boundary events enable:

<bpmn:boundaryEvent id="unmappedError" attachedToRef="processRows">
  <bpmn:errorEventDefinition errorRef="UNMAPPED_VALUE" />
</bpmn:boundaryEvent>
<bpmn:sequenceFlow sourceRef="unmappedError" targetRef="resolveCells" />

When row processing discovers an unmapped value, the process automatically redirects to cell mapping.

8.3. Timer Events

For future enhancements like timeout handling:

<bpmn:boundaryEvent id="mappingTimeout" attachedToRef="waitColumnMappings">
  <bpmn:timerEventDefinition>
    <bpmn:timeDuration>P7D</bpmn:timeDuration>
  </bpmn:timerEventDefinition>
</bpmn:boundaryEvent>
<bpmn:sequenceFlow sourceRef="mappingTimeout" targetRef="cancelDueToTimeout" />

9. Comparison: ImportProcess vs Fluxnova

Aspect ImportProcess Fluxnova

Runtime

None (pure Spring)

Fluxnova engine + database tables

Monitoring

Custom queries

Cockpit dashboard

Process visibility

Code inspection

Visual BPMN diagram

Error recovery

Manual restart

Compensation handlers

Timers

@Scheduled

Native BPMN timers

Learning curve

Low (FormController pattern)

Medium (BPMN concepts)

Operational overhead

Low

Medium