# maxdb.py
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""Support for the MaxDB database.
TODO: More module docs! MaxDB support is currently experimental.
Overview
--------
The ``maxdb`` dialect is **experimental** and has only been tested on 7.6.03.007
and 7.6.00.037. Of these, **only 7.6.03.007 will work** with SQLAlchemy's ORM.
The earlier version has severe ``LEFT JOIN`` limitations and will return
incorrect results from even very simple ORM queries.
Only the native Python DB-API is currently supported. ODBC driver support
is a future enhancement.
Connecting
----------
The username is case-sensitive. If you usually connect to the
database with sqlcli and other tools in lower case, you likely need to
use upper case for DB-API.
Implementation Notes
--------------------
Also check the DatabaseNotes page on the wiki for detailed information.
With the 7.6.00.37 driver and Python 2.5, it seems that all DB-API
generated exceptions are broken and can cause Python to crash.
For 'somecol.in_([])' to work, the IN operator's generation must be changed
to cast 'NULL' to a numeric, i.e. NUM(NULL). The DB-API doesn't accept a
bind parameter there, so that particular generation must inline the NULL value,
which depends on [ticket:807].
The DB-API is very picky about where bind params may be used in queries.
Bind params for some functions (e.g. MOD) need type information supplied.
The dialect does not yet do this automatically.
Max will occasionally throw up 'bad sql, compile again' exceptions for
perfectly valid SQL. The dialect does not currently handle these, more
research is needed.
MaxDB 7.5 and Sap DB <= 7.4 reportedly do not support schemas. A very
slightly different version of this dialect would be required to support
those versions, and can easily be added if there is demand. Some other
required components such as an Max-aware 'old oracle style' join compiler
(thetas with (+) outer indicators) are already done and available for
integration- email the devel list if you're interested in working on
this.
"""
import datetime, itertools, re
from sqlalchemy import exc, schema, sql, util
from sqlalchemy.sql import operators as sql_operators, expression as sql_expr
from sqlalchemy.sql import compiler, visitors
from sqlalchemy.engine import base as engine_base, default
from sqlalchemy import types as sqltypes
__all__ = [
'MaxString', 'MaxUnicode', 'MaxChar', 'MaxText', 'MaxInteger',
'MaxSmallInteger', 'MaxNumeric', 'MaxFloat', 'MaxTimestamp',
'MaxDate', 'MaxTime', 'MaxBoolean', 'MaxBlob',
]
class _StringType(sqltypes.String):
_type = None
def __init__(self, length=None, encoding=None, **kw):
super(_StringType, self).__init__(length=length, **kw)
self.encoding = encoding
def get_col_spec(self):
if self.length is None:
spec = 'LONG'
else:
spec = '%s(%s)' % (self._type, self.length)
if self.encoding is not None:
spec = ' '.join([spec, self.encoding.upper()])
return spec
def bind_processor(self, dialect):
if self.encoding == 'unicode':
return None
else:
def process(value):
if isinstance(value, unicode):
return value.encode(dialect.encoding)
else:
return value
return process
def result_processor(self, dialect):
def process(value):
while True:
if value is None:
return None
elif isinstance(value, unicode):
return value
elif isinstance(value, str):
if self.convert_unicode or dialect.convert_unicode:
return value.decode(dialect.encoding)
else:
return value
elif hasattr(value, 'read'):
# some sort of LONG, snarf and retry
value = value.read(value.remainingLength())
continue
else:
# unexpected type, return as-is
return value
return process
class MaxString(_StringType):
_type = 'VARCHAR'
def __init__(self, *a, **kw):
super(MaxString, self).__init__(*a, **kw)
class MaxUnicode(_StringType):
_type = 'VARCHAR'
def __init__(self, length=None, **kw):
super(MaxUnicode, self).__init__(length=length, encoding='unicode')
class MaxChar(_StringType):
_type = 'CHAR'
class MaxText(_StringType):
_type = 'LONG'
def __init__(self, *a, **kw):
super(MaxText, self).__init__(*a, **kw)
def get_col_spec(self):
spec = 'LONG'
if self.encoding is not None:
spec = ' '.join((spec, self.encoding))
elif self.convert_unicode:
spec = ' '.join((spec, 'UNICODE'))
return spec
class MaxInteger(sqltypes.Integer):
def get_col_spec(self):
return 'INTEGER'
class MaxSmallInteger(MaxInteger):
def get_col_spec(self):
return 'SMALLINT'
class MaxNumeric(sqltypes.Numeric):
"""The FIXED (also NUMERIC, DECIMAL) data type."""
def __init__(self, precision=None, scale=None, **kw):
kw.setdefault('asdecimal', True)
super(MaxNumeric, self).__init__(scale=scale, precision=precision,
**kw)
def bind_processor(self, dialect):
return None
def get_col_spec(self):
if self.scale and self.precision:
return 'FIXED(%s, %s)' % (self.precision, self.scale)
elif self.precision:
return 'FIXED(%s)' % self.precision
else:
return 'INTEGER'
class MaxFloat(sqltypes.Float):
"""The FLOAT data type."""
def get_col_spec(self):
if self.precision is None:
return 'FLOAT'
else:
return 'FLOAT(%s)' % (self.precision,)
class MaxTimestamp(sqltypes.DateTime):
def get_col_spec(self):
return 'TIMESTAMP'
def bind_processor(self, dialect):
def process(value):
if value is None:
return None
elif isinstance(value, basestring):
return value
elif dialect.datetimeformat == 'internal':
ms = getattr(value, 'microsecond', 0)
return value.strftime("%Y%m%d%H%M%S" + ("%06u" % ms))
elif dialect.datetimeformat == 'iso':
ms = getattr(value, 'microsecond', 0)
return value.strftime("%Y-%m-%d %H:%M:%S." + ("%06u" % ms))
else:
raise exc.InvalidRequestError(
"datetimeformat '%s' is not supported." % (
dialect.datetimeformat,))
return process
def result_processor(self, dialect):
def process(value):
if value is None:
return None
elif dialect.datetimeformat == 'internal':
return datetime.datetime(
*[int(v)
for v in (value[0:4], value[4:6], value[6:8],
value[8:10], value[10:12], value[12:14],
value[14:])])
elif dialect.datetimeformat == 'iso':
return datetime.datetime(
*[int(v)
for v in (value[0:4], value[5:7], value[8:10],
value[11:13], value[14:16], value[17:19],
value[20:])])
else:
raise exc.InvalidRequestError(
"datetimeformat '%s' is not supported." % (
dialect.datetimeformat,))
return process
class MaxDate(sqltypes.Date):
def get_col_spec(self):
return 'DATE'
def bind_processor(self, dialect):
def process(value):
if value is None:
return None
elif isinstance(value, basestring):
return value
elif dialect.datetimeformat == 'internal':
return value.strftime("%Y%m%d")
elif dialect.datetimeformat == 'iso':
return value.strftime("%Y-%m-%d")
else:
raise exc.InvalidRequestError(
"datetimeformat '%s' is not supported." % (
dialect.datetimeformat,))
return process
def result_processor(self, dialect):
def process(value):
if value is None:
return None
elif dialect.datetimeformat == 'internal':
return datetime.date(
*[int(v) for v in (value[0:4], value[4:6], value[6:8])])
elif dialect.datetimeformat == 'iso':
return datetime.date(
*[int(v) for v in (value[0:4], value[5:7], value[8:10])])
else:
raise exc.InvalidRequestError(
"datetimeformat '%s' is not supported." % (
dialect.datetimeformat,))
return process
class MaxTime(sqltypes.Time):
def get_col_spec(self):
return 'TIME'
def bind_processor(self, dialect):
def process(value):
if value is None:
return None
elif isinstance(value, basestring):
return value
elif dialect.datetimeformat == 'internal':
return value.strftime("%H%M%S")
elif dialect.datetimeformat == 'iso':
return value.strftime("%H-%M-%S")
else:
raise exc.InvalidRequestError(
"datetimeformat '%s' is not supported." % (
dialect.datetimeformat,))
return process
def result_processor(self, dialect):
def process(value):
if value is None:
return None
elif dialect.datetimeformat == 'internal':
t = datetime.time(
*[int(v) for v in (value[0:4], value[4:6], value[6:8])])
return t
elif dialect.datetimeformat == 'iso':
return datetime.time(
*[int(v) for v in (value[0:4], value[5:7], value[8:10])])
else:
raise exc.InvalidRequestError(
"datetimeformat '%s' is not supported." % (
dialect.datetimeformat,))
return process
class MaxBoolean(sqltypes.Boolean):
def get_col_spec(self):
return 'BOOLEAN'
class MaxBlob(sqltypes.Binary):
def get_col_spec(self):
return 'LONG BYTE'
def bind_processor(self, dialect):
def process(value):
if value is None:
return None
else:
return str(value)
return process
def result_processor(self, dialect):
def process(value):
if value is None:
return None
else:
return value.read(value.remainingLength())
return process
colspecs = {
sqltypes.Integer: MaxInteger,
sqltypes.Smallinteger: MaxSmallInteger,
sqltypes.Numeric: MaxNumeric,
sqltypes.Float: MaxFloat,
sqltypes.DateTime: MaxTimestamp,
sqltypes.Date: MaxDate,
sqltypes.Time: MaxTime,
sqltypes.String: MaxString,
sqltypes.Binary: MaxBlob,
sqltypes.Boolean: MaxBoolean,
sqltypes.Text: MaxText,
sqltypes.CHAR: MaxChar,
sqltypes.TIMESTAMP: MaxTimestamp,
sqltypes.BLOB: MaxBlob,
sqltypes.Unicode: MaxUnicode,
}
ischema_names = {
'boolean': MaxBoolean,
'char': MaxChar,
'character': MaxChar,
'date': MaxDate,
'fixed': MaxNumeric,
'float': MaxFloat,
'int': MaxInteger,
'integer': MaxInteger,
'long binary': MaxBlob,
'long unicode': MaxText,
'long': MaxText,
'long': MaxText,
'smallint': MaxSmallInteger,
'time': MaxTime,
'timestamp': MaxTimestamp,
'varchar': MaxString,
}
class MaxDBExecutionContext(default.DefaultExecutionContext):
def post_exec(self):
# DB-API bug: if there were any functions as values,
# then do another select and pull CURRVAL from the
# autoincrement column's implicit sequence... ugh
if self.compiled.isinsert and not self.executemany:
table = self.compiled.statement.table
index, serial_col = _autoserial_column(table)
if serial_col and (not self.compiled._safeserial or
not(self._last_inserted_ids) or
self._last_inserted_ids[index] in (None, 0)):
if table.schema:
sql = "SELECT %s.CURRVAL FROM DUAL" % (
self.compiled.preparer.format_table(table))
else:
sql = "SELECT CURRENT_SCHEMA.%s.CURRVAL FROM DUAL" % (
self.compiled.preparer.format_table(table))
if self.connection.engine._should_log_info:
self.connection.engine.logger.info(sql)
rs = self.cursor.execute(sql)
id = rs.fetchone()[0]
if self.connection.engine._should_log_debug:
self.connection.engine.logger.debug([id])
if not self._last_inserted_ids:
# This shouldn't ever be > 1? Right?
self._last_inserted_ids = \
[None] * len(table.primary_key.columns)
self._last_inserted_ids[index] = id
super(MaxDBExecutionContext, self).post_exec()
def get_result_proxy(self):
if self.cursor.description is not None:
for column in self.cursor.description:
if column[1] in ('Long Binary', 'Long', 'Long Unicode'):
return MaxDBResultProxy(self)
return engine_base.ResultProxy(self)
class MaxDBCachedColumnRow(engine_base.RowProxy):
"""A RowProxy that only runs result_processors once per column."""
def __init__(self, parent, row):
super(MaxDBCachedColumnRow, self).__init__(parent, row)
self.columns = {}
self._row = row
self._parent = parent
def _get_col(self, key):
if key not in self.columns:
self.columns[key] = self._parent._get_col(self._row, key)
return self.columns[key]
def __iter__(self):
for i in xrange(len(self._row)):
yield self._get_col(i)
def __repr__(self):
return repr(list(self))
def __eq__(self, other):
return ((other is self) or
(other == tuple([self._get_col(key)
for key in xrange(len(self._row))])))
def __getitem__(self, key):
if isinstance(key, slice):
indices = key.indices(len(self._row))
return tuple([self._get_col(i) for i in xrange(*indices)])
else:
return self._get_col(key)
def __getattr__(self, name):
try:
return self._get_col(name)
except KeyError:
raise AttributeError(name)
class MaxDBResultProxy(engine_base.ResultProxy):
_process_row = MaxDBCachedColumnRow
class MaxDBDialect(default.DefaultDialect):
name = 'maxdb'
supports_alter = True
supports_unicode_statements = True
max_identifier_length = 32
supports_sane_rowcount = True
supports_sane_multi_rowcount = False
preexecute_pk_sequences = True
# MaxDB-specific
datetimeformat = 'internal'
def __init__(self, _raise_known_sql_errors=False, **kw):
super(MaxDBDialect, self).__init__(**kw)
self._raise_known = _raise_known_sql_errors
if self.dbapi is None:
self.dbapi_type_map = {}
else:
self.dbapi_type_map = {
'Long Binary': MaxBlob(),
'Long byte_t': MaxBlob(),
'Long Unicode': MaxText(),
'Timestamp': MaxTimestamp(),
'Date': MaxDate(),
'Time': MaxTime(),
datetime.datetime: MaxTimestamp(),
datetime.date: MaxDate(),
datetime.time: MaxTime(),
}
def dbapi(cls):
from sapdb import dbapi as _dbapi
return _dbapi
dbapi = classmethod(dbapi)
def create_connect_args(self, url):
opts = url.translate_connect_args(username='user')
opts.update(url.query)
return [], opts
def type_descriptor(self, typeobj):
if isinstance(typeobj, type):
typeobj = typeobj()
if isinstance(typeobj, sqltypes.Unicode):
return typeobj.adapt(MaxUnicode)
else:
return sqltypes.adapt_type(typeobj, colspecs)
def do_execute(self, cursor, statement, parameters, context=None):
res = cursor.execute(statement, parameters)
if isinstance(res, int) and context is not None:
context._rowcount = res
def do_release_savepoint(self, connection, name):
# Does MaxDB truly support RELEASE SAVEPOINT