[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

classic Classic list List threaded Threaded
52 messages Options
123
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
GitHub user adamjshook opened a pull request:

    https://github.com/apache/accumulo/pull/254

    [ACCUMULO-4506] Add a timeout to the replication task

    This addresses an issue where a replication task will get stuck for a
    substantial amount of time.  Adding a timeout will abandon the task
    allowing another worker (or the same worker) to pick it up.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/adamjshook/accumulo 1.7

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/accumulo/pull/254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #254
   
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    Not too sure how to go about adding a unit or integration test for this timeout.  I tested the timeout by launching two clusters and replicating data with a short timeout.  Replication was halted on the tablet server and re-scheduled at a later time.  Increasing the timeout allowed replication to finish as expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838502
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    --- End diff --
   
    Any reason that we need to start an extra thread here?
   
    Guava has a `MoreExecutors` class with a special Executor that uses the same thread. Would that work here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838618
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    --- End diff --
   
    This will be implicitly retried by the framework, right? Would should give some indication of that in the message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838774
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replstats");
    --- End diff --
   
    nit: whole words instead of `replstats` here, please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838793
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    +              finalStatus = status;
                 } finally {
                   span.stop();
    +              executor.shutdownNow();
                 }
               }
     
    -          log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    --- End diff --
   
    What's the reasoning behind dropping this debug msg?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113839682
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    --- End diff --
   
    Wasn't aware of that special Executor -- that'll work here, no need for another thread.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113839731
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    +              finalStatus = status;
                 } finally {
                   span.stop();
    +              executor.shutdownNow();
                 }
               }
     
    -          log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    --- End diff --
   
    It's moved up to the new line 311 inside the try/catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113839773
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    --- End diff --
   
    Correct, I'll add that to th emessage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    @joshelser Applied your feedback to the commit.  I'll have to run it through my test later, though -- away from my PC at the moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    Thanks, @adamjshook.
   
    > Not too sure how to go about adding a unit or integration test for this timeout
   
    Would it help to re-factor this code to make it more unit-testable? I think creating a "good" integration test for this change would be hard -- I'd be happy with a unit test, likely supported by a mocking framework.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    @joshelser I think so.  I was taking a look at `AccumuloReplicaSystemTest` and seeing how I could fit something in to try and trigger the timeout, but the `replicate` call takes in the `ReplicaSystemHelper` as an argument and I wasn't too sure how to go about creating an instance of that.  And even then we could only assert the `Status` didn't change, but it couldn't change for a number of reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    > the replicate call takes in the ReplicaSystemHelper as an argument and I wasn't too sure how to go about creating an instance of that
   
    Nothing special, just instantiate it. It requires a `ClientContext` which you can also mock (the implementation just uses that to get at a `Connector`).
   
    It may make your life easier to break up `_replicate(..)` further - I think that would be good if it makes your testing even easier.
   
    ```java
    } else {
      span = Trace.start("WAL replication");
      try {
        finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
      } finally {
        span.stop();
      }
    }
    ```
   
    This section is most of what you changed. What if you lift this into its own method, and then use that as the entry-point to test? Something like `replicateLogsWithTimeout()`..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    Thanks for the pointers.  I'll head down this route and get back to you soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    I've been reading up on `Executors#sameThreadExecutor`, and I don't think we can use this here since it runs the task prior to getting any `Future`.  The timeout logic would not kick in until the task is finished.
     The task has to be run in a separate thread while the main thread idles to trigger the timeout.
   
    Additionally, I'm having a bit of trouble getting a unit test I am pleased with.  I've made one that runs, but there is a race condition there where it'll either not run the task at all or the task runs but it throws and logs a `Connection refused` error because there is no server running to receive any work. You can take a look at the commit [here](https://github.com/adamjshook/accumulo/commit/e0c9ca76982aaa327dba0084e82b339383dfd09c).
   
    Either way the test passes since the `Status` returned is the same, but it isn't a very good test since the behavior isn't really defined.  I think we are a bit too high-level here for a unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    > I've been reading up on Executors#sameThreadExecutor, and I don't think we can use this here since it runs the task prior to getting any Future. The timeout logic would not kick in until the task is finished.
    The task has to be run in a separate thread while the main thread idles to trigger the timeout.
   
    Ahh, bummer. Thought I could be tricky and help save us yet another thread ;)
   
    > Either way the test passes since the Status returned is the same, but it isn't a very good test since the behavior isn't really defined. I think we are a bit too high-level here for a unit test.
   
    I would be happy with a test that is not "human-level". That is, if you can write a test showing that your interrupt/retry logic works, I would +1 that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114136601
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, task will be retried by the framework");
    +              finalStatus = status;
                 } finally {
                   span.stop();
    +              executor.shutdownNow();
    --- End diff --
   
    Could add comment here that shutdownNow was called instead of shutdown because it will interrupt threads.
   
    If you had an atomic boolean for interrupting, could set it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114136260
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -416,6 +444,12 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer
         Status lastStatus = status, currentStatus = status;
         final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
         while (true) {
    +      // Check if the thread has been interrupted prior to replicating data
    +      if (Thread.interrupted()) {
    --- End diff --
   
    In the past I have found that other libraries (like Hadoop) eat interrupts, so it may never be seen.  I am not sure, but I was wondering if it would it make sense to have an additional atomic boolean that you set and and check?  I had to do this to reliably interrupt scans in Accumulo. If there were an AtomicBoolean, could do something like `if(Thread.interrupted() || stopped.get())`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

joshelser
In reply to this post by joshelser
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114137768
 
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
   
    Should this clients timeout be set to some function of the replication timeout?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

joshelser
In reply to this post by joshelser
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
 
    @joshelser Back at you.  I think I managed to scrape together a test that I am happy with.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
123
Loading...