• No results found

Changing TwitBase passwords

In document Hbase in Action (Page 192-200)

Alternative HBase clients

6.4 Using the HBase Thrift gateway from Python

6.5.2 Changing TwitBase passwords

Let’s create an application to randomize the passwords of all users in the system. This kind of thing would be useful if your TwitBase deployment suffered a security breach. You’d like your application to scan through all users in the users table, retrieve the user’s password, and generate a new password based on the old one. You also want the application to notify the user of the security breach and inform them as to how they can retrieve their account. You’ll do all this by chaining successive actions using async’s Deferred and Callback objects. The workflow as a Callback chain is illus- trated in figure 6.3.

Wiring a Callback instance onto a Deferred instance chains successive steps together. This is done using the addCallback family of methods provided by the Deferred class. Callbacks can also be attached to handle error cases, as you see in step 4b. Async calls these Errbacks, a term consistent with the terminology used in Twisted Python. The final result of a Callback chain is retrieved by calling join() on the associated Deferred instance. If the Deferred is finished processing, calling join(long timeout) returns immediately with a result. If the Deferred’s Callback chain is still processing, the current thread blocks until either the Deferred completes or the timeout, in milliseconds, expires.

With your newfound understanding of the async data-processing pipeline and a rough idea of the pipeline you want to build, let’s start building it.

THE ASYNCHRONOUS HBASE CLIENT

Your primary entry point into asynchbase is the HBaseClient class. It manages your interactions with the HBase cluster. Its responsibility is something of a combination of both HTablePool and HTableInterface from the stock client.

Your application needs only a single instance of HBaseClient. Much like an HTable- Interface, you need to make sure you close it after you’re finished using it. Using the stock client, you would do this:

HTablePool pool = new HTablePool(...);

HTableInterface myTable = pool.getTable("myTable"); // application code

myTable.close();

pool.closeTablePool("myTable");

In asynchbase, you have the following:

final HBaseClient client = new HBaseClient("localhost"); // application code

client.shutdown().joinUninterruptibly();

This snippet creates an instance of HBaseClient that talks to an HBase managed by localhost. It then closes that instance and blocks the current thread until the shut- down() method completes. Waiting on shutdown() to complete is necessary to ensure that all pending RPC requests are completed and the thread pool is properly disposed before the application exits. shutdown() returns an instance of the Deferred class, which represents an operation executing on another thread. Waiting is accomplished by calling one of the join family of methods on the Deferred instance. Here you call joinUninterruptibly() because you want to ensure that the client resources are

Send notification 5 Set new password 2 Retrieve current password Format message 4a Format error 4b Boolean Put Response Interpret response UpdateFailed Exception

1 Scan over all rows in the users table. A KeyValue is produced for each user. 2 Calculate a new password based on the old and send a Put to HBase. 3 Interpret the Put response as either a success or failure.

4 Format a message to send based on response result. 5 Send the notification message and return true.

String message KeyValue rowkey: TheRealMT info:password : abc123 String message Boolean notification status UpdateFailed object 1 3

Figure 6.3 Building a data-processing pipeline with Callbacks. Each step takes output from the pre- vious one, processes it, and sends it to the next, until a final result is reached.

167

Asynchbase: an alternative Java HBase client

cleaned when you’re finished. Note that if your thread is interrupted while waiting on joinUninterruptibly(), it will still be marked as interrupted.

You’ll use this client instance to create a Scanner. The asynchbase Scanner is simi- lar to the ResultsScanner with which you’re already familiar. Create a Scanner against the users table, and limit its results to the info:password column:

final Scanner scanner = client.newScanner("users"); scanner.setFamily("info");

scanner.setQualifier("password");

Use this Scanner instance to walk the rows in your table by calling nextRows(). Like the other asynchronous operations in this library, nextRows() returns a Deferred instance. Like the stock scanner, you can request a specific number of results per page by passing a number to nextRows(). To help emphasize the asynchronous nature of the application, let’s limit the scan results to a single row per page.

Each returned row consists of a list of its cells. These cells are represented by instances of the KeyValue class. In order to walk the page of returned rows, you loop over a list of lists of KeyValue instances:

ArrayList<ArrayList<KeyValue>> rows = null;

while ((rows = scanner.nextRows(1).joinUninterruptibly()) != null) { for (ArrayList<KeyValue> row : rows) {

// ... } }

Like the call to shutdown(), this code blocks the cur- rent thread until all results are available before con- suming them. Scanning rows asynchronously doesn’t make a lot of sense when you’re interested in maintain- ing row order. By joining on each Deferred instance, you realize the scan results into the rows variable. Pars- ing the results is similar to consuming KeyValue objects in the stock client. This is step 1 from the state diagram, illustrated in figure 6.4.

The code looks like this:

KeyValue kv = row.get(0);

byte[] expected = kv.value(); String userId = new String(kv.key());

Don’t do this in a real application!

Limiting your scanner to a single row per request will significantly hinder your appli- cation performance. The only reason we do so here is to maximize the opportunity for failure scenarios to trigger. You’ll see what we’re talking about later in the section.

Retrieve current password Retrieve user ID Retrieve current password KeyValue rowkey: TheRealMT info:password : abc123 1

Figure 6.4 Step 1 is to scan

over all rows in the users table.

A KeyValue is produced for each user.

PutRequest put = new PutRequest(

"users".getBytes(), kv.key(), kv.family(), kv.qualifier(), mkNewPassword(expected));

The scanner was limited to returning the info:password column, so you know there will be only a single KeyValue per result row. You take that KeyValue and pull out the bits relevant to you. For this example, the old password is used to seed the new pass- word, so pass it into the mkNewPassword() method. Create a new Put instance, which asynchbase calls a PutRequest, to update the user’s password. The last step is to con- struct a Callback chain and attach it to the PutRequest invocation.

Thus far you’ve implemented all of step 1 and most of step 2 from figure 6.3. Before you start chaining Callbacks, let’s write a couple of methods to help you watch the asynchronous application at work.

DEVELOPING AN ASYNCHRONOUS APPLICATION

Developing and debugging asynchronous applications can be tricky, so you’ll set your- self up for success. The first thing you want is to print debugging statements with their associated thread. For this, you’ll use the logging library SLF4J, the same logging library used by asynchbase. The following line gives you what you need:

static final Logger LOG = LoggerFactory.getLogger(AsyncUsersTool.class);

To help explore the asynchronous nature of this code, it’s useful to introduce simu- lated latency into the system. The method latency() will occasionally delay process- ing by forcing the thread to sleep:

static void latency() throws Exception { if (System.currentTimeMillis() % 2 == 0) { LOG.info("a thread is napping..."); Thread.sleep(1000);

} }

You can do the same by introducing occasional failures, slightly less frequently, with the entropy() method:

static boolean entropy(Boolean val) {

if (System.currentTimeMillis() % 3 == 0) { LOG.info("entropy strikes!");

return false; }

return (val == null) ? Boolean.TRUE : val; }

You’ll call latency() at the beginning and end of each Callback to slow things down a little. Call entropy() on the result produced by step 2 so you can exercise the error handling provided by step 4b. Now it’s time to implement Callbacks for each of the remaining steps.

Store new password based on old

169

Asynchbase: an alternative Java HBase client CHAINING SUCCESSIVE ACTIONS USING CALLBACKS

Step 3 in the data pipeline is to interpret the response generated by the PutRequest sent to HBase. This step is summarized in figure 6.5.

You’ll do this by implementing async’s Callback inter- face. The implementation receives a Boolean from the

HBase response and generates an UpdateResult instance, an object specific to your application. The UpdateResult class is simple, just a package for data:

static final class UpdateResult { public String userId;

public boolean success; }

Step 3 can also throw an UpdateFailedException when the PutRequest fails or when entropy() strikes. Async looks for Exceptions, either thrown by or returned by Deferred and Callback instances, to trigger the error-handling callback chain. You implement your own exception so you can package a little context along with the exception. It looks like this:

static final class UpdateFailedException extends Exception { public UpdateResult result;

public UpdateFailedException(UpdateResult r) { this.result = r;

} }

Now you can implement your Callback to handle step 3. The responsibility of this class is to translate the asynchbase response into an application-specific datatype. You’ll call it InterpretResponse. It has a constructor to pass in the user ID; that way you know which user you were processing when you received this response. The meat of the code is in the UpdateResult call(Boolean response) method. This method starts and ends by calling latency(). It also takes the response received from HBase and subjects it to entropy(). This is purely for the purposes of your understanding. The real work is in taking the response and either constructing an UpdateResult instance or throwing an UpdateFailedException. Either way, there’s not much work involved. You can imagine performing an arbitrarily complex operation in your real working code:

static final class InterpretResponse

implements Callback<UpdateResult, Boolean> { private String userId;

InterpretResponse(String userId) { this.userId = userId;

}

public UpdateResult call(Boolean response) throws Exception { Boolean Put Response Interpret response UpdateFailed Exception UpdateFailed object 3

Figure 6.5 Step 3 is to inter-

pret the Put response as either

latency();

UpdateResult r = new UpdateResult(); r.userId = this.userId;

r.success = entropy(response); if (!r.success)

throw new UpdateFailedException(r); latency();

return r; }

}

InterpretResponse is the most complex Callback in this example, so if you’re still fol- lowing, you should be in good shape. This Callback has either performed its transfor- mation or detected an error and bailed. Either way, the decision of what Callback to invoke next is left up to async. This is an important concept when thinking about these data-processing pipelines. Each step in the chain is ignorant of the others. Notice that the type signature of InterpretResponse implements Callback<UpdateResult, Bool- ean>. Those generic types correspond to the

signature of the call() method. The only thing that links step 3 to step 4 is the contract between them in the form of type signatures.

For the next step, you’ll implement the successful case first: step 4a from the state dia- gram. For context, figure 6.6 illustrates both steps 4a and 4b.

This step takes the UpdateResult produced in step 3 and converts it into a String message, perhaps to send to the user via email or to update a log somewhere. Thus, step 4a is implemented by a Callback<String, Update- Result>. Call this one ResultToMessage:

static final class ResultToMessage

implements Callback<String, UpdateResult> {

public String call(UpdateResult r) throws Exception { latency();

String fmt = "password change for user %s successful."; latency();

return String.format(fmt, r.userId); }

}

Again, you’re calling latency() at the beginning and end of the call() method. Oth- erwise, there’s nothing fancy here. Construction of the message is simple, and it looks like it’s appropriate for the user. There’s also nothing going on to throw an Excep- tion, so you won’t consider an Errback chain for this step.

Format message 4a Format error 4b UpdateFailed Exception String message String message UpdateFailed object

Figure 6.6 Steps 4a and 4b format a mes- sage to send based on response result.

171

Asynchbase: an alternative Java HBase client

The Errback defining step 4b is similar to the Callback in 4a. It’s also implemented as a Callback, this time parameterized on String and UpdateFailedException. The processing work is almost identical, except it retrieves the user ID context from the Exception instead of an UpdateResult:

static final class FailureToMessage

implements Callback<String, UpdateFailedException> {

public String call(UpdateFailedException e) throws Exception { latency();

String fmt = "%s, your password is unchanged!"; latency();

return String.format(fmt, e.result.userId); }

}

Both ResultToMessage and FailureToMessage pro- duce a String for their output. That means they can be chained to the same Callback instance for the final step, 5. Step 5 is handled by SendMessage and is an implementation of Callback<Object, String>; see figure 6.7.

SendMessage should either succeed, in which case it returns true, or throw a SendMessageFailed- Exception. There’s nothing special about the failure Exception; it’s application-specific for the sake of clarity in the example. SendMessage looks like this:

static final class SendMessage

implements Callback<Boolean, String> {

public Boolean call(String s) throws Exception { latency();

if (entropy(null))

throw new SendMessageFailedException(); LOG.info(s);

latency();

return Boolean.TRUE; }

}

Again, you have a little latency() and entropy() to keep things interesting. Either the message is delivered or an Exception is thrown. In this case, there is no Errback to chain into the data pipeline, so that error needs to be handled in client code. With the processing pipeline implemented, let’s return to the code consuming the scanner.

WIRING UP THE DATA PIPELINE

When you last saw your user application, it was reading rows off the scanner and build- ing PutRequest objects to initiate the processing pipeline, essentially step 1 from the state diagram. The last thing to do is to send those Puts off to HBase and pass the response down the Callback chain, as shown in figure 6.8.

Send notification 5 String message String message Boolean notification status

Figure 6.7 Step 5 sends the notifi- cation message.

This is where you finally get your hands on a Deferred instance! For this example, you use the method Deferred<Boolean> HBaseCli- ent.compareAndSet(PutRequest put, byte[] expected) instead of put() to simplify explana- tion of the Callback chain. This is the atomic version of put(). compareAndSet() returns a Deferred instance that, once join()ed, will return a Boolean. That’s the entry point for chaining Callbacks. The chaining looks like this:

Deferred<Boolean> d = client.compareAndSet(put, expected)

.addCallback(new InterpretResponse(userId)) .addCallbacks(new ResultToMessage(), new FailureToMessage()) .addCallback(new SendMessage());

Each consecutive call to addCallback() returns the same Deferred instance but with its type updated to correspond to the return type of the attached Callback. So, executing step 2 returns a Deferred<Boolean>, and after chaining the Callback for step 3 it becomes a Deferred<UpdateResult>. Chaining on steps 4a and 4b is done using addCallbacks(), with an s. This returns a Deferred<String>, which is typed by the return type of the success case. The error case in async is always typed by an Exception, so it need not be specified in the Deferred. Step 5 turns it into a Deferred<Boolean>, the type ultimately consumed by the application.

Each row in the scan result has a corresponding Deferred<Boolean> whose execu- tion you want to see completed. The only way to see the results of the full Callback chain for each row is to collect the final Deferred<Boolean> instances and join() on them. This is the same code as before, just with the extra bookkeeping of collecting the Deferred<Boolean> instances:

ArrayList<Deferred<Boolean>> workers = new ArrayList<Deferred<Boolean>>();

while ((rows = scanner.nextRows(1).joinUninterruptibly()) != null) { for (ArrayList<KeyValue> row : rows) {

// ...

Deferred<Boolean> d = ...; workers.add(d);

} }

Notice that your list of workers preserves the order in which the rows were produced. This isn’t particularly useful for this example because you’ve been careful to keep track of necessary context as you go using the custom UpdateResult and Update- FailedException classes. You could as easily accumulate state out at this level, for instance, by creating a Map of user ID to Deferred<Boolean> results. Because you’re

Execute step 2 Append step 3 Append steps 4a and 4b Append step 5 Set new password 2 Boolean Put Response KeyValue rowkey: TheRealMT info:password : abc123

Figure 6.8 Step 2 calculates a new password based on the old and sends

173

Asynchbase: an alternative Java HBase client

not interested in any specific result, you can join on all the Deferred instances as a group. The last step is to invoke join() and accumulate the results of your processing:

Deferred<ArrayList<Object>> d = Deferred.group(workers); try { d.join(); } catch (DeferredGroupException e) { LOG.info(e.getCause().getMessage()); }

Your machine is executing them all in the background simultaneously. When you call join(), async gives you back all the results of the processing chains for each worker. If any component of the chain threw or returned an instance of Exception along the way, it throws that Exception for you to catch here. The Deferred that encapsulates all of the individual Deferred instances wraps those exceptions in the Deferred- GroupException. Unpack it with a call to getCause() to see the underlying error.

To round it all out, let’s give the command-line application a meaningful name. Rename the file src/main/java/HBaseIA/App.java something useful like AsyncUsers- Tool.java, move it into an appropriate package path, and update the class name and package as well. Your final AsyncUsersTool will look like this.

package HBaseIA.TwitBase; // ...

static final byte[] TABLE_NAME = "users".getBytes(); static final byte[] INFO_FAM = "info".getBytes(); static final byte[] PASSWORD_COL = "password".getBytes(); static final byte[] EMAIL_COL = "email".getBytes(); public static final String usage =

"usertool action ...\n" +

" help - print this message and exit.\n" +

" update - update passwords for all installed users.\n"; static final Object lock = new Object();

static void println(String msg) { synchronized (lock) {

System.out.println(msg); }

}

static byte[] mkNewPassword(byte[] seed) { // ... }

static void latency() throws Exception {

if (System.currentTimeMillis() % 2 == 0) { println("a thread is napping...");

Thread.sleep(1000); }

}

Listing 6.4 Complete asynchbase client to TwitBase: AsyncUsersTool

Imports omitted

Synchronize writing to stdout among threads

Generate new password based on old

Add reality to sample app

In document Hbase in Action (Page 192-200)