Skip to content

Integration Testing

Part of: MPAC SmartPOS Cloud Platform - Product RequirementsVersion: 2.0 Last Updated: 2026-01-28


Overview

This document defines integration testing strategies for the MPAC platform, covering end-to-end API flows, service-to-service communication, and database migration validation. Integration tests verify that multiple components work correctly together, including order-to-payment flows, authentication chains, and data consistency across services. These tests use real database connections (test databases) and mock external dependencies like payment providers.

Table of Contents


API Integration Tests

Purpose: Validate complete business flows across multiple endpoints and services.

Payment Flow Integration Test

python
# Example: Complete payment flow integration test
import pytest
from httpx import AsyncClient

@pytest.mark.integration
async def test_complete_payment_flow():
    """
    Test the full payment lifecycle:
    1. Create order
    2. Generate bill
    3. Create payment
    4. Confirm payment
    5. Verify order and bill status updates
    """
    # Create order
    order = await create_order(store_id, items)
    assert order.status == "open"

    # Generate bill from order
    bill = await create_bill(order.id)
    assert bill.status == "unpaid"
    assert bill.total_amount == order.total_amount

    # Create payment (with mocked PGW)
    payment = await create_payment(bill.id, amount)
    assert payment.status == "pending"

    # Confirm payment (simulates PGW webhook)
    result = await confirm_payment(payment.id)
    assert result.status == "succeeded"

    # Verify bill marked as paid
    bill = await get_bill(bill.id)
    assert bill.status == "paid"
    assert bill.paid_at is not None

    # Verify order completed
    order = await get_order(order.id)
    assert order.status == "completed"

Authentication Chain Test

python
@pytest.mark.integration
async def test_device_authentication_and_order_creation():
    """
    Test device authentication flow and subsequent API calls:
    1. Device authenticates with private_key_jwt
    2. Device creates order using obtained token
    3. Token expiry validation
    """
    # Device authentication
    auth_response = await authenticate_device(
        device_id="DEV123",
        client_assertion=generate_jwt_assertion()
    )
    assert auth_response.access_token is not None
    assert auth_response.expires_in == 120  # 2 minutes

    # Use token to create order
    async with AsyncClient() as client:
        client.headers["Authorization"] = f"Bearer {auth_response.access_token}"
        order_response = await client.post("/orders", json={
            "store_id": "store123",
            "items": [{"product_id": "prod1", "quantity": 2}]
        })
        assert order_response.status_code == 201

    # Verify token expiry enforcement
    await asyncio.sleep(121)  # Wait for token to expire
    async with AsyncClient() as client:
        client.headers["Authorization"] = f"Bearer {auth_response.access_token}"
        expired_response = await client.post("/orders", json={})
        assert expired_response.status_code == 401

Multi-Tenant Isolation Test

python
@pytest.mark.integration
async def test_merchant_data_isolation():
    """
    Verify that merchants cannot access each other's data.
    """
    # Create data for merchant A
    merchant_a_token = await get_merchant_token(merchant_id="MERCH_A")
    order_a = await create_order_as(merchant_a_token, store_id="STORE_A")

    # Try to access merchant A's data as merchant B
    merchant_b_token = await get_merchant_token(merchant_id="MERCH_B")
    response = await get_order_as(merchant_b_token, order_id=order_a.id)

    # Should return 403 Forbidden or 404 Not Found
    assert response.status_code in [403, 404]

Database Migration Tests

Purpose: Ensure database schema changes are safe and reversible without data loss.

Migration Forward/Backward Test

bash
# Test forward and backward migrations
cd mpac-smartpos/svc-portal

# Start from clean state
alembic downgrade base

# Apply all migrations forward
alembic upgrade head

# Verify schema is correct
uv run python scripts/verify_schema.py

# Test rollback (one version)
alembic downgrade -1

# Verify schema still valid
uv run python scripts/verify_schema.py

# Re-apply forward
alembic upgrade head

# Verify data integrity after migration cycle
uv run pytest tests/integration/test_migrations.py

Migration Data Integrity Test

python
@pytest.mark.integration
async def test_migration_preserves_data():
    """
    Verify that migrations preserve existing data.
    """
    # Insert test data in old schema
    await insert_test_data_old_schema()

    # Run migration
    alembic.upgrade("head")

    # Verify data still accessible and correct in new schema
    data = await fetch_test_data_new_schema()
    assert data.count == original_count
    assert data.checksums_match()

    # Test rollback preserves data
    alembic.downgrade("-1")
    old_data = await fetch_test_data_old_schema()
    assert old_data.count == original_count

Migration Performance Test

python
@pytest.mark.integration
@pytest.mark.slow
async def test_migration_performance_large_dataset():
    """
    Verify migrations complete in reasonable time with large datasets.
    """
    # Create 1M test records
    await create_large_dataset(record_count=1_000_000)

    # Time migration
    start = time.time()
    alembic.upgrade("head")
    duration = time.time() - start

    # Should complete in under 5 minutes for 1M records
    assert duration < 300, f"Migration took {duration}s, expected <300s"

Service Integration Patterns

Purpose: Define common patterns for testing service-to-service communication.

NATS Messaging Integration

go
// Test NATS message flow between services
func TestOrderCreatedEventFlow(t *testing.T) {
    // Setup NATS test server
    natsServer := natstest.RunServer(t)
    defer natsServer.Shutdown()

    // Setup subscriber (e.g., analytics service)
    subscriber := setupTestSubscriber(t, natsServer.ClientURL())

    // Publish order.created event
    publisher := setupPublisher(t, natsServer.ClientURL())
    err := publisher.PublishOrderCreated(Order{
        ID:      "order123",
        StoreID: "store456",
    })
    require.NoError(t, err)

    // Verify subscriber received and processed event
    select {
    case event := <-subscriber.Events:
        assert.Equal(t, "order123", event.OrderID)
        assert.Equal(t, "order.created", event.Type)
    case <-time.After(5 * time.Second):
        t.Fatal("Did not receive event within timeout")
    }
}

Redis Cache Integration

python
@pytest.mark.integration
async def test_cache_consistency():
    """
    Verify cache invalidation works correctly.
    """
    # Create data
    order = await create_order(store_id="store123")

    # Verify cached
    cached_order = await cache.get(f"order:{order.id}")
    assert cached_order is not None

    # Update data
    await update_order(order.id, status="completed")

    # Verify cache invalidated
    cached_order = await cache.get(f"order:{order.id}")
    assert cached_order is None

    # Verify fresh fetch works
    fresh_order = await get_order(order.id)
    assert fresh_order.status == "completed"

WebSocket Integration

typescript
// Test WebSocket communication
describe('WebSocket Order Updates', () => {
  it('should receive real-time order updates', async () => {
    const ws = new WebSocket('ws://localhost:8001/ws/orders')
    const messages: any[] = []

    ws.onmessage = (event) => {
      messages.push(JSON.parse(event.data))
    }

    await waitForWebSocketConnection(ws)

    // Create order via REST API
    const order = await createOrder({ storeId: 'store123' })

    // Update order status
    await updateOrderStatus(order.id, 'completed')

    // Verify WebSocket received update
    await waitFor(() => {
      const updateMsg = messages.find(
        (m) => m.type === 'order.updated' && m.orderId === order.id
      )
      expect(updateMsg).toBeDefined()
      expect(updateMsg.status).toBe('completed')
    })
  })
})

Test Environment Setup

Docker Compose for Integration Tests

yaml
# docker-compose.test.yml
version: '3.8'
services:
  postgres-test:
    image: postgres:15
    environment:
      POSTGRES_DB: mpac_test
      POSTGRES_USER: test
      POSTGRES_PASSWORD: test
    ports:
      - "5433:5432"

  redis-test:
    image: redis:7-alpine
    ports:
      - "6380:6379"

  nats-test:
    image: nats:2.10
    ports:
      - "4223:4222"

Running Integration Tests

bash
# Start test infrastructure
docker compose -f docker-compose.test.yml up -d

# Run Python integration tests
cd mpac-smartpos/svc-portal
uv run pytest tests/integration/ -v

# Run Go integration tests
cd mpac-smartpos/svc-smarttab
go test ./tests/integration/... -v

# Cleanup
docker compose -f docker-compose.test.yml down -v


MPAC — MP-Solution Advanced Cloud Service