From 69fede99a4bd1ff427d578e49f084c6d8e30c9ec Mon Sep 17 00:00:00 2001 From: craig mcclendon Date: Mon, 11 Nov 2024 13:29:54 -0600 Subject: [PATCH] add property to turn on userRequestRetryVersionConflictsInterceptor (#752) --- .../uhn/fhir/jpa/starter/AppProperties.java | 13 +- .../jpa/starter/common/StarterJpaConfig.java | 5 + src/main/resources/application.yaml | 2 + .../ParallelUpdatesVersionConflictTest.java | 164 ++++++++++++++++++ 4 files changed, 181 insertions(+), 3 deletions(-) create mode 100644 src/test/java/ca/uhn/fhir/jpa/starter/ParallelUpdatesVersionConflictTest.java diff --git a/src/main/java/ca/uhn/fhir/jpa/starter/AppProperties.java b/src/main/java/ca/uhn/fhir/jpa/starter/AppProperties.java index cb06546..67d751c 100644 --- a/src/main/java/ca/uhn/fhir/jpa/starter/AppProperties.java +++ b/src/main/java/ca/uhn/fhir/jpa/starter/AppProperties.java @@ -106,8 +106,9 @@ public class AppProperties { private final List custom_provider_classes = new ArrayList<>(); private Boolean upliftedRefchains_enabled = false; - private List search_prefetch_thresholds = new ArrayList<>(); + private boolean userRequestRetryVersionConflictsInterceptorEnabled = false; + private List search_prefetch_thresholds = new ArrayList<>(); public List getCustomInterceptorClasses() { return custom_interceptor_classes; @@ -671,6 +672,14 @@ public Cors getCors() { this.upliftedRefchains_enabled = upliftedRefchains_enabled; } + public Boolean getUserRequestRetryVersionConflictsInterceptorEnabled() { + return userRequestRetryVersionConflictsInterceptorEnabled; + } + + public void setUserRequestRetryVersionConflictsInterceptorEnabled(Boolean userRequestRetryVersionConflictsInterceptorEnabled) { + this.userRequestRetryVersionConflictsInterceptorEnabled = userRequestRetryVersionConflictsInterceptorEnabled; + } + public static class Cors { private Boolean allow_Credentials = true; private List allowed_origin = List.of("*"); @@ -690,8 +699,6 @@ public Cors getCors() { public void setAllow_Credentials(Boolean allow_Credentials) { this.allow_Credentials = allow_Credentials; } - - } public static class Logger { diff --git a/src/main/java/ca/uhn/fhir/jpa/starter/common/StarterJpaConfig.java b/src/main/java/ca/uhn/fhir/jpa/starter/common/StarterJpaConfig.java index 3166288..f6a7c9a 100644 --- a/src/main/java/ca/uhn/fhir/jpa/starter/common/StarterJpaConfig.java +++ b/src/main/java/ca/uhn/fhir/jpa/starter/common/StarterJpaConfig.java @@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.dao.search.IHSearchSortHelper; import ca.uhn.fhir.jpa.delete.ThreadSafeResourceDeleterSvc; import ca.uhn.fhir.jpa.graphql.GraphQLProvider; import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor; +import ca.uhn.fhir.jpa.interceptor.UserRequestRetryVersionConflictsInterceptor; import ca.uhn.fhir.jpa.interceptor.validation.RepositoryValidatingInterceptor; import ca.uhn.fhir.jpa.ips.provider.IpsOperationProvider; import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; @@ -457,6 +458,10 @@ public class StarterJpaConfig { fhirServer.registerProvider(theIpsOperationProvider.get()); } + if (appProperties.getUserRequestRetryVersionConflictsInterceptorEnabled() ) { + fhirServer.registerInterceptor(new UserRequestRetryVersionConflictsInterceptor()); + } + // register custom providers registerCustomProviders(fhirServer, appContext, appProperties.getCustomProviderClasses()); diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 5bb8326..71eee07 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -214,6 +214,8 @@ hapi: narrative_enabled: false mdm_enabled: false mdm_rules_json_location: "mdm-rules.json" + ## see: https://hapifhir.io/hapi-fhir/docs/interceptors/built_in_server_interceptors.html#jpa-server-retry-on-version-conflicts + # userRequestRetryVersionConflictsInterceptorEnabled : false # local_base_urls: # - https://hapi.fhir.org/baseR4 logical_urls: diff --git a/src/test/java/ca/uhn/fhir/jpa/starter/ParallelUpdatesVersionConflictTest.java b/src/test/java/ca/uhn/fhir/jpa/starter/ParallelUpdatesVersionConflictTest.java new file mode 100644 index 0000000..29dd059 --- /dev/null +++ b/src/test/java/ca/uhn/fhir/jpa/starter/ParallelUpdatesVersionConflictTest.java @@ -0,0 +1,164 @@ +package ca.uhn.fhir.jpa.starter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; +import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent; +import org.hl7.fhir.r4.model.Patient; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Bundle.BundleType; +import org.hl7.fhir.r4.model.Bundle.HTTPVerb; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.server.LocalServerPort; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.rest.client.api.IGenericClient; +import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {Application.class}, properties = { + "spring.datasource.url=jdbc:h2:mem:dbr4", + "hapi.fhir.fhir_version=r4", + "hapi.fhir.userRequestRetryVersionConflictsInterceptorEnabled=true" +}) + +/** + * This class tests running parallel updates to a single resource with and without setting the 'X-Retry-On-Version-Conflict' header + * to ensure we get the expected behavior of detecting conflicts + */ +public class ParallelUpdatesVersionConflictTest { + + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ParallelUpdatesVersionConflictTest.class); + + @LocalServerPort + private int port; + + private IGenericClient client; + private FhirContext ctx; + + @BeforeEach + void setUp() { + ctx = FhirContext.forR4(); + ctx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER); + ctx.getRestfulClientFactory().setSocketTimeout(1200 * 1000); + String ourServerBase = "http://localhost:" + port + "/fhir/"; + client = ctx.newRestfulGenericClient(ourServerBase); + } + + @Test + void testParallelResourceUpdateBundle() throws Throwable { + //send 10 bundles with updates to the patient in parallel, except the header to deconflict them + Patient pat = new Patient(); + String patId = client.create().resource(pat).execute().getId().getIdPart(); + launchThreads(patId, true, "X-Retry-On-Version-Conflict"); + } + + @Test + void testParallelResourceUpdateNoBundle() throws Throwable { + //send 10 resource puts to the patient in parallel, except the header to deconflict them + Patient pat = new Patient(); + String patId = client.create().resource(pat).execute().getId().getIdPart(); + launchThreads(patId, false, "X-Retry-On-Version-Conflict"); + } + + @Test + void testParallelResourceUpdateBundleExpectConflict() { + //send 10 bundles with updates to the patient in parallel, expect a ResourceVersionConflictException since we are not setting the retry header + Patient pat = new Patient(); + String patId = client.create().resource(pat).execute().getId().getIdPart(); + ResourceVersionConflictException exception = assertThrows(ResourceVersionConflictException.class, () -> + launchThreads(patId, true, "someotherheader")); + } + + @Test + void testParallelResourceUpdateNoBundleExpectConflict() { + //send 10 resource puts to the patient in parallel, expect a ResourceVersionConflictException since we are not setting the retry header + Patient pat = new Patient(); + String patId = client.create().resource(pat).execute().getId().getIdPart(); + ResourceVersionConflictException exception = assertThrows(ResourceVersionConflictException.class, () -> + launchThreads(patId, false, "someotherheader")); + } + + private void launchThreads(String patientId, boolean useBundles, String headerName) throws Throwable { + int threadCnt = 10; + ExecutorService execSvc = Executors.newFixedThreadPool(threadCnt); + + //launch a bunch of threads at the same time that update the same patient + List> callables = new ArrayList<>(); + for (int i = 0; i < threadCnt; i++) { + final int cnt = i; + Callable callable = new Callable<>() { + @Override + public Integer call() throws Exception { + Patient pat = new Patient(); + //make sure to change something so the server doesnt short circuit on a no-op + pat.addName().setFamily("fam-" + cnt); + pat.setId(patientId); + + if( useBundles) { + Bundle b = new Bundle(); + b.setType(BundleType.TRANSACTION); + BundleEntryComponent bec = b.addEntry(); + bec.setResource(pat); + //bec.setFullUrl("Patient/" + patId); + Bundle.BundleEntryRequestComponent req = bec.getRequest(); + req.setUrl("Patient/" + patientId); + req.setMethod(HTTPVerb.PUT); + bec.setRequest(req); + + Bundle returnBundle = client.transaction().withBundle(b) + .withAdditionalHeader(headerName, "retry; max-retries=10") + .execute(); + + String statusString = returnBundle.getEntryFirstRep().getResponse().getStatus(); + ourLog.trace("statusString->{}", statusString); + try { + return Integer.parseInt(statusString.substring(0,3)); + }catch(NumberFormatException nfe) { + return 500; + } + } + else { + MethodOutcome outcome = client.update().resource(pat).withId(patientId) + .withAdditionalHeader(headerName, "retry; max-retries=10") + .execute(); + ourLog.trace("updated patient: " + outcome.getResponseStatusCode()); + return outcome.getResponseStatusCode(); + } + } + }; + callables.add(callable); + } + + List> futures = new ArrayList<>(); + + //launch them all at once + for (Callable callable : callables) { + futures.add(execSvc.submit(callable)); + } + + //wait for calls to complete + for (Future future : futures) { + try { + Integer httpResponseCode = future.get(); + Assertions.assertEquals(200, httpResponseCode); + } catch (InterruptedException | ExecutionException e) { + //throw the ResourceVersionConflictException back up so we can test it + throw e.getCause(); + } + } + } +}