Monday 10 October 2011

Eventual consistency with transactions




In my last post I described the motivation for the new NDB$EPOCH conflict detection function in MySQL Cluster. This function detects when a row has been concurrently updated on two asynchronously replicating MySQL Cluster databases, and takes steps to keep the databases in alignment.

With NDB$EPOCH, conflicts are detected and handled on a row granularity, as opposed to column granularity, as this is the granularity of the epoch metadata used to detect conflicts. Dealing with conflicts on a row-by-row basis has implications for schema and application design. The NDB$EPOCH_TRANS function extends NDB$EPOCH, giving stronger consistency guarantees and reducing the impact on applications and schemas.

Concurrency control in a single synchronous system

MySQL Cluster is a relational system. Data is stored in tables with defined schemas of typed columns. As with any relational system, real-world concepts can be modelled in a number of ways with different trade offs. One such consideration is the level of normalisation applied to a data model. Transactions and concurrency control ensure that some data modelled using multiple tables, rows and columns, appears to any external observer to move instantaneously between stable, self consistent states. This is a powerful simplification, and eases the complexity burden on application writers. Each transaction provides the illusion of serialised access to the database. Multiple transactions can execute in parallel, so long as they do not interfere by accessing the same data. Where transactions do interfere, some real serialisation can occur. In practice, applications depend on the serialisation and atomicity guarantees given by transactions, often in ways not fully made explicit or understood by the application designers.

Concurrency control in independent, asynchronously replicated systems

Asynchronously replicating writes between two independent systems erodes the guarantees given by single system concurrency control. Each system maintains its transactional guarantees in parallel, and incorporates modifications from the other system asynchronously, at some time after they were originally committed. Where the same row is modified on both systems concurrently, two versions of the same row are produced, and there is no longer a single history of values for the given row. This can cause replicas to diverge. Note that the window of 'concurrency', or 'potential conflict' is related to the time taken for a committed update to be applied on all replicas. This is similar, or equivalent to the commit delay experienced by a synchronous 2-phase commit system.

Conflicts can be detected using some form of conflict detection. On detecting a conflict, steps can then be taken to avoid divergence, and resolve any unwanted effects of the concurrent writes.

Replica divergence,
external effects and cascading impacts

Divergence can be avoided if conflicting writes can be merged in some way. Some write conflicts may be equivalent, associative or otherwise mergeable, especially if the operations are replicated rather than their resulting states. However merging requires specific schema and application knowledge to determine how to merge conflicting writes.

More generally, divergence can be avoided by rejecting one or both conflicting writes. This is the approach we have taken, with handling of rejected writes delegated to the application, where the knowledge exists to handle them via the exceptions table mechanism.

However write conflicts are handled, it is important to consider :
  1. Cascading impacts on dependent operations
    Operations based on the results of conflicting operations may themselves require handling to avoid divergence.
  2. Real world / other system effects based on conflicting writes
    Maintaining database consistency does not guarantee that real world effects have been correctly compensated.

A database system does not exist in a vacuum. Operations are performed to reflect external world, or external system events. When the effects of an operation are later reverted, the real world effects may also require some compensating actions. These external world compensating actions are beyond the scope of any DBMS system and are application specific. In a real application of this technology, this is probably the most important part of the design.

Any particular conflict originates between two concurrent operations, but once a conflicting operation is committed, other operations can read its results, and commit their own, expanding the impact of the original conflict. Conflicts are discovered asynchronously, some time after the original operations are committed, so there can be a large number of subsequent operations in the replication pipeline which depend on the conflicting operations at the point they are discovered. All of the invalidated subsequent operations must be handled.

Row based conflict detection and data shearing

Using row-based conflict detection and re-alignment can counteract data divergence so that rows become consistent eventually, but this comes at the cost of eroding the atomicity of committed transactions. For example, a committed transaction which writes to three rows may, after conflict handling, have none, one, two or all three row changes reverted.

Within a single system, the two potentially visible states were :
  1. Before transaction (All rows at version 1 : Av1, Bv1, Cv1)
  2. After transaction (All rows at version 2 : Av2, Bv2, Cv2)

With row based conflict detection, ignoring the row variants we're actually conflicting with, there could be :

Before transaction : (All rows at version 1 : Av1, Bv1, Cv1)

After transaction
  1. Av2, Bv2, Cv2 (All rows at version 2)
  2. Av2, Bv2, Cv1 (Cv2 reverted)
  3. Av2, Bv1, Cv2 (Bv2 reverted)
  4. Av2, Bv1, Cv1 (Bv2, Cv2 reverted)
  5. Av1, Bv2, Cv2 (Av2 reverted)
  6. Av1, Bv2, Cv1 (Av2, Cv2 reverted)
  7. Av1, Bv1, Cv2 (Av2, Bv2 reverted)
  8. Av1, Bv1, Cv1 (Av2, Bv2, Cv2 reverted)

Depending on the concept that the distinct rows A,B,C represented, this can vastly increase the complexity of understanding the data. If A, B and C model entirely separate entities, which just happened to be transactionally updated together then there may be no problem if they fare differently in conflict detection. If they model portions of the state of a larger entity then reasoning about the state of that entity becomes complex.

This potential chopping up of changes committed in a transaction can be described as shearing of the data model represented by the schema. In practice, the potential for shearing between rows implies that for tables with conflicts handled on a row basis, cross row consistency is not available. This in turn implies that the schema must be modified to ensure that data items which cannot tolerate relative shear are placed in the same row so that they share the same fate and remain self-consistent. This single-row limit to consistency is native and natural to some NoSQL / key-value / wide column store products, but is a weakening of the normal guarantees in a transactional system.

Requiring that schemas and applications using conflict detection can tolerate shear between any two rows is quite a heavy burden to place on applications, especially those not written with eventual consistency in mind. Is there some way to support optimistic conflict detection without breaking up committed transactions, and shearing rows?

Transaction based conflict detection


One way to avoid inter-row shearing is to perform conflict detection on a row-by-row basis, but on discovering a conflict, take action on a transaction basis. More concretely, when a row conflict is discovered, any other rows written as part of the same transaction should also be considered in-conflict by implication. This reduces the set of stable states back to the original case - all rows at version 1 or all rows at version 2.

Where a row is found to be in-conflict with some replicated row operation, a further replicated row operation on the same row should also be found to be in-conflict, until the conflict condition has been cleared. This property is implicitly implemented in the existing row based conflict detection functions.

When the scope of a conflict is extended to include all row modifications in a transaction, this implies that all following replicated row operations which affect the same rows, must also be in conflict by implication. To avoid row shearing, these implied-in-conflict rows must implicate the other rows in their transactions, and those rows may in-turn implicate other rows. The overall effect is that a single row conflict must cause its transaction, and all dependent transactions to be considered to be in conflict.

From our database centric point of view, transactions can only become dependent on each other through the data they access in the database. If transaction X updates rows A and B, and transaction Y then reads row B and updates row C, then we can say that transaction B has a read-write dependency on transaction A via row B. We cannot tell whether there is some other out-of-band communication between transactions.

By tracking this transaction 'footprint' information, and looking for row overlaps, we can determine transaction dependencies. This is how the new NDB$EPOCH_TRANS function provides transactional conflict detection.

NDB$EPOCH_TRANS conflict detection function

The NDB$EPOCH_TRANS conflict function uses the same mechanism as the NDB$EPOCH function to detect concurrent updates to the same row across two clusters. However, once a row conflict has been detected in an operation which is part of a replicated transaction, all other operations in that replicated transaction are considered to be in conflict. Furthermore, any transactions found to be dependent on that transaction are also considered in conflict. Once the full set of in conflict transactions has been determined, the set of affected rows are handled in the same way as in NDB$EPOCH.

Specifically :
  • The replicated operations are not applied
  • The exceptions table(s) are populated with the affected primary keys
  • The affected row epochs are updated
  • Realignment Binlog events are generated to (eventually) realign the Secondary cluster

As with NDB$EPOCH, NDB$EPOCH_TRANS is asymmetric, so the Primary Cluster always wins when a conflict is detected. As with NDB$EPOCH, this allows applications needing pessimistic properties to obtain them by accessing the Primary Cluster. Applications which can handle the relaxed consistency of optimism can access either Cluster. With NDB$EPOCH_TRANS, transactions committed on the Secondary Cluster are guaranteed to be atomic, whether or not they are later found to be in conflict. Each committed transaction will either be unaffected by conflict detection, or be completely reverted. There will be no row shear.

This slightly stronger optimistic consistency guarantee may ease the implementation of relaxed consistency / eventually consistent applications. For example, where some concept is modelled by a number of different rows in different tables, any transactional modification will either be atomically applied, or not applied at all, so the relationships between the rows affected by a transaction will preserved. The need to flatten a schema into single-row entities is reduced, although careful design is still required to get a good understanding of transaction boundaries, and the behaviour of the overall system when transactions are reverted.

Transaction dependency tracking


NDB$EPOCH_TRANS is built in to the MySQL Cluster Storage Engine. It is active in the normal MySQL Slave SQL thread, as part of the normal table handler calls made when applying a replicated Binlog. The NDB$EPOCH_TRANS code in the Ndb storage engine tracks transaction dependencies based on the primary keys accessed by row events in the Binlog, and their transaction ids. If two row events have the same table and primary key values, then they affect the same row. If two events affect the same row, and are in different transactions, then the second transaction depends on the first. In this way, a transaction dependency graph is built by the MySQL Cluster Storage Engine as row events are applied by the Slave from a replicated Binlog. This graph is then used to find dependencies when a conflict is detected.

A Binlog only contains WRITE_ROW, UPDATE_ROW and DELETE_ROW events. This means that we only detect dependencies between transactions which write the same rows. We do not currently track dependencies between writers and readers. For example :

Transaction A : {Write row X, Write row Y}
Transaction B : {Read row Y, Write row Z}

Binlog : {{Tx A : Wr X, Wr Y}, {Tx B : Wr Z}}

In this example, the dependency of Transaction B on Transaction A is not recorded in the Binlog, and so the Slave is not aware of it. This would result in the write to row Z not being considered in conflict, when it should be.

A future improvement is to add selective tracking of reads to the Binlog, so that Write -> Read dependencies will implicate reading transactions when a conflict is discovered.

There's more to come


Another long dry post, best consumed with your favourite drink in hand. As I mentioned last time, these functions are pushed, and available in the latest releases of MySQL Cluster. I'd be happy to hear from anyone who wants to try them out and give feedback. I've been deliberately light with implementation details thus far, as I'm saving those for yet another posting. I think that some of the implementation details are interesting from a replication point of view, even if you're not interested in these particular conflict detection algorithms. You may disagree :)

Edit 23/12/11 : Added index

3 comments:

Mark Callaghan said...

I didn't understand -- "the minimum conflict window size is approximately 200 milliseconds". Does this mean that a row changed at the same time isn't guaranteed to be reported as a conflict? I think that section of http://dev.mysql.com/doc/refman/5.1/en/mysql-cluster-replication-conflict-resolution.html needs more details.

Frazer Clement said...

I agree that it isn't clear at the moment.

By 'minimum conflict window', I mean the minimum time which must pass between updates to the same row on different clusters for them *not* to be considered conflicting.

If less than the minimum conflict window of time passes between conflicting updates, then they *will* be considered in-conflict.

This non-negligible minimum time would be easy for a naive application to beat in a race, so should be considered when designing an application based on the NDB$EPOCH functions.

Does that make any more sense? :)

Unknown said...


attractive piece of information, I had come to know about your blog from my friend arjun, ahmedabad,i have read atleast eleven posts of yours by now, and let me tell you, your website gives the best and the most interesting information. This is just the kind of information that i had been looking for, i'm already your rss reader now and i would regularly watch out for the new posts, once again hats off to you! Thanks a lot once again, Regards, Single Row Function in sql