############################################################################
#
# Copyright (c) 2004 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
import binascii
import logging
import sys
import weakref
import traceback
from zope.interface import implementer
from transaction.weakset import WeakSet
from transaction.interfaces import TransactionFailedError
from transaction import interfaces
from transaction._compat import reraise
from transaction._compat import get_thread_ident
from transaction._compat import native_
from transaction._compat import bytes_
from transaction._compat import StringIO
_marker = object()
_TB_BUFFER = None #unittests may hook
def _makeTracebackBuffer(): #pragma NO COVER
if _TB_BUFFER is not None:
return _TB_BUFFER
return StringIO()
_LOGGER = None #unittests may hook
def _makeLogger(): #pragma NO COVER
if _LOGGER is not None:
return _LOGGER
return logging.getLogger("txn.%d" % get_thread_ident())
# The point of this is to avoid hiding exceptions (which the builtin
# hasattr() does).
def myhasattr(obj, attr):
return getattr(obj, attr, _marker) is not _marker
class Status:
# ACTIVE is the initial state.
ACTIVE = "Active"
COMMITTING = "Committing"
COMMITTED = "Committed"
DOOMED = "Doomed"
# commit() or commit(True) raised an exception. All further attempts
# to commit or join this transaction will raise TransactionFailedError.
COMMITFAILED = "Commit failed"
@implementer(interfaces.ITransaction,
interfaces.ITransactionDeprecated)
[docs]class Transaction(object):
# Assign an index to each savepoint so we can invalidate later savepoints
# on rollback. The first index assigned is 1, and it goes up by 1 each
# time.
_savepoint_index = 0
# If savepoints are used, keep a weak key dict of them. This maps a
# savepoint to its index (see above).
_savepoint2index = None
# Meta data. ._extension is also metadata, but is initialized to an
# emtpy dict in __init__.
user = ""
description = ""
def __init__(self, synchronizers=None, manager=None):
self.status = Status.ACTIVE
# List of resource managers, e.g. MultiObjectResourceAdapters.
self._resources = []
# Weak set of synchronizer objects to call.
if synchronizers is None:
synchronizers = WeakSet()
self._synchronizers = synchronizers
self._manager = manager
# _adapters: Connection/_p_jar -> MultiObjectResourceAdapter[Sub]
self._adapters = {}
self._voted = {} # id(Connection) -> boolean, True if voted
# _voted and other dictionaries use the id() of the resource
# manager as a key, because we can't guess whether the actual
# resource managers will be safe to use as dict keys.
# The user, description, and _extension attributes are accessed
# directly by storages, leading underscore notwithstanding.
self._extension = {}
self.log = _makeLogger()
self.log.debug("new transaction")
# If a commit fails, the traceback is saved in _failure_traceback.
# If another attempt is made to commit, TransactionFailedError is
# raised, incorporating this traceback.
self._failure_traceback = None
# List of (hook, args, kws) tuples added by addBeforeCommitHook().
self._before_commit = []
# List of (hook, args, kws) tuples added by addAfterCommitHook().
self._after_commit = []
[docs] def isDoomed(self):
""" See ITransaction.
"""
return self.status is Status.DOOMED
[docs] def doom(self):
""" See ITransaction.
"""
if self.status is not Status.DOOMED:
if self.status is not Status.ACTIVE:
# should not doom transactions in the middle,
# or after, a commit
raise ValueError('non-doomable')
self.status = Status.DOOMED
# Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered
# a commit/savepoint failure.
def _prior_operation_failed(self):
assert self._failure_traceback is not None
raise TransactionFailedError("An operation previously failed, "
"with traceback:\n\n%s" %
self._failure_traceback.getvalue())
[docs] def join(self, resource):
""" See ITransaction.
"""
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return
if (self.status is not Status.ACTIVE and
self.status is not Status.DOOMED):
# TODO: Should it be possible to join a committing transaction?
# I think some users want it.
raise ValueError("expected txn status %r or %r, but it's %r" % (
Status.ACTIVE, Status.DOOMED, self.status))
# TODO: the prepare check is a bit of a hack, perhaps it would
# be better to use interfaces. If this is a ZODB4-style
# resource manager, it needs to be adapted, too.
if myhasattr(resource, "prepare"):
# TODO: deprecate 3.6
resource = DataManagerAdapter(resource)
self._resources.append(resource)
if self._savepoint2index:
# A data manager has joined a transaction *after* a savepoint
# was created. A couple of things are different in this case:
#
# 1. We need to add its savepoint to all previous savepoints.
# so that if they are rolled back, we roll this one back too.
#
# 2. We don't actually need to ask the data manager for a
# savepoint: because it's just joining, we can just abort it to
# roll back to the current state, so we simply use an
# AbortSavepoint.
datamanager_savepoint = AbortSavepoint(resource, self)
for transaction_savepoint in self._savepoint2index.keys():
transaction_savepoint._savepoints.append(
datamanager_savepoint)
def _unjoin(self, resource):
# Leave a transaction because a savepoint was rolled back on a resource
# that joined later.
# Don't use remove. We don't want to assume anything about __eq__.
self._resources = [r for r in self._resources if r is not resource]
[docs] def savepoint(self, optimistic=False):
""" See ITransaction.
"""
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return, it raises
try:
savepoint = Savepoint(self, optimistic, *self._resources)
except:
self._cleanup(self._resources)
self._saveAndRaiseCommitishError() # reraises!
if self._savepoint2index is None:
self._savepoint2index = weakref.WeakKeyDictionary()
self._savepoint_index += 1
self._savepoint2index[savepoint] = self._savepoint_index
return savepoint
# Remove and invalidate all savepoints we know about with an index
# larger than `savepoint`'s. This is what's needed when a rollback
# _to_ `savepoint` is done.
def _remove_and_invalidate_after(self, savepoint):
savepoint2index = self._savepoint2index
index = savepoint2index[savepoint]
# use list(items()) to make copy to avoid mutating while iterating
for savepoint, i in list(savepoint2index.items()):
if i > index:
savepoint.transaction = None # invalidate
del savepoint2index[savepoint]
# Invalidate and forget about all savepoints.
def _invalidate_all_savepoints(self):
for savepoint in self._savepoint2index.keys():
savepoint.transaction = None # invalidate
self._savepoint2index.clear()
[docs] def register(self, obj):
""" See ITransaction.
"""
# The old way of registering transaction participants.
#
# register() is passed either a persisent object or a
# resource manager like the ones defined in ZODB.DB.
# If it is passed a persistent object, that object should
# be stored when the transaction commits. For other
# objects, the object implements the standard two-phase
# commit protocol.
manager = getattr(obj, "_p_jar", obj)
if manager is None:
raise ValueError("Register with no manager")
adapter = self._adapters.get(manager)
if adapter is None:
adapter = MultiObjectResourceAdapter(manager)
adapter.objects.append(obj)
self._adapters[manager] = adapter
self.join(adapter)
else:
# TODO: comment out this expensive assert later
# Use id() to guard against proxies.
assert id(obj) not in map(id, adapter.objects)
adapter.objects.append(obj)
[docs] def commit(self):
""" See ITransaction.
"""
if self.status is Status.DOOMED:
raise interfaces.DoomedTransaction(
'transaction doomed, cannot commit')
if self._savepoint2index:
self._invalidate_all_savepoints()
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return
self._callBeforeCommitHooks()
self._synchronizers.map(lambda s: s.beforeCompletion(self))
self.status = Status.COMMITTING
try:
self._commitResources()
self.status = Status.COMMITTED
except:
t = None
v = None
tb = None
try:
t, v, tb = self._saveAndGetCommitishError()
self._callAfterCommitHooks(status=False)
reraise(t, v, tb)
finally:
del t, v, tb
else:
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self._callAfterCommitHooks(status=True)
self.log.debug("commit")
def _saveAndGetCommitishError(self):
self.status = Status.COMMITFAILED
# Save the traceback for TransactionFailedError.
ft = self._failure_traceback = _makeTracebackBuffer()
t = None
v = None
tb = None
try:
t, v, tb = sys.exc_info()
# Record how we got into commit().
traceback.print_stack(sys._getframe(1), None, ft)
# Append the stack entries from here down to the exception.
traceback.print_tb(tb, None, ft)
# Append the exception type and value.
ft.writelines(traceback.format_exception_only(t, v))
return t, v, tb
finally:
del t, v, tb
def _saveAndRaiseCommitishError(self):
t = None
v = None
tb = None
try:
t, v, tb = self._saveAndGetCommitishError()
reraise(t, v, tb)
finally:
del t, v, tb
[docs] def getBeforeCommitHooks(self):
""" See ITransaction.
"""
return iter(self._before_commit)
[docs] def addBeforeCommitHook(self, hook, args=(), kws=None):
""" See ITransaction.
"""
if kws is None:
kws = {}
self._before_commit.append((hook, tuple(args), kws))
def _callBeforeCommitHooks(self):
# Call all hooks registered, allowing further registrations
# during processing. Note that calls to addBeforeCommitHook() may
# add additional hooks while hooks are running, and iterating over a
# growing list is well-defined in Python.
for hook, args, kws in self._before_commit:
hook(*args, **kws)
self._before_commit = []
[docs] def getAfterCommitHooks(self):
""" See ITransaction.
"""
return iter(self._after_commit)
[docs] def addAfterCommitHook(self, hook, args=(), kws=None):
""" See ITransaction.
"""
if kws is None:
kws = {}
self._after_commit.append((hook, tuple(args), kws))
def _callAfterCommitHooks(self, status=True):
# Avoid to abort anything at the end if no hooks are registred.
if not self._after_commit:
return
# Call all hooks registered, allowing further registrations
# during processing. Note that calls to addAterCommitHook() may
# add additional hooks while hooks are running, and iterating over a
# growing list is well-defined in Python.
for hook, args, kws in self._after_commit:
# The first argument passed to the hook is a Boolean value,
# true if the commit succeeded, or false if the commit aborted.
try:
hook(status, *args, **kws)
except:
# We need to catch the exceptions if we want all hooks
# to be called
self.log.error("Error in after commit hook exec in %s ",
hook, exc_info=sys.exc_info())
# The transaction is already committed. It must not have
# further effects after the commit.
for rm in self._resources:
try:
rm.abort(self)
except:
# XXX should we take further actions here ?
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
self._after_commit = []
self._before_commit = []
def _commitResources(self):
# Execute the two-phase commit protocol.
L = list(self._resources)
L.sort(key=rm_key)
try:
for rm in L:
rm.tpc_begin(self)
for rm in L:
rm.commit(self)
self.log.debug("commit %r" % rm)
for rm in L:
rm.tpc_vote(self)
self._voted[id(rm)] = True
try:
for rm in L:
rm.tpc_finish(self)
except:
# TODO: do we need to make this warning stronger?
# TODO: It would be nice if the system could be configured
# to stop committing transactions at this point.
self.log.critical("A storage error occurred during the second "
"phase of the two-phase commit. Resources "
"may be in an inconsistent state.")
raise
except:
# If an error occurs committing a transaction, we try
# to revert the changes in each of the resource managers.
t, v, tb = sys.exc_info()
try:
try:
self._cleanup(L)
finally:
self._synchronizers.map(lambda s: s.afterCompletion(self))
reraise(t, v, tb)
finally:
del t, v, tb
def _cleanup(self, L):
# Called when an exception occurs during tpc_vote or tpc_finish.
for rm in L:
if id(rm) not in self._voted:
try:
rm.abort(self)
except Exception:
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
for rm in L:
try:
rm.tpc_abort(self)
except Exception:
self.log.error("Error in tpc_abort() on manager %s",
rm, exc_info=sys.exc_info())
[docs] def abort(self):
""" See ITransaction.
"""
if self._savepoint2index:
self._invalidate_all_savepoints()
self._synchronizers.map(lambda s: s.beforeCompletion(self))
try:
t = None
v = None
tb = None
for rm in self._resources:
try:
rm.abort(self)
except:
if tb is None:
t, v, tb = sys.exc_info()
self.log.error("Failed to abort resource manager: %s",
rm, exc_info=sys.exc_info())
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("abort")
if tb is not None:
reraise(t, v, tb)
finally:
del t, v, tb
[docs] def note(self, text):
""" See ITransaction.
"""
text = text.strip()
if self.description:
self.description += "\n" + text
else:
self.description = text
[docs] def setUser(self, user_name, path="/"):
""" See ITransaction.
"""
self.user = "%s %s" % (path, user_name)
[docs] def setExtendedInfo(self, name, value):
""" See ITransaction.
"""
self._extension[name] = value
# TODO: We need a better name for the adapters.
class MultiObjectResourceAdapter(object):
"""Adapt the old-style register() call to the new-style join().
With join(), a resource mananger like a Connection registers with
the transaction manager. With register(), an individual object
is passed to register().
"""
def __init__(self, jar):
self.manager = jar
self.objects = []
self.ncommitted = 0
def __repr__(self):
return "<%s for %s at %s>" % (self.__class__.__name__,
self.manager, id(self))
def sortKey(self):
return self.manager.sortKey()
def tpc_begin(self, txn):
self.manager.tpc_begin(txn)
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
def tpc_abort(self, txn):
self.manager.tpc_abort(txn)
def commit(self, txn):
for o in self.objects:
self.manager.commit(o, txn)
self.ncommitted += 1
def tpc_vote(self, txn):
self.manager.tpc_vote(txn)
def abort(self, txn):
t = None
v = None
tb = None
try:
for o in self.objects:
try:
self.manager.abort(o, txn)
except:
# Capture the first exception and re-raise it after
# aborting all the other objects.
if tb is None:
t, v, tb = sys.exc_info()
txn.log.error("Failed to abort object: %s",
object_hint(o), exc_info=sys.exc_info())
if tb is not None:
reraise(t, v, tb)
finally:
del t, v, tb
def rm_key(rm):
func = getattr(rm, 'sortKey', None)
if func is not None:
return func()
def object_hint(o):
"""Return a string describing the object.
This function does not raise an exception.
"""
# We should always be able to get __class__.
klass = o.__class__.__name__
# oid would be great, but maybe this isn't a persistent object.
oid = getattr(o, "_p_oid", _marker)
if oid is not _marker:
oid = oid_repr(oid)
else:
oid = 'None'
return "%s oid=%s" % (klass, oid)
def oid_repr(oid):
if isinstance(oid, str) and len(oid) == 8:
# Convert to hex and strip leading zeroes.
as_hex = native_(
binascii.hexlify(bytes_(oid, 'ascii')), 'ascii').lstrip('0')
# Ensure two characters per input byte.
if len(as_hex) & 1:
as_hex = '0' + as_hex
elif as_hex == '':
as_hex = '00'
return '0x' + as_hex
else:
return repr(oid)
# TODO: deprecate for 3.6.
class DataManagerAdapter(object):
"""Adapt zodb 4-style data managers to zodb3 style
Adapt transaction.interfaces.IDataManager to
ZODB.interfaces.IPureDatamanager
"""
# Note that it is pretty important that this does not have a _p_jar
# attribute. This object will be registered with a zodb3 TM, which
# will then try to get a _p_jar from it, using it as the default.
# (Objects without a _p_jar are their own data managers.)
def __init__(self, datamanager):
self._datamanager = datamanager
# TODO: I'm not sure why commit() doesn't do anything
def commit(self, transaction):
# We don't do anything here because ZODB4-style data managers
# didn't have a separate commit step
pass
def abort(self, transaction):
self._datamanager.abort(transaction)
def tpc_begin(self, transaction):
# We don't do anything here because ZODB4-style data managers
# didn't have a separate tpc_begin step
pass
def tpc_abort(self, transaction):
self._datamanager.abort(transaction)
def tpc_finish(self, transaction):
self._datamanager.commit(transaction)
def tpc_vote(self, transaction):
self._datamanager.prepare(transaction)
def sortKey(self):
return self._datamanager.sortKey()
@implementer(interfaces.ISavepoint)
[docs]class Savepoint:
"""Transaction savepoint.
Transaction savepoints coordinate savepoints for data managers
participating in a transaction.
"""
valid = property(lambda self: self.transaction is not None)
def __init__(self, transaction, optimistic, *resources):
self.transaction = transaction
self._savepoints = savepoints = []
for datamanager in resources:
try:
savepoint = datamanager.savepoint
except AttributeError:
if not optimistic:
raise TypeError("Savepoints unsupported", datamanager)
savepoint = NoRollbackSavepoint(datamanager)
else:
savepoint = savepoint()
savepoints.append(savepoint)
[docs] def rollback(self):
""" See ISavepoint.
"""
transaction = self.transaction
if transaction is None:
raise interfaces.InvalidSavepointRollbackError(
'invalidated by a later savepoint')
transaction._remove_and_invalidate_after(self)
try:
for savepoint in self._savepoints:
savepoint.rollback()
except:
# Mark the transaction as failed.
transaction._saveAndRaiseCommitishError() # reraises!
class AbortSavepoint:
def __init__(self, datamanager, transaction):
self.datamanager = datamanager
self.transaction = transaction
def rollback(self):
self.datamanager.abort(self.transaction)
self.transaction._unjoin(self.datamanager)
class NoRollbackSavepoint:
def __init__(self, datamanager):
self.datamanager = datamanager
def rollback(self):
raise TypeError("Savepoints unsupported", self.datamanager)